From 8fc3cc370137954c581cd2a628bfc3589ae0d636 Mon Sep 17 00:00:00 2001 From: Dave Smith Date: Wed, 10 Dec 2008 18:17:44 -0700 Subject: [PATCH] Interstitial commit --- c_src/bdberl_drv.c | 184 +++++++++++++++++++++++++++++++------------ c_src/bdberl_drv.h | 28 ++++--- c_src/bdberl_tpool.c | 15 +++- c_src/bdberl_tpool.h | 8 +- src/bdberl_port.erl | 69 ++++++++++------ test/port_SUITE.erl | 2 +- 6 files changed, 214 insertions(+), 92 deletions(-) diff --git a/c_src/bdberl_drv.c b/c_src/bdberl_drv.c index cc18983..18e89b6 100644 --- a/c_src/bdberl_drv.c +++ b/c_src/bdberl_drv.c @@ -9,6 +9,8 @@ #include #include #include +#include +#include #include "hive_hash.h" #include "bdberl_drv.h" @@ -21,9 +23,7 @@ static int open_database(const char* name, DBTYPE type, PortData* data, int* err static int close_database(int dbref, PortData* data); static void do_async_put(void* arg); -static void do_async_put_free(void* arg); static void do_async_get(void* arg); -static void do_async_get_free(void* arg); static void do_async_txnop(void* arg); static int add_dbref(PortData* data, int dbref); @@ -37,6 +37,8 @@ static int alloc_dbref(); static void* zalloc(unsigned int size); +static void signal_port(PortData* d); + /** * Global instance of DB_ENV; only a single one exists per O/S process. */ @@ -69,13 +71,19 @@ static ErlDrvRWLock* G_DATABASES_RWLOCK; static hive_hash* G_DATABASES_NAMES; +/** + * + */ +static TPool* G_TPOOL_GENERAL; +static TPool* G_TPOOL_TXNS; + /** * Helpful macros */ #define READ_LOCK(L) erl_drv_rwlock_rlock(L) #define READ_UNLOCK(L) erl_drv_rwlock_runlock(L) -#define PROMOTE_READ_LOCK(L) { erl_drv_rwlock_runlock(L); erl_drv_rwlock_rwlock(L); } +#define PROMOTE_READ_LOCK(L) erl_drv_rwlock_runlock(L); erl_drv_rwlock_rwlock(L) #define WRITE_LOCK(L) erl_drv_rwlock_rwlock(L) #define WRITE_UNLOCK(L) erl_drv_rwlock_rwunlock(L) @@ -120,13 +128,25 @@ DRIVER_INIT(bdberl_drv) } // Make sure we can distiguish between lock timeouts and deadlocks - G_DB_ENV->set_flags(G_DB_ENV, DB_TIME_NOTGRANTED, 1); + int rc = G_DB_ENV->set_flags(G_DB_ENV, DB_TIME_NOTGRANTED, 1); + printf("TIME_NOT_GRANTED rc: %d\n", rc); + rc = G_DB_ENV->set_lk_detect(G_DB_ENV, 1); + printf("LOCK DETECT rc: %d\n", rc); + rc = G_DB_ENV->set_timeout(G_DB_ENV, 50000000, DB_SET_TXN_TIMEOUT); + printf("DB_SET_TXN_TIMEOUT rc: %d\n", rc); + rc = G_DB_ENV->set_timeout(G_DB_ENV, 50000000, DB_SET_LOCK_TIMEOUT); + printf("DB_SET_LOCK_TIMEOUT rc: %d\n", rc); // BDB is setup -- allocate structures for tracking databases G_DATABASES = (Database*) driver_alloc(sizeof(Database) * G_DATABASES_SIZE); memset(G_DATABASES, '\0', sizeof(Database) * G_DATABASES_SIZE); G_DATABASES_RWLOCK = erl_drv_rwlock_create("bdberl_drv: G_DATABASES_RWLOCK"); G_DATABASES_NAMES = hive_hash_new(G_DATABASES_SIZE); + + // Startup our thread pools + // TODO: Make configurable/adjustable + G_TPOOL_GENERAL = bdberl_tpool_start(5); + G_TPOOL_TXNS = bdberl_tpool_start(5); } else { @@ -155,6 +175,13 @@ static ErlDrvData bdberl_drv_start(ErlDrvPort port, char* buffer) // Save handle to the port d->port = port; + // Setup a pair of pipes for notification purposes + assert(pipe(d->pipe_fds) == 0); + + // Make sure both pipes are configured non-blocking + assert(fcntl(d->pipe_fds[0], F_SETFL, O_NONBLOCK) == 0); + assert(fcntl(d->pipe_fds[1], F_SETFL, O_NONBLOCK) == 0); + // Make sure port is running in binary mode set_port_control_flags(port, PORT_CONTROL_FLAG_BINARY); @@ -164,9 +191,17 @@ static ErlDrvData bdberl_drv_start(ErlDrvPort port, char* buffer) static void bdberl_drv_stop(ErlDrvData handle) { PortData* d = (PortData*)handle; + printf("%p: stop\n", d->port); - // TODO: Investigate if we need to worry about queued up async operations. It seems like the VM - // SHOULD ensure the port doesn't disappear while something is in the queue... + // If there is an async job pending, we need to cancel it. The cancel operation will + // block until the job has either been removed or has run + if (d->async_job) + { + printf("Cancelling async job for port: %p\n", d->port); + bdberl_tpool_cancel(d->async_pool, d->async_job); + driver_select(d->port, (ErlDrvEvent)d->pipe_fds[0], DO_READ, 0); + printf("Canceled async job for port: %p\n", d->port); + } // If a txn is currently active, terminate it. We _must_ do it synchronously (unfortunately) as // there doesn't seem to be a to do an async op while stopping the driver. @@ -180,6 +215,11 @@ static void bdberl_drv_stop(ErlDrvData handle) { close_database(d->dbrefs->dbref, d); } + + + close(d->pipe_fds[0]); + close(d->pipe_fds[1]); + printf("Stopped port: %p\n", d->port); // Release the port instance data driver_free(handle); @@ -187,12 +227,19 @@ static void bdberl_drv_stop(ErlDrvData handle) static void bdberl_drv_finish() { - // Driver is unloading -- cleanup and shut down the BDB environment. Note that we assume + printf("finish\n"); + // Stop the thread pools + bdberl_tpool_stop(G_TPOOL_GENERAL); + bdberl_tpool_stop(G_TPOOL_TXNS); + + // Cleanup and shut down the BDB environment. Note that we assume // all ports have been released and thuse all databases/txns/etc are also gone. G_DB_ENV->close(G_DB_ENV, 0); driver_free(G_DATABASES); erl_drv_rwlock_destroy(G_DATABASES_RWLOCK); hive_hash_destroy(G_DATABASES_NAMES); + + printf("finished\n"); } static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, @@ -243,7 +290,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, } case CMD_TXN_BEGIN: { - // If an async operaiton is pending, fail + // If an async operation is pending, fail if (d->async_op != CMD_NONE) { RETURN_INT(ERROR_ASYNC_PENDING, outbuf); @@ -252,11 +299,13 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, // If we already have a txn open, fail if (d->txn != 0) { + printf("ERROR_TXN_OPEN\n"); RETURN_INT(ERROR_TXN_OPEN, outbuf); } // Outbuf is <> int rc = G_DB_ENV->txn_begin(G_DB_ENV, 0, &(d->txn), 0); + printf("TXN BEGIN: %d\n", rc); RETURN_INT(rc, outbuf); } case CMD_TXN_COMMIT: @@ -282,8 +331,13 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, d->async_op = cmd; // Schedule async operation to execute the commit/abort - unsigned int key = (unsigned int)d->port; - driver_async(d->port, &key, &do_async_txnop, adata, 0); + d->async_data = adata; + d->async_pool = G_TPOOL_TXNS; + d->async_job = bdberl_tpool_run(G_TPOOL_TXNS, &do_async_txnop, adata, 0); + + // Watch for events on the output pipe + // TODO: Can we do this just once ?! + driver_select(d->port, (ErlDrvEvent)d->pipe_fds[0], DO_READ, 1); // Outbuf is <> RETURN_INT(0, outbuf); @@ -317,16 +371,25 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, // Mark the port as busy and then schedule the appropriate async operation d->async_op = cmd; - unsigned int key = (unsigned int)d->port; + + TPoolJobFunc fn; if (cmd == CMD_PUT) { - driver_async(d->port, &key, &do_async_put, thread_data, &do_async_put_free); + fn = &do_async_put; } else if (cmd == CMD_GET) { - driver_async(d->port, &key, &do_async_get, thread_data, &do_async_get_free); + fn = &do_async_get; } + d->async_data = adata; + d->async_pool = G_TPOOL_GENERAL; + d->async_job = bdberl_tpool_run(G_TPOOL_GENERAL, fn, adata, 0); + + // Watch for events on the output pipe + // TODO: Can we do this just once ?! + driver_select(d->port, (ErlDrvEvent)d->pipe_fds[0], DO_READ, 1); + // Let caller know that the operation is in progress // Outbuf is: <<0:32>> RETURN_INT(0, outbuf); @@ -342,21 +405,34 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, return 0; } -static void bdberl_drv_ready_async(ErlDrvData handle, ErlDrvThreadData thread_data) +static void bdberl_drv_ready_input(ErlDrvData handle, ErlDrvEvent event) { - // TODO: If GET/PUT fails due to DEADLOCK, detect and cleanup txn pointer accordingly - printf("ready_async\n"); - PortData* d = (PortData*)handle; + printf("%p: ready_input\n", d->port); + + // Empty out the queue + int readbuf; + while (read((int)event, &readbuf, sizeof(readbuf)) > 0) { ; } + driver_select(d->port, event, DO_READ, 0); + + // The async op has completed running on the thread pool -- process the results switch (d->async_op) { case CMD_PUT: case CMD_TXN_COMMIT: case CMD_TXN_ABORT: { - // Extract return code == if it's zero, send back "ok" to driver process; otherwise + + // If this was a commit/abort, clear out the txn handle + if (d->async_op == CMD_TXN_COMMIT || d->async_op == CMD_TXN_ABORT) + { + d->txn = 0; + } + + // Extract return code == if it's zero, send back "ok" to driver process; otherwise // send a {error, Reason} tuple - AsyncData* adata = (AsyncData*)thread_data; + AsyncData* adata = (AsyncData*)d->async_data; + printf("%p: %d op; rc = %d\n", d->port, d->async_op, adata->rc); if (adata->rc == 0) { ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("ok") }; @@ -364,28 +440,33 @@ static void bdberl_drv_ready_async(ErlDrvData handle, ErlDrvThreadData thread_da } else { + // If we have a transactional PUT and a deadlock occurred, clear out txn ptr + if (d->async_op == CMD_GET && adata->port->txn && + (adata->rc == DB_LOCK_DEADLOCK || adata->rc == DB_LOCK_NOTGRANTED)) + { + d->txn = 0; + } + ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"), ERL_DRV_INT, adata->rc, ERL_DRV_TUPLE, 2}; driver_output_term(d->port, response, sizeof(response) / sizeof(response[0])); } - // If this was a commit/abort, clear out the txn handle - if (d->async_op == CMD_TXN_COMMIT || d->async_op == CMD_TXN_ABORT) - { - d->txn = 0; - } - // Cleanup async data and mark the port as not busy - driver_free(adata); + driver_free(d->async_data); + d->async_data = 0; d->async_op = CMD_NONE; + d->async_job = 0; + d->async_pool = 0; break; } case CMD_GET: { // Extract return code == if it's zero, send back {ok, Payload} or not_found to driver // process; otherwise send a {error, Reason} tuple - AsyncData* adata = (AsyncData*)thread_data; + AsyncData* adata = (AsyncData*)d->async_data; + printf("%p: %d op; rc = %d\n", d->port, d->async_op, adata->rc); if (adata->rc == 0) { ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("ok"), @@ -395,12 +476,18 @@ static void bdberl_drv_ready_async(ErlDrvData handle, ErlDrvThreadData thread_da } else if (adata->rc == DB_NOTFOUND) { - printf("not foudn\n"); ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("not_found") }; driver_output_term(d->port, response, sizeof(response) / sizeof(response[0])); } else { + printf("GET ERROR: %d\n", adata->rc); + if (adata->port->txn && (adata->rc == DB_LOCK_DEADLOCK || adata->rc == DB_LOCK_NOTGRANTED)) + { + // Transaction has been invalidated -- null it out to avoid multiple txn closure + adata->port->txn = 0; + } + ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"), ERL_DRV_INT, adata->rc, ERL_DRV_TUPLE, 2}; @@ -408,18 +495,16 @@ static void bdberl_drv_ready_async(ErlDrvData handle, ErlDrvThreadData thread_da } // Cleanup async data and mark the port as not busy - driver_free(adata->payload); - driver_free(adata); + driver_free(d->async_data); + d->async_data = 0; d->async_op = CMD_NONE; + d->async_job = 0; + d->async_pool = 0; break; } } } -static void bdberl_drv_process_exit(ErlDrvData handle, ErlDrvMonitor *monitor) -{ -} - static int open_database(const char* name, DBTYPE type, PortData* data, int* dbref_res) { *dbref_res = -1; @@ -576,8 +661,8 @@ static int close_database(int dbref, PortData* data) static void do_async_put(void* arg) { - printf("do_async_put\n"); AsyncData* adata = (AsyncData*)arg; + printf("%p: do_async_put\n", adata->port->port); // Setup DBTs DBT key; @@ -596,19 +681,16 @@ static void do_async_put(void* arg) // All databases are opened with AUTO_COMMIT, so if msg->port->txn is NULL, the put will still // be atomic adata->rc = adata->db->put(adata->db, adata->port->txn, &key, &value, 0); -} -static void do_async_put_free(void* arg) -{ - printf("do_async_put_free\n"); + // Enqueue a signal for the port to know that the operation is complete + signal_port(adata->port); } - static void do_async_get(void* arg) { - printf("do_async_get\n"); AsyncData* adata = (AsyncData*)arg; - + printf("%p: do_async_get\n", adata->port->port); + // Setup DBTs DBT key; DBT value; @@ -638,20 +720,16 @@ static void do_async_get(void* arg) adata->payload = value.data; adata->payload_sz = value.size; // Not ulen -- we want the actual data size adata->rc = rc; -} -static void do_async_get_free(void* arg) -{ - printf("do_async_get_free\n"); + signal_port(adata->port); } - static void do_async_txnop(void* arg) { - printf("do_async_txnop\n"); + AsyncData* adata = (AsyncData*)arg; + printf("%p: do_async_txnop\n", adata->port->port); // Execute the actual commit/abort - AsyncData* adata = (AsyncData*)arg; if (adata->port->async_op == CMD_TXN_COMMIT) { adata->rc = adata->port->txn->commit(adata->port->txn, 0); @@ -660,6 +738,8 @@ static void do_async_txnop(void* arg) { adata->rc = adata->port->txn->abort(adata->port->txn); } + + signal_port(adata->port); } @@ -842,3 +922,9 @@ static int alloc_dbref() return -1; } + +static void signal_port(PortData* d) +{ + int flag = 1; + write(d->pipe_fds[1], &flag, sizeof(flag)); +} diff --git a/c_src/bdberl_drv.h b/c_src/bdberl_drv.h index 123bb54..582b702 100644 --- a/c_src/bdberl_drv.h +++ b/c_src/bdberl_drv.h @@ -9,6 +9,7 @@ #include "erl_driver.h" #include "db.h" +#include "bdberl_tpool.h" /** * Driver functions @@ -23,9 +24,8 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, char* inbuf, int inbuf_sz, char** outbuf, int outbuf_sz); -static void bdberl_drv_ready_async(ErlDrvData handle, ErlDrvThreadData thread_data); +static void bdberl_drv_ready_input(ErlDrvData handle, ErlDrvEvent ev); -static void bdberl_drv_process_exit(ErlDrvData handle, ErlDrvMonitor *monitor); /** * Command codes @@ -71,7 +71,7 @@ ErlDrvEntry bdberl_drv_entry = bdberl_drv_start, /* L_PTR start, called when port is opened */ bdberl_drv_stop, /* F_PTR stop, called when port is closed */ NULL, /* F_PTR output, called when erlang has sent */ - NULL, /* F_PTR ready_input, called when input descriptor ready */ + bdberl_drv_ready_input, /* F_PTR ready_input, called when input descriptor ready */ NULL, /* F_PTR ready_output, called when output descriptor ready */ "bdberl_drv", /* driver_name */ bdberl_drv_finish, /* F_PTR finish, called when unloaded */ @@ -79,7 +79,7 @@ ErlDrvEntry bdberl_drv_entry = bdberl_drv_control, /* F_PTR control, port_command callback */ NULL, /* F_PTR timeout, reserved */ NULL, /* F_PTR outputv, reserved */ - bdberl_drv_ready_async, /* F_PTR ready_async */ + NULL, /* F_PTR ready_async */ NULL, /* F_PTR flush */ NULL, /* F_PTR call */ NULL, /* F_PTR event */ @@ -88,7 +88,7 @@ ErlDrvEntry bdberl_drv_entry = ERL_DRV_EXTENDED_MINOR_VERSION, ERL_DRV_FLAG_USE_PORT_LOCKING, NULL, /* Reserved */ - bdberl_drv_process_exit /* F_PTR process_exit */ + NULL /* F_PTR process_exit */ }; typedef struct _DbRefList @@ -120,19 +120,27 @@ typedef struct { ErlDrvPort port; - DB_TXN* txn; /* Transaction handle for this port; each port may only have 1 txn - * active */ + DbRefList* dbrefs; /* List of databases that this port has opened */ - int async_op; /* Value indicating what async op is pending */ + DB_TXN* txn; /* Transaction handle for this port; each port may only have 1 txn + * active */ - DbRefList* dbrefs; /* List of databases that this port has opened */ + int pipe_fds[2]; /* Array of pipe fds for signaling purposes */ + + int async_op; /* Value indicating what async op is pending */ + + void* async_data; /* Opaque point to data used during async op */ + + TPoolJob* async_job; /* Active job on the thread pool */ + + TPool* async_pool; /* Pool the async job is running on */ } PortData; typedef struct { - const PortData* port; /* Port that originated this request -- READ ONLY! */ + PortData* port; /* Port that originated this request -- READ ONLY! */ int rc; /* Return code from operation */ DB* db; /* Database to use for data storage/retrieval */ void* payload; /* Packed key/value data */ diff --git a/c_src/bdberl_tpool.c b/c_src/bdberl_tpool.c index b73a92a..9d12276 100644 --- a/c_src/bdberl_tpool.c +++ b/c_src/bdberl_tpool.c @@ -6,6 +6,7 @@ * ------------------------------------------------------------------- */ #include "bdberl_tpool.h" +#include #include #include @@ -45,8 +46,9 @@ void bdberl_tpool_stop(TPool* tpool) { LOCK(tpool); - // Set the shutdown flag + // Set the shutdown flag and broadcast a notification tpool->shutdown = 1; + erl_drv_cond_broadcast(tpool->work_cv); // Clean out the queue of pending jobs -- invoke their cleanup function @@ -72,7 +74,7 @@ void bdberl_tpool_stop(TPool* tpool) driver_free(tpool); } -TPoolJob* bdberl_tpool_run(TPool* tpool, TPoolJobFunc* main_fn, void* arg, TPoolJobFunc* cancel_fn) +TPoolJob* bdberl_tpool_run(TPool* tpool, TPoolJobFunc main_fn, void* arg, TPoolJobFunc cancel_fn) { // Allocate and fill a new job structure TPoolJob* job = driver_alloc(sizeof(TPoolJob)); @@ -116,7 +118,10 @@ void bdberl_tpool_cancel(TPool* tpool, TPoolJob* job) // Job was removed from pending -- unlock and notify the job that it got canceled UNLOCK(tpool); - (*(job->cancel_fn))(job->arg); + if (job->cancel_fn) + { + (*(job->cancel_fn))(job->arg); + } // Delete the job structure driver_free(job); @@ -171,9 +176,13 @@ static void* bdberl_tpool_main(void* arg) // Unlock to avoid blocking others UNLOCK(tpool); + printf("Invoking job\n"); + // Invoke the function (*(job->main_fn))(job->arg); + printf("Finished invoking job.\n"); + // Relock LOCK(tpool); diff --git a/c_src/bdberl_tpool.h b/c_src/bdberl_tpool.h index 0b01185..6f6e23a 100644 --- a/c_src/bdberl_tpool.h +++ b/c_src/bdberl_tpool.h @@ -13,11 +13,11 @@ typedef void (*TPoolJobFunc)(void* arg); typedef struct _TPoolJob { - TPoolJobFunc* main_fn; /* Function to invoke for this job */ + TPoolJobFunc main_fn; /* Function to invoke for this job */ - TPoolJobFunc* cancel_fn; /* Function that gets invoked if job is canceled before it can run */ + TPoolJobFunc cancel_fn; /* Function that gets invoked if job is canceled before it can run */ - TPoolJobFunc* arg; /* Input data for the function */ + void* arg; /* Input data for the function */ unsigned int running; /* Flag indicating if the job is currently running */ @@ -60,7 +60,7 @@ TPool* bdberl_tpool_start(unsigned int thread_count); void bdberl_tpool_stop(TPool* tpool); -TPoolJob* bdberl_tpool_run(TPool* tpool, TPoolJobFunc* main_fn, void* arg, TPoolJobFunc* cancel_fn); +TPoolJob* bdberl_tpool_run(TPool* tpool, TPoolJobFunc main_fn, void* arg, TPoolJobFunc cancel_fn); void bdberl_tpool_cancel(TPool* tpool, TPoolJob* job); diff --git a/src/bdberl_port.erl b/src/bdberl_port.erl index 5810059..4413f86 100644 --- a/src/bdberl_port.erl +++ b/src/bdberl_port.erl @@ -36,6 +36,8 @@ -define(ERROR_TXN_OPEN, -29003). % Transaction already active on this port -define(ERROR_NO_TXN, -29004). % No transaction open on this port +-define(ERROR_DB_LOCK_NOTGRANTED, -30993). % Lock was busy and not granted + new() -> case erl_ddll:load_driver(code:priv_dir(bdberl), bdberl_drv) of ok -> ok; @@ -70,34 +72,35 @@ close_database(Port, DbRef) -> txn_begin(Port) -> <> = erlang:port_control(Port, ?CMD_TXN_BEGIN, <<>>), - case Result of - ?ERROR_NONE -> ok; - ?ERROR_ASYNC_PENDING -> {error, async_pending}; - ?ERROR_TXN_OPEN -> {error, txn_open} + io:format("TXN BEGIN erl: ~p\n", [Result]), + case decode_rc(Result) of + ok -> ok; + Error -> {error, {txn_begin, Error}} end. + txn_commit(Port) -> <> = erlang:port_control(Port, ?CMD_TXN_COMMIT, <<>>), - case Result of - ?ERROR_NONE -> + case decode_rc(Result) of + ok -> receive ok -> ok; - {error, Reason} -> {error, Reason} + {error, Reason} -> {error, {txn_commit, decode_rc(Reason)}} end; - ?ERROR_ASYNC_PENDING -> {error, async_pending}; - ?ERROR_NO_TXN -> {error, no_txn} + Error -> + {error, {txn_commit, Error}} end. txn_abort(Port) -> <> = erlang:port_control(Port, ?CMD_TXN_ABORT, <<>>), - case Result of - ?ERROR_NONE -> + case decode_rc(Result) of + ok -> receive ok -> ok; - {error, Reason} -> {error, Reason} + {error, Reason} -> {error, {txn_abort, decode_rc(Reason)}} end; - ?ERROR_ASYNC_PENDING -> {error, async_pending}; - ?ERROR_NO_TXN -> {error, no_txn} + Error -> + {error, {txn_abort, Error}} end. @@ -106,14 +109,14 @@ put(Port, DbRef, Key, Value) -> {ValLen, ValBin} = to_binary(Value), Cmd = <>, <> = erlang:port_control(Port, ?CMD_PUT, Cmd), - case Result of - ?ERROR_NONE -> + case decode_rc(Result) of + ok -> receive ok -> ok; - {error, Reason} -> {error, Reason} + {error, Reason} -> {error, {put, decode_rc(Reason)}} end; - ?ERROR_ASYNC_PENDING -> {error, async_pending}; - ?ERROR_INVALID_DBREF -> {error, invalid_dbref} + Error -> + {error, {put, decode_rc(Error)}} end. @@ -121,22 +124,38 @@ get(Port, DbRef, Key) -> {KeyLen, KeyBin} = to_binary(Key), Cmd = <>, <> = erlang:port_control(Port, ?CMD_GET, Cmd), - case Result of - ?ERROR_NONE -> + case decode_rc(Result) of + ok -> receive {ok, Bin} -> {ok, binary_to_term(Bin)}; not_found -> not_found; - {error, Reason} -> {error, Reason} + {error, Reason} -> {error, {get, decode_rc(Reason)}} end; - ?ERROR_ASYNC_PENDING -> {error, async_pending}; - ?ERROR_INVALID_DBREF -> {error, invalid_dbref} + Error -> + {error, {get, decode_rc(Error)}} end. +%% ==================================================================== +%% Internal functions +%% ==================================================================== +%% +%% Decode a integer return value into an atom representation +%% +decode_rc(?ERROR_NONE) -> ok; +decode_rc(?ERROR_ASYNC_PENDING) -> async_pending; +decode_rc(?ERROR_INVALID_DBREF) -> invalid_dbref; +decode_rc(?ERROR_NO_TXN) -> no_txn; +decode_rc(?ERROR_DB_LOCK_NOTGRANTED) -> lock_not_granted; +decode_rc(Rc) -> {unknown, Rc}. + - +%% +%% Convert a term into a binary, returning a tuple with the binary and the length of the binary +%% to_binary(Term) -> Bin = term_to_binary(Term), {size(Bin), Bin}. + diff --git a/test/port_SUITE.erl b/test/port_SUITE.erl index f674d83..b468705 100644 --- a/test/port_SUITE.erl +++ b/test/port_SUITE.erl @@ -10,7 +10,7 @@ all() -> % [test_db]. - [test_put, test_txn]. + [test_put].% test_txn]. init_per_testcase(TestCase, Config) -> Config.