From f56351e1dc750188cd5362f0a4267d7253094b1e Mon Sep 17 00:00:00 2001 From: Dave Smith Date: Mon, 15 Dec 2008 09:23:19 -0700 Subject: [PATCH] Overhaul driver to avoid use of pipes and simplify memory management --- c_src/bdberl_drv.c | 396 +++++++++++++++++++++++---------------------- c_src/bdberl_drv.h | 31 ++-- 2 files changed, 212 insertions(+), 215 deletions(-) diff --git a/c_src/bdberl_drv.c b/c_src/bdberl_drv.c index 99955a8..a32dfba 100644 --- a/c_src/bdberl_drv.c +++ b/c_src/bdberl_drv.c @@ -39,10 +39,11 @@ static int alloc_dbref(); static void* zalloc(unsigned int size); -static void signal_port(PortData* d); static void* deadlock_check(void* arg); static void* trickle_write(void* arg); +static void send_ok_or_error(ErlDrvPort port, ErlDrvTermData pid, int rc); + /** * Global instance of DB_ENV; only a single one exists per O/S process. */ @@ -123,6 +124,18 @@ static TPool* G_TPOOL_TXNS; bin_helper_push_int32(&bh, val); \ RETURN_BH(bh, outbuf); } +#define FAIL_IF_ASYNC_PENDING(d, outbuf) { \ + erl_drv_mutex_lock(d->port_lock); \ + if (d->async_op != CMD_NONE) { \ + erl_drv_mutex_unlock(d->port_lock); \ + RETURN_INT(ERROR_ASYNC_PENDING, outbuf); \ + } else { \ + erl_drv_mutex_unlock(d->port_lock); \ + }} + + + + DRIVER_INIT(bdberl_drv) { printf("DRIVER INIT\n"); @@ -210,12 +223,15 @@ static ErlDrvData bdberl_drv_start(ErlDrvPort port, char* buffer) // Save handle to the port d->port = port; - // Setup a pair of pipes for notification purposes - assert(pipe(d->pipe_fds) == 0); + // Allocate a mutex for the port + d->port_lock = erl_drv_mutex_create("bdberl_port_lock"); - // Make sure both pipes are configured non-blocking - assert(fcntl(d->pipe_fds[0], F_SETFL, O_NONBLOCK) == 0); - assert(fcntl(d->pipe_fds[1], F_SETFL, O_NONBLOCK) == 0); + // Save the caller/owner PID + d->port_owner = driver_connected(port); + + // Allocate an initial buffer for work purposes + d->work_buffer = driver_alloc(4096); + d->work_buffer_sz = 4096; // Make sure port is running in binary mode set_port_control_flags(port, PORT_CONTROL_FLAG_BINARY); @@ -226,20 +242,31 @@ static ErlDrvData bdberl_drv_start(ErlDrvPort port, char* buffer) static void bdberl_drv_stop(ErlDrvData handle) { PortData* d = (PortData*)handle; -// printf("%p: stop\n", d->port); + + // Grab the port lock, in case we have an async job running + erl_drv_mutex_lock(d->port_lock); // If there is an async job pending, we need to cancel it. The cancel operation will // block until the job has either been removed or has run if (d->async_job) { + // Drop the lock prior to starting the wait for the async process + erl_drv_mutex_unlock(d->port_lock); + printf("Cancelling async job for port: %p\n", d->port); bdberl_tpool_cancel(d->async_pool, d->async_job); - driver_select(d->port, (ErlDrvEvent)(size_t)d->pipe_fds[0], DO_READ, 0); printf("Canceled async job for port: %p\n", d->port); } + else + { + // If there was no async job, drop the lock -- not needed + erl_drv_mutex_unlock(d->port_lock); + } + + // Cleanup the port lock + erl_drv_mutex_destroy(d->port_lock); - // If a txn is currently active, terminate it. We _must_ do it synchronously (unfortunately) as - // there doesn't seem to be a to do an async op while stopping the driver. + // If a txn is currently active, terminate it. if (d->txn) { d->txn->abort(d->txn); @@ -251,12 +278,10 @@ static void bdberl_drv_stop(ErlDrvData handle) close_database(d->dbrefs->dbref, 0, d); } - - close(d->pipe_fds[0]); - close(d->pipe_fds[1]); printf("Stopped port: %p\n", d->port); // Release the port instance data + driver_free(d->work_buffer); driver_free(handle); } @@ -322,7 +347,13 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, } case CMD_CLOSE_DB: { - // TODO: If data is inflight, fail. Abort any open txns. + FAIL_IF_ASYNC_PENDING(d, outbuf); + + // Fail if a txn is open + if (d->txn != 0) + { + RETURN_INT(ERROR_TXN_OPEN, outbuf); + } // Take the provided dbref and attempt to close it // Inbuf is: <> @@ -336,11 +367,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, } case CMD_TXN_BEGIN: { - // If an async operation is pending, fail - if (d->async_op != CMD_NONE) - { - RETURN_INT(ERROR_ASYNC_PENDING, outbuf); - } + FAIL_IF_ASYNC_PENDING(d, outbuf); // If we already have a txn open, fail if (d->txn != 0) @@ -358,11 +385,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, case CMD_TXN_COMMIT: case CMD_TXN_ABORT: { - // If an async operation is pending, fail - if (d->async_op != CMD_NONE) - { - RETURN_INT(ERROR_ASYNC_PENDING, outbuf); - } + FAIL_IF_ASYNC_PENDING(d, outbuf); // If we don't already have a txn open, fail if (d->txn == 0) @@ -370,26 +393,14 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, RETURN_INT(ERROR_NO_TXN, outbuf); } - // Allocate operation structure - AsyncData* adata = zalloc(sizeof(AsyncData)); - adata->port = d; - + // Setup async command and schedule it on the txns threadpool + d->async_op = cmd; if (cmd == CMD_TXN_COMMIT) { - adata->payload = (void*)(size_t)UNPACK_INT(inbuf, 0); + d->async_flags = UNPACK_INT(inbuf, 0); } - - // Update port data to indicate we have an operation in progress - d->async_op = cmd; - - // Schedule async operation to execute the commit/abort - d->async_data = adata; d->async_pool = G_TPOOL_TXNS; - d->async_job = bdberl_tpool_run(G_TPOOL_TXNS, &do_async_txnop, adata, 0); - - // Watch for events on the output pipe - // TODO: Can we do this just once ?! - driver_select(d->port, (ErlDrvEvent)(size_t)d->pipe_fds[0], DO_READ, 1); + d->async_job = bdberl_tpool_run(G_TPOOL_TXNS, &do_async_txnop, d, 0); // Outbuf is <> RETURN_INT(0, outbuf); @@ -397,11 +408,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, case CMD_PUT: case CMD_GET: { - // If another async op is pending, fail - if (d->async_op != CMD_NONE) - { - RETURN_INT(ERROR_ASYNC_PENDING, outbuf); - } + FAIL_IF_ASYNC_PENDING(d, outbuf); // Inbuf is: << DbRef:32, Rest/binary>> int dbref = UNPACK_INT(inbuf, 0); @@ -411,19 +418,20 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, // the underlying handle disappearing since we have a reference. if (has_dbref(d, dbref)) { - // Allocate operation structure - void* thread_data = zalloc(sizeof(AsyncData) + inbuf_sz); - AsyncData* adata = (AsyncData*)thread_data; - adata->port = d; - adata->db = G_DATABASES[dbref].db; - adata->payload = thread_data + sizeof(AsyncData); + // If the working buffer is large enough, copy the data to put/get into it. Otherwise, realloc + // until it is large enough + if (d->work_buffer_sz < inbuf_sz) + { + d->work_buffer = driver_realloc(d->work_buffer, inbuf_sz); + d->work_buffer_sz = inbuf_sz; + } // Copy the payload into place - memcpy(adata->payload, inbuf, inbuf_sz); + memcpy(d->work_buffer, inbuf, inbuf_sz); + d->work_buffer_offset = inbuf_sz; // Mark the port as busy and then schedule the appropriate async operation d->async_op = cmd; - TPoolJobFunc fn; if (cmd == CMD_PUT) { @@ -433,14 +441,8 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, { fn = &do_async_get; } - - d->async_data = adata; d->async_pool = G_TPOOL_GENERAL; - d->async_job = bdberl_tpool_run(G_TPOOL_GENERAL, fn, adata, 0); - - // Watch for events on the output pipe - // TODO: Can we do this just once ?! - driver_select(d->port, (ErlDrvEvent)(size_t)d->pipe_fds[0], DO_READ, 1); + d->async_job = bdberl_tpool_run(G_TPOOL_GENERAL, fn, d, 0); // Let caller know that the operation is in progress // Outbuf is: <<0:32>> @@ -471,100 +473,6 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, return 0; } -static void bdberl_drv_ready_input(ErlDrvData handle, ErlDrvEvent event) -{ - PortData* d = (PortData*)handle; -// printf("%p: ready_input; cmd = %d; rc = %d\n", d->port, d->async_op, -// ((AsyncData*)d->async_data)->rc); - - // Empty out the queue - int readbuf; - while (read((size_t)event, &readbuf, sizeof(readbuf)) > 0) { ; } - driver_select(d->port, event, DO_READ, 0); - - // The async op has completed running on the thread pool -- process the results - switch (d->async_op) - { - case CMD_PUT: - case CMD_TXN_COMMIT: - case CMD_TXN_ABORT: - { - AsyncData* adata = (AsyncData*)d->async_data; - - // Extract return code == if it's zero, send back "ok" to driver process; otherwise - // send a {error, Reason} tuple - if (adata->rc == 0) - { - ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("ok") }; - driver_output_term(d->port, response, sizeof(response) / sizeof(response[0])); - } - else - { - ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"), - ERL_DRV_INT, adata->rc, - ERL_DRV_TUPLE, 2}; - driver_output_term(d->port, response, sizeof(response) / sizeof(response[0])); - } - - // If this was a commit/abort, or a deadlock occurred while in a transaction, - // clear out the handle -- it's already invalid - if (d->async_op == CMD_TXN_COMMIT || d->async_op == CMD_TXN_ABORT || - (d->txn && (adata->rc == DB_LOCK_NOTGRANTED || adata->rc == DB_LOCK_DEADLOCK))) - { - d->txn = 0; - } - - // Cleanup async data and mark the port as not busy - driver_free(d->async_data); - d->async_data = 0; - d->async_op = CMD_NONE; - d->async_job = 0; - d->async_pool = 0; - break; - } - case CMD_GET: - { - // Extract return code == if it's zero, send back {ok, Payload} or not_found to driver - // process; otherwise send a {error, Reason} tuple - AsyncData* adata = (AsyncData*)d->async_data; - if (adata->rc == 0) - { - ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("ok"), - ERL_DRV_BUF2BINARY, (ErlDrvTermData)adata->payload, (ErlDrvUInt)adata->payload_sz, - ERL_DRV_TUPLE, 2}; - driver_output_term(d->port, response, sizeof(response) / sizeof(response[0])); - } - else if (adata->rc == DB_NOTFOUND) - { - ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("not_found") }; - driver_output_term(d->port, response, sizeof(response) / sizeof(response[0])); - } - else - { - ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"), - ERL_DRV_INT, adata->rc, - ERL_DRV_TUPLE, 2}; - driver_output_term(d->port, response, sizeof(response) / sizeof(response[0])); - } - - // If a deadlock occurred while in a transaction, clear out the handle -- it's - // already invalid - if (d->txn && (adata->rc == DB_LOCK_DEADLOCK || adata->rc == DB_LOCK_NOTGRANTED)) - { - d->txn = 0; - } - - // Cleanup async data and mark the port as not busy - driver_free(adata->payload); - driver_free(d->async_data); - d->async_data = 0; - d->async_op = CMD_NONE; - d->async_job = 0; - d->async_pool = 0; - break; - } - } -} static int open_database(const char* name, DBTYPE type, unsigned int flags, PortData* data, int* dbref_res) { @@ -773,8 +681,12 @@ static void tune_system(int target, void* values, BinHelper* bh) static void do_async_put(void* arg) { // Payload is: <> - AsyncData* adata = (AsyncData*)arg; - unsigned flags = UNPACK_INT(adata->payload, 4); + PortData* d = (PortData*)arg; + + // Get the database reference and flags from the payload + int dbref = UNPACK_INT(d->work_buffer, 0); + DB* db = G_DATABASES[dbref].db; + unsigned int flags = UNPACK_INT(d->work_buffer, 4); // Setup DBTs DBT key; @@ -783,31 +695,55 @@ static void do_async_put(void* arg) memset(&value, '\0', sizeof(DBT)); // Parse payload into DBTs - key.size = UNPACK_INT(adata->payload, 8); - key.data = UNPACK_BLOB(adata->payload, 12); - value.size = UNPACK_INT(adata->payload, 12 + key.size); - value.data = UNPACK_BLOB(adata->payload, 12 + key.size + 4); + key.size = UNPACK_INT(d->work_buffer, 8); + key.data = UNPACK_BLOB(d->work_buffer, 12); + value.size = UNPACK_INT(d->work_buffer, 12 + key.size); + value.data = UNPACK_BLOB(d->work_buffer, 12 + key.size + 4); - // Execute the actual put -- we'll process the result back in the driver_async_ready function - // All databases are opened with AUTO_COMMIT, so if msg->port->txn is NULL, the put will still - // be atomic - adata->rc = adata->db->put(adata->db, adata->port->txn, &key, &value, flags); + // Execute the actual put. All databases are opened with AUTO_COMMIT, so if msg->port->txn + // is NULL, the put will still be atomic + int rc = db->put(db, d->txn, &key, &value, flags); // If any error occurs while we have a txn action, abort it - if (adata->port->txn && adata->rc) + if (d->txn && rc) { - adata->port->txn->abort(adata->port->txn); + d->txn->abort(d->txn); + d->txn = 0; } - // Enqueue a signal for the port to know that the operation is complete - signal_port(adata->port); + // Save the port and pid references -- we need copies independent from the PortData + // structure. Once we release the port_lock after clearing the cmd, it's possible that + // the port could go away without waiting on us to finish. This is acceptable, but we need + // to be certain that there is no overlap of data between the two threads. driver_send_term + // is safe to use from a thread, even if the port you're sending from has already expired. + ErlDrvPort port = d->port; + ErlDrvTermData port_owner = d->port_owner; + + // Release the port for another operation + d->async_pool = 0; + d->async_job = 0; + d->work_buffer_offset = 0; + erl_drv_mutex_lock(d->port_lock); + d->async_op = CMD_NONE; + erl_drv_mutex_unlock(d->port_lock); + + // TODO: May need to tag the messages a bit more explicitly so that if another async + // job runs to completion before the message gets delivered we don't mis-interpret this + // response code. + send_ok_or_error(port, port_owner, rc); } static void do_async_get(void* arg) { // Payload is: << DbRef:32, Flags:32, KeyLen:32, Key:KeyLen >> - AsyncData* adata = (AsyncData*)arg; - unsigned flags = UNPACK_INT(adata->payload, 4); + PortData* d = (PortData*)arg; + + // Get the database object, using the provided ref + int dbref = UNPACK_INT(d->work_buffer, 0); + DB* db = G_DATABASES[dbref].db; + + // Extract operation flags + unsigned flags = UNPACK_INT(d->work_buffer, 4); // Setup DBTs DBT key; @@ -816,54 +752,109 @@ static void do_async_get(void* arg) memset(&value, '\0', sizeof(DBT)); // Parse payload into DBT - key.size = UNPACK_INT(adata->payload, 8); - key.data = UNPACK_BLOB(adata->payload, 12); + key.size = UNPACK_INT(d->work_buffer, 8); + key.data = UNPACK_BLOB(d->work_buffer, 12); - // Allocate memory to hold the value -- hard code initial size to 4k - // TODO: Make this smarter! - value.data = zalloc(4096); + // Allocate a buffer for the output value + value.data = driver_alloc(4096); value.ulen = 4096; value.flags = DB_DBT_USERMEM; - int rc = adata->db->get(adata->db, adata->port->txn, &key, &value, flags); + int rc = db->get(db, d->txn, &key, &value, flags); while (rc == DB_BUFFER_SMALL) { // Grow our value buffer and try again value.data = driver_realloc(value.data, value.size); value.ulen = value.size; - rc = adata->db->get(adata->db, adata->port->txn, &key, &value, flags); + rc = db->get(db, d->txn, &key, &value, flags); } - adata->payload = value.data; - adata->payload_sz = value.size; // Not ulen -- we want the actual data size - adata->rc = rc; - - // If any error occurs while we have a txn action, abort it - if (adata->rc != DB_NOTFOUND && adata->port->txn && adata->rc) + // Cleanup transaction as necessary + if (rc && rc != DB_NOTFOUND && d->txn) { - adata->port->txn->abort(adata->port->txn); + d->txn->abort(d->txn); + d->txn = 0; } - signal_port(adata->port); + // Save the port and pid references -- we need copies independent from the PortData + // structure. Once we release the port_lock after clearing the cmd, it's possible that + // the port could go away without waiting on us to finish. This is acceptable, but we need + // to be certain that there is no overlap of data between the two threads. driver_send_term + // is safe to use from a thread, even if the port you're sending from has already expired. + ErlDrvPort port = d->port; + ErlDrvTermData port_owner = d->port_owner; + + // Release the port for another operation + d->async_pool = 0; + d->async_job = 0; + d->work_buffer_offset = 0; + erl_drv_mutex_lock(d->port_lock); + d->async_op = CMD_NONE; + erl_drv_mutex_unlock(d->port_lock); + + // Notify port of result + if (rc == 0) + { + ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("ok"), + ERL_DRV_BUF2BINARY, (ErlDrvTermData)value.data, (ErlDrvUInt)value.size, + ERL_DRV_TUPLE, 2}; + driver_send_term(port, port_owner, response, sizeof(response) / sizeof(response[0])); + } + else if (rc == DB_NOTFOUND) + { + ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("not_found") }; + driver_send_term(port, port_owner, response, sizeof(response) / sizeof(response[0])); + } + else + { + ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"), + ERL_DRV_INT, rc, + ERL_DRV_TUPLE, 2}; + driver_send_term(port, port_owner, response, sizeof(response) / sizeof(response[0])); + } + + // Finally, clean up value buffer (driver_send_term made a copy) + driver_free(value.data); } static void do_async_txnop(void* arg) { - AsyncData* adata = (AsyncData*)arg; -// printf("%p: do_async_txnop\n", adata->port->port); + PortData* d = (PortData*)arg; // Execute the actual commit/abort - if (adata->port->async_op == CMD_TXN_COMMIT) + int rc = 0; + if (d->async_op == CMD_TXN_COMMIT) { - unsigned flags = (unsigned)(size_t)adata->payload; - adata->rc = adata->port->txn->commit(adata->port->txn, flags); + rc = d->txn->commit(d->txn, d->async_flags); } else { - adata->rc = adata->port->txn->abort(adata->port->txn); + rc = d->txn->abort(d->txn); } - signal_port(adata->port); + // The transaction is now invalid, regardless of the outcome. + d->txn = 0; + + // Save the port and pid references -- we need copies independent from the PortData + // structure. Once we release the port_lock after clearing the cmd, it's possible that + // the port could go away without waiting on us to finish. This is acceptable, but we need + // to be certain that there is no overlap of data between the two threads. driver_send_term + // is safe to use from a thread, even if the port you're sending from has already expired. + ErlDrvPort port = d->port; + ErlDrvTermData port_owner = d->port_owner; + + // Release the port for another operation + d->async_pool = 0; + d->async_job = 0; + d->work_buffer_offset = 0; + erl_drv_mutex_lock(d->port_lock); + d->async_op = CMD_NONE; + erl_drv_mutex_unlock(d->port_lock); + + // TODO: May need to tag the messages a bit more explicitly so that if another async + // job runs to completion before the message gets delivered we don't mis-interpret this + // response code. + send_ok_or_error(port, port_owner, rc); } @@ -1047,11 +1038,6 @@ static int alloc_dbref() return -1; } -static void signal_port(PortData* d) -{ - int flag = 1; - write(d->pipe_fds[1], &flag, sizeof(flag)); -} /** * Thread function that runs the deadlock checker periodically @@ -1105,3 +1091,19 @@ static void* trickle_write(void* arg) printf("Trickle writer exiting.\n"); return 0; } + +static void send_ok_or_error(ErlDrvPort port, ErlDrvTermData pid, int rc) +{ + if (rc == 0) + { + ErlDrvTermData response[] = {ERL_DRV_ATOM, driver_mk_atom("ok")}; + driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); + } + else + { + ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"), + ERL_DRV_INT, rc, + ERL_DRV_TUPLE, 2}; + driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); + } +} diff --git a/c_src/bdberl_drv.h b/c_src/bdberl_drv.h index 66ce577..952fe23 100644 --- a/c_src/bdberl_drv.h +++ b/c_src/bdberl_drv.h @@ -24,8 +24,6 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, char* inbuf, int inbuf_sz, char** outbuf, int outbuf_sz); -static void bdberl_drv_ready_input(ErlDrvData handle, ErlDrvEvent ev); - /** * Command codes @@ -85,7 +83,7 @@ ErlDrvEntry bdberl_drv_entry = bdberl_drv_start, /* L_PTR start, called when port is opened */ bdberl_drv_stop, /* F_PTR stop, called when port is closed */ NULL, /* F_PTR output, called when erlang has sent */ - bdberl_drv_ready_input, /* F_PTR ready_input, called when input descriptor ready */ + NULL, /* F_PTR ready_input, called when input descriptor ready */ NULL, /* F_PTR ready_output, called when output descriptor ready */ "bdberl_drv", /* driver_name */ bdberl_drv_finish, /* F_PTR finish, called when unloaded */ @@ -134,33 +132,30 @@ typedef struct { ErlDrvPort port; + ErlDrvMutex* port_lock; /* Mutex for this port (to permit async jobs to safely update this + * structure) */ + + ErlDrvTermData port_owner; /* Pid of the port owner */ + DbRefList* dbrefs; /* List of databases that this port has opened */ DB_TXN* txn; /* Transaction handle for this port; each port may only have 1 txn * active */ - int pipe_fds[2]; /* Array of pipe fds for signaling purposes */ - int async_op; /* Value indicating what async op is pending */ - void* async_data; /* Opaque point to data used during async op */ + int async_flags; /* Flags for the async op command */ TPoolJob* async_job; /* Active job on the thread pool */ TPool* async_pool; /* Pool the async job is running on */ + void* work_buffer; + + unsigned int work_buffer_sz; + + unsigned int work_buffer_offset; + } PortData; - -typedef struct -{ - PortData* port; /* Port that originated this request -- READ ONLY! */ - int rc; /* Return code from operation */ - DB* db; /* Database to use for data storage/retrieval */ - void* payload; /* Packed key/value data */ - int payload_sz; /* Size of payload */ -} AsyncData; - - - #endif