Add background checkpoint thread.
This commit is contained in:
parent
068ddec7b2
commit
75afb8502a
1 changed files with 57 additions and 0 deletions
|
@ -43,6 +43,7 @@ static void* zalloc(unsigned int size);
|
||||||
|
|
||||||
static void* deadlock_check(void* arg);
|
static void* deadlock_check(void* arg);
|
||||||
static void* trickle_write(void* arg);
|
static void* trickle_write(void* arg);
|
||||||
|
static void* txn_checkpoint(void* arg);
|
||||||
|
|
||||||
static void send_ok_or_error(ErlDrvPort port, ErlDrvTermData pid, int rc);
|
static void send_ok_or_error(ErlDrvPort port, ErlDrvTermData pid, int rc);
|
||||||
|
|
||||||
|
@ -97,6 +98,16 @@ static unsigned int G_TRICKLE_INTERVAL = 60 * 15; /* Seconds between trickle w
|
||||||
static unsigned int G_TRICKLE_PERCENTAGE = 10; /* Desired % of clean pages in cache */
|
static unsigned int G_TRICKLE_PERCENTAGE = 10; /* Desired % of clean pages in cache */
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Transaction checkpoint monitor. We run a single thread per VM to flush transaction
|
||||||
|
* logs into the backing data store. G_CHECKPOINT_INTERVAL is the time between runs in seconds.
|
||||||
|
* TODO The interval should be configurable.
|
||||||
|
*/
|
||||||
|
static ErlDrvTid G_CHECKPOINT_THREAD;
|
||||||
|
static unsigned int G_CHECKPOINT_ACTIVE = 1;
|
||||||
|
static unsigned int G_CHECKPOINT_INTERVAL = 60 * 60; /* Seconds between checkpoints */
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
|
@ -197,6 +208,22 @@ DRIVER_INIT(bdberl_drv)
|
||||||
erl_drv_thread_create("bdberl_drv_trickle_write", &G_TRICKLE_THREAD,
|
erl_drv_thread_create("bdberl_drv_trickle_write", &G_TRICKLE_THREAD,
|
||||||
&trickle_write, 0, 0);
|
&trickle_write, 0, 0);
|
||||||
|
|
||||||
|
// Use the BDBERL_CHECKPOINT_TIME environment value to determine the
|
||||||
|
// interval between transaction checkpoints. Defaults to 1 hour.
|
||||||
|
char* cp_int_str = getenv("BDBERL_CHECKPOINT_TIME"); /* TODO: Use erl_drv_getenv */
|
||||||
|
if (cp_int_str != 0)
|
||||||
|
{
|
||||||
|
G_CHECKPOINT_INTERVAL = atoi(cp_int_str);
|
||||||
|
if (G_CHECKPOINT_INTERVAL <= 0)
|
||||||
|
{
|
||||||
|
G_CHECKPOINT_INTERVAL = 60 * 60;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Startup checkpoint thread
|
||||||
|
erl_drv_thread_create("bdberl_drv_txn_checkpoint", &G_CHECKPOINT_THREAD,
|
||||||
|
&txn_checkpoint, 0, 0);
|
||||||
|
|
||||||
// Startup our thread pools
|
// Startup our thread pools
|
||||||
// TODO: Make configurable/adjustable
|
// TODO: Make configurable/adjustable
|
||||||
G_TPOOL_GENERAL = bdberl_tpool_start(10);
|
G_TPOOL_GENERAL = bdberl_tpool_start(10);
|
||||||
|
@ -311,6 +338,10 @@ static void bdberl_drv_finish()
|
||||||
G_DEADLOCK_CHECK_ACTIVE = 0;
|
G_DEADLOCK_CHECK_ACTIVE = 0;
|
||||||
erl_drv_thread_join(G_DEADLOCK_THREAD, 0);
|
erl_drv_thread_join(G_DEADLOCK_THREAD, 0);
|
||||||
|
|
||||||
|
// Signal the checkpointer to shutdown -- then wait for it
|
||||||
|
G_CHECKPOINT_ACTIVE = 0;
|
||||||
|
erl_drv_thread_join(G_CHECKPOINT_THREAD, 0);
|
||||||
|
|
||||||
// Cleanup and shut down the BDB environment. Note that we assume
|
// Cleanup and shut down the BDB environment. Note that we assume
|
||||||
// all ports have been released and thuse all databases/txns/etc are also gone.
|
// all ports have been released and thuse all databases/txns/etc are also gone.
|
||||||
G_DB_ENV->close(G_DB_ENV, 0);
|
G_DB_ENV->close(G_DB_ENV, 0);
|
||||||
|
@ -1309,6 +1340,32 @@ static void* trickle_write(void* arg)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Thread function that flushes transaction logs to the backing store
|
||||||
|
*/
|
||||||
|
static void* txn_checkpoint(void* arg)
|
||||||
|
{
|
||||||
|
DBG("Checkpoint interval: %d seconds\n", G_CHECKPOINT_INTERVAL);
|
||||||
|
while (G_CHECKPOINT_ACTIVE)
|
||||||
|
{
|
||||||
|
int ret = 0;
|
||||||
|
if ((ret = G_DB_ENV->txn_checkpoint(G_DB_ENV, 0, 0, 0)) != 0)
|
||||||
|
{
|
||||||
|
G_DB_ENV->err(G_DB_ENV, ret, "checkpoint thread");
|
||||||
|
}
|
||||||
|
|
||||||
|
#ifdef DEBUG
|
||||||
|
time_t tm = time(NULL);
|
||||||
|
printf("Transaction checkpoint complete at %s\n", ctime(&tm));
|
||||||
|
#endif
|
||||||
|
|
||||||
|
sleep(G_CHECKPOINT_INTERVAL);
|
||||||
|
}
|
||||||
|
|
||||||
|
DBG("Checkpointer exiting.\n");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static void send_ok_or_error(ErlDrvPort port, ErlDrvTermData pid, int rc)
|
static void send_ok_or_error(ErlDrvPort port, ErlDrvTermData pid, int rc)
|
||||||
{
|
{
|
||||||
if (rc == 0)
|
if (rc == 0)
|
||||||
|
|
Loading…
Reference in a new issue