Interstitial commit
This commit is contained in:
parent
700e66ba9d
commit
8fc3cc3701
6 changed files with 214 additions and 92 deletions
|
@ -9,6 +9,8 @@
|
||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
#include <stdint.h>
|
#include <stdint.h>
|
||||||
|
#include <time.h>
|
||||||
|
#include <fcntl.h>
|
||||||
|
|
||||||
#include "hive_hash.h"
|
#include "hive_hash.h"
|
||||||
#include "bdberl_drv.h"
|
#include "bdberl_drv.h"
|
||||||
|
@ -21,9 +23,7 @@ static int open_database(const char* name, DBTYPE type, PortData* data, int* err
|
||||||
static int close_database(int dbref, PortData* data);
|
static int close_database(int dbref, PortData* data);
|
||||||
|
|
||||||
static void do_async_put(void* arg);
|
static void do_async_put(void* arg);
|
||||||
static void do_async_put_free(void* arg);
|
|
||||||
static void do_async_get(void* arg);
|
static void do_async_get(void* arg);
|
||||||
static void do_async_get_free(void* arg);
|
|
||||||
static void do_async_txnop(void* arg);
|
static void do_async_txnop(void* arg);
|
||||||
|
|
||||||
static int add_dbref(PortData* data, int dbref);
|
static int add_dbref(PortData* data, int dbref);
|
||||||
|
@ -37,6 +37,8 @@ static int alloc_dbref();
|
||||||
|
|
||||||
static void* zalloc(unsigned int size);
|
static void* zalloc(unsigned int size);
|
||||||
|
|
||||||
|
static void signal_port(PortData* d);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Global instance of DB_ENV; only a single one exists per O/S process.
|
* Global instance of DB_ENV; only a single one exists per O/S process.
|
||||||
*/
|
*/
|
||||||
|
@ -69,13 +71,19 @@ static ErlDrvRWLock* G_DATABASES_RWLOCK;
|
||||||
static hive_hash* G_DATABASES_NAMES;
|
static hive_hash* G_DATABASES_NAMES;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
static TPool* G_TPOOL_GENERAL;
|
||||||
|
static TPool* G_TPOOL_TXNS;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helpful macros
|
* Helpful macros
|
||||||
*/
|
*/
|
||||||
#define READ_LOCK(L) erl_drv_rwlock_rlock(L)
|
#define READ_LOCK(L) erl_drv_rwlock_rlock(L)
|
||||||
#define READ_UNLOCK(L) erl_drv_rwlock_runlock(L)
|
#define READ_UNLOCK(L) erl_drv_rwlock_runlock(L)
|
||||||
#define PROMOTE_READ_LOCK(L) { erl_drv_rwlock_runlock(L); erl_drv_rwlock_rwlock(L); }
|
#define PROMOTE_READ_LOCK(L) erl_drv_rwlock_runlock(L); erl_drv_rwlock_rwlock(L)
|
||||||
#define WRITE_LOCK(L) erl_drv_rwlock_rwlock(L)
|
#define WRITE_LOCK(L) erl_drv_rwlock_rwlock(L)
|
||||||
#define WRITE_UNLOCK(L) erl_drv_rwlock_rwunlock(L)
|
#define WRITE_UNLOCK(L) erl_drv_rwlock_rwunlock(L)
|
||||||
|
|
||||||
|
@ -120,13 +128,25 @@ DRIVER_INIT(bdberl_drv)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make sure we can distiguish between lock timeouts and deadlocks
|
// Make sure we can distiguish between lock timeouts and deadlocks
|
||||||
G_DB_ENV->set_flags(G_DB_ENV, DB_TIME_NOTGRANTED, 1);
|
int rc = G_DB_ENV->set_flags(G_DB_ENV, DB_TIME_NOTGRANTED, 1);
|
||||||
|
printf("TIME_NOT_GRANTED rc: %d\n", rc);
|
||||||
|
rc = G_DB_ENV->set_lk_detect(G_DB_ENV, 1);
|
||||||
|
printf("LOCK DETECT rc: %d\n", rc);
|
||||||
|
rc = G_DB_ENV->set_timeout(G_DB_ENV, 50000000, DB_SET_TXN_TIMEOUT);
|
||||||
|
printf("DB_SET_TXN_TIMEOUT rc: %d\n", rc);
|
||||||
|
rc = G_DB_ENV->set_timeout(G_DB_ENV, 50000000, DB_SET_LOCK_TIMEOUT);
|
||||||
|
printf("DB_SET_LOCK_TIMEOUT rc: %d\n", rc);
|
||||||
|
|
||||||
// BDB is setup -- allocate structures for tracking databases
|
// BDB is setup -- allocate structures for tracking databases
|
||||||
G_DATABASES = (Database*) driver_alloc(sizeof(Database) * G_DATABASES_SIZE);
|
G_DATABASES = (Database*) driver_alloc(sizeof(Database) * G_DATABASES_SIZE);
|
||||||
memset(G_DATABASES, '\0', sizeof(Database) * G_DATABASES_SIZE);
|
memset(G_DATABASES, '\0', sizeof(Database) * G_DATABASES_SIZE);
|
||||||
G_DATABASES_RWLOCK = erl_drv_rwlock_create("bdberl_drv: G_DATABASES_RWLOCK");
|
G_DATABASES_RWLOCK = erl_drv_rwlock_create("bdberl_drv: G_DATABASES_RWLOCK");
|
||||||
G_DATABASES_NAMES = hive_hash_new(G_DATABASES_SIZE);
|
G_DATABASES_NAMES = hive_hash_new(G_DATABASES_SIZE);
|
||||||
|
|
||||||
|
// Startup our thread pools
|
||||||
|
// TODO: Make configurable/adjustable
|
||||||
|
G_TPOOL_GENERAL = bdberl_tpool_start(5);
|
||||||
|
G_TPOOL_TXNS = bdberl_tpool_start(5);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -155,6 +175,13 @@ static ErlDrvData bdberl_drv_start(ErlDrvPort port, char* buffer)
|
||||||
// Save handle to the port
|
// Save handle to the port
|
||||||
d->port = port;
|
d->port = port;
|
||||||
|
|
||||||
|
// Setup a pair of pipes for notification purposes
|
||||||
|
assert(pipe(d->pipe_fds) == 0);
|
||||||
|
|
||||||
|
// Make sure both pipes are configured non-blocking
|
||||||
|
assert(fcntl(d->pipe_fds[0], F_SETFL, O_NONBLOCK) == 0);
|
||||||
|
assert(fcntl(d->pipe_fds[1], F_SETFL, O_NONBLOCK) == 0);
|
||||||
|
|
||||||
// Make sure port is running in binary mode
|
// Make sure port is running in binary mode
|
||||||
set_port_control_flags(port, PORT_CONTROL_FLAG_BINARY);
|
set_port_control_flags(port, PORT_CONTROL_FLAG_BINARY);
|
||||||
|
|
||||||
|
@ -164,9 +191,17 @@ static ErlDrvData bdberl_drv_start(ErlDrvPort port, char* buffer)
|
||||||
static void bdberl_drv_stop(ErlDrvData handle)
|
static void bdberl_drv_stop(ErlDrvData handle)
|
||||||
{
|
{
|
||||||
PortData* d = (PortData*)handle;
|
PortData* d = (PortData*)handle;
|
||||||
|
printf("%p: stop\n", d->port);
|
||||||
|
|
||||||
// TODO: Investigate if we need to worry about queued up async operations. It seems like the VM
|
// If there is an async job pending, we need to cancel it. The cancel operation will
|
||||||
// SHOULD ensure the port doesn't disappear while something is in the queue...
|
// block until the job has either been removed or has run
|
||||||
|
if (d->async_job)
|
||||||
|
{
|
||||||
|
printf("Cancelling async job for port: %p\n", d->port);
|
||||||
|
bdberl_tpool_cancel(d->async_pool, d->async_job);
|
||||||
|
driver_select(d->port, (ErlDrvEvent)d->pipe_fds[0], DO_READ, 0);
|
||||||
|
printf("Canceled async job for port: %p\n", d->port);
|
||||||
|
}
|
||||||
|
|
||||||
// If a txn is currently active, terminate it. We _must_ do it synchronously (unfortunately) as
|
// If a txn is currently active, terminate it. We _must_ do it synchronously (unfortunately) as
|
||||||
// there doesn't seem to be a to do an async op while stopping the driver.
|
// there doesn't seem to be a to do an async op while stopping the driver.
|
||||||
|
@ -181,18 +216,30 @@ static void bdberl_drv_stop(ErlDrvData handle)
|
||||||
close_database(d->dbrefs->dbref, d);
|
close_database(d->dbrefs->dbref, d);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
close(d->pipe_fds[0]);
|
||||||
|
close(d->pipe_fds[1]);
|
||||||
|
printf("Stopped port: %p\n", d->port);
|
||||||
|
|
||||||
// Release the port instance data
|
// Release the port instance data
|
||||||
driver_free(handle);
|
driver_free(handle);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void bdberl_drv_finish()
|
static void bdberl_drv_finish()
|
||||||
{
|
{
|
||||||
// Driver is unloading -- cleanup and shut down the BDB environment. Note that we assume
|
printf("finish\n");
|
||||||
|
// Stop the thread pools
|
||||||
|
bdberl_tpool_stop(G_TPOOL_GENERAL);
|
||||||
|
bdberl_tpool_stop(G_TPOOL_TXNS);
|
||||||
|
|
||||||
|
// Cleanup and shut down the BDB environment. Note that we assume
|
||||||
// all ports have been released and thuse all databases/txns/etc are also gone.
|
// all ports have been released and thuse all databases/txns/etc are also gone.
|
||||||
G_DB_ENV->close(G_DB_ENV, 0);
|
G_DB_ENV->close(G_DB_ENV, 0);
|
||||||
driver_free(G_DATABASES);
|
driver_free(G_DATABASES);
|
||||||
erl_drv_rwlock_destroy(G_DATABASES_RWLOCK);
|
erl_drv_rwlock_destroy(G_DATABASES_RWLOCK);
|
||||||
hive_hash_destroy(G_DATABASES_NAMES);
|
hive_hash_destroy(G_DATABASES_NAMES);
|
||||||
|
|
||||||
|
printf("finished\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
|
static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
|
||||||
|
@ -243,7 +290,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
|
||||||
}
|
}
|
||||||
case CMD_TXN_BEGIN:
|
case CMD_TXN_BEGIN:
|
||||||
{
|
{
|
||||||
// If an async operaiton is pending, fail
|
// If an async operation is pending, fail
|
||||||
if (d->async_op != CMD_NONE)
|
if (d->async_op != CMD_NONE)
|
||||||
{
|
{
|
||||||
RETURN_INT(ERROR_ASYNC_PENDING, outbuf);
|
RETURN_INT(ERROR_ASYNC_PENDING, outbuf);
|
||||||
|
@ -252,11 +299,13 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
|
||||||
// If we already have a txn open, fail
|
// If we already have a txn open, fail
|
||||||
if (d->txn != 0)
|
if (d->txn != 0)
|
||||||
{
|
{
|
||||||
|
printf("ERROR_TXN_OPEN\n");
|
||||||
RETURN_INT(ERROR_TXN_OPEN, outbuf);
|
RETURN_INT(ERROR_TXN_OPEN, outbuf);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Outbuf is <<Rc:32>>
|
// Outbuf is <<Rc:32>>
|
||||||
int rc = G_DB_ENV->txn_begin(G_DB_ENV, 0, &(d->txn), 0);
|
int rc = G_DB_ENV->txn_begin(G_DB_ENV, 0, &(d->txn), 0);
|
||||||
|
printf("TXN BEGIN: %d\n", rc);
|
||||||
RETURN_INT(rc, outbuf);
|
RETURN_INT(rc, outbuf);
|
||||||
}
|
}
|
||||||
case CMD_TXN_COMMIT:
|
case CMD_TXN_COMMIT:
|
||||||
|
@ -282,8 +331,13 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
|
||||||
d->async_op = cmd;
|
d->async_op = cmd;
|
||||||
|
|
||||||
// Schedule async operation to execute the commit/abort
|
// Schedule async operation to execute the commit/abort
|
||||||
unsigned int key = (unsigned int)d->port;
|
d->async_data = adata;
|
||||||
driver_async(d->port, &key, &do_async_txnop, adata, 0);
|
d->async_pool = G_TPOOL_TXNS;
|
||||||
|
d->async_job = bdberl_tpool_run(G_TPOOL_TXNS, &do_async_txnop, adata, 0);
|
||||||
|
|
||||||
|
// Watch for events on the output pipe
|
||||||
|
// TODO: Can we do this just once ?!
|
||||||
|
driver_select(d->port, (ErlDrvEvent)d->pipe_fds[0], DO_READ, 1);
|
||||||
|
|
||||||
// Outbuf is <<Rc:32>>
|
// Outbuf is <<Rc:32>>
|
||||||
RETURN_INT(0, outbuf);
|
RETURN_INT(0, outbuf);
|
||||||
|
@ -317,16 +371,25 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
|
||||||
|
|
||||||
// Mark the port as busy and then schedule the appropriate async operation
|
// Mark the port as busy and then schedule the appropriate async operation
|
||||||
d->async_op = cmd;
|
d->async_op = cmd;
|
||||||
unsigned int key = (unsigned int)d->port;
|
|
||||||
|
TPoolJobFunc fn;
|
||||||
if (cmd == CMD_PUT)
|
if (cmd == CMD_PUT)
|
||||||
{
|
{
|
||||||
driver_async(d->port, &key, &do_async_put, thread_data, &do_async_put_free);
|
fn = &do_async_put;
|
||||||
}
|
}
|
||||||
else if (cmd == CMD_GET)
|
else if (cmd == CMD_GET)
|
||||||
{
|
{
|
||||||
driver_async(d->port, &key, &do_async_get, thread_data, &do_async_get_free);
|
fn = &do_async_get;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
d->async_data = adata;
|
||||||
|
d->async_pool = G_TPOOL_GENERAL;
|
||||||
|
d->async_job = bdberl_tpool_run(G_TPOOL_GENERAL, fn, adata, 0);
|
||||||
|
|
||||||
|
// Watch for events on the output pipe
|
||||||
|
// TODO: Can we do this just once ?!
|
||||||
|
driver_select(d->port, (ErlDrvEvent)d->pipe_fds[0], DO_READ, 1);
|
||||||
|
|
||||||
// Let caller know that the operation is in progress
|
// Let caller know that the operation is in progress
|
||||||
// Outbuf is: <<0:32>>
|
// Outbuf is: <<0:32>>
|
||||||
RETURN_INT(0, outbuf);
|
RETURN_INT(0, outbuf);
|
||||||
|
@ -342,33 +405,23 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void bdberl_drv_ready_async(ErlDrvData handle, ErlDrvThreadData thread_data)
|
static void bdberl_drv_ready_input(ErlDrvData handle, ErlDrvEvent event)
|
||||||
{
|
{
|
||||||
// TODO: If GET/PUT fails due to DEADLOCK, detect and cleanup txn pointer accordingly
|
|
||||||
printf("ready_async\n");
|
|
||||||
|
|
||||||
PortData* d = (PortData*)handle;
|
PortData* d = (PortData*)handle;
|
||||||
|
printf("%p: ready_input\n", d->port);
|
||||||
|
|
||||||
|
// Empty out the queue
|
||||||
|
int readbuf;
|
||||||
|
while (read((int)event, &readbuf, sizeof(readbuf)) > 0) { ; }
|
||||||
|
driver_select(d->port, event, DO_READ, 0);
|
||||||
|
|
||||||
|
// The async op has completed running on the thread pool -- process the results
|
||||||
switch (d->async_op)
|
switch (d->async_op)
|
||||||
{
|
{
|
||||||
case CMD_PUT:
|
case CMD_PUT:
|
||||||
case CMD_TXN_COMMIT:
|
case CMD_TXN_COMMIT:
|
||||||
case CMD_TXN_ABORT:
|
case CMD_TXN_ABORT:
|
||||||
{
|
{
|
||||||
// Extract return code == if it's zero, send back "ok" to driver process; otherwise
|
|
||||||
// send a {error, Reason} tuple
|
|
||||||
AsyncData* adata = (AsyncData*)thread_data;
|
|
||||||
if (adata->rc == 0)
|
|
||||||
{
|
|
||||||
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("ok") };
|
|
||||||
driver_output_term(d->port, response, sizeof(response) / sizeof(response[0]));
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"),
|
|
||||||
ERL_DRV_INT, adata->rc,
|
|
||||||
ERL_DRV_TUPLE, 2};
|
|
||||||
driver_output_term(d->port, response, sizeof(response) / sizeof(response[0]));
|
|
||||||
}
|
|
||||||
|
|
||||||
// If this was a commit/abort, clear out the txn handle
|
// If this was a commit/abort, clear out the txn handle
|
||||||
if (d->async_op == CMD_TXN_COMMIT || d->async_op == CMD_TXN_ABORT)
|
if (d->async_op == CMD_TXN_COMMIT || d->async_op == CMD_TXN_ABORT)
|
||||||
|
@ -376,16 +429,44 @@ static void bdberl_drv_ready_async(ErlDrvData handle, ErlDrvThreadData thread_da
|
||||||
d->txn = 0;
|
d->txn = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Extract return code == if it's zero, send back "ok" to driver process; otherwise
|
||||||
|
// send a {error, Reason} tuple
|
||||||
|
AsyncData* adata = (AsyncData*)d->async_data;
|
||||||
|
printf("%p: %d op; rc = %d\n", d->port, d->async_op, adata->rc);
|
||||||
|
if (adata->rc == 0)
|
||||||
|
{
|
||||||
|
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("ok") };
|
||||||
|
driver_output_term(d->port, response, sizeof(response) / sizeof(response[0]));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// If we have a transactional PUT and a deadlock occurred, clear out txn ptr
|
||||||
|
if (d->async_op == CMD_GET && adata->port->txn &&
|
||||||
|
(adata->rc == DB_LOCK_DEADLOCK || adata->rc == DB_LOCK_NOTGRANTED))
|
||||||
|
{
|
||||||
|
d->txn = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"),
|
||||||
|
ERL_DRV_INT, adata->rc,
|
||||||
|
ERL_DRV_TUPLE, 2};
|
||||||
|
driver_output_term(d->port, response, sizeof(response) / sizeof(response[0]));
|
||||||
|
}
|
||||||
|
|
||||||
// Cleanup async data and mark the port as not busy
|
// Cleanup async data and mark the port as not busy
|
||||||
driver_free(adata);
|
driver_free(d->async_data);
|
||||||
|
d->async_data = 0;
|
||||||
d->async_op = CMD_NONE;
|
d->async_op = CMD_NONE;
|
||||||
|
d->async_job = 0;
|
||||||
|
d->async_pool = 0;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case CMD_GET:
|
case CMD_GET:
|
||||||
{
|
{
|
||||||
// Extract return code == if it's zero, send back {ok, Payload} or not_found to driver
|
// Extract return code == if it's zero, send back {ok, Payload} or not_found to driver
|
||||||
// process; otherwise send a {error, Reason} tuple
|
// process; otherwise send a {error, Reason} tuple
|
||||||
AsyncData* adata = (AsyncData*)thread_data;
|
AsyncData* adata = (AsyncData*)d->async_data;
|
||||||
|
printf("%p: %d op; rc = %d\n", d->port, d->async_op, adata->rc);
|
||||||
if (adata->rc == 0)
|
if (adata->rc == 0)
|
||||||
{
|
{
|
||||||
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("ok"),
|
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("ok"),
|
||||||
|
@ -395,12 +476,18 @@ static void bdberl_drv_ready_async(ErlDrvData handle, ErlDrvThreadData thread_da
|
||||||
}
|
}
|
||||||
else if (adata->rc == DB_NOTFOUND)
|
else if (adata->rc == DB_NOTFOUND)
|
||||||
{
|
{
|
||||||
printf("not foudn\n");
|
|
||||||
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("not_found") };
|
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("not_found") };
|
||||||
driver_output_term(d->port, response, sizeof(response) / sizeof(response[0]));
|
driver_output_term(d->port, response, sizeof(response) / sizeof(response[0]));
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
printf("GET ERROR: %d\n", adata->rc);
|
||||||
|
if (adata->port->txn && (adata->rc == DB_LOCK_DEADLOCK || adata->rc == DB_LOCK_NOTGRANTED))
|
||||||
|
{
|
||||||
|
// Transaction has been invalidated -- null it out to avoid multiple txn closure
|
||||||
|
adata->port->txn = 0;
|
||||||
|
}
|
||||||
|
|
||||||
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"),
|
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"),
|
||||||
ERL_DRV_INT, adata->rc,
|
ERL_DRV_INT, adata->rc,
|
||||||
ERL_DRV_TUPLE, 2};
|
ERL_DRV_TUPLE, 2};
|
||||||
|
@ -408,18 +495,16 @@ static void bdberl_drv_ready_async(ErlDrvData handle, ErlDrvThreadData thread_da
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cleanup async data and mark the port as not busy
|
// Cleanup async data and mark the port as not busy
|
||||||
driver_free(adata->payload);
|
driver_free(d->async_data);
|
||||||
driver_free(adata);
|
d->async_data = 0;
|
||||||
d->async_op = CMD_NONE;
|
d->async_op = CMD_NONE;
|
||||||
|
d->async_job = 0;
|
||||||
|
d->async_pool = 0;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void bdberl_drv_process_exit(ErlDrvData handle, ErlDrvMonitor *monitor)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
static int open_database(const char* name, DBTYPE type, PortData* data, int* dbref_res)
|
static int open_database(const char* name, DBTYPE type, PortData* data, int* dbref_res)
|
||||||
{
|
{
|
||||||
*dbref_res = -1;
|
*dbref_res = -1;
|
||||||
|
@ -576,8 +661,8 @@ static int close_database(int dbref, PortData* data)
|
||||||
|
|
||||||
static void do_async_put(void* arg)
|
static void do_async_put(void* arg)
|
||||||
{
|
{
|
||||||
printf("do_async_put\n");
|
|
||||||
AsyncData* adata = (AsyncData*)arg;
|
AsyncData* adata = (AsyncData*)arg;
|
||||||
|
printf("%p: do_async_put\n", adata->port->port);
|
||||||
|
|
||||||
// Setup DBTs
|
// Setup DBTs
|
||||||
DBT key;
|
DBT key;
|
||||||
|
@ -596,18 +681,15 @@ static void do_async_put(void* arg)
|
||||||
// All databases are opened with AUTO_COMMIT, so if msg->port->txn is NULL, the put will still
|
// All databases are opened with AUTO_COMMIT, so if msg->port->txn is NULL, the put will still
|
||||||
// be atomic
|
// be atomic
|
||||||
adata->rc = adata->db->put(adata->db, adata->port->txn, &key, &value, 0);
|
adata->rc = adata->db->put(adata->db, adata->port->txn, &key, &value, 0);
|
||||||
}
|
|
||||||
|
|
||||||
static void do_async_put_free(void* arg)
|
// Enqueue a signal for the port to know that the operation is complete
|
||||||
{
|
signal_port(adata->port);
|
||||||
printf("do_async_put_free\n");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static void do_async_get(void* arg)
|
static void do_async_get(void* arg)
|
||||||
{
|
{
|
||||||
printf("do_async_get\n");
|
|
||||||
AsyncData* adata = (AsyncData*)arg;
|
AsyncData* adata = (AsyncData*)arg;
|
||||||
|
printf("%p: do_async_get\n", adata->port->port);
|
||||||
|
|
||||||
// Setup DBTs
|
// Setup DBTs
|
||||||
DBT key;
|
DBT key;
|
||||||
|
@ -638,20 +720,16 @@ static void do_async_get(void* arg)
|
||||||
adata->payload = value.data;
|
adata->payload = value.data;
|
||||||
adata->payload_sz = value.size; // Not ulen -- we want the actual data size
|
adata->payload_sz = value.size; // Not ulen -- we want the actual data size
|
||||||
adata->rc = rc;
|
adata->rc = rc;
|
||||||
}
|
|
||||||
|
|
||||||
static void do_async_get_free(void* arg)
|
signal_port(adata->port);
|
||||||
{
|
|
||||||
printf("do_async_get_free\n");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static void do_async_txnop(void* arg)
|
static void do_async_txnop(void* arg)
|
||||||
{
|
{
|
||||||
printf("do_async_txnop\n");
|
AsyncData* adata = (AsyncData*)arg;
|
||||||
|
printf("%p: do_async_txnop\n", adata->port->port);
|
||||||
|
|
||||||
// Execute the actual commit/abort
|
// Execute the actual commit/abort
|
||||||
AsyncData* adata = (AsyncData*)arg;
|
|
||||||
if (adata->port->async_op == CMD_TXN_COMMIT)
|
if (adata->port->async_op == CMD_TXN_COMMIT)
|
||||||
{
|
{
|
||||||
adata->rc = adata->port->txn->commit(adata->port->txn, 0);
|
adata->rc = adata->port->txn->commit(adata->port->txn, 0);
|
||||||
|
@ -660,6 +738,8 @@ static void do_async_txnop(void* arg)
|
||||||
{
|
{
|
||||||
adata->rc = adata->port->txn->abort(adata->port->txn);
|
adata->rc = adata->port->txn->abort(adata->port->txn);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
signal_port(adata->port);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -842,3 +922,9 @@ static int alloc_dbref()
|
||||||
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void signal_port(PortData* d)
|
||||||
|
{
|
||||||
|
int flag = 1;
|
||||||
|
write(d->pipe_fds[1], &flag, sizeof(flag));
|
||||||
|
}
|
||||||
|
|
|
@ -9,6 +9,7 @@
|
||||||
|
|
||||||
#include "erl_driver.h"
|
#include "erl_driver.h"
|
||||||
#include "db.h"
|
#include "db.h"
|
||||||
|
#include "bdberl_tpool.h"
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Driver functions
|
* Driver functions
|
||||||
|
@ -23,9 +24,8 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
|
||||||
char* inbuf, int inbuf_sz,
|
char* inbuf, int inbuf_sz,
|
||||||
char** outbuf, int outbuf_sz);
|
char** outbuf, int outbuf_sz);
|
||||||
|
|
||||||
static void bdberl_drv_ready_async(ErlDrvData handle, ErlDrvThreadData thread_data);
|
static void bdberl_drv_ready_input(ErlDrvData handle, ErlDrvEvent ev);
|
||||||
|
|
||||||
static void bdberl_drv_process_exit(ErlDrvData handle, ErlDrvMonitor *monitor);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Command codes
|
* Command codes
|
||||||
|
@ -71,7 +71,7 @@ ErlDrvEntry bdberl_drv_entry =
|
||||||
bdberl_drv_start, /* L_PTR start, called when port is opened */
|
bdberl_drv_start, /* L_PTR start, called when port is opened */
|
||||||
bdberl_drv_stop, /* F_PTR stop, called when port is closed */
|
bdberl_drv_stop, /* F_PTR stop, called when port is closed */
|
||||||
NULL, /* F_PTR output, called when erlang has sent */
|
NULL, /* F_PTR output, called when erlang has sent */
|
||||||
NULL, /* F_PTR ready_input, called when input descriptor ready */
|
bdberl_drv_ready_input, /* F_PTR ready_input, called when input descriptor ready */
|
||||||
NULL, /* F_PTR ready_output, called when output descriptor ready */
|
NULL, /* F_PTR ready_output, called when output descriptor ready */
|
||||||
"bdberl_drv", /* driver_name */
|
"bdberl_drv", /* driver_name */
|
||||||
bdberl_drv_finish, /* F_PTR finish, called when unloaded */
|
bdberl_drv_finish, /* F_PTR finish, called when unloaded */
|
||||||
|
@ -79,7 +79,7 @@ ErlDrvEntry bdberl_drv_entry =
|
||||||
bdberl_drv_control, /* F_PTR control, port_command callback */
|
bdberl_drv_control, /* F_PTR control, port_command callback */
|
||||||
NULL, /* F_PTR timeout, reserved */
|
NULL, /* F_PTR timeout, reserved */
|
||||||
NULL, /* F_PTR outputv, reserved */
|
NULL, /* F_PTR outputv, reserved */
|
||||||
bdberl_drv_ready_async, /* F_PTR ready_async */
|
NULL, /* F_PTR ready_async */
|
||||||
NULL, /* F_PTR flush */
|
NULL, /* F_PTR flush */
|
||||||
NULL, /* F_PTR call */
|
NULL, /* F_PTR call */
|
||||||
NULL, /* F_PTR event */
|
NULL, /* F_PTR event */
|
||||||
|
@ -88,7 +88,7 @@ ErlDrvEntry bdberl_drv_entry =
|
||||||
ERL_DRV_EXTENDED_MINOR_VERSION,
|
ERL_DRV_EXTENDED_MINOR_VERSION,
|
||||||
ERL_DRV_FLAG_USE_PORT_LOCKING,
|
ERL_DRV_FLAG_USE_PORT_LOCKING,
|
||||||
NULL, /* Reserved */
|
NULL, /* Reserved */
|
||||||
bdberl_drv_process_exit /* F_PTR process_exit */
|
NULL /* F_PTR process_exit */
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct _DbRefList
|
typedef struct _DbRefList
|
||||||
|
@ -120,19 +120,27 @@ typedef struct
|
||||||
{
|
{
|
||||||
ErlDrvPort port;
|
ErlDrvPort port;
|
||||||
|
|
||||||
DB_TXN* txn; /* Transaction handle for this port; each port may only have 1 txn
|
DbRefList* dbrefs; /* List of databases that this port has opened */
|
||||||
* active */
|
|
||||||
|
|
||||||
int async_op; /* Value indicating what async op is pending */
|
DB_TXN* txn; /* Transaction handle for this port; each port may only have 1 txn
|
||||||
|
* active */
|
||||||
|
|
||||||
DbRefList* dbrefs; /* List of databases that this port has opened */
|
int pipe_fds[2]; /* Array of pipe fds for signaling purposes */
|
||||||
|
|
||||||
|
int async_op; /* Value indicating what async op is pending */
|
||||||
|
|
||||||
|
void* async_data; /* Opaque point to data used during async op */
|
||||||
|
|
||||||
|
TPoolJob* async_job; /* Active job on the thread pool */
|
||||||
|
|
||||||
|
TPool* async_pool; /* Pool the async job is running on */
|
||||||
|
|
||||||
} PortData;
|
} PortData;
|
||||||
|
|
||||||
|
|
||||||
typedef struct
|
typedef struct
|
||||||
{
|
{
|
||||||
const PortData* port; /* Port that originated this request -- READ ONLY! */
|
PortData* port; /* Port that originated this request -- READ ONLY! */
|
||||||
int rc; /* Return code from operation */
|
int rc; /* Return code from operation */
|
||||||
DB* db; /* Database to use for data storage/retrieval */
|
DB* db; /* Database to use for data storage/retrieval */
|
||||||
void* payload; /* Packed key/value data */
|
void* payload; /* Packed key/value data */
|
||||||
|
|
|
@ -6,6 +6,7 @@
|
||||||
* ------------------------------------------------------------------- */
|
* ------------------------------------------------------------------- */
|
||||||
#include "bdberl_tpool.h"
|
#include "bdberl_tpool.h"
|
||||||
|
|
||||||
|
#include <stdio.h>
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
|
|
||||||
|
@ -45,8 +46,9 @@ void bdberl_tpool_stop(TPool* tpool)
|
||||||
{
|
{
|
||||||
LOCK(tpool);
|
LOCK(tpool);
|
||||||
|
|
||||||
// Set the shutdown flag
|
// Set the shutdown flag and broadcast a notification
|
||||||
tpool->shutdown = 1;
|
tpool->shutdown = 1;
|
||||||
|
erl_drv_cond_broadcast(tpool->work_cv);
|
||||||
|
|
||||||
// Clean out the queue of pending jobs -- invoke their cleanup function
|
// Clean out the queue of pending jobs -- invoke their cleanup function
|
||||||
|
|
||||||
|
@ -72,7 +74,7 @@ void bdberl_tpool_stop(TPool* tpool)
|
||||||
driver_free(tpool);
|
driver_free(tpool);
|
||||||
}
|
}
|
||||||
|
|
||||||
TPoolJob* bdberl_tpool_run(TPool* tpool, TPoolJobFunc* main_fn, void* arg, TPoolJobFunc* cancel_fn)
|
TPoolJob* bdberl_tpool_run(TPool* tpool, TPoolJobFunc main_fn, void* arg, TPoolJobFunc cancel_fn)
|
||||||
{
|
{
|
||||||
// Allocate and fill a new job structure
|
// Allocate and fill a new job structure
|
||||||
TPoolJob* job = driver_alloc(sizeof(TPoolJob));
|
TPoolJob* job = driver_alloc(sizeof(TPoolJob));
|
||||||
|
@ -116,7 +118,10 @@ void bdberl_tpool_cancel(TPool* tpool, TPoolJob* job)
|
||||||
// Job was removed from pending -- unlock and notify the job that it got canceled
|
// Job was removed from pending -- unlock and notify the job that it got canceled
|
||||||
UNLOCK(tpool);
|
UNLOCK(tpool);
|
||||||
|
|
||||||
(*(job->cancel_fn))(job->arg);
|
if (job->cancel_fn)
|
||||||
|
{
|
||||||
|
(*(job->cancel_fn))(job->arg);
|
||||||
|
}
|
||||||
|
|
||||||
// Delete the job structure
|
// Delete the job structure
|
||||||
driver_free(job);
|
driver_free(job);
|
||||||
|
@ -171,9 +176,13 @@ static void* bdberl_tpool_main(void* arg)
|
||||||
// Unlock to avoid blocking others
|
// Unlock to avoid blocking others
|
||||||
UNLOCK(tpool);
|
UNLOCK(tpool);
|
||||||
|
|
||||||
|
printf("Invoking job\n");
|
||||||
|
|
||||||
// Invoke the function
|
// Invoke the function
|
||||||
(*(job->main_fn))(job->arg);
|
(*(job->main_fn))(job->arg);
|
||||||
|
|
||||||
|
printf("Finished invoking job.\n");
|
||||||
|
|
||||||
// Relock
|
// Relock
|
||||||
LOCK(tpool);
|
LOCK(tpool);
|
||||||
|
|
||||||
|
|
|
@ -13,11 +13,11 @@ typedef void (*TPoolJobFunc)(void* arg);
|
||||||
|
|
||||||
typedef struct _TPoolJob
|
typedef struct _TPoolJob
|
||||||
{
|
{
|
||||||
TPoolJobFunc* main_fn; /* Function to invoke for this job */
|
TPoolJobFunc main_fn; /* Function to invoke for this job */
|
||||||
|
|
||||||
TPoolJobFunc* cancel_fn; /* Function that gets invoked if job is canceled before it can run */
|
TPoolJobFunc cancel_fn; /* Function that gets invoked if job is canceled before it can run */
|
||||||
|
|
||||||
TPoolJobFunc* arg; /* Input data for the function */
|
void* arg; /* Input data for the function */
|
||||||
|
|
||||||
unsigned int running; /* Flag indicating if the job is currently running */
|
unsigned int running; /* Flag indicating if the job is currently running */
|
||||||
|
|
||||||
|
@ -60,7 +60,7 @@ TPool* bdberl_tpool_start(unsigned int thread_count);
|
||||||
|
|
||||||
void bdberl_tpool_stop(TPool* tpool);
|
void bdberl_tpool_stop(TPool* tpool);
|
||||||
|
|
||||||
TPoolJob* bdberl_tpool_run(TPool* tpool, TPoolJobFunc* main_fn, void* arg, TPoolJobFunc* cancel_fn);
|
TPoolJob* bdberl_tpool_run(TPool* tpool, TPoolJobFunc main_fn, void* arg, TPoolJobFunc cancel_fn);
|
||||||
|
|
||||||
void bdberl_tpool_cancel(TPool* tpool, TPoolJob* job);
|
void bdberl_tpool_cancel(TPool* tpool, TPoolJob* job);
|
||||||
|
|
||||||
|
|
|
@ -36,6 +36,8 @@
|
||||||
-define(ERROR_TXN_OPEN, -29003). % Transaction already active on this port
|
-define(ERROR_TXN_OPEN, -29003). % Transaction already active on this port
|
||||||
-define(ERROR_NO_TXN, -29004). % No transaction open on this port
|
-define(ERROR_NO_TXN, -29004). % No transaction open on this port
|
||||||
|
|
||||||
|
-define(ERROR_DB_LOCK_NOTGRANTED, -30993). % Lock was busy and not granted
|
||||||
|
|
||||||
new() ->
|
new() ->
|
||||||
case erl_ddll:load_driver(code:priv_dir(bdberl), bdberl_drv) of
|
case erl_ddll:load_driver(code:priv_dir(bdberl), bdberl_drv) of
|
||||||
ok -> ok;
|
ok -> ok;
|
||||||
|
@ -70,34 +72,35 @@ close_database(Port, DbRef) ->
|
||||||
|
|
||||||
txn_begin(Port) ->
|
txn_begin(Port) ->
|
||||||
<<Result:32/native>> = erlang:port_control(Port, ?CMD_TXN_BEGIN, <<>>),
|
<<Result:32/native>> = erlang:port_control(Port, ?CMD_TXN_BEGIN, <<>>),
|
||||||
case Result of
|
io:format("TXN BEGIN erl: ~p\n", [Result]),
|
||||||
?ERROR_NONE -> ok;
|
case decode_rc(Result) of
|
||||||
?ERROR_ASYNC_PENDING -> {error, async_pending};
|
ok -> ok;
|
||||||
?ERROR_TXN_OPEN -> {error, txn_open}
|
Error -> {error, {txn_begin, Error}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
txn_commit(Port) ->
|
txn_commit(Port) ->
|
||||||
<<Result:32/native>> = erlang:port_control(Port, ?CMD_TXN_COMMIT, <<>>),
|
<<Result:32/native>> = erlang:port_control(Port, ?CMD_TXN_COMMIT, <<>>),
|
||||||
case Result of
|
case decode_rc(Result) of
|
||||||
?ERROR_NONE ->
|
ok ->
|
||||||
receive
|
receive
|
||||||
ok -> ok;
|
ok -> ok;
|
||||||
{error, Reason} -> {error, Reason}
|
{error, Reason} -> {error, {txn_commit, decode_rc(Reason)}}
|
||||||
end;
|
end;
|
||||||
?ERROR_ASYNC_PENDING -> {error, async_pending};
|
Error ->
|
||||||
?ERROR_NO_TXN -> {error, no_txn}
|
{error, {txn_commit, Error}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
txn_abort(Port) ->
|
txn_abort(Port) ->
|
||||||
<<Result:32/native>> = erlang:port_control(Port, ?CMD_TXN_ABORT, <<>>),
|
<<Result:32/native>> = erlang:port_control(Port, ?CMD_TXN_ABORT, <<>>),
|
||||||
case Result of
|
case decode_rc(Result) of
|
||||||
?ERROR_NONE ->
|
ok ->
|
||||||
receive
|
receive
|
||||||
ok -> ok;
|
ok -> ok;
|
||||||
{error, Reason} -> {error, Reason}
|
{error, Reason} -> {error, {txn_abort, decode_rc(Reason)}}
|
||||||
end;
|
end;
|
||||||
?ERROR_ASYNC_PENDING -> {error, async_pending};
|
Error ->
|
||||||
?ERROR_NO_TXN -> {error, no_txn}
|
{error, {txn_abort, Error}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
|
@ -106,14 +109,14 @@ put(Port, DbRef, Key, Value) ->
|
||||||
{ValLen, ValBin} = to_binary(Value),
|
{ValLen, ValBin} = to_binary(Value),
|
||||||
Cmd = <<DbRef:32/native, KeyLen:32/native, KeyBin/bytes, ValLen:32/native, ValBin/bytes>>,
|
Cmd = <<DbRef:32/native, KeyLen:32/native, KeyBin/bytes, ValLen:32/native, ValBin/bytes>>,
|
||||||
<<Result:32/native>> = erlang:port_control(Port, ?CMD_PUT, Cmd),
|
<<Result:32/native>> = erlang:port_control(Port, ?CMD_PUT, Cmd),
|
||||||
case Result of
|
case decode_rc(Result) of
|
||||||
?ERROR_NONE ->
|
ok ->
|
||||||
receive
|
receive
|
||||||
ok -> ok;
|
ok -> ok;
|
||||||
{error, Reason} -> {error, Reason}
|
{error, Reason} -> {error, {put, decode_rc(Reason)}}
|
||||||
end;
|
end;
|
||||||
?ERROR_ASYNC_PENDING -> {error, async_pending};
|
Error ->
|
||||||
?ERROR_INVALID_DBREF -> {error, invalid_dbref}
|
{error, {put, decode_rc(Error)}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
|
@ -121,22 +124,38 @@ get(Port, DbRef, Key) ->
|
||||||
{KeyLen, KeyBin} = to_binary(Key),
|
{KeyLen, KeyBin} = to_binary(Key),
|
||||||
Cmd = <<DbRef:32/native, KeyLen:32/native, KeyBin/bytes>>,
|
Cmd = <<DbRef:32/native, KeyLen:32/native, KeyBin/bytes>>,
|
||||||
<<Result:32/native>> = erlang:port_control(Port, ?CMD_GET, Cmd),
|
<<Result:32/native>> = erlang:port_control(Port, ?CMD_GET, Cmd),
|
||||||
case Result of
|
case decode_rc(Result) of
|
||||||
?ERROR_NONE ->
|
ok ->
|
||||||
receive
|
receive
|
||||||
{ok, Bin} -> {ok, binary_to_term(Bin)};
|
{ok, Bin} -> {ok, binary_to_term(Bin)};
|
||||||
not_found -> not_found;
|
not_found -> not_found;
|
||||||
{error, Reason} -> {error, Reason}
|
{error, Reason} -> {error, {get, decode_rc(Reason)}}
|
||||||
end;
|
end;
|
||||||
?ERROR_ASYNC_PENDING -> {error, async_pending};
|
Error ->
|
||||||
?ERROR_INVALID_DBREF -> {error, invalid_dbref}
|
{error, {get, decode_rc(Error)}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
|
%% ====================================================================
|
||||||
|
%% Internal functions
|
||||||
|
%% ====================================================================
|
||||||
|
|
||||||
|
%%
|
||||||
|
%% Decode a integer return value into an atom representation
|
||||||
|
%%
|
||||||
|
decode_rc(?ERROR_NONE) -> ok;
|
||||||
|
decode_rc(?ERROR_ASYNC_PENDING) -> async_pending;
|
||||||
|
decode_rc(?ERROR_INVALID_DBREF) -> invalid_dbref;
|
||||||
|
decode_rc(?ERROR_NO_TXN) -> no_txn;
|
||||||
|
decode_rc(?ERROR_DB_LOCK_NOTGRANTED) -> lock_not_granted;
|
||||||
|
decode_rc(Rc) -> {unknown, Rc}.
|
||||||
|
|
||||||
|
|
||||||
|
%%
|
||||||
|
%% Convert a term into a binary, returning a tuple with the binary and the length of the binary
|
||||||
|
%%
|
||||||
to_binary(Term) ->
|
to_binary(Term) ->
|
||||||
Bin = term_to_binary(Term),
|
Bin = term_to_binary(Term),
|
||||||
{size(Bin), Bin}.
|
{size(Bin), Bin}.
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -10,7 +10,7 @@
|
||||||
|
|
||||||
all() ->
|
all() ->
|
||||||
% [test_db].
|
% [test_db].
|
||||||
[test_put, test_txn].
|
[test_put].% test_txn].
|
||||||
|
|
||||||
init_per_testcase(TestCase, Config) ->
|
init_per_testcase(TestCase, Config) ->
|
||||||
Config.
|
Config.
|
||||||
|
|
Loading…
Reference in a new issue