Rework trickle/checkpoint to use same thread and avoid tromping on each other; add SNMP reporting to the logger

This commit is contained in:
Dave Smith 2009-04-24 09:33:01 -06:00
parent 7bad64484d
commit 41f7213235
2 changed files with 141 additions and 70 deletions

View file

@ -47,11 +47,11 @@ static int alloc_dbref();
static void* zalloc(unsigned int size);
static void* deadlock_check(void* arg);
static void* trickle_write(void* arg);
static void* txn_checkpoint(void* arg);
static void* checkpointer(void* arg);
static void bdb_errcall(const DB_ENV* dbenv, const char* errpfx, const char* msg);
static void bdb_msgcall(const DB_ENV* dbenv, const char* msg);
static void send_log_message(ErlDrvTermData* msg, int elements);
/**
* Global instance of DB_ENV; only a single one exists per O/S process.
@ -98,7 +98,6 @@ static unsigned int G_DEADLOCK_CHECK_INTERVAL = 100; /* Milliseconds between che
* Trickle writer for dirty pages. We run a single thread per VM to perform background
* trickling of dirty pages to disk. G_TRICKLE_INTERVAL is the time between runs in seconds.
*/
static ErlDrvTid G_TRICKLE_THREAD = 0;
static unsigned int G_TRICKLE_ACTIVE = 1;
static unsigned int G_TRICKLE_INTERVAL = 60 * 5; /* Seconds between trickle writes */
static unsigned int G_TRICKLE_PERCENTAGE = 50; /* Desired % of clean pages in cache */
@ -265,10 +264,6 @@ DRIVER_INIT(bdberl_drv)
erl_drv_thread_create("bdberl_drv_deadlock_checker", &G_DEADLOCK_THREAD,
&deadlock_check, 0, 0);
// Startup trickle write thread
erl_drv_thread_create("bdberl_drv_trickle_write", &G_TRICKLE_THREAD,
&trickle_write, 0, 0);
// Use the BDBERL_CHECKPOINT_TIME environment value to determine the
// interval between transaction checkpoints. Defaults to 1 hour.
char* cp_int_str = getenv("BDBERL_CHECKPOINT_TIME"); /* TODO: Use erl_drv_getenv */
@ -282,8 +277,8 @@ DRIVER_INIT(bdberl_drv)
}
// Startup checkpoint thread
erl_drv_thread_create("bdberl_drv_txn_checkpoint", &G_CHECKPOINT_THREAD,
&txn_checkpoint, 0, 0);
erl_drv_thread_create("bdberl_drv_checkpointer", &G_CHECKPOINT_THREAD,
&checkpointer, 0, 0);
// Startup our thread pools
// TODO: Make configurable/adjustable
@ -415,9 +410,6 @@ static void bdberl_drv_finish()
close(G_BDBERL_PIPE[1]);
G_BDBERL_PIPE[1] = -1;
// Wait for the trickle write thread to shutdown
erl_drv_thread_join(G_TRICKLE_THREAD, 0);
// Wait for the deadlock checker to shutdown -- then wait for it
erl_drv_thread_join(G_DEADLOCK_THREAD, 0);
@ -1536,82 +1528,90 @@ static void* deadlock_check(void* arg)
}
/**
* Thread function that trickle writes dirty pages to disk
* Thread function that does trickle writes or checkpointing at fixed intervals.
*/
static void* trickle_write(void* arg)
static void* checkpointer(void* arg)
{
while(G_TRICKLE_ACTIVE)
{
// Enough time has passed -- time to run the trickle operation again
int pages_wrote = 0;
int rc = G_DB_ENV->memp_trickle(G_DB_ENV, G_TRICKLE_PERCENTAGE, &pages_wrote);
if (0 != rc)
{
DBG("memp_trickle returned %s(%d)\n", db_strerror(rc), rc);
}
DBG("Wrote %d pages to achieve %d trickle\r\n", pages_wrote, G_TRICKLE_PERCENTAGE);
time_t last_checkpoint_time = time(0);
time_t last_trickle_time = time(0);
util_thread_usleep(G_TRICKLE_INTERVAL * 1000000);
}
DBG("Trickle writer exiting.\r\n");
return 0;
}
/**
* Thread function that flushes transaction logs to the backing store
*/
static void* txn_checkpoint(void* arg)
{
DBG("Checkpoint interval: %d seconds\r\n", G_CHECKPOINT_INTERVAL);
printf("Starting checkpointer\n");
while (G_CHECKPOINT_ACTIVE)
{
int rc = G_DB_ENV->txn_checkpoint(G_DB_ENV, 0, 0, 0);
if (0 != rc)
{
DBG("txn_checkpoint returned %s(%d)\n", db_strerror(rc), rc);
}
rc = G_DB_ENV->log_archive(G_DB_ENV, NULL, DB_ARCH_REMOVE);
if (0 != rc)
{
DBG("log_archive returned %s(%d)\n", db_strerror(rc), rc);
}
time_t now = time(0);
if (now - last_checkpoint_time > G_CHECKPOINT_INTERVAL)
{
// Time to checkpoint and cleanup log files
int checkpoint_rc = G_DB_ENV->txn_checkpoint(G_DB_ENV, 0, 0, 0);
#ifdef DEBUG
time_t tm = time(NULL);
printf("Transaction checkpoint complete at %s\r\n", ctime(&tm));
#endif
util_thread_usleep(G_CHECKPOINT_INTERVAL * 1000000);
// Mark the time before starting log_archive so we can know how long it took
time_t log_now = time(0);
int log_rc = G_DB_ENV->log_archive(G_DB_ENV, NULL, DB_ARCH_REMOVE);
time_t finish_now = time(0);
// Bundle up the results and elapsed time into a message for the logger
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("bdb_checkpoint_stats"),
ERL_DRV_UINT, log_now - now, /* Elapsed seconds for checkpoint */
ERL_DRV_UINT, finish_now - log_now, /* Elapsed seconds for log_archive */
ERL_DRV_INT, checkpoint_rc, /* Return code of checkpoint */
ERL_DRV_INT, log_rc, /* Return code of log_archive */
ERL_DRV_TUPLE, 5};
send_log_message(response, sizeof(response));
// Note the time of this checkpoint completion
last_checkpoint_time = finish_now;
}
else if (now - last_trickle_time > G_TRICKLE_INTERVAL)
{
// Time to run the trickle operation again
int pages_wrote = 0;
int rc = G_DB_ENV->memp_trickle(G_DB_ENV, G_TRICKLE_PERCENTAGE, &pages_wrote);
time_t finish_now = time(0);
// Bundle up the results and elapsed time into a message for the logger
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("bdb_trickle_stats"),
ERL_DRV_UINT, finish_now - now, /* Elapsed seconds for trickle */
ERL_DRV_UINT, pages_wrote, /* Number of pages flushed */
ERL_DRV_INT, rc, /* Return code of checkpoint */
ERL_DRV_TUPLE, 4};
send_log_message(response, sizeof(response));
// Note the time of this trickle completion
last_trickle_time = finish_now;
}
// Always sleep for one second
util_thread_usleep(1000000);
}
DBG("Checkpointer exiting.\r\n");
return 0;
}
static void bdb_errcall(const DB_ENV* dbenv, const char* errpfx, const char* msg)
{
READ_LOCK(G_LOG_RWLOCK);
if (G_LOG_PORT)
{
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("bdb_error_log"),
ERL_DRV_STRING, (ErlDrvTermData)msg, (ErlDrvUInt)strlen(msg),
ERL_DRV_TUPLE, 2};
driver_send_term(G_LOG_PORT, G_LOG_PID, response, sizeof(response) / sizeof(response[0]));
}
READ_UNLOCK(G_LOG_RWLOCK);
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("bdb_error_log"),
ERL_DRV_STRING, (ErlDrvTermData)msg, (ErlDrvUInt)strlen(msg),
ERL_DRV_TUPLE, 2};
send_log_message(response, sizeof(response));
}
static void bdb_msgcall(const DB_ENV* dbenv, const char* msg)
{
READ_LOCK(G_LOG_RWLOCK);
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("bdb_info_log"),
ERL_DRV_STRING, (ErlDrvTermData)msg, (ErlDrvUInt)strlen(msg),
ERL_DRV_TUPLE, 2};
send_log_message(response, sizeof(response));
}
static void send_log_message(ErlDrvTermData* msg, int elements)
{
if (G_LOG_PORT)
{
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("bdb_info_log"),
ERL_DRV_STRING, (ErlDrvTermData)msg, (ErlDrvUInt)strlen(msg),
ERL_DRV_TUPLE, 2};
driver_send_term(G_LOG_PORT, G_LOG_PID, response, sizeof(response) / sizeof(response[0]));
READ_LOCK(G_LOG_RWLOCK);
driver_send_term(G_LOG_PORT, G_LOG_PID, msg, elements / sizeof(msg[0]));
READ_UNLOCK(G_LOG_RWLOCK);
}
READ_UNLOCK(G_LOG_RWLOCK);
}

View file

@ -18,6 +18,12 @@
-record(state, {}).
%% Macro for setting a counter
-define(SNMP_SET(Key, Value), (snmp_generic:variable_set({Key, volatile}, Value))).
%% Macro for incrementing a counter
-define(SNMP_INC(Key), (snmp_generic:variable_inc({Key, volatile}, 1))).
%% ====================================================================
%% API
%% ====================================================================
@ -32,7 +38,16 @@ start_link() ->
init([]) ->
%% Start up the logger -- automatically initializes a port for this
%% PID.
bdberl:register_logger(),
ok = bdberl:register_logger(),
%% If SNMP is available, load our MIBs
case is_snmp_running() of
true ->
load_mibs(['BDBERL-MIB']);
false ->
error_logger:warn_msg("SNMP is not running; bdberl Stats will not be published.\n")
end,
{ok, #state{}}.
handle_call(_Request, _From, State) ->
@ -47,6 +62,39 @@ handle_info({bdb_error_log, Msg}, State) ->
handle_info({bdb_info_log, Msg}, State) ->
error_logger:info_msg("BDB Info: ~s\n", [Msg]),
{noreply, State};
handle_info({bdb_checkpoint_stats, CheckpointSecs, ArchiveSecs, 0, 0}, State) ->
case is_snmp_running() of
true ->
?SNMP_INC(bdbCheckpoints),
?SNMP_SET(bdbCheckpointRuntimeSecs, CheckpointSecs),
?SNMP_SET(bdbArchiveRuntimeSecs, ArchiveSecs);
false ->
ok
end,
{noreply, State};
handle_info({bdb_checkpoint_stats, _CheckpointSecs, _ArchiveSecs, CheckpointRc, ArchiveRc}, State) ->
error_logger:error_msg("BDB Checkpoint error: ~w ~w\n", [CheckpointRc, ArchiveRc]),
{noreply, State};
handle_info({bdb_trickle_stats, ElapsedSecs, Pages, 0}, State) ->
case is_snmp_running() of
true ->
?SNMP_INC(bdbTrickleWrites),
?SNMP_SET(bdbTrickleRuntimeSecs, ElapsedSecs),
?SNMP_SET(bdbTricklePages, Pages);
false ->
ok
end,
{noreply, State};
handle_info({bdb_trickle_stats, _ElapsedSecs, _Pages, Rc}, State) ->
error_logger:error_msg("BDB Trickle Write error: ~w\n", [Rc]),
{noreply, State};
handle_info(Msg, State) ->
io:format("Unexpected message: ~p\n", [Msg]),
{noreply, State}.
terminate(_Reason, _State) ->
@ -55,3 +103,26 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% ====================================================================
%% Internal functions
%% ====================================================================
is_snmp_running() ->
whereis(snmpa_local_db) /= undefined.
%%
%% Take a list of MIB atoms and load them from priv/ directory, if they aren't already loaded
%%
load_mibs([]) ->
ok;
load_mibs([Mib | Rest]) ->
MibFile = filename:join([code:priv_dir(bdberl), "mibs", lists:concat([Mib, ".bin"])]),
case snmpa:whereis_mib(Mib) of
{ok, _} ->
load_mibs(Rest);
{error, not_found} ->
ok = snmpa:load_mibs([MibFile]),
load_mibs(Rest)
end.