From 2f8343070818e6135f0a514e298933d110bd3f5f Mon Sep 17 00:00:00 2001 From: Dave Smith Date: Tue, 9 Dec 2008 13:18:55 -0700 Subject: [PATCH] Adding support for get/put and basic transactions --- c_src/bdberl_drv.c | 304 +++++++++++++++++++++++++++++++++++++++++++-- c_src/bdberl_drv.h | 39 ++++-- 2 files changed, 317 insertions(+), 26 deletions(-) diff --git a/c_src/bdberl_drv.c b/c_src/bdberl_drv.c index aa1a6b8..4019281 100644 --- a/c_src/bdberl_drv.c +++ b/c_src/bdberl_drv.c @@ -20,8 +20,15 @@ static int open_database(const char* name, DBTYPE type, PortData* data, int* errno); static int close_database(int dbref, PortData* data); +static void do_async_put(void* arg); +static void do_async_put_free(void* arg); +static void do_async_get(void* arg); +static void do_async_get_free(void* arg); +static void do_async_commit(void* arg); + static int add_dbref(PortData* data, int dbref); static int del_dbref(PortData* data, int dbref); +static int has_dbref(PortData* data, int dbref); static int add_portref(int dbref, ErlDrvPort port); static int del_portref(int dbref, ErlDrvPort port); @@ -72,6 +79,13 @@ static hive_hash* G_DATABASES_NAMES; #define WRITE_LOCK(L) erl_drv_rwlock_rwlock(L) #define WRITE_UNLOCK(L) erl_drv_rwlock_rwunlock(L) +#define RETURN_BH(bh, outbuf) *outbuf = (char*)bh.bin; return bh.bin->orig_size; + +#define RETURN_INT(val, outbuf) { \ + BinHelper bh; \ + bin_helper_init(&bh, 4); \ + bin_helper_push_int32(&bh, val); \ + RETURN_BH(bh, outbuf); } DRIVER_INIT(bdberl_drv) { @@ -95,7 +109,7 @@ DRIVER_INIT(bdberl_drv) // Use the BDBERL_MAX_DBS environment value to determine the max # of // databases to permit the VM to open at once. Defaults to 1024. G_DATABASES_SIZE = 1024; - char* max_dbs_str = getenv("BDBERL_MAX_DBS"); + char* max_dbs_str = getenv("BDBERL_MAX_DBS"); /* TODO: Use erl_drv_getenv */ if (max_dbs_str != 0) { G_DATABASES_SIZE = atoi(max_dbs_str); @@ -105,6 +119,9 @@ DRIVER_INIT(bdberl_drv) } } + // Make sure we can distiguish between lock timeouts and deadlocks + G_DB_ENV->set_flags(G_DB_ENV, DB_TIME_NOTGRANTED, 1); + // BDB is setup -- allocate structures for tracking databases G_DATABASES = (Database*) driver_alloc(sizeof(Database) * G_DATABASES_SIZE); memset(G_DATABASES, '\0', sizeof(Database) * G_DATABASES_SIZE); @@ -148,7 +165,15 @@ static void bdberl_drv_stop(ErlDrvData handle) { PortData* d = (PortData*)handle; - // TODO: Terminate any txns + // TODO: Investigate if we need to worry about queued up async operations. It seems like the VM + // SHOULD ensure the port doesn't disappear while something is in the queue... + + // If a txn is currently active, terminate it. We _must_ do it synchronously (unfortunately) as + // there doesn't seem to be a to do an async op while stopping the driver. + if (d->txn) + { + d->txn->abort(d->txn); + } // Close all the databases we previously opened while (d->dbrefs) @@ -181,7 +206,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, case CMD_OPEN_DB: { // Extract the type code and filename from the inbuf - // Expect: Type:8, Name/bytes, NULL:8 + // Inbuf is: <> DBTYPE type = (DBTYPE)((char)*inbuf); char* name = (char*)(inbuf+1); int dbref; @@ -198,14 +223,12 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, } // Pack the status and dbref (or errno) into a binary and return it - // Byte 0 : Status - // Byte 1..4: dbref/errno + // Outbuf is: <> BinHelper bh; bin_helper_init(&bh, 5); bin_helper_push_byte(&bh, status); bin_helper_push_int32(&bh, dbref); - *outbuf = (char*)bh.bin; - return bh.bin->orig_size; + RETURN_BH(bh, outbuf); } case CMD_CLOSE_DB: { @@ -215,13 +238,104 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, int dbref = *((int*)inbuf); int rc = close_database(dbref, d); - // Setup to return the rc - BinHelper bh; - bin_helper_init(&bh, 4); - bin_helper_push_int32(&bh, rc); - *outbuf = (char*)bh.bin; - return bh.bin->orig_size; + // Outbuf is: <> + RETURN_INT(rc, outbuf); } + case CMD_TXN_BEGIN: + { + // If an async operaiton is pending, fail + if (d->async_op != CMD_NONE) + { + RETURN_INT(ERROR_ASYNC_PENDING, outbuf); + } + + // If we already have a txn open, fail + if (d->txn != 0) + { + RETURN_INT(ERROR_TXN_OPEN, outbuf); + } + + // Outbuf is <> + int rc = G_DB_ENV->txn_begin(G_DB_ENV, 0, &(d->txn), 0); + RETURN_INT(rc, outbuf); + } + case CMD_TXN_COMMIT: + { + // If an async operation is pending, fail + if (d->async_op != CMD_NONE) + { + RETURN_INT(ERROR_ASYNC_PENDING, outbuf); + } + + // If we don't already have a txn open, fail + if (d->txn == 0) + { + RETURN_INT(ERROR_NO_TXN, outbuf); + } + + // Allocate operation structure + AsyncData* adata = zalloc(sizeof(AsyncData)); + adata->port = d; + + // Update port data to indicate we have an operation in progress + d->async_op = CMD_TXN_COMMIT; + + // Schedule async operation to execute the commit + unsigned int key = (unsigned int)d->port; + driver_async(d->port, &key, &do_async_commit, adata, 0); + + // Outbuf is <> + RETURN_INT(0, outbuf); + } + case CMD_PUT: + case CMD_GET: + { + // If another async op is pending, fail + if (d->async_op != CMD_NONE) + { + RETURN_INT(ERROR_ASYNC_PENDING, outbuf); + } + + // Inbuf is: << DbRef:32, Rest/binary>> + int dbref = *((int*)inbuf); + + // Make sure this port currently has dbref open -- if it doesn't, error out. Of note, + // if it's in our list, we don't need to grab the RWLOCK, as we don't have to worry about + // the underlying handle disappearing since we have a reference. + if (has_dbref(d, dbref)) + { + // Allocate operation structure + void* thread_data = zalloc(sizeof(AsyncData) + inbuf_sz); + AsyncData* adata = (AsyncData*)thread_data; + adata->port = d; + adata->db = G_DATABASES[dbref].db; + adata->payload = thread_data + sizeof(AsyncData); + + // Copy the payload into place + memcpy(adata->payload, inbuf, inbuf_sz); + + // Mark the port as busy and then schedule the appropriate async operation + d->async_op = cmd; + unsigned int key = (unsigned int)d->port; + if (cmd == CMD_PUT) + { + driver_async(d->port, &key, &do_async_put, thread_data, &do_async_put_free); + } + else if (cmd == CMD_GET) + { + driver_async(d->port, &key, &do_async_get, thread_data, &do_async_get_free); + } + + // Let caller know that the operation is in progress + // Outbuf is: <<0:32>> + RETURN_INT(0, outbuf); + } + else + { + // Invalid dbref + RETURN_INT(ERROR_INVALID_DBREF, outbuf); + } + } } *outbuf = 0; return 0; @@ -229,6 +343,70 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, static void bdberl_drv_ready_async(ErlDrvData handle, ErlDrvThreadData thread_data) { + // TODO: If GET/PUT fails due to DEADLOCK, detect and cleanup txn pointer accordingly + printf("ready_async\n"); + + PortData* d = (PortData*)handle; + switch (d->async_op) + { + case CMD_PUT: + case CMD_TXN_COMMIT: + case CMD_TXN_ABORT: + { + // Extract return code == if it's zero, send back "ok" to driver process; otherwise + // send a {error, Reason} tuple + AsyncData* adata = (AsyncData*)thread_data; + if (adata->rc == 0) + { + ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("ok") }; + driver_output_term(d->port, response, sizeof(response) / sizeof(response[0])); + } + else + { + ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"), + ERL_DRV_INT, adata->rc, + ERL_DRV_TUPLE, 2}; + driver_output_term(d->port, response, sizeof(response) / sizeof(response[0])); + } + + // If this was a commit/abort, clear out the txn handle + if (d->async_op == CMD_TXN_COMMIT || d->async_op == CMD_TXN_ABORT) + { + d->txn = 0; + } + + // Cleanup async data and mark the port as not busy + driver_free(adata); + d->async_op = CMD_NONE; + break; + } + case CMD_GET: + { + // Extract return code == if it's zero, send back {ok, Payload} to driver process; otherwise + // send a {error, Reason} tuple + AsyncData* adata = (AsyncData*)thread_data; + if (adata->rc == 0) + { + ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("ok"), + ERL_DRV_BUF2BINARY, (ErlDrvTermData)adata->payload, (ErlDrvUInt)adata->payload_sz, + ERL_DRV_TUPLE, 2}; + driver_output_term(d->port, response, sizeof(response) / sizeof(response[0])); + } + else + { + ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"), + ERL_DRV_INT, adata->rc, + ERL_DRV_TUPLE, 2}; + driver_output_term(d->port, response, sizeof(response) / sizeof(response[0])); + } + + // Cleanup async data and mark the port as not busy + driver_free(adata->payload); + driver_free(adata); + d->async_op = CMD_NONE; + break; + } + } } static void bdberl_drv_process_exit(ErlDrvData handle, ErlDrvMonitor *monitor) @@ -389,6 +567,88 @@ static int close_database(int dbref, PortData* data) return 0; } +static void do_async_put(void* arg) +{ + printf("do_async_put\n"); + AsyncData* adata = (AsyncData*)arg; + + // Setup DBTs + DBT key; + DBT value; + memset(&key, '\0', sizeof(DBT)); + memset(&value, '\0', sizeof(DBT)); + + // Parse payload into DBTs + // Payload is: << DbRef:32, KeyLen:32, Key:KeyLen, ValLen:32, Val:ValLen>> + key.size = *((int*)(adata->payload + 4)); + key.data = (void*)(adata->payload + 8); + value.size = *((int*)(adata->payload + 8 + key.size)); + value.data = (void*)(adata->payload + 8 + key.size + 4); + + // Execute the actual put -- we'll process the result back in the driver_async_ready function + // All databases are opened with AUTO_COMMIT, so if msg->port->txn is NULL, the put will still + // be atomic + adata->rc = adata->db->put(adata->db, adata->port->txn, &key, &value, 0); +} + +static void do_async_put_free(void* arg) +{ + printf("do_async_put_free\n"); +} + + +static void do_async_get(void* arg) +{ + printf("do_async_get\n"); + AsyncData* adata = (AsyncData*)arg; + + // Setup DBTs + DBT key; + DBT value; + memset(&key, '\0', sizeof(DBT)); + memset(&value, '\0', sizeof(DBT)); + + // Parse payload into DBT + // Payload is: << DbRef:32, KeyLen:32, Key:KeyLen >> + key.size = *((int*)(adata->payload + 4)); + key.data = (void*)(adata->payload + 8); + + // Allocate memory to hold the value -- hard code initial size to 4k + // TODO: Make this smarter! + value.data = zalloc(4096); + value.ulen = 4096; + value.flags = DB_DBT_USERMEM; + + int rc = adata->db->get(adata->db, adata->port->txn, &key, &value, 0); + while (rc == DB_BUFFER_SMALL) + { + // Grow our value buffer and try again + value.data = driver_realloc(value.data, value.size); + value.ulen = value.size; + rc = adata->db->get(adata->db, adata->port->txn, &key, &value, 0); + } + + adata->payload = value.data; + adata->payload_sz = value.size; // Not ulen -- we want the actual data size + adata->rc = rc; +} + +static void do_async_get_free(void* arg) +{ + printf("do_async_get_free\n"); +} + + +static void do_async_commit(void* arg) +{ + printf("do_async_commit\n"); + + // Execute the actual commit + AsyncData* adata = (AsyncData*)arg; + adata->rc = adata->port->txn->commit(adata->port->txn, 0); +} + + static void* zalloc(unsigned int size) { void* res = driver_alloc(size); @@ -533,6 +793,24 @@ static int del_dbref(PortData* data, int dbref) return 0; } +/** + * Validate that a provided dbref is currently opened by a port. Return 1 if true; 0 if false. + */ +static int has_dbref(PortData* data, int dbref) +{ + DbRefList* current = data->dbrefs; + while (current) + { + if (current->dbref == dbref) + { + return 1; + } + + current = current->next; + } + return 0; +} + /** * Allocate a Database structure; find first available slot in G_DATABASES and return the * index of it. If no free slots are available, return -1 diff --git a/c_src/bdberl_drv.h b/c_src/bdberl_drv.h index caec772..123bb54 100644 --- a/c_src/bdberl_drv.h +++ b/c_src/bdberl_drv.h @@ -30,14 +30,15 @@ static void bdberl_drv_process_exit(ErlDrvData handle, ErlDrvMonitor *monitor); /** * Command codes */ -#define CMD_OPEN_DB 0 -#define CMD_CLOSE_DB 1 -#define CMD_TXN_BEGIN 2 -#define CMD_TXN_COMMIT 3 -#define CMD_TXN_ABORT 4 -#define CMD_GET 5 -#define CMD_PUT 6 -#define CMD_PUT_ATOMIC 7 +#define CMD_NONE 0 +#define CMD_OPEN_DB 1 +#define CMD_CLOSE_DB 2 +#define CMD_TXN_BEGIN 3 +#define CMD_TXN_COMMIT 4 +#define CMD_TXN_ABORT 5 +#define CMD_GET 6 +#define CMD_PUT 7 +#define CMD_PUT_ATOMIC 8 /** * Command status values @@ -55,7 +56,11 @@ static void bdberl_drv_process_exit(ErlDrvData handle, ErlDrvMonitor *monitor); * Error codes -- chosen so that we do not conflict with other packages, particularly * db.h. We use error namespace from -29000 to -29500. */ -#define ERROR_MAX_DBS (-29000) /* System can not open any further databases */ +#define ERROR_MAX_DBS (-29000) /* System can not open any further databases */ +#define ERROR_ASYNC_PENDING (-29001) /* Async operation already pending on this port */ +#define ERROR_INVALID_DBREF (-29002) /* DbRef not currently opened by this port */ +#define ERROR_TXN_OPEN (-29003) /* Transaction already active on this port */ +#define ERROR_NO_TXN (-29004) /* No transaction open on this port */ /** * Driver Entry @@ -115,17 +120,25 @@ typedef struct { ErlDrvPort port; - DB_TXN* txn; /* Transaction handle for this port; each port may only have 1 txn - * active */ + DB_TXN* txn; /* Transaction handle for this port; each port may only have 1 txn + * active */ - int in_flight; /* Flag indicating if this port has an operation pending on the async - * pool. */ + int async_op; /* Value indicating what async op is pending */ DbRefList* dbrefs; /* List of databases that this port has opened */ } PortData; +typedef struct +{ + const PortData* port; /* Port that originated this request -- READ ONLY! */ + int rc; /* Return code from operation */ + DB* db; /* Database to use for data storage/retrieval */ + void* payload; /* Packed key/value data */ + int payload_sz; /* Size of payload */ +} AsyncData; + #endif