Overhaul driver to avoid use of pipes and simplify memory management

This commit is contained in:
Dave Smith 2008-12-15 09:23:19 -07:00
parent f20e46d756
commit f56351e1dc
2 changed files with 212 additions and 215 deletions

View file

@ -39,10 +39,11 @@ static int alloc_dbref();
static void* zalloc(unsigned int size);
static void signal_port(PortData* d);
static void* deadlock_check(void* arg);
static void* trickle_write(void* arg);
static void send_ok_or_error(ErlDrvPort port, ErlDrvTermData pid, int rc);
/**
* Global instance of DB_ENV; only a single one exists per O/S process.
*/
@ -123,6 +124,18 @@ static TPool* G_TPOOL_TXNS;
bin_helper_push_int32(&bh, val); \
RETURN_BH(bh, outbuf); }
#define FAIL_IF_ASYNC_PENDING(d, outbuf) { \
erl_drv_mutex_lock(d->port_lock); \
if (d->async_op != CMD_NONE) { \
erl_drv_mutex_unlock(d->port_lock); \
RETURN_INT(ERROR_ASYNC_PENDING, outbuf); \
} else { \
erl_drv_mutex_unlock(d->port_lock); \
}}
DRIVER_INIT(bdberl_drv)
{
printf("DRIVER INIT\n");
@ -210,12 +223,15 @@ static ErlDrvData bdberl_drv_start(ErlDrvPort port, char* buffer)
// Save handle to the port
d->port = port;
// Setup a pair of pipes for notification purposes
assert(pipe(d->pipe_fds) == 0);
// Allocate a mutex for the port
d->port_lock = erl_drv_mutex_create("bdberl_port_lock");
// Make sure both pipes are configured non-blocking
assert(fcntl(d->pipe_fds[0], F_SETFL, O_NONBLOCK) == 0);
assert(fcntl(d->pipe_fds[1], F_SETFL, O_NONBLOCK) == 0);
// Save the caller/owner PID
d->port_owner = driver_connected(port);
// Allocate an initial buffer for work purposes
d->work_buffer = driver_alloc(4096);
d->work_buffer_sz = 4096;
// Make sure port is running in binary mode
set_port_control_flags(port, PORT_CONTROL_FLAG_BINARY);
@ -226,20 +242,31 @@ static ErlDrvData bdberl_drv_start(ErlDrvPort port, char* buffer)
static void bdberl_drv_stop(ErlDrvData handle)
{
PortData* d = (PortData*)handle;
// printf("%p: stop\n", d->port);
// Grab the port lock, in case we have an async job running
erl_drv_mutex_lock(d->port_lock);
// If there is an async job pending, we need to cancel it. The cancel operation will
// block until the job has either been removed or has run
if (d->async_job)
{
// Drop the lock prior to starting the wait for the async process
erl_drv_mutex_unlock(d->port_lock);
printf("Cancelling async job for port: %p\n", d->port);
bdberl_tpool_cancel(d->async_pool, d->async_job);
driver_select(d->port, (ErlDrvEvent)(size_t)d->pipe_fds[0], DO_READ, 0);
printf("Canceled async job for port: %p\n", d->port);
}
else
{
// If there was no async job, drop the lock -- not needed
erl_drv_mutex_unlock(d->port_lock);
}
// Cleanup the port lock
erl_drv_mutex_destroy(d->port_lock);
// If a txn is currently active, terminate it. We _must_ do it synchronously (unfortunately) as
// there doesn't seem to be a to do an async op while stopping the driver.
// If a txn is currently active, terminate it.
if (d->txn)
{
d->txn->abort(d->txn);
@ -251,12 +278,10 @@ static void bdberl_drv_stop(ErlDrvData handle)
close_database(d->dbrefs->dbref, 0, d);
}
close(d->pipe_fds[0]);
close(d->pipe_fds[1]);
printf("Stopped port: %p\n", d->port);
// Release the port instance data
driver_free(d->work_buffer);
driver_free(handle);
}
@ -322,7 +347,13 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
}
case CMD_CLOSE_DB:
{
// TODO: If data is inflight, fail. Abort any open txns.
FAIL_IF_ASYNC_PENDING(d, outbuf);
// Fail if a txn is open
if (d->txn != 0)
{
RETURN_INT(ERROR_TXN_OPEN, outbuf);
}
// Take the provided dbref and attempt to close it
// Inbuf is: <<DbRef:32, Flags:32/unsigned>>
@ -336,11 +367,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
}
case CMD_TXN_BEGIN:
{
// If an async operation is pending, fail
if (d->async_op != CMD_NONE)
{
RETURN_INT(ERROR_ASYNC_PENDING, outbuf);
}
FAIL_IF_ASYNC_PENDING(d, outbuf);
// If we already have a txn open, fail
if (d->txn != 0)
@ -358,11 +385,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
case CMD_TXN_COMMIT:
case CMD_TXN_ABORT:
{
// If an async operation is pending, fail
if (d->async_op != CMD_NONE)
{
RETURN_INT(ERROR_ASYNC_PENDING, outbuf);
}
FAIL_IF_ASYNC_PENDING(d, outbuf);
// If we don't already have a txn open, fail
if (d->txn == 0)
@ -370,26 +393,14 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
RETURN_INT(ERROR_NO_TXN, outbuf);
}
// Allocate operation structure
AsyncData* adata = zalloc(sizeof(AsyncData));
adata->port = d;
// Setup async command and schedule it on the txns threadpool
d->async_op = cmd;
if (cmd == CMD_TXN_COMMIT)
{
adata->payload = (void*)(size_t)UNPACK_INT(inbuf, 0);
d->async_flags = UNPACK_INT(inbuf, 0);
}
// Update port data to indicate we have an operation in progress
d->async_op = cmd;
// Schedule async operation to execute the commit/abort
d->async_data = adata;
d->async_pool = G_TPOOL_TXNS;
d->async_job = bdberl_tpool_run(G_TPOOL_TXNS, &do_async_txnop, adata, 0);
// Watch for events on the output pipe
// TODO: Can we do this just once ?!
driver_select(d->port, (ErlDrvEvent)(size_t)d->pipe_fds[0], DO_READ, 1);
d->async_job = bdberl_tpool_run(G_TPOOL_TXNS, &do_async_txnop, d, 0);
// Outbuf is <<Rc:32>>
RETURN_INT(0, outbuf);
@ -397,11 +408,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
case CMD_PUT:
case CMD_GET:
{
// If another async op is pending, fail
if (d->async_op != CMD_NONE)
{
RETURN_INT(ERROR_ASYNC_PENDING, outbuf);
}
FAIL_IF_ASYNC_PENDING(d, outbuf);
// Inbuf is: << DbRef:32, Rest/binary>>
int dbref = UNPACK_INT(inbuf, 0);
@ -411,19 +418,20 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
// the underlying handle disappearing since we have a reference.
if (has_dbref(d, dbref))
{
// Allocate operation structure
void* thread_data = zalloc(sizeof(AsyncData) + inbuf_sz);
AsyncData* adata = (AsyncData*)thread_data;
adata->port = d;
adata->db = G_DATABASES[dbref].db;
adata->payload = thread_data + sizeof(AsyncData);
// If the working buffer is large enough, copy the data to put/get into it. Otherwise, realloc
// until it is large enough
if (d->work_buffer_sz < inbuf_sz)
{
d->work_buffer = driver_realloc(d->work_buffer, inbuf_sz);
d->work_buffer_sz = inbuf_sz;
}
// Copy the payload into place
memcpy(adata->payload, inbuf, inbuf_sz);
memcpy(d->work_buffer, inbuf, inbuf_sz);
d->work_buffer_offset = inbuf_sz;
// Mark the port as busy and then schedule the appropriate async operation
d->async_op = cmd;
TPoolJobFunc fn;
if (cmd == CMD_PUT)
{
@ -433,14 +441,8 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
{
fn = &do_async_get;
}
d->async_data = adata;
d->async_pool = G_TPOOL_GENERAL;
d->async_job = bdberl_tpool_run(G_TPOOL_GENERAL, fn, adata, 0);
// Watch for events on the output pipe
// TODO: Can we do this just once ?!
driver_select(d->port, (ErlDrvEvent)(size_t)d->pipe_fds[0], DO_READ, 1);
d->async_job = bdberl_tpool_run(G_TPOOL_GENERAL, fn, d, 0);
// Let caller know that the operation is in progress
// Outbuf is: <<0:32>>
@ -471,100 +473,6 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
return 0;
}
static void bdberl_drv_ready_input(ErlDrvData handle, ErlDrvEvent event)
{
PortData* d = (PortData*)handle;
// printf("%p: ready_input; cmd = %d; rc = %d\n", d->port, d->async_op,
// ((AsyncData*)d->async_data)->rc);
// Empty out the queue
int readbuf;
while (read((size_t)event, &readbuf, sizeof(readbuf)) > 0) { ; }
driver_select(d->port, event, DO_READ, 0);
// The async op has completed running on the thread pool -- process the results
switch (d->async_op)
{
case CMD_PUT:
case CMD_TXN_COMMIT:
case CMD_TXN_ABORT:
{
AsyncData* adata = (AsyncData*)d->async_data;
// Extract return code == if it's zero, send back "ok" to driver process; otherwise
// send a {error, Reason} tuple
if (adata->rc == 0)
{
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("ok") };
driver_output_term(d->port, response, sizeof(response) / sizeof(response[0]));
}
else
{
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"),
ERL_DRV_INT, adata->rc,
ERL_DRV_TUPLE, 2};
driver_output_term(d->port, response, sizeof(response) / sizeof(response[0]));
}
// If this was a commit/abort, or a deadlock occurred while in a transaction,
// clear out the handle -- it's already invalid
if (d->async_op == CMD_TXN_COMMIT || d->async_op == CMD_TXN_ABORT ||
(d->txn && (adata->rc == DB_LOCK_NOTGRANTED || adata->rc == DB_LOCK_DEADLOCK)))
{
d->txn = 0;
}
// Cleanup async data and mark the port as not busy
driver_free(d->async_data);
d->async_data = 0;
d->async_op = CMD_NONE;
d->async_job = 0;
d->async_pool = 0;
break;
}
case CMD_GET:
{
// Extract return code == if it's zero, send back {ok, Payload} or not_found to driver
// process; otherwise send a {error, Reason} tuple
AsyncData* adata = (AsyncData*)d->async_data;
if (adata->rc == 0)
{
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("ok"),
ERL_DRV_BUF2BINARY, (ErlDrvTermData)adata->payload, (ErlDrvUInt)adata->payload_sz,
ERL_DRV_TUPLE, 2};
driver_output_term(d->port, response, sizeof(response) / sizeof(response[0]));
}
else if (adata->rc == DB_NOTFOUND)
{
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("not_found") };
driver_output_term(d->port, response, sizeof(response) / sizeof(response[0]));
}
else
{
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"),
ERL_DRV_INT, adata->rc,
ERL_DRV_TUPLE, 2};
driver_output_term(d->port, response, sizeof(response) / sizeof(response[0]));
}
// If a deadlock occurred while in a transaction, clear out the handle -- it's
// already invalid
if (d->txn && (adata->rc == DB_LOCK_DEADLOCK || adata->rc == DB_LOCK_NOTGRANTED))
{
d->txn = 0;
}
// Cleanup async data and mark the port as not busy
driver_free(adata->payload);
driver_free(d->async_data);
d->async_data = 0;
d->async_op = CMD_NONE;
d->async_job = 0;
d->async_pool = 0;
break;
}
}
}
static int open_database(const char* name, DBTYPE type, unsigned int flags, PortData* data, int* dbref_res)
{
@ -773,8 +681,12 @@ static void tune_system(int target, void* values, BinHelper* bh)
static void do_async_put(void* arg)
{
// Payload is: <<DbRef:32, Flags:32, KeyLen:32, Key:KeyLen, ValLen:32, Val:ValLen>>
AsyncData* adata = (AsyncData*)arg;
unsigned flags = UNPACK_INT(adata->payload, 4);
PortData* d = (PortData*)arg;
// Get the database reference and flags from the payload
int dbref = UNPACK_INT(d->work_buffer, 0);
DB* db = G_DATABASES[dbref].db;
unsigned int flags = UNPACK_INT(d->work_buffer, 4);
// Setup DBTs
DBT key;
@ -783,31 +695,55 @@ static void do_async_put(void* arg)
memset(&value, '\0', sizeof(DBT));
// Parse payload into DBTs
key.size = UNPACK_INT(adata->payload, 8);
key.data = UNPACK_BLOB(adata->payload, 12);
value.size = UNPACK_INT(adata->payload, 12 + key.size);
value.data = UNPACK_BLOB(adata->payload, 12 + key.size + 4);
key.size = UNPACK_INT(d->work_buffer, 8);
key.data = UNPACK_BLOB(d->work_buffer, 12);
value.size = UNPACK_INT(d->work_buffer, 12 + key.size);
value.data = UNPACK_BLOB(d->work_buffer, 12 + key.size + 4);
// Execute the actual put -- we'll process the result back in the driver_async_ready function
// All databases are opened with AUTO_COMMIT, so if msg->port->txn is NULL, the put will still
// be atomic
adata->rc = adata->db->put(adata->db, adata->port->txn, &key, &value, flags);
// Execute the actual put. All databases are opened with AUTO_COMMIT, so if msg->port->txn
// is NULL, the put will still be atomic
int rc = db->put(db, d->txn, &key, &value, flags);
// If any error occurs while we have a txn action, abort it
if (adata->port->txn && adata->rc)
if (d->txn && rc)
{
adata->port->txn->abort(adata->port->txn);
d->txn->abort(d->txn);
d->txn = 0;
}
// Enqueue a signal for the port to know that the operation is complete
signal_port(adata->port);
// Save the port and pid references -- we need copies independent from the PortData
// structure. Once we release the port_lock after clearing the cmd, it's possible that
// the port could go away without waiting on us to finish. This is acceptable, but we need
// to be certain that there is no overlap of data between the two threads. driver_send_term
// is safe to use from a thread, even if the port you're sending from has already expired.
ErlDrvPort port = d->port;
ErlDrvTermData port_owner = d->port_owner;
// Release the port for another operation
d->async_pool = 0;
d->async_job = 0;
d->work_buffer_offset = 0;
erl_drv_mutex_lock(d->port_lock);
d->async_op = CMD_NONE;
erl_drv_mutex_unlock(d->port_lock);
// TODO: May need to tag the messages a bit more explicitly so that if another async
// job runs to completion before the message gets delivered we don't mis-interpret this
// response code.
send_ok_or_error(port, port_owner, rc);
}
static void do_async_get(void* arg)
{
// Payload is: << DbRef:32, Flags:32, KeyLen:32, Key:KeyLen >>
AsyncData* adata = (AsyncData*)arg;
unsigned flags = UNPACK_INT(adata->payload, 4);
PortData* d = (PortData*)arg;
// Get the database object, using the provided ref
int dbref = UNPACK_INT(d->work_buffer, 0);
DB* db = G_DATABASES[dbref].db;
// Extract operation flags
unsigned flags = UNPACK_INT(d->work_buffer, 4);
// Setup DBTs
DBT key;
@ -816,54 +752,109 @@ static void do_async_get(void* arg)
memset(&value, '\0', sizeof(DBT));
// Parse payload into DBT
key.size = UNPACK_INT(adata->payload, 8);
key.data = UNPACK_BLOB(adata->payload, 12);
key.size = UNPACK_INT(d->work_buffer, 8);
key.data = UNPACK_BLOB(d->work_buffer, 12);
// Allocate memory to hold the value -- hard code initial size to 4k
// TODO: Make this smarter!
value.data = zalloc(4096);
// Allocate a buffer for the output value
value.data = driver_alloc(4096);
value.ulen = 4096;
value.flags = DB_DBT_USERMEM;
int rc = adata->db->get(adata->db, adata->port->txn, &key, &value, flags);
int rc = db->get(db, d->txn, &key, &value, flags);
while (rc == DB_BUFFER_SMALL)
{
// Grow our value buffer and try again
value.data = driver_realloc(value.data, value.size);
value.ulen = value.size;
rc = adata->db->get(adata->db, adata->port->txn, &key, &value, flags);
rc = db->get(db, d->txn, &key, &value, flags);
}
adata->payload = value.data;
adata->payload_sz = value.size; // Not ulen -- we want the actual data size
adata->rc = rc;
// If any error occurs while we have a txn action, abort it
if (adata->rc != DB_NOTFOUND && adata->port->txn && adata->rc)
// Cleanup transaction as necessary
if (rc && rc != DB_NOTFOUND && d->txn)
{
adata->port->txn->abort(adata->port->txn);
d->txn->abort(d->txn);
d->txn = 0;
}
signal_port(adata->port);
// Save the port and pid references -- we need copies independent from the PortData
// structure. Once we release the port_lock after clearing the cmd, it's possible that
// the port could go away without waiting on us to finish. This is acceptable, but we need
// to be certain that there is no overlap of data between the two threads. driver_send_term
// is safe to use from a thread, even if the port you're sending from has already expired.
ErlDrvPort port = d->port;
ErlDrvTermData port_owner = d->port_owner;
// Release the port for another operation
d->async_pool = 0;
d->async_job = 0;
d->work_buffer_offset = 0;
erl_drv_mutex_lock(d->port_lock);
d->async_op = CMD_NONE;
erl_drv_mutex_unlock(d->port_lock);
// Notify port of result
if (rc == 0)
{
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("ok"),
ERL_DRV_BUF2BINARY, (ErlDrvTermData)value.data, (ErlDrvUInt)value.size,
ERL_DRV_TUPLE, 2};
driver_send_term(port, port_owner, response, sizeof(response) / sizeof(response[0]));
}
else if (rc == DB_NOTFOUND)
{
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("not_found") };
driver_send_term(port, port_owner, response, sizeof(response) / sizeof(response[0]));
}
else
{
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"),
ERL_DRV_INT, rc,
ERL_DRV_TUPLE, 2};
driver_send_term(port, port_owner, response, sizeof(response) / sizeof(response[0]));
}
// Finally, clean up value buffer (driver_send_term made a copy)
driver_free(value.data);
}
static void do_async_txnop(void* arg)
{
AsyncData* adata = (AsyncData*)arg;
// printf("%p: do_async_txnop\n", adata->port->port);
PortData* d = (PortData*)arg;
// Execute the actual commit/abort
if (adata->port->async_op == CMD_TXN_COMMIT)
int rc = 0;
if (d->async_op == CMD_TXN_COMMIT)
{
unsigned flags = (unsigned)(size_t)adata->payload;
adata->rc = adata->port->txn->commit(adata->port->txn, flags);
rc = d->txn->commit(d->txn, d->async_flags);
}
else
{
adata->rc = adata->port->txn->abort(adata->port->txn);
rc = d->txn->abort(d->txn);
}
signal_port(adata->port);
// The transaction is now invalid, regardless of the outcome.
d->txn = 0;
// Save the port and pid references -- we need copies independent from the PortData
// structure. Once we release the port_lock after clearing the cmd, it's possible that
// the port could go away without waiting on us to finish. This is acceptable, but we need
// to be certain that there is no overlap of data between the two threads. driver_send_term
// is safe to use from a thread, even if the port you're sending from has already expired.
ErlDrvPort port = d->port;
ErlDrvTermData port_owner = d->port_owner;
// Release the port for another operation
d->async_pool = 0;
d->async_job = 0;
d->work_buffer_offset = 0;
erl_drv_mutex_lock(d->port_lock);
d->async_op = CMD_NONE;
erl_drv_mutex_unlock(d->port_lock);
// TODO: May need to tag the messages a bit more explicitly so that if another async
// job runs to completion before the message gets delivered we don't mis-interpret this
// response code.
send_ok_or_error(port, port_owner, rc);
}
@ -1047,11 +1038,6 @@ static int alloc_dbref()
return -1;
}
static void signal_port(PortData* d)
{
int flag = 1;
write(d->pipe_fds[1], &flag, sizeof(flag));
}
/**
* Thread function that runs the deadlock checker periodically
@ -1105,3 +1091,19 @@ static void* trickle_write(void* arg)
printf("Trickle writer exiting.\n");
return 0;
}
static void send_ok_or_error(ErlDrvPort port, ErlDrvTermData pid, int rc)
{
if (rc == 0)
{
ErlDrvTermData response[] = {ERL_DRV_ATOM, driver_mk_atom("ok")};
driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0]));
}
else
{
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"),
ERL_DRV_INT, rc,
ERL_DRV_TUPLE, 2};
driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0]));
}
}

View file

@ -24,8 +24,6 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
char* inbuf, int inbuf_sz,
char** outbuf, int outbuf_sz);
static void bdberl_drv_ready_input(ErlDrvData handle, ErlDrvEvent ev);
/**
* Command codes
@ -85,7 +83,7 @@ ErlDrvEntry bdberl_drv_entry =
bdberl_drv_start, /* L_PTR start, called when port is opened */
bdberl_drv_stop, /* F_PTR stop, called when port is closed */
NULL, /* F_PTR output, called when erlang has sent */
bdberl_drv_ready_input, /* F_PTR ready_input, called when input descriptor ready */
NULL, /* F_PTR ready_input, called when input descriptor ready */
NULL, /* F_PTR ready_output, called when output descriptor ready */
"bdberl_drv", /* driver_name */
bdberl_drv_finish, /* F_PTR finish, called when unloaded */
@ -134,33 +132,30 @@ typedef struct
{
ErlDrvPort port;
ErlDrvMutex* port_lock; /* Mutex for this port (to permit async jobs to safely update this
* structure) */
ErlDrvTermData port_owner; /* Pid of the port owner */
DbRefList* dbrefs; /* List of databases that this port has opened */
DB_TXN* txn; /* Transaction handle for this port; each port may only have 1 txn
* active */
int pipe_fds[2]; /* Array of pipe fds for signaling purposes */
int async_op; /* Value indicating what async op is pending */
void* async_data; /* Opaque point to data used during async op */
int async_flags; /* Flags for the async op command */
TPoolJob* async_job; /* Active job on the thread pool */
TPool* async_pool; /* Pool the async job is running on */
void* work_buffer;
unsigned int work_buffer_sz;
unsigned int work_buffer_offset;
} PortData;
typedef struct
{
PortData* port; /* Port that originated this request -- READ ONLY! */
int rc; /* Return code from operation */
DB* db; /* Database to use for data storage/retrieval */
void* payload; /* Packed key/value data */
int payload_sz; /* Size of payload */
} AsyncData;
#endif