Adding support for get/put and basic transactions
This commit is contained in:
parent
c92a48356e
commit
2f83430708
2 changed files with 317 additions and 26 deletions
|
@ -20,8 +20,15 @@
|
|||
static int open_database(const char* name, DBTYPE type, PortData* data, int* errno);
|
||||
static int close_database(int dbref, PortData* data);
|
||||
|
||||
static void do_async_put(void* arg);
|
||||
static void do_async_put_free(void* arg);
|
||||
static void do_async_get(void* arg);
|
||||
static void do_async_get_free(void* arg);
|
||||
static void do_async_commit(void* arg);
|
||||
|
||||
static int add_dbref(PortData* data, int dbref);
|
||||
static int del_dbref(PortData* data, int dbref);
|
||||
static int has_dbref(PortData* data, int dbref);
|
||||
|
||||
static int add_portref(int dbref, ErlDrvPort port);
|
||||
static int del_portref(int dbref, ErlDrvPort port);
|
||||
|
@ -72,6 +79,13 @@ static hive_hash* G_DATABASES_NAMES;
|
|||
#define WRITE_LOCK(L) erl_drv_rwlock_rwlock(L)
|
||||
#define WRITE_UNLOCK(L) erl_drv_rwlock_rwunlock(L)
|
||||
|
||||
#define RETURN_BH(bh, outbuf) *outbuf = (char*)bh.bin; return bh.bin->orig_size;
|
||||
|
||||
#define RETURN_INT(val, outbuf) { \
|
||||
BinHelper bh; \
|
||||
bin_helper_init(&bh, 4); \
|
||||
bin_helper_push_int32(&bh, val); \
|
||||
RETURN_BH(bh, outbuf); }
|
||||
|
||||
DRIVER_INIT(bdberl_drv)
|
||||
{
|
||||
|
@ -95,7 +109,7 @@ DRIVER_INIT(bdberl_drv)
|
|||
// Use the BDBERL_MAX_DBS environment value to determine the max # of
|
||||
// databases to permit the VM to open at once. Defaults to 1024.
|
||||
G_DATABASES_SIZE = 1024;
|
||||
char* max_dbs_str = getenv("BDBERL_MAX_DBS");
|
||||
char* max_dbs_str = getenv("BDBERL_MAX_DBS"); /* TODO: Use erl_drv_getenv */
|
||||
if (max_dbs_str != 0)
|
||||
{
|
||||
G_DATABASES_SIZE = atoi(max_dbs_str);
|
||||
|
@ -105,6 +119,9 @@ DRIVER_INIT(bdberl_drv)
|
|||
}
|
||||
}
|
||||
|
||||
// Make sure we can distiguish between lock timeouts and deadlocks
|
||||
G_DB_ENV->set_flags(G_DB_ENV, DB_TIME_NOTGRANTED, 1);
|
||||
|
||||
// BDB is setup -- allocate structures for tracking databases
|
||||
G_DATABASES = (Database*) driver_alloc(sizeof(Database) * G_DATABASES_SIZE);
|
||||
memset(G_DATABASES, '\0', sizeof(Database) * G_DATABASES_SIZE);
|
||||
|
@ -148,7 +165,15 @@ static void bdberl_drv_stop(ErlDrvData handle)
|
|||
{
|
||||
PortData* d = (PortData*)handle;
|
||||
|
||||
// TODO: Terminate any txns
|
||||
// TODO: Investigate if we need to worry about queued up async operations. It seems like the VM
|
||||
// SHOULD ensure the port doesn't disappear while something is in the queue...
|
||||
|
||||
// If a txn is currently active, terminate it. We _must_ do it synchronously (unfortunately) as
|
||||
// there doesn't seem to be a to do an async op while stopping the driver.
|
||||
if (d->txn)
|
||||
{
|
||||
d->txn->abort(d->txn);
|
||||
}
|
||||
|
||||
// Close all the databases we previously opened
|
||||
while (d->dbrefs)
|
||||
|
@ -181,7 +206,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
|
|||
case CMD_OPEN_DB:
|
||||
{
|
||||
// Extract the type code and filename from the inbuf
|
||||
// Expect: Type:8, Name/bytes, NULL:8
|
||||
// Inbuf is: <<Type:8, Name/bytes, 0:8>>
|
||||
DBTYPE type = (DBTYPE)((char)*inbuf);
|
||||
char* name = (char*)(inbuf+1);
|
||||
int dbref;
|
||||
|
@ -198,14 +223,12 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
|
|||
}
|
||||
|
||||
// Pack the status and dbref (or errno) into a binary and return it
|
||||
// Byte 0 : Status
|
||||
// Byte 1..4: dbref/errno
|
||||
// Outbuf is: <<Status:8, DbRef:32>>
|
||||
BinHelper bh;
|
||||
bin_helper_init(&bh, 5);
|
||||
bin_helper_push_byte(&bh, status);
|
||||
bin_helper_push_int32(&bh, dbref);
|
||||
*outbuf = (char*)bh.bin;
|
||||
return bh.bin->orig_size;
|
||||
RETURN_BH(bh, outbuf);
|
||||
}
|
||||
case CMD_CLOSE_DB:
|
||||
{
|
||||
|
@ -215,13 +238,104 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
|
|||
int dbref = *((int*)inbuf);
|
||||
int rc = close_database(dbref, d);
|
||||
|
||||
// Setup to return the rc
|
||||
BinHelper bh;
|
||||
bin_helper_init(&bh, 4);
|
||||
bin_helper_push_int32(&bh, rc);
|
||||
*outbuf = (char*)bh.bin;
|
||||
return bh.bin->orig_size;
|
||||
// Outbuf is: <<Rc:32>>
|
||||
RETURN_INT(rc, outbuf);
|
||||
}
|
||||
case CMD_TXN_BEGIN:
|
||||
{
|
||||
// If an async operaiton is pending, fail
|
||||
if (d->async_op != CMD_NONE)
|
||||
{
|
||||
RETURN_INT(ERROR_ASYNC_PENDING, outbuf);
|
||||
}
|
||||
|
||||
// If we already have a txn open, fail
|
||||
if (d->txn != 0)
|
||||
{
|
||||
RETURN_INT(ERROR_TXN_OPEN, outbuf);
|
||||
}
|
||||
|
||||
// Outbuf is <<Rc:32>>
|
||||
int rc = G_DB_ENV->txn_begin(G_DB_ENV, 0, &(d->txn), 0);
|
||||
RETURN_INT(rc, outbuf);
|
||||
}
|
||||
case CMD_TXN_COMMIT:
|
||||
{
|
||||
// If an async operation is pending, fail
|
||||
if (d->async_op != CMD_NONE)
|
||||
{
|
||||
RETURN_INT(ERROR_ASYNC_PENDING, outbuf);
|
||||
}
|
||||
|
||||
// If we don't already have a txn open, fail
|
||||
if (d->txn == 0)
|
||||
{
|
||||
RETURN_INT(ERROR_NO_TXN, outbuf);
|
||||
}
|
||||
|
||||
// Allocate operation structure
|
||||
AsyncData* adata = zalloc(sizeof(AsyncData));
|
||||
adata->port = d;
|
||||
|
||||
// Update port data to indicate we have an operation in progress
|
||||
d->async_op = CMD_TXN_COMMIT;
|
||||
|
||||
// Schedule async operation to execute the commit
|
||||
unsigned int key = (unsigned int)d->port;
|
||||
driver_async(d->port, &key, &do_async_commit, adata, 0);
|
||||
|
||||
// Outbuf is <<Rc:32>>
|
||||
RETURN_INT(0, outbuf);
|
||||
}
|
||||
case CMD_PUT:
|
||||
case CMD_GET:
|
||||
{
|
||||
// If another async op is pending, fail
|
||||
if (d->async_op != CMD_NONE)
|
||||
{
|
||||
RETURN_INT(ERROR_ASYNC_PENDING, outbuf);
|
||||
}
|
||||
|
||||
// Inbuf is: << DbRef:32, Rest/binary>>
|
||||
int dbref = *((int*)inbuf);
|
||||
|
||||
// Make sure this port currently has dbref open -- if it doesn't, error out. Of note,
|
||||
// if it's in our list, we don't need to grab the RWLOCK, as we don't have to worry about
|
||||
// the underlying handle disappearing since we have a reference.
|
||||
if (has_dbref(d, dbref))
|
||||
{
|
||||
// Allocate operation structure
|
||||
void* thread_data = zalloc(sizeof(AsyncData) + inbuf_sz);
|
||||
AsyncData* adata = (AsyncData*)thread_data;
|
||||
adata->port = d;
|
||||
adata->db = G_DATABASES[dbref].db;
|
||||
adata->payload = thread_data + sizeof(AsyncData);
|
||||
|
||||
// Copy the payload into place
|
||||
memcpy(adata->payload, inbuf, inbuf_sz);
|
||||
|
||||
// Mark the port as busy and then schedule the appropriate async operation
|
||||
d->async_op = cmd;
|
||||
unsigned int key = (unsigned int)d->port;
|
||||
if (cmd == CMD_PUT)
|
||||
{
|
||||
driver_async(d->port, &key, &do_async_put, thread_data, &do_async_put_free);
|
||||
}
|
||||
else if (cmd == CMD_GET)
|
||||
{
|
||||
driver_async(d->port, &key, &do_async_get, thread_data, &do_async_get_free);
|
||||
}
|
||||
|
||||
// Let caller know that the operation is in progress
|
||||
// Outbuf is: <<0:32>>
|
||||
RETURN_INT(0, outbuf);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Invalid dbref
|
||||
RETURN_INT(ERROR_INVALID_DBREF, outbuf);
|
||||
}
|
||||
}
|
||||
}
|
||||
*outbuf = 0;
|
||||
return 0;
|
||||
|
@ -229,6 +343,70 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
|
|||
|
||||
static void bdberl_drv_ready_async(ErlDrvData handle, ErlDrvThreadData thread_data)
|
||||
{
|
||||
// TODO: If GET/PUT fails due to DEADLOCK, detect and cleanup txn pointer accordingly
|
||||
printf("ready_async\n");
|
||||
|
||||
PortData* d = (PortData*)handle;
|
||||
switch (d->async_op)
|
||||
{
|
||||
case CMD_PUT:
|
||||
case CMD_TXN_COMMIT:
|
||||
case CMD_TXN_ABORT:
|
||||
{
|
||||
// Extract return code == if it's zero, send back "ok" to driver process; otherwise
|
||||
// send a {error, Reason} tuple
|
||||
AsyncData* adata = (AsyncData*)thread_data;
|
||||
if (adata->rc == 0)
|
||||
{
|
||||
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("ok") };
|
||||
driver_output_term(d->port, response, sizeof(response) / sizeof(response[0]));
|
||||
}
|
||||
else
|
||||
{
|
||||
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"),
|
||||
ERL_DRV_INT, adata->rc,
|
||||
ERL_DRV_TUPLE, 2};
|
||||
driver_output_term(d->port, response, sizeof(response) / sizeof(response[0]));
|
||||
}
|
||||
|
||||
// If this was a commit/abort, clear out the txn handle
|
||||
if (d->async_op == CMD_TXN_COMMIT || d->async_op == CMD_TXN_ABORT)
|
||||
{
|
||||
d->txn = 0;
|
||||
}
|
||||
|
||||
// Cleanup async data and mark the port as not busy
|
||||
driver_free(adata);
|
||||
d->async_op = CMD_NONE;
|
||||
break;
|
||||
}
|
||||
case CMD_GET:
|
||||
{
|
||||
// Extract return code == if it's zero, send back {ok, Payload} to driver process; otherwise
|
||||
// send a {error, Reason} tuple
|
||||
AsyncData* adata = (AsyncData*)thread_data;
|
||||
if (adata->rc == 0)
|
||||
{
|
||||
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("ok"),
|
||||
ERL_DRV_BUF2BINARY, (ErlDrvTermData)adata->payload, (ErlDrvUInt)adata->payload_sz,
|
||||
ERL_DRV_TUPLE, 2};
|
||||
driver_output_term(d->port, response, sizeof(response) / sizeof(response[0]));
|
||||
}
|
||||
else
|
||||
{
|
||||
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"),
|
||||
ERL_DRV_INT, adata->rc,
|
||||
ERL_DRV_TUPLE, 2};
|
||||
driver_output_term(d->port, response, sizeof(response) / sizeof(response[0]));
|
||||
}
|
||||
|
||||
// Cleanup async data and mark the port as not busy
|
||||
driver_free(adata->payload);
|
||||
driver_free(adata);
|
||||
d->async_op = CMD_NONE;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void bdberl_drv_process_exit(ErlDrvData handle, ErlDrvMonitor *monitor)
|
||||
|
@ -389,6 +567,88 @@ static int close_database(int dbref, PortData* data)
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void do_async_put(void* arg)
|
||||
{
|
||||
printf("do_async_put\n");
|
||||
AsyncData* adata = (AsyncData*)arg;
|
||||
|
||||
// Setup DBTs
|
||||
DBT key;
|
||||
DBT value;
|
||||
memset(&key, '\0', sizeof(DBT));
|
||||
memset(&value, '\0', sizeof(DBT));
|
||||
|
||||
// Parse payload into DBTs
|
||||
// Payload is: << DbRef:32, KeyLen:32, Key:KeyLen, ValLen:32, Val:ValLen>>
|
||||
key.size = *((int*)(adata->payload + 4));
|
||||
key.data = (void*)(adata->payload + 8);
|
||||
value.size = *((int*)(adata->payload + 8 + key.size));
|
||||
value.data = (void*)(adata->payload + 8 + key.size + 4);
|
||||
|
||||
// Execute the actual put -- we'll process the result back in the driver_async_ready function
|
||||
// All databases are opened with AUTO_COMMIT, so if msg->port->txn is NULL, the put will still
|
||||
// be atomic
|
||||
adata->rc = adata->db->put(adata->db, adata->port->txn, &key, &value, 0);
|
||||
}
|
||||
|
||||
static void do_async_put_free(void* arg)
|
||||
{
|
||||
printf("do_async_put_free\n");
|
||||
}
|
||||
|
||||
|
||||
static void do_async_get(void* arg)
|
||||
{
|
||||
printf("do_async_get\n");
|
||||
AsyncData* adata = (AsyncData*)arg;
|
||||
|
||||
// Setup DBTs
|
||||
DBT key;
|
||||
DBT value;
|
||||
memset(&key, '\0', sizeof(DBT));
|
||||
memset(&value, '\0', sizeof(DBT));
|
||||
|
||||
// Parse payload into DBT
|
||||
// Payload is: << DbRef:32, KeyLen:32, Key:KeyLen >>
|
||||
key.size = *((int*)(adata->payload + 4));
|
||||
key.data = (void*)(adata->payload + 8);
|
||||
|
||||
// Allocate memory to hold the value -- hard code initial size to 4k
|
||||
// TODO: Make this smarter!
|
||||
value.data = zalloc(4096);
|
||||
value.ulen = 4096;
|
||||
value.flags = DB_DBT_USERMEM;
|
||||
|
||||
int rc = adata->db->get(adata->db, adata->port->txn, &key, &value, 0);
|
||||
while (rc == DB_BUFFER_SMALL)
|
||||
{
|
||||
// Grow our value buffer and try again
|
||||
value.data = driver_realloc(value.data, value.size);
|
||||
value.ulen = value.size;
|
||||
rc = adata->db->get(adata->db, adata->port->txn, &key, &value, 0);
|
||||
}
|
||||
|
||||
adata->payload = value.data;
|
||||
adata->payload_sz = value.size; // Not ulen -- we want the actual data size
|
||||
adata->rc = rc;
|
||||
}
|
||||
|
||||
static void do_async_get_free(void* arg)
|
||||
{
|
||||
printf("do_async_get_free\n");
|
||||
}
|
||||
|
||||
|
||||
static void do_async_commit(void* arg)
|
||||
{
|
||||
printf("do_async_commit\n");
|
||||
|
||||
// Execute the actual commit
|
||||
AsyncData* adata = (AsyncData*)arg;
|
||||
adata->rc = adata->port->txn->commit(adata->port->txn, 0);
|
||||
}
|
||||
|
||||
|
||||
static void* zalloc(unsigned int size)
|
||||
{
|
||||
void* res = driver_alloc(size);
|
||||
|
@ -533,6 +793,24 @@ static int del_dbref(PortData* data, int dbref)
|
|||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate that a provided dbref is currently opened by a port. Return 1 if true; 0 if false.
|
||||
*/
|
||||
static int has_dbref(PortData* data, int dbref)
|
||||
{
|
||||
DbRefList* current = data->dbrefs;
|
||||
while (current)
|
||||
{
|
||||
if (current->dbref == dbref)
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
|
||||
current = current->next;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Allocate a Database structure; find first available slot in G_DATABASES and return the
|
||||
* index of it. If no free slots are available, return -1
|
||||
|
|
|
@ -30,14 +30,15 @@ static void bdberl_drv_process_exit(ErlDrvData handle, ErlDrvMonitor *monitor);
|
|||
/**
|
||||
* Command codes
|
||||
*/
|
||||
#define CMD_OPEN_DB 0
|
||||
#define CMD_CLOSE_DB 1
|
||||
#define CMD_TXN_BEGIN 2
|
||||
#define CMD_TXN_COMMIT 3
|
||||
#define CMD_TXN_ABORT 4
|
||||
#define CMD_GET 5
|
||||
#define CMD_PUT 6
|
||||
#define CMD_PUT_ATOMIC 7
|
||||
#define CMD_NONE 0
|
||||
#define CMD_OPEN_DB 1
|
||||
#define CMD_CLOSE_DB 2
|
||||
#define CMD_TXN_BEGIN 3
|
||||
#define CMD_TXN_COMMIT 4
|
||||
#define CMD_TXN_ABORT 5
|
||||
#define CMD_GET 6
|
||||
#define CMD_PUT 7
|
||||
#define CMD_PUT_ATOMIC 8
|
||||
|
||||
/**
|
||||
* Command status values
|
||||
|
@ -55,7 +56,11 @@ static void bdberl_drv_process_exit(ErlDrvData handle, ErlDrvMonitor *monitor);
|
|||
* Error codes -- chosen so that we do not conflict with other packages, particularly
|
||||
* db.h. We use error namespace from -29000 to -29500.
|
||||
*/
|
||||
#define ERROR_MAX_DBS (-29000) /* System can not open any further databases */
|
||||
#define ERROR_MAX_DBS (-29000) /* System can not open any further databases */
|
||||
#define ERROR_ASYNC_PENDING (-29001) /* Async operation already pending on this port */
|
||||
#define ERROR_INVALID_DBREF (-29002) /* DbRef not currently opened by this port */
|
||||
#define ERROR_TXN_OPEN (-29003) /* Transaction already active on this port */
|
||||
#define ERROR_NO_TXN (-29004) /* No transaction open on this port */
|
||||
|
||||
/**
|
||||
* Driver Entry
|
||||
|
@ -115,17 +120,25 @@ typedef struct
|
|||
{
|
||||
ErlDrvPort port;
|
||||
|
||||
DB_TXN* txn; /* Transaction handle for this port; each port may only have 1 txn
|
||||
* active */
|
||||
DB_TXN* txn; /* Transaction handle for this port; each port may only have 1 txn
|
||||
* active */
|
||||
|
||||
int in_flight; /* Flag indicating if this port has an operation pending on the async
|
||||
* pool. */
|
||||
int async_op; /* Value indicating what async op is pending */
|
||||
|
||||
DbRefList* dbrefs; /* List of databases that this port has opened */
|
||||
|
||||
} PortData;
|
||||
|
||||
|
||||
typedef struct
|
||||
{
|
||||
const PortData* port; /* Port that originated this request -- READ ONLY! */
|
||||
int rc; /* Return code from operation */
|
||||
DB* db; /* Database to use for data storage/retrieval */
|
||||
void* payload; /* Packed key/value data */
|
||||
int payload_sz; /* Size of payload */
|
||||
} AsyncData;
|
||||
|
||||
|
||||
|
||||
#endif
|
||||
|
|
Loading…
Reference in a new issue