diff --git a/c_src/bdberl_drv.c b/c_src/bdberl_drv.c index 8f45482..d61c016 100644 --- a/c_src/bdberl_drv.c +++ b/c_src/bdberl_drv.c @@ -41,6 +41,8 @@ static void do_async_memp_stat(void* arg); static void do_async_mutex_stat(void* arg); static void do_async_txn_stat(void* arg); +static void send_rc(ErlDrvPort port, ErlDrvTermData pid, int rc); + static int add_dbref(PortData* data, int dbref); static int del_dbref(PortData* data, int dbref); static int has_dbref(PortData* data, int dbref); @@ -177,6 +179,26 @@ static TPool* G_TPOOL_TXNS = NULL; erl_drv_mutex_unlock(d->port_lock); \ }} +#define FAIL_IF_CURSOR_OPEN(d, outbuf) { \ + if (NULL != d->cursor) \ + { \ + send_rc(d->port, d->port_owner, ERROR_CURSOR_OPEN); \ + RETURN_INT(0, outbuf); \ + }} +#define FAIL_IF_NO_CURSOR(d, outbuf) { \ + if (NULL == d->cursor) \ + { \ + send_rc(d->port, d->port_owner, ERROR_NO_CURSOR); \ + RETURN_INT(0, outbuf); \ + }} + +#define FAIL_IF_TXN_OPEN(d, outbuf) { \ + if (d->txn) \ + { \ + send_rc(d->port, d->port_owner, ERROR_TXN_OPEN); \ + RETURN_INT(0, outbuf); \ + }} + #ifdef DEBUG # define DBG printf #else @@ -478,41 +500,29 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, DBTYPE type = (DBTYPE) UNPACK_BYTE(inbuf, 4); char* name = UNPACK_STRING(inbuf, 5); int dbref; - int status; int rc = open_database(name, type, flags, d, &dbref); - if (rc == 0) - { - status = STATUS_OK; - } - else - { - status = STATUS_ERROR; - dbref = rc; - } - // Pack the status and dbref (or errno) into a binary and return it - // Outbuf is: <> - BinHelper bh; - bin_helper_init(&bh); - bin_helper_push_byte(&bh, status); - bin_helper_push_int32(&bh, dbref); - RETURN_BH(bh, outbuf); + // Queue up a message for bdberl:open to process + if (rc == 0) // success: send {ok, DbRef} + { + ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("ok"), + ERL_DRV_INT, dbref, + ERL_DRV_TUPLE, 2}; + driver_send_term(d->port, d->port_owner, + response, sizeof(response) / sizeof(response[0])); + } + else // failure: send {error, atom() | {error, {unknown, Rc}} + { + send_rc(d->port, d->port_owner, rc); + } + // Outbuf is: <> - always send 0 and the driver will receive the real value + RETURN_INT(0, outbuf); } case CMD_CLOSE_DB: { FAIL_IF_ASYNC_PENDING(d, outbuf); - - // Fail if a cursor is open - if (d->cursor != 0) - { - RETURN_INT(ERROR_CURSOR_OPEN, outbuf); - } - - // Fail if a txn is open - if (d->txn != 0) - { - RETURN_INT(ERROR_TXN_OPEN, outbuf); - } + FAIL_IF_CURSOR_OPEN(d, outbuf); + FAIL_IF_TXN_OPEN(d, outbuf); // Take the provided dbref and attempt to close it // Inbuf is: <> @@ -521,25 +531,24 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, int rc = close_database(dbref, flags, d); - // Outbuf is: <> - RETURN_INT(rc, outbuf); + // Queue up a message for bdberl:close to process + send_rc(d->port, d->port_owner, rc); + // Outbuf is: <> - always send 0 and the driver will receive the real value + RETURN_INT(0, outbuf); } case CMD_TXN_BEGIN: { FAIL_IF_ASYNC_PENDING(d, outbuf); - - // If we already have a txn open, fail - if (d->txn != 0) - { - RETURN_INT(ERROR_TXN_OPEN, outbuf); - } + FAIL_IF_TXN_OPEN(d, outbuf); // Inbuf is <> unsigned flags = UNPACK_INT(inbuf, 0); // Outbuf is <> int rc = G_DB_ENV->txn_begin(G_DB_ENV, 0, &(d->txn), flags); - RETURN_INT(rc, outbuf); + + send_rc(d->port, d->port_owner, rc); + RETURN_INT(0, outbuf); } case CMD_TXN_COMMIT: case CMD_TXN_ABORT: @@ -549,7 +558,8 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, // If we don't already have a txn open, fail if (d->txn == 0) { - RETURN_INT(ERROR_NO_TXN, outbuf); + send_rc(d->port, d->port_owner, ERROR_NO_TXN); + RETURN_INT(0, outbuf); } // Setup async command and schedule it on the txns threadpool @@ -573,7 +583,8 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, // Put/commit requires a transaction to be active if (cmd == CMD_PUT_COMMIT && (!d->txn)) { - RETURN_INT(ERROR_NO_TXN, outbuf); + send_rc(d->port, d->port_owner, ERROR_NO_TXN); + RETURN_INT(0, outbuf); } // Inbuf is: << DbRef:32, Rest/binary>> @@ -618,7 +629,8 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, else { // Invalid dbref - RETURN_INT(ERROR_INVALID_DBREF, outbuf); + send_rc(d->port, d->port_owner, ERROR_INVALID_DBREF); + RETURN_INT(0, outbuf); } } case CMD_GETINFO: @@ -636,11 +648,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, case CMD_CURSOR_OPEN: { FAIL_IF_ASYNC_PENDING(d, outbuf); - - if (d->cursor) - { - RETURN_INT(ERROR_CURSOR_OPEN, outbuf); - } + FAIL_IF_CURSOR_OPEN(d, outbuf); // Inbuf is << DbRef:32, Flags:32 >> int dbref = UNPACK_INT(inbuf, 0); @@ -652,11 +660,13 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, // Grab the database handle and open the cursor DB* db = G_DATABASES[dbref].db; int rc = db->cursor(db, d->txn, &(d->cursor), flags); - RETURN_INT(rc, outbuf); + send_rc(d->port, d->port_owner, rc); + RETURN_INT(0, outbuf); } else { - RETURN_INT(ERROR_INVALID_DBREF, outbuf); + send_rc(d->port, d->port_owner, ERROR_INVALID_DBREF); + RETURN_INT(0, outbuf); } } case CMD_CURSOR_CURR: @@ -664,12 +674,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, case CMD_CURSOR_PREV: { FAIL_IF_ASYNC_PENDING(d, outbuf); - - // Fail if no cursor currently open - if (!d->cursor) - { - RETURN_INT(ERROR_NO_CURSOR, outbuf); - } + FAIL_IF_NO_CURSOR(d, outbuf); // Schedule the operation d->async_op = cmd; @@ -682,12 +687,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, case CMD_CURSOR_CLOSE: { FAIL_IF_ASYNC_PENDING(d, outbuf); - - // Fail if no cursor open - if (!d->cursor) - { - RETURN_INT(ERROR_NO_CURSOR, outbuf); - } + FAIL_IF_NO_CURSOR(d, outbuf); // It's possible to get a deadlock when closing a cursor -- in that situation we also // need to go ahead and abort the txn @@ -700,33 +700,26 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, // Regardless of what happens, clear out the cursor pointer d->cursor = 0; - + + // Send result code + send_rc(d->port, d->port_owner, rc); RETURN_INT(0, outbuf); } case CMD_REMOVE_DB: { FAIL_IF_ASYNC_PENDING(d, outbuf); - - // Fail if a txn is open - if (d->txn != 0) - { - RETURN_INT(ERROR_TXN_OPEN, outbuf); - } + FAIL_IF_TXN_OPEN(d, outbuf); // Inbuf is: << dbname/bytes, 0:8 >> const char* dbname = UNPACK_STRING(inbuf, 0); int rc = delete_database(dbname); - RETURN_INT(rc, outbuf); + send_rc(d->port, d->port_owner, rc); + RETURN_INT(0, outbuf); } case CMD_TRUNCATE: { FAIL_IF_ASYNC_PENDING(d, outbuf); - - // Fail if a cursor is open - if (d->cursor) - { - RETURN_INT(ERROR_CURSOR_OPEN, outbuf); - } + FAIL_IF_CURSOR_OPEN(d, outbuf); // Inbuf is: <> int dbref = UNPACK_INT(inbuf, 0); @@ -1232,6 +1225,7 @@ static int delete_database(const char* name) DBG("Attempting to delete database: %s\r\n", name); int rc = G_DB_ENV->dbremove(G_DB_ENV, 0, name, 0, DB_AUTO_COMMIT); WRITE_UNLOCK(G_DATABASES_RWLOCK); + return rc; } @@ -1291,6 +1285,82 @@ static void async_cleanup(PortData* d) erl_drv_mutex_unlock(d->port_lock); } + +// Convert an rc from BDB into a string suitable for driver_mk_atom +// returns NULL on no match +static char *rc_to_atom_str(int rc) +{ + char *error = erl_errno_id(rc); + //fprintf(stderr, "erl_errno_id(%d) = %s db_strerror = %s\n", rc, error, db_strerror(rc)); + if (NULL != error && strcmp("unknown", error) != 0) + { + return error; + } + else + { + switch (rc) + { + // bdberl driver errors + case ERROR_ASYNC_PENDING: return "async_pending"; + case ERROR_INVALID_DBREF: return "invalid_db"; + case ERROR_TXN_OPEN: return "transaction_open"; + case ERROR_NO_TXN: return "no_txn"; + case ERROR_CURSOR_OPEN: return "cursor_open"; + case ERROR_NO_CURSOR: return "no_cursor"; + // bonafide BDB errors + case DB_BUFFER_SMALL: return "buffer_small"; + case DB_DONOTINDEX: return "do_not_index"; + case DB_FOREIGN_CONFLICT: return "foreign_conflict"; + case DB_KEYEMPTY: return "key_empty"; + case DB_KEYEXIST: return "key_exist"; + case DB_LOCK_DEADLOCK: return "deadlock"; + case DB_LOCK_NOTGRANTED: return "lock_not_granted"; + case DB_LOG_BUFFER_FULL: return "log_buffer_full"; + case DB_NOTFOUND: return "not_found"; + case DB_OLD_VERSION: return "old_version"; + case DB_PAGE_NOTFOUND: return "page_not_found"; + case DB_RUNRECOVERY: return "run_recovery"; + case DB_VERIFY_BAD: return "verify_bad"; + case DB_VERSION_MISMATCH: return "version_mismatch"; + default: return NULL; + } + } +} + + +static void send_rc(ErlDrvPort port, ErlDrvTermData pid, int rc) +{ + // TODO: May need to tag the messages a bit more explicitly so that if another async + // job runs to completion before the message gets delivered we don't mis-interpret this + // response code. + if (rc == 0) + { + ErlDrvTermData response[] = {ERL_DRV_ATOM, driver_mk_atom("ok")}; + driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); + } + else + { + // See if this is a standard errno that we have an erlang code for + char *error = rc_to_atom_str(rc); + if (NULL != error) + { + ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"), + ERL_DRV_ATOM, driver_mk_atom(error), + ERL_DRV_TUPLE, 2}; + driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); + } + else + { + ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"), + ERL_DRV_ATOM, driver_mk_atom("unknown"), + ERL_DRV_INT, rc, + ERL_DRV_TUPLE, 2, + ERL_DRV_TUPLE, 2}; + driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); + } + } +} + static void async_cleanup_and_send_rc(PortData* d, int rc) { // Save the port and pid references -- we need copies independent from the PortData @@ -1302,22 +1372,7 @@ static void async_cleanup_and_send_rc(PortData* d, int rc) ErlDrvTermData pid = d->port_owner; async_cleanup(d); - - // TODO: May need to tag the messages a bit more explicitly so that if another async - // job runs to completion before the message gets delivered we don't mis-interpret this - // response code. - if (rc == 0) - { - ErlDrvTermData response[] = {ERL_DRV_ATOM, driver_mk_atom("ok")}; - driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); - } - else - { - ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"), - ERL_DRV_INT, rc, - ERL_DRV_TUPLE, 2}; - driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); - } + send_rc(port, pid, rc); } static void async_cleanup_and_send_kv(PortData* d, int rc, DBT* key, DBT* value) diff --git a/include/bdberl.hrl b/include/bdberl.hrl index 21f2cc1..e3f9ff8 100644 --- a/include/bdberl.hrl +++ b/include/bdberl.hrl @@ -37,6 +37,7 @@ -define(CMD_TXN_STAT, 29). -define(CMD_TXN_STAT_PRINT, 30). +-define(DB_TYPE_UNKNOWN, 0). -define(DB_TYPE_BTREE, 1). -define(DB_TYPE_HASH, 2). diff --git a/src/bdberl.erl b/src/bdberl.erl index 3a0a344..0950019 100644 --- a/src/bdberl.erl +++ b/src/bdberl.erl @@ -140,28 +140,35 @@ open(Name, Type) -> %% @spec open(Name, Type, Opts) -> {ok, Db} | {error, Error} %% where %% Name = string() -%% Type = btree | hash +%% Type = btree | hash | unknown %% Opts = [atom()] %% Db = integer() %% %% @end %%-------------------------------------------------------------------- --spec open(Name :: db_name(), Type :: db_type(), Opts :: db_flags()) -> +-spec open(Name :: db_name(), Type :: db_type() | unknown, Opts :: db_flags()) -> {ok, db()} | {error, integer()}. open(Name, Type, Opts) -> % Map database type into an integer code case Type of btree -> TypeCode = ?DB_TYPE_BTREE; - hash -> TypeCode = ?DB_TYPE_HASH + hash -> TypeCode = ?DB_TYPE_HASH; + unknown -> TypeCode = ?DB_TYPE_UNKNOWN %% BDB automatically determines if file exists end, Flags = process_flags(lists:umerge(Opts, [auto_commit, threaded])), Cmd = <>, - case erlang:port_control(get_port(), ?CMD_OPEN_DB, Cmd) of - <> -> - {ok, Db}; - <> -> - {error, Errno} + <> = erlang:port_control(get_port(), ?CMD_OPEN_DB, Cmd), + case decode_rc(Rc) of + ok -> + receive + {ok, DbRef} -> + {ok, DbRef}; + {error, Reason} -> + {error, Reason} + end; + Error -> + {error, Error} end. @@ -226,9 +233,14 @@ close(Db, Opts) -> <> = erlang:port_control(get_port(), ?CMD_CLOSE_DB, Cmd), case decode_rc(Rc) of ok -> - ok; - Reason -> - {error, Reason} + receive + ok -> + ok; + {error, Reason} -> + {error, Reason} + end; + Error -> + {error, Error} end. @@ -339,7 +351,11 @@ txn_begin(Opts) -> Cmd = <>, <> = erlang:port_control(get_port(), ?CMD_TXN_BEGIN, Cmd), case decode_rc(Result) of - ok -> ok; + ok -> + receive + ok -> ok; + {error, Reason} -> {error, decode_rc(Reason)} + end; Error -> {error, Error} end. @@ -1060,7 +1076,11 @@ cursor_open(Db) -> <> = erlang:port_control(get_port(), ?CMD_CURSOR_OPEN, Cmd), case decode_rc(Rc) of ok -> - ok; + receive + ok -> ok; + {error, Reason} -> {error, Reason} + end; + Reason -> {error, Reason} end. @@ -1166,7 +1186,10 @@ cursor_close() -> <> = erlang:port_control(get_port(), ?CMD_CURSOR_CLOSE, <<>>), case decode_rc(Rc) of ok -> - ok; + receive + ok -> ok; + {error, Reason} -> {error, Reason} + end; Reason -> {error, Reason} end. @@ -1199,7 +1222,10 @@ delete_database(Filename) -> <> = erlang:port_control(get_port(), ?CMD_REMOVE_DB, Cmd), case decode_rc(Rc) of ok -> - ok; + receive + ok -> ok; + {error, Reason} -> {error, decode_rc(Reason)} + end; Reason -> {error, Reason} end. @@ -2008,6 +2034,7 @@ get_port() -> %% %% Decode a integer return value into an atom representation %% +decode_rc(Rc) when is_atom(Rc) -> Rc; decode_rc(?ERROR_NONE) -> ok; decode_rc(?ERROR_ASYNC_PENDING) -> async_pending; decode_rc(?ERROR_INVALID_DBREF) -> invalid_db;