diff --git a/c_src/bdberl_drv.c b/c_src/bdberl_drv.c index d01d0ee..8498dca 100644 --- a/c_src/bdberl_drv.c +++ b/c_src/bdberl_drv.c @@ -47,11 +47,11 @@ static int alloc_dbref(); static void* zalloc(unsigned int size); static void* deadlock_check(void* arg); -static void* trickle_write(void* arg); -static void* txn_checkpoint(void* arg); +static void* checkpointer(void* arg); static void bdb_errcall(const DB_ENV* dbenv, const char* errpfx, const char* msg); static void bdb_msgcall(const DB_ENV* dbenv, const char* msg); +static void send_log_message(ErlDrvTermData* msg, int elements); /** * Global instance of DB_ENV; only a single one exists per O/S process. @@ -98,7 +98,6 @@ static unsigned int G_DEADLOCK_CHECK_INTERVAL = 100; /* Milliseconds between che * Trickle writer for dirty pages. We run a single thread per VM to perform background * trickling of dirty pages to disk. G_TRICKLE_INTERVAL is the time between runs in seconds. */ -static ErlDrvTid G_TRICKLE_THREAD = 0; static unsigned int G_TRICKLE_ACTIVE = 1; static unsigned int G_TRICKLE_INTERVAL = 60 * 5; /* Seconds between trickle writes */ static unsigned int G_TRICKLE_PERCENTAGE = 50; /* Desired % of clean pages in cache */ @@ -265,10 +264,6 @@ DRIVER_INIT(bdberl_drv) erl_drv_thread_create("bdberl_drv_deadlock_checker", &G_DEADLOCK_THREAD, &deadlock_check, 0, 0); - // Startup trickle write thread - erl_drv_thread_create("bdberl_drv_trickle_write", &G_TRICKLE_THREAD, - &trickle_write, 0, 0); - // Use the BDBERL_CHECKPOINT_TIME environment value to determine the // interval between transaction checkpoints. Defaults to 1 hour. char* cp_int_str = getenv("BDBERL_CHECKPOINT_TIME"); /* TODO: Use erl_drv_getenv */ @@ -282,8 +277,8 @@ DRIVER_INIT(bdberl_drv) } // Startup checkpoint thread - erl_drv_thread_create("bdberl_drv_txn_checkpoint", &G_CHECKPOINT_THREAD, - &txn_checkpoint, 0, 0); + erl_drv_thread_create("bdberl_drv_checkpointer", &G_CHECKPOINT_THREAD, + &checkpointer, 0, 0); // Startup our thread pools // TODO: Make configurable/adjustable @@ -415,9 +410,6 @@ static void bdberl_drv_finish() close(G_BDBERL_PIPE[1]); G_BDBERL_PIPE[1] = -1; - // Wait for the trickle write thread to shutdown - erl_drv_thread_join(G_TRICKLE_THREAD, 0); - // Wait for the deadlock checker to shutdown -- then wait for it erl_drv_thread_join(G_DEADLOCK_THREAD, 0); @@ -1536,82 +1528,90 @@ static void* deadlock_check(void* arg) } /** - * Thread function that trickle writes dirty pages to disk + * Thread function that does trickle writes or checkpointing at fixed intervals. */ -static void* trickle_write(void* arg) +static void* checkpointer(void* arg) { - while(G_TRICKLE_ACTIVE) - { - // Enough time has passed -- time to run the trickle operation again - int pages_wrote = 0; - int rc = G_DB_ENV->memp_trickle(G_DB_ENV, G_TRICKLE_PERCENTAGE, &pages_wrote); - if (0 != rc) - { - DBG("memp_trickle returned %s(%d)\n", db_strerror(rc), rc); - } - DBG("Wrote %d pages to achieve %d trickle\r\n", pages_wrote, G_TRICKLE_PERCENTAGE); + time_t last_checkpoint_time = time(0); + time_t last_trickle_time = time(0); - util_thread_usleep(G_TRICKLE_INTERVAL * 1000000); - } - - DBG("Trickle writer exiting.\r\n"); - return 0; -} - -/** - * Thread function that flushes transaction logs to the backing store - */ -static void* txn_checkpoint(void* arg) -{ - DBG("Checkpoint interval: %d seconds\r\n", G_CHECKPOINT_INTERVAL); + printf("Starting checkpointer\n"); while (G_CHECKPOINT_ACTIVE) { - int rc = G_DB_ENV->txn_checkpoint(G_DB_ENV, 0, 0, 0); - if (0 != rc) - { - DBG("txn_checkpoint returned %s(%d)\n", db_strerror(rc), rc); - } - - rc = G_DB_ENV->log_archive(G_DB_ENV, NULL, DB_ARCH_REMOVE); - if (0 != rc) - { - DBG("log_archive returned %s(%d)\n", db_strerror(rc), rc); - } + time_t now = time(0); + if (now - last_checkpoint_time > G_CHECKPOINT_INTERVAL) + { + // Time to checkpoint and cleanup log files + int checkpoint_rc = G_DB_ENV->txn_checkpoint(G_DB_ENV, 0, 0, 0); -#ifdef DEBUG - time_t tm = time(NULL); - printf("Transaction checkpoint complete at %s\r\n", ctime(&tm)); -#endif - util_thread_usleep(G_CHECKPOINT_INTERVAL * 1000000); + // Mark the time before starting log_archive so we can know how long it took + time_t log_now = time(0); + int log_rc = G_DB_ENV->log_archive(G_DB_ENV, NULL, DB_ARCH_REMOVE); + time_t finish_now = time(0); + + // Bundle up the results and elapsed time into a message for the logger + ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("bdb_checkpoint_stats"), + ERL_DRV_UINT, log_now - now, /* Elapsed seconds for checkpoint */ + ERL_DRV_UINT, finish_now - log_now, /* Elapsed seconds for log_archive */ + ERL_DRV_INT, checkpoint_rc, /* Return code of checkpoint */ + ERL_DRV_INT, log_rc, /* Return code of log_archive */ + ERL_DRV_TUPLE, 5}; + send_log_message(response, sizeof(response)); + + // Note the time of this checkpoint completion + last_checkpoint_time = finish_now; + } + else if (now - last_trickle_time > G_TRICKLE_INTERVAL) + { + // Time to run the trickle operation again + int pages_wrote = 0; + int rc = G_DB_ENV->memp_trickle(G_DB_ENV, G_TRICKLE_PERCENTAGE, &pages_wrote); + time_t finish_now = time(0); + + // Bundle up the results and elapsed time into a message for the logger + ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("bdb_trickle_stats"), + ERL_DRV_UINT, finish_now - now, /* Elapsed seconds for trickle */ + ERL_DRV_UINT, pages_wrote, /* Number of pages flushed */ + ERL_DRV_INT, rc, /* Return code of checkpoint */ + ERL_DRV_TUPLE, 4}; + send_log_message(response, sizeof(response)); + + // Note the time of this trickle completion + last_trickle_time = finish_now; + } + + // Always sleep for one second + util_thread_usleep(1000000); } - DBG("Checkpointer exiting.\r\n"); return 0; } static void bdb_errcall(const DB_ENV* dbenv, const char* errpfx, const char* msg) { - READ_LOCK(G_LOG_RWLOCK); - if (G_LOG_PORT) - { - ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("bdb_error_log"), - ERL_DRV_STRING, (ErlDrvTermData)msg, (ErlDrvUInt)strlen(msg), - ERL_DRV_TUPLE, 2}; - driver_send_term(G_LOG_PORT, G_LOG_PID, response, sizeof(response) / sizeof(response[0])); - } - READ_UNLOCK(G_LOG_RWLOCK); + ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("bdb_error_log"), + ERL_DRV_STRING, (ErlDrvTermData)msg, (ErlDrvUInt)strlen(msg), + ERL_DRV_TUPLE, 2}; + send_log_message(response, sizeof(response)); } static void bdb_msgcall(const DB_ENV* dbenv, const char* msg) { - READ_LOCK(G_LOG_RWLOCK); + ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("bdb_info_log"), + ERL_DRV_STRING, (ErlDrvTermData)msg, (ErlDrvUInt)strlen(msg), + ERL_DRV_TUPLE, 2}; + send_log_message(response, sizeof(response)); +} + +static void send_log_message(ErlDrvTermData* msg, int elements) +{ if (G_LOG_PORT) { - ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("bdb_info_log"), - ERL_DRV_STRING, (ErlDrvTermData)msg, (ErlDrvUInt)strlen(msg), - ERL_DRV_TUPLE, 2}; - driver_send_term(G_LOG_PORT, G_LOG_PID, response, sizeof(response) / sizeof(response[0])); + READ_LOCK(G_LOG_RWLOCK); + driver_send_term(G_LOG_PORT, G_LOG_PID, msg, elements / sizeof(msg[0])); + READ_UNLOCK(G_LOG_RWLOCK); } - READ_UNLOCK(G_LOG_RWLOCK); } + + diff --git a/src/bdberl_logger.erl b/src/bdberl_logger.erl index aae46c2..a6a87aa 100644 --- a/src/bdberl_logger.erl +++ b/src/bdberl_logger.erl @@ -18,6 +18,12 @@ -record(state, {}). +%% Macro for setting a counter +-define(SNMP_SET(Key, Value), (snmp_generic:variable_set({Key, volatile}, Value))). + +%% Macro for incrementing a counter +-define(SNMP_INC(Key), (snmp_generic:variable_inc({Key, volatile}, 1))). + %% ==================================================================== %% API %% ==================================================================== @@ -32,7 +38,16 @@ start_link() -> init([]) -> %% Start up the logger -- automatically initializes a port for this %% PID. - bdberl:register_logger(), + ok = bdberl:register_logger(), + + %% If SNMP is available, load our MIBs + case is_snmp_running() of + true -> + load_mibs(['BDBERL-MIB']); + false -> + error_logger:warn_msg("SNMP is not running; bdberl Stats will not be published.\n") + end, + {ok, #state{}}. handle_call(_Request, _From, State) -> @@ -47,6 +62,39 @@ handle_info({bdb_error_log, Msg}, State) -> handle_info({bdb_info_log, Msg}, State) -> error_logger:info_msg("BDB Info: ~s\n", [Msg]), + {noreply, State}; + +handle_info({bdb_checkpoint_stats, CheckpointSecs, ArchiveSecs, 0, 0}, State) -> + case is_snmp_running() of + true -> + ?SNMP_INC(bdbCheckpoints), + ?SNMP_SET(bdbCheckpointRuntimeSecs, CheckpointSecs), + ?SNMP_SET(bdbArchiveRuntimeSecs, ArchiveSecs); + false -> + ok + end, + {noreply, State}; + +handle_info({bdb_checkpoint_stats, _CheckpointSecs, _ArchiveSecs, CheckpointRc, ArchiveRc}, State) -> + error_logger:error_msg("BDB Checkpoint error: ~w ~w\n", [CheckpointRc, ArchiveRc]), + {noreply, State}; + +handle_info({bdb_trickle_stats, ElapsedSecs, Pages, 0}, State) -> + case is_snmp_running() of + true -> + ?SNMP_INC(bdbTrickleWrites), + ?SNMP_SET(bdbTrickleRuntimeSecs, ElapsedSecs), + ?SNMP_SET(bdbTricklePages, Pages); + false -> + ok + end, + {noreply, State}; +handle_info({bdb_trickle_stats, _ElapsedSecs, _Pages, Rc}, State) -> + error_logger:error_msg("BDB Trickle Write error: ~w\n", [Rc]), + {noreply, State}; + +handle_info(Msg, State) -> + io:format("Unexpected message: ~p\n", [Msg]), {noreply, State}. terminate(_Reason, _State) -> @@ -55,3 +103,26 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. + + +%% ==================================================================== +%% Internal functions +%% ==================================================================== + +is_snmp_running() -> + whereis(snmpa_local_db) /= undefined. + +%% +%% Take a list of MIB atoms and load them from priv/ directory, if they aren't already loaded +%% +load_mibs([]) -> + ok; +load_mibs([Mib | Rest]) -> + MibFile = filename:join([code:priv_dir(bdberl), "mibs", lists:concat([Mib, ".bin"])]), + case snmpa:whereis_mib(Mib) of + {ok, _} -> + load_mibs(Rest); + {error, not_found} -> + ok = snmpa:load_mibs([MibFile]), + load_mibs(Rest) + end.