Changed the utility thread sleep function to use a custom sleep based on pipe/select
so they can be terminated quickly when the driver shuts down.
This commit is contained in:
parent
5ccd73c2ea
commit
09081f000b
1 changed files with 95 additions and 32 deletions
|
@ -11,6 +11,10 @@
|
|||
#include <stdint.h>
|
||||
#include <time.h>
|
||||
#include <fcntl.h>
|
||||
#include <unistd.h>
|
||||
#include <errno.h>
|
||||
#include <sys/time.h>
|
||||
#include <sys/select.h>
|
||||
|
||||
#include "hive_hash.h"
|
||||
#include "bdberl_drv.h"
|
||||
|
@ -19,7 +23,7 @@
|
|||
/**
|
||||
* Function prototypes
|
||||
*/
|
||||
static int open_database(const char* name, DBTYPE type, unsigned flags, PortData* data, int* errno);
|
||||
static int open_database(const char* name, DBTYPE type, unsigned int flags, PortData* data, int* dbref_res);
|
||||
static int close_database(int dbref, unsigned flags, PortData* data);
|
||||
static int delete_database(const char* name);
|
||||
|
||||
|
@ -106,6 +110,12 @@ static ErlDrvTid G_CHECKPOINT_THREAD = 0;
|
|||
static unsigned int G_CHECKPOINT_ACTIVE = 1;
|
||||
static unsigned int G_CHECKPOINT_INTERVAL = 60 * 60; /* Seconds between checkpoints */
|
||||
|
||||
/**
|
||||
* Pipe to used to wake up the various monitors. Instead of just sleeping
|
||||
* they wait for an exceptional condition on the read fd of the pipe. When it is time to
|
||||
* shutdown, the driver closes the write fd and waits for the threads to be joined.
|
||||
*/
|
||||
static int G_BDBERL_PIPE[2] = {-1, -1};
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -188,6 +198,9 @@ DRIVER_INIT(bdberl_drv)
|
|||
|
||||
if (G_DB_ENV_ERROR == 0)
|
||||
{
|
||||
// Pipe for signalling the utility threads all is over.
|
||||
assert(0 == pipe(G_BDBERL_PIPE));
|
||||
|
||||
// Use the BDBERL_MAX_DBS environment value to determine the max # of
|
||||
// databases to permit the VM to open at once. Defaults to 1024.
|
||||
G_DATABASES_SIZE = 1024;
|
||||
|
@ -336,6 +349,10 @@ static void bdberl_drv_finish()
|
|||
bdberl_tpool_stop(G_TPOOL_GENERAL);
|
||||
bdberl_tpool_stop(G_TPOOL_TXNS);
|
||||
|
||||
// Close the writer fd on the pipe to signal finish to the utility threads
|
||||
close(G_BDBERL_PIPE[1]);
|
||||
G_BDBERL_PIPE[1] = -1;
|
||||
|
||||
// Signal the trickle write thread to shutdown
|
||||
G_TRICKLE_ACTIVE = 0;
|
||||
erl_drv_thread_join(G_TRICKLE_THREAD, 0);
|
||||
|
@ -348,6 +365,10 @@ static void bdberl_drv_finish()
|
|||
G_CHECKPOINT_ACTIVE = 0;
|
||||
erl_drv_thread_join(G_CHECKPOINT_THREAD, 0);
|
||||
|
||||
// Close the reader fd on the pipe now utility threads are closed
|
||||
close(G_BDBERL_PIPE[0]);
|
||||
G_BDBERL_PIPE[0] = -1;
|
||||
|
||||
// Cleanup and shut down the BDB environment. Note that we assume
|
||||
// all ports have been released and thuse all databases/txns/etc are also gone.
|
||||
G_DB_ENV->close(G_DB_ENV, 0);
|
||||
|
@ -1101,6 +1122,8 @@ static void do_async_cursor_get(void* arg)
|
|||
// the transaction
|
||||
if (rc && rc != DB_NOTFOUND)
|
||||
{
|
||||
DBG("cursor flags=%d rc=%d\n", flags, rc);
|
||||
|
||||
d->cursor->close(d->cursor);
|
||||
d->cursor = 0;
|
||||
if (d->txn)
|
||||
|
@ -1349,6 +1372,70 @@ static int alloc_dbref()
|
|||
}
|
||||
|
||||
|
||||
/**
|
||||
* Utility thread sleep - returns true if being signalled to exit
|
||||
* otherwise false if timeout exceeded.
|
||||
*/
|
||||
int util_thread_usleep(unsigned int usecs)
|
||||
{
|
||||
fd_set fds;
|
||||
struct timeval sleep_until;
|
||||
struct timeval sleep_for;
|
||||
struct timeval now;
|
||||
struct timeval tv;
|
||||
int done;
|
||||
int nfds = (G_BDBERL_PIPE[0] > G_BDBERL_PIPE[1] ? G_BDBERL_PIPE[0] : G_BDBERL_PIPE[1]) + 1;
|
||||
|
||||
memset(&sleep_for, 0, sizeof(sleep_for));
|
||||
sleep_for.tv_sec = usecs / 1000000;
|
||||
sleep_for.tv_usec = usecs % 1000000;
|
||||
|
||||
gettimeofday(&now, NULL);
|
||||
timeradd(&now, &sleep_for, &sleep_until);
|
||||
|
||||
do
|
||||
{
|
||||
FD_ZERO(&fds);
|
||||
FD_SET(G_BDBERL_PIPE[0], &fds); // read fd of pipe
|
||||
|
||||
// Check if we have slept long enough
|
||||
gettimeofday(&now, NULL);
|
||||
if (timercmp(&now, &sleep_until, >))
|
||||
{
|
||||
done = 1;
|
||||
}
|
||||
else // take a nap
|
||||
{
|
||||
// work out the remaining time to sleep on the fd for - make sure that this time
|
||||
// is less than or equal to the original sleep time requested, just in
|
||||
// case the system time is being adjusted. If the adjustment would result
|
||||
// in a longer wait then cap it at the sleep_for time.
|
||||
timersub(&sleep_until, &now, &tv);
|
||||
if (timercmp(&tv, &sleep_for, >))
|
||||
{
|
||||
memcpy(&tv, &sleep_for, sizeof(tv));
|
||||
}
|
||||
|
||||
done = 1;
|
||||
if (-1 == select(nfds, &fds, NULL, NULL, &tv))
|
||||
{
|
||||
if (EINTR == errno) // a signal woke up select, back to sleep for us
|
||||
{
|
||||
done = 0;
|
||||
}
|
||||
// any other signals can return to the caller to fail safe as it
|
||||
// doesn't matter if the util threads get woken up more often
|
||||
}
|
||||
else if (FD_ISSET(G_BDBERL_PIPE[0], &fds))
|
||||
{
|
||||
done = 1;
|
||||
}
|
||||
}
|
||||
} while (!done);
|
||||
|
||||
return FD_ISSET(G_BDBERL_PIPE[0], &fds);
|
||||
}
|
||||
|
||||
/**
|
||||
* Thread function that runs the deadlock checker periodically
|
||||
*/
|
||||
|
@ -1364,8 +1451,7 @@ static void* deadlock_check(void* arg)
|
|||
DBG("Rejected deadlocks: %d\r\n", count);
|
||||
}
|
||||
|
||||
// TODO: Use nanosleep
|
||||
usleep(G_DEADLOCK_CHECK_INTERVAL * 1000);
|
||||
util_thread_usleep(G_DEADLOCK_CHECK_INTERVAL * 1000);
|
||||
}
|
||||
|
||||
DBG("Deadlock checker exiting.\r\n");
|
||||
|
@ -1377,25 +1463,14 @@ static void* deadlock_check(void* arg)
|
|||
*/
|
||||
static void* trickle_write(void* arg)
|
||||
{
|
||||
int elapsed_secs = 0;
|
||||
while(G_TRICKLE_ACTIVE)
|
||||
{
|
||||
if (elapsed_secs == G_TRICKLE_INTERVAL)
|
||||
{
|
||||
// Enough time has passed -- time to run the trickle operation again
|
||||
int pages_wrote = 0;
|
||||
G_DB_ENV->memp_trickle(G_DB_ENV, G_TRICKLE_PERCENTAGE, &pages_wrote);
|
||||
DBG("Wrote %d pages to achieve %d trickle\r\n", pages_wrote, G_TRICKLE_PERCENTAGE);
|
||||
// Enough time has passed -- time to run the trickle operation again
|
||||
int pages_wrote = 0;
|
||||
G_DB_ENV->memp_trickle(G_DB_ENV, G_TRICKLE_PERCENTAGE, &pages_wrote);
|
||||
DBG("Wrote %d pages to achieve %d trickle\r\n", pages_wrote, G_TRICKLE_PERCENTAGE);
|
||||
|
||||
// Reset the counter
|
||||
elapsed_secs = 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
// TODO: Use nanosleep
|
||||
usleep(1000 * 1000); /* Sleep for 1 second */
|
||||
elapsed_secs++;
|
||||
}
|
||||
util_thread_usleep(G_TRICKLE_INTERVAL * 1000000);
|
||||
}
|
||||
|
||||
DBG("Trickle writer exiting.\r\n");
|
||||
|
@ -1409,11 +1484,8 @@ static void* txn_checkpoint(void* arg)
|
|||
{
|
||||
DBG("Checkpoint interval: %d seconds\r\n", G_CHECKPOINT_INTERVAL);
|
||||
|
||||
int elapsed_secs = 0;
|
||||
while (G_CHECKPOINT_ACTIVE)
|
||||
{
|
||||
if (elapsed_secs == G_CHECKPOINT_INTERVAL)
|
||||
{
|
||||
G_DB_ENV->txn_checkpoint(G_DB_ENV, 0, 0, 0);
|
||||
G_DB_ENV->log_archive(G_DB_ENV, NULL, DB_ARCH_REMOVE);
|
||||
|
||||
|
@ -1421,16 +1493,7 @@ static void* txn_checkpoint(void* arg)
|
|||
time_t tm = time(NULL);
|
||||
printf("Transaction checkpoint complete at %s\r\n", ctime(&tm));
|
||||
#endif
|
||||
|
||||
// Reset the counter
|
||||
elapsed_secs = 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
// TODO: Use nanosleep
|
||||
usleep(1000 * 1000); /* Sleep for 1 second */
|
||||
elapsed_secs++;
|
||||
}
|
||||
util_thread_usleep(G_CHECKPOINT_INTERVAL * 1000000);
|
||||
}
|
||||
|
||||
DBG("Checkpointer exiting.\r\n");
|
||||
|
|
Loading…
Reference in a new issue