From 09081f000bd9efa38620ceefd6f548272d18919b Mon Sep 17 00:00:00 2001 From: Jon Meredith Date: Wed, 11 Feb 2009 11:22:44 -0700 Subject: [PATCH] Changed the utility thread sleep function to use a custom sleep based on pipe/select so they can be terminated quickly when the driver shuts down. --- c_src/bdberl_drv.c | 127 +++++++++++++++++++++++++++++++++------------ 1 file changed, 95 insertions(+), 32 deletions(-) diff --git a/c_src/bdberl_drv.c b/c_src/bdberl_drv.c index 17e0d9b..a560534 100644 --- a/c_src/bdberl_drv.c +++ b/c_src/bdberl_drv.c @@ -11,6 +11,10 @@ #include #include #include +#include +#include +#include +#include #include "hive_hash.h" #include "bdberl_drv.h" @@ -19,7 +23,7 @@ /** * Function prototypes */ -static int open_database(const char* name, DBTYPE type, unsigned flags, PortData* data, int* errno); +static int open_database(const char* name, DBTYPE type, unsigned int flags, PortData* data, int* dbref_res); static int close_database(int dbref, unsigned flags, PortData* data); static int delete_database(const char* name); @@ -106,6 +110,12 @@ static ErlDrvTid G_CHECKPOINT_THREAD = 0; static unsigned int G_CHECKPOINT_ACTIVE = 1; static unsigned int G_CHECKPOINT_INTERVAL = 60 * 60; /* Seconds between checkpoints */ +/** + * Pipe to used to wake up the various monitors. Instead of just sleeping + * they wait for an exceptional condition on the read fd of the pipe. When it is time to + * shutdown, the driver closes the write fd and waits for the threads to be joined. + */ +static int G_BDBERL_PIPE[2] = {-1, -1}; /** * @@ -188,6 +198,9 @@ DRIVER_INIT(bdberl_drv) if (G_DB_ENV_ERROR == 0) { + // Pipe for signalling the utility threads all is over. + assert(0 == pipe(G_BDBERL_PIPE)); + // Use the BDBERL_MAX_DBS environment value to determine the max # of // databases to permit the VM to open at once. Defaults to 1024. G_DATABASES_SIZE = 1024; @@ -336,6 +349,10 @@ static void bdberl_drv_finish() bdberl_tpool_stop(G_TPOOL_GENERAL); bdberl_tpool_stop(G_TPOOL_TXNS); + // Close the writer fd on the pipe to signal finish to the utility threads + close(G_BDBERL_PIPE[1]); + G_BDBERL_PIPE[1] = -1; + // Signal the trickle write thread to shutdown G_TRICKLE_ACTIVE = 0; erl_drv_thread_join(G_TRICKLE_THREAD, 0); @@ -348,6 +365,10 @@ static void bdberl_drv_finish() G_CHECKPOINT_ACTIVE = 0; erl_drv_thread_join(G_CHECKPOINT_THREAD, 0); + // Close the reader fd on the pipe now utility threads are closed + close(G_BDBERL_PIPE[0]); + G_BDBERL_PIPE[0] = -1; + // Cleanup and shut down the BDB environment. Note that we assume // all ports have been released and thuse all databases/txns/etc are also gone. G_DB_ENV->close(G_DB_ENV, 0); @@ -1101,6 +1122,8 @@ static void do_async_cursor_get(void* arg) // the transaction if (rc && rc != DB_NOTFOUND) { + DBG("cursor flags=%d rc=%d\n", flags, rc); + d->cursor->close(d->cursor); d->cursor = 0; if (d->txn) @@ -1349,6 +1372,70 @@ static int alloc_dbref() } +/** + * Utility thread sleep - returns true if being signalled to exit + * otherwise false if timeout exceeded. + */ +int util_thread_usleep(unsigned int usecs) +{ + fd_set fds; + struct timeval sleep_until; + struct timeval sleep_for; + struct timeval now; + struct timeval tv; + int done; + int nfds = (G_BDBERL_PIPE[0] > G_BDBERL_PIPE[1] ? G_BDBERL_PIPE[0] : G_BDBERL_PIPE[1]) + 1; + + memset(&sleep_for, 0, sizeof(sleep_for)); + sleep_for.tv_sec = usecs / 1000000; + sleep_for.tv_usec = usecs % 1000000; + + gettimeofday(&now, NULL); + timeradd(&now, &sleep_for, &sleep_until); + + do + { + FD_ZERO(&fds); + FD_SET(G_BDBERL_PIPE[0], &fds); // read fd of pipe + + // Check if we have slept long enough + gettimeofday(&now, NULL); + if (timercmp(&now, &sleep_until, >)) + { + done = 1; + } + else // take a nap + { + // work out the remaining time to sleep on the fd for - make sure that this time + // is less than or equal to the original sleep time requested, just in + // case the system time is being adjusted. If the adjustment would result + // in a longer wait then cap it at the sleep_for time. + timersub(&sleep_until, &now, &tv); + if (timercmp(&tv, &sleep_for, >)) + { + memcpy(&tv, &sleep_for, sizeof(tv)); + } + + done = 1; + if (-1 == select(nfds, &fds, NULL, NULL, &tv)) + { + if (EINTR == errno) // a signal woke up select, back to sleep for us + { + done = 0; + } + // any other signals can return to the caller to fail safe as it + // doesn't matter if the util threads get woken up more often + } + else if (FD_ISSET(G_BDBERL_PIPE[0], &fds)) + { + done = 1; + } + } + } while (!done); + + return FD_ISSET(G_BDBERL_PIPE[0], &fds); +} + /** * Thread function that runs the deadlock checker periodically */ @@ -1364,8 +1451,7 @@ static void* deadlock_check(void* arg) DBG("Rejected deadlocks: %d\r\n", count); } - // TODO: Use nanosleep - usleep(G_DEADLOCK_CHECK_INTERVAL * 1000); + util_thread_usleep(G_DEADLOCK_CHECK_INTERVAL * 1000); } DBG("Deadlock checker exiting.\r\n"); @@ -1377,25 +1463,14 @@ static void* deadlock_check(void* arg) */ static void* trickle_write(void* arg) { - int elapsed_secs = 0; while(G_TRICKLE_ACTIVE) { - if (elapsed_secs == G_TRICKLE_INTERVAL) - { - // Enough time has passed -- time to run the trickle operation again - int pages_wrote = 0; - G_DB_ENV->memp_trickle(G_DB_ENV, G_TRICKLE_PERCENTAGE, &pages_wrote); - DBG("Wrote %d pages to achieve %d trickle\r\n", pages_wrote, G_TRICKLE_PERCENTAGE); + // Enough time has passed -- time to run the trickle operation again + int pages_wrote = 0; + G_DB_ENV->memp_trickle(G_DB_ENV, G_TRICKLE_PERCENTAGE, &pages_wrote); + DBG("Wrote %d pages to achieve %d trickle\r\n", pages_wrote, G_TRICKLE_PERCENTAGE); - // Reset the counter - elapsed_secs = 0; - } - else - { - // TODO: Use nanosleep - usleep(1000 * 1000); /* Sleep for 1 second */ - elapsed_secs++; - } + util_thread_usleep(G_TRICKLE_INTERVAL * 1000000); } DBG("Trickle writer exiting.\r\n"); @@ -1409,11 +1484,8 @@ static void* txn_checkpoint(void* arg) { DBG("Checkpoint interval: %d seconds\r\n", G_CHECKPOINT_INTERVAL); - int elapsed_secs = 0; while (G_CHECKPOINT_ACTIVE) { - if (elapsed_secs == G_CHECKPOINT_INTERVAL) - { G_DB_ENV->txn_checkpoint(G_DB_ENV, 0, 0, 0); G_DB_ENV->log_archive(G_DB_ENV, NULL, DB_ARCH_REMOVE); @@ -1421,16 +1493,7 @@ static void* txn_checkpoint(void* arg) time_t tm = time(NULL); printf("Transaction checkpoint complete at %s\r\n", ctime(&tm)); #endif - - // Reset the counter - elapsed_secs = 0; - } - else - { - // TODO: Use nanosleep - usleep(1000 * 1000); /* Sleep for 1 second */ - elapsed_secs++; - } + util_thread_usleep(G_CHECKPOINT_INTERVAL * 1000000); } DBG("Checkpointer exiting.\r\n");