From 75afb8502a645b45f10df94cfa41cd9066c315a0 Mon Sep 17 00:00:00 2001 From: Phillip Toland Date: Thu, 29 Jan 2009 11:27:37 -0600 Subject: [PATCH] Add background checkpoint thread. --- c_src/bdberl_drv.c | 57 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/c_src/bdberl_drv.c b/c_src/bdberl_drv.c index 961239d..770ac8f 100644 --- a/c_src/bdberl_drv.c +++ b/c_src/bdberl_drv.c @@ -43,6 +43,7 @@ static void* zalloc(unsigned int size); static void* deadlock_check(void* arg); static void* trickle_write(void* arg); +static void* txn_checkpoint(void* arg); static void send_ok_or_error(ErlDrvPort port, ErlDrvTermData pid, int rc); @@ -97,6 +98,16 @@ static unsigned int G_TRICKLE_INTERVAL = 60 * 15; /* Seconds between trickle w static unsigned int G_TRICKLE_PERCENTAGE = 10; /* Desired % of clean pages in cache */ +/** + * Transaction checkpoint monitor. We run a single thread per VM to flush transaction + * logs into the backing data store. G_CHECKPOINT_INTERVAL is the time between runs in seconds. + * TODO The interval should be configurable. + */ +static ErlDrvTid G_CHECKPOINT_THREAD; +static unsigned int G_CHECKPOINT_ACTIVE = 1; +static unsigned int G_CHECKPOINT_INTERVAL = 60 * 60; /* Seconds between checkpoints */ + + /** * */ @@ -197,6 +208,22 @@ DRIVER_INIT(bdberl_drv) erl_drv_thread_create("bdberl_drv_trickle_write", &G_TRICKLE_THREAD, &trickle_write, 0, 0); + // Use the BDBERL_CHECKPOINT_TIME environment value to determine the + // interval between transaction checkpoints. Defaults to 1 hour. + char* cp_int_str = getenv("BDBERL_CHECKPOINT_TIME"); /* TODO: Use erl_drv_getenv */ + if (cp_int_str != 0) + { + G_CHECKPOINT_INTERVAL = atoi(cp_int_str); + if (G_CHECKPOINT_INTERVAL <= 0) + { + G_CHECKPOINT_INTERVAL = 60 * 60; + } + } + + // Startup checkpoint thread + erl_drv_thread_create("bdberl_drv_txn_checkpoint", &G_CHECKPOINT_THREAD, + &txn_checkpoint, 0, 0); + // Startup our thread pools // TODO: Make configurable/adjustable G_TPOOL_GENERAL = bdberl_tpool_start(10); @@ -311,6 +338,10 @@ static void bdberl_drv_finish() G_DEADLOCK_CHECK_ACTIVE = 0; erl_drv_thread_join(G_DEADLOCK_THREAD, 0); + // Signal the checkpointer to shutdown -- then wait for it + G_CHECKPOINT_ACTIVE = 0; + erl_drv_thread_join(G_CHECKPOINT_THREAD, 0); + // Cleanup and shut down the BDB environment. Note that we assume // all ports have been released and thuse all databases/txns/etc are also gone. G_DB_ENV->close(G_DB_ENV, 0); @@ -1309,6 +1340,32 @@ static void* trickle_write(void* arg) return 0; } +/** + * Thread function that flushes transaction logs to the backing store + */ +static void* txn_checkpoint(void* arg) +{ + DBG("Checkpoint interval: %d seconds\n", G_CHECKPOINT_INTERVAL); + while (G_CHECKPOINT_ACTIVE) + { + int ret = 0; + if ((ret = G_DB_ENV->txn_checkpoint(G_DB_ENV, 0, 0, 0)) != 0) + { + G_DB_ENV->err(G_DB_ENV, ret, "checkpoint thread"); + } + +#ifdef DEBUG + time_t tm = time(NULL); + printf("Transaction checkpoint complete at %s\n", ctime(&tm)); +#endif + + sleep(G_CHECKPOINT_INTERVAL); + } + + DBG("Checkpointer exiting.\n"); + return 0; +} + static void send_ok_or_error(ErlDrvPort port, ErlDrvTermData pid, int rc) { if (rc == 0)