Enhanced error reporting to return atoms for libc errno, common bdb errors and bdberl errors.

This commit is contained in:
Jon Meredith 2009-05-29 07:26:59 -06:00
parent 9d081c0572
commit 553ed4eda1
3 changed files with 188 additions and 105 deletions

View file

@ -41,6 +41,8 @@ static void do_async_memp_stat(void* arg);
static void do_async_mutex_stat(void* arg);
static void do_async_txn_stat(void* arg);
static void send_rc(ErlDrvPort port, ErlDrvTermData pid, int rc);
static int add_dbref(PortData* data, int dbref);
static int del_dbref(PortData* data, int dbref);
static int has_dbref(PortData* data, int dbref);
@ -177,6 +179,26 @@ static TPool* G_TPOOL_TXNS = NULL;
erl_drv_mutex_unlock(d->port_lock); \
}}
#define FAIL_IF_CURSOR_OPEN(d, outbuf) { \
if (NULL != d->cursor) \
{ \
send_rc(d->port, d->port_owner, ERROR_CURSOR_OPEN); \
RETURN_INT(0, outbuf); \
}}
#define FAIL_IF_NO_CURSOR(d, outbuf) { \
if (NULL == d->cursor) \
{ \
send_rc(d->port, d->port_owner, ERROR_NO_CURSOR); \
RETURN_INT(0, outbuf); \
}}
#define FAIL_IF_TXN_OPEN(d, outbuf) { \
if (d->txn) \
{ \
send_rc(d->port, d->port_owner, ERROR_TXN_OPEN); \
RETURN_INT(0, outbuf); \
}}
#ifdef DEBUG
# define DBG printf
#else
@ -478,41 +500,29 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
DBTYPE type = (DBTYPE) UNPACK_BYTE(inbuf, 4);
char* name = UNPACK_STRING(inbuf, 5);
int dbref;
int status;
int rc = open_database(name, type, flags, d, &dbref);
if (rc == 0)
{
status = STATUS_OK;
}
else
{
status = STATUS_ERROR;
dbref = rc;
}
// Pack the status and dbref (or errno) into a binary and return it
// Outbuf is: <<Status:8, DbRef:32>>
BinHelper bh;
bin_helper_init(&bh);
bin_helper_push_byte(&bh, status);
bin_helper_push_int32(&bh, dbref);
RETURN_BH(bh, outbuf);
// Queue up a message for bdberl:open to process
if (rc == 0) // success: send {ok, DbRef}
{
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("ok"),
ERL_DRV_INT, dbref,
ERL_DRV_TUPLE, 2};
driver_send_term(d->port, d->port_owner,
response, sizeof(response) / sizeof(response[0]));
}
else // failure: send {error, atom() | {error, {unknown, Rc}}
{
send_rc(d->port, d->port_owner, rc);
}
// Outbuf is: <<Rc:32>> - always send 0 and the driver will receive the real value
RETURN_INT(0, outbuf);
}
case CMD_CLOSE_DB:
{
FAIL_IF_ASYNC_PENDING(d, outbuf);
// Fail if a cursor is open
if (d->cursor != 0)
{
RETURN_INT(ERROR_CURSOR_OPEN, outbuf);
}
// Fail if a txn is open
if (d->txn != 0)
{
RETURN_INT(ERROR_TXN_OPEN, outbuf);
}
FAIL_IF_CURSOR_OPEN(d, outbuf);
FAIL_IF_TXN_OPEN(d, outbuf);
// Take the provided dbref and attempt to close it
// Inbuf is: <<DbRef:32, Flags:32/unsigned>>
@ -521,25 +531,24 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
int rc = close_database(dbref, flags, d);
// Outbuf is: <<Rc:32>>
RETURN_INT(rc, outbuf);
// Queue up a message for bdberl:close to process
send_rc(d->port, d->port_owner, rc);
// Outbuf is: <<Rc:32>> - always send 0 and the driver will receive the real value
RETURN_INT(0, outbuf);
}
case CMD_TXN_BEGIN:
{
FAIL_IF_ASYNC_PENDING(d, outbuf);
// If we already have a txn open, fail
if (d->txn != 0)
{
RETURN_INT(ERROR_TXN_OPEN, outbuf);
}
FAIL_IF_TXN_OPEN(d, outbuf);
// Inbuf is <<Flags:32/unsigned>>
unsigned flags = UNPACK_INT(inbuf, 0);
// Outbuf is <<Rc:32>>
int rc = G_DB_ENV->txn_begin(G_DB_ENV, 0, &(d->txn), flags);
RETURN_INT(rc, outbuf);
send_rc(d->port, d->port_owner, rc);
RETURN_INT(0, outbuf);
}
case CMD_TXN_COMMIT:
case CMD_TXN_ABORT:
@ -549,7 +558,8 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
// If we don't already have a txn open, fail
if (d->txn == 0)
{
RETURN_INT(ERROR_NO_TXN, outbuf);
send_rc(d->port, d->port_owner, ERROR_NO_TXN);
RETURN_INT(0, outbuf);
}
// Setup async command and schedule it on the txns threadpool
@ -573,7 +583,8 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
// Put/commit requires a transaction to be active
if (cmd == CMD_PUT_COMMIT && (!d->txn))
{
RETURN_INT(ERROR_NO_TXN, outbuf);
send_rc(d->port, d->port_owner, ERROR_NO_TXN);
RETURN_INT(0, outbuf);
}
// Inbuf is: << DbRef:32, Rest/binary>>
@ -618,7 +629,8 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
else
{
// Invalid dbref
RETURN_INT(ERROR_INVALID_DBREF, outbuf);
send_rc(d->port, d->port_owner, ERROR_INVALID_DBREF);
RETURN_INT(0, outbuf);
}
}
case CMD_GETINFO:
@ -636,11 +648,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
case CMD_CURSOR_OPEN:
{
FAIL_IF_ASYNC_PENDING(d, outbuf);
if (d->cursor)
{
RETURN_INT(ERROR_CURSOR_OPEN, outbuf);
}
FAIL_IF_CURSOR_OPEN(d, outbuf);
// Inbuf is << DbRef:32, Flags:32 >>
int dbref = UNPACK_INT(inbuf, 0);
@ -652,11 +660,13 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
// Grab the database handle and open the cursor
DB* db = G_DATABASES[dbref].db;
int rc = db->cursor(db, d->txn, &(d->cursor), flags);
RETURN_INT(rc, outbuf);
send_rc(d->port, d->port_owner, rc);
RETURN_INT(0, outbuf);
}
else
{
RETURN_INT(ERROR_INVALID_DBREF, outbuf);
send_rc(d->port, d->port_owner, ERROR_INVALID_DBREF);
RETURN_INT(0, outbuf);
}
}
case CMD_CURSOR_CURR:
@ -664,12 +674,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
case CMD_CURSOR_PREV:
{
FAIL_IF_ASYNC_PENDING(d, outbuf);
// Fail if no cursor currently open
if (!d->cursor)
{
RETURN_INT(ERROR_NO_CURSOR, outbuf);
}
FAIL_IF_NO_CURSOR(d, outbuf);
// Schedule the operation
d->async_op = cmd;
@ -682,12 +687,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
case CMD_CURSOR_CLOSE:
{
FAIL_IF_ASYNC_PENDING(d, outbuf);
// Fail if no cursor open
if (!d->cursor)
{
RETURN_INT(ERROR_NO_CURSOR, outbuf);
}
FAIL_IF_NO_CURSOR(d, outbuf);
// It's possible to get a deadlock when closing a cursor -- in that situation we also
// need to go ahead and abort the txn
@ -700,33 +700,26 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
// Regardless of what happens, clear out the cursor pointer
d->cursor = 0;
// Send result code
send_rc(d->port, d->port_owner, rc);
RETURN_INT(0, outbuf);
}
case CMD_REMOVE_DB:
{
FAIL_IF_ASYNC_PENDING(d, outbuf);
// Fail if a txn is open
if (d->txn != 0)
{
RETURN_INT(ERROR_TXN_OPEN, outbuf);
}
FAIL_IF_TXN_OPEN(d, outbuf);
// Inbuf is: << dbname/bytes, 0:8 >>
const char* dbname = UNPACK_STRING(inbuf, 0);
int rc = delete_database(dbname);
RETURN_INT(rc, outbuf);
send_rc(d->port, d->port_owner, rc);
RETURN_INT(0, outbuf);
}
case CMD_TRUNCATE:
{
FAIL_IF_ASYNC_PENDING(d, outbuf);
// Fail if a cursor is open
if (d->cursor)
{
RETURN_INT(ERROR_CURSOR_OPEN, outbuf);
}
FAIL_IF_CURSOR_OPEN(d, outbuf);
// Inbuf is: <<DbRef:32>>
int dbref = UNPACK_INT(inbuf, 0);
@ -1232,6 +1225,7 @@ static int delete_database(const char* name)
DBG("Attempting to delete database: %s\r\n", name);
int rc = G_DB_ENV->dbremove(G_DB_ENV, 0, name, 0, DB_AUTO_COMMIT);
WRITE_UNLOCK(G_DATABASES_RWLOCK);
return rc;
}
@ -1291,6 +1285,82 @@ static void async_cleanup(PortData* d)
erl_drv_mutex_unlock(d->port_lock);
}
// Convert an rc from BDB into a string suitable for driver_mk_atom
// returns NULL on no match
static char *rc_to_atom_str(int rc)
{
char *error = erl_errno_id(rc);
//fprintf(stderr, "erl_errno_id(%d) = %s db_strerror = %s\n", rc, error, db_strerror(rc));
if (NULL != error && strcmp("unknown", error) != 0)
{
return error;
}
else
{
switch (rc)
{
// bdberl driver errors
case ERROR_ASYNC_PENDING: return "async_pending";
case ERROR_INVALID_DBREF: return "invalid_db";
case ERROR_TXN_OPEN: return "transaction_open";
case ERROR_NO_TXN: return "no_txn";
case ERROR_CURSOR_OPEN: return "cursor_open";
case ERROR_NO_CURSOR: return "no_cursor";
// bonafide BDB errors
case DB_BUFFER_SMALL: return "buffer_small";
case DB_DONOTINDEX: return "do_not_index";
case DB_FOREIGN_CONFLICT: return "foreign_conflict";
case DB_KEYEMPTY: return "key_empty";
case DB_KEYEXIST: return "key_exist";
case DB_LOCK_DEADLOCK: return "deadlock";
case DB_LOCK_NOTGRANTED: return "lock_not_granted";
case DB_LOG_BUFFER_FULL: return "log_buffer_full";
case DB_NOTFOUND: return "not_found";
case DB_OLD_VERSION: return "old_version";
case DB_PAGE_NOTFOUND: return "page_not_found";
case DB_RUNRECOVERY: return "run_recovery";
case DB_VERIFY_BAD: return "verify_bad";
case DB_VERSION_MISMATCH: return "version_mismatch";
default: return NULL;
}
}
}
static void send_rc(ErlDrvPort port, ErlDrvTermData pid, int rc)
{
// TODO: May need to tag the messages a bit more explicitly so that if another async
// job runs to completion before the message gets delivered we don't mis-interpret this
// response code.
if (rc == 0)
{
ErlDrvTermData response[] = {ERL_DRV_ATOM, driver_mk_atom("ok")};
driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0]));
}
else
{
// See if this is a standard errno that we have an erlang code for
char *error = rc_to_atom_str(rc);
if (NULL != error)
{
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"),
ERL_DRV_ATOM, driver_mk_atom(error),
ERL_DRV_TUPLE, 2};
driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0]));
}
else
{
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"),
ERL_DRV_ATOM, driver_mk_atom("unknown"),
ERL_DRV_INT, rc,
ERL_DRV_TUPLE, 2,
ERL_DRV_TUPLE, 2};
driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0]));
}
}
}
static void async_cleanup_and_send_rc(PortData* d, int rc)
{
// Save the port and pid references -- we need copies independent from the PortData
@ -1302,22 +1372,7 @@ static void async_cleanup_and_send_rc(PortData* d, int rc)
ErlDrvTermData pid = d->port_owner;
async_cleanup(d);
// TODO: May need to tag the messages a bit more explicitly so that if another async
// job runs to completion before the message gets delivered we don't mis-interpret this
// response code.
if (rc == 0)
{
ErlDrvTermData response[] = {ERL_DRV_ATOM, driver_mk_atom("ok")};
driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0]));
}
else
{
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"),
ERL_DRV_INT, rc,
ERL_DRV_TUPLE, 2};
driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0]));
}
send_rc(port, pid, rc);
}
static void async_cleanup_and_send_kv(PortData* d, int rc, DBT* key, DBT* value)

View file

@ -37,6 +37,7 @@
-define(CMD_TXN_STAT, 29).
-define(CMD_TXN_STAT_PRINT, 30).
-define(DB_TYPE_UNKNOWN, 0).
-define(DB_TYPE_BTREE, 1).
-define(DB_TYPE_HASH, 2).

View file

@ -140,28 +140,35 @@ open(Name, Type) ->
%% @spec open(Name, Type, Opts) -> {ok, Db} | {error, Error}
%% where
%% Name = string()
%% Type = btree | hash
%% Type = btree | hash | unknown
%% Opts = [atom()]
%% Db = integer()
%%
%% @end
%%--------------------------------------------------------------------
-spec open(Name :: db_name(), Type :: db_type(), Opts :: db_flags()) ->
-spec open(Name :: db_name(), Type :: db_type() | unknown, Opts :: db_flags()) ->
{ok, db()} | {error, integer()}.
open(Name, Type, Opts) ->
% Map database type into an integer code
case Type of
btree -> TypeCode = ?DB_TYPE_BTREE;
hash -> TypeCode = ?DB_TYPE_HASH
hash -> TypeCode = ?DB_TYPE_HASH;
unknown -> TypeCode = ?DB_TYPE_UNKNOWN %% BDB automatically determines if file exists
end,
Flags = process_flags(lists:umerge(Opts, [auto_commit, threaded])),
Cmd = <<Flags:32/native, TypeCode:8/signed-native, (list_to_binary(Name))/bytes, 0:8/native>>,
case erlang:port_control(get_port(), ?CMD_OPEN_DB, Cmd) of
<<?STATUS_OK:8, Db:32/signed-native>> ->
{ok, Db};
<<?STATUS_ERROR:8, Errno:32/signed-native>> ->
{error, Errno}
<<Rc:32/signed-native>> = erlang:port_control(get_port(), ?CMD_OPEN_DB, Cmd),
case decode_rc(Rc) of
ok ->
receive
{ok, DbRef} ->
{ok, DbRef};
{error, Reason} ->
{error, Reason}
end;
Error ->
{error, Error}
end.
@ -226,9 +233,14 @@ close(Db, Opts) ->
<<Rc:32/signed-native>> = erlang:port_control(get_port(), ?CMD_CLOSE_DB, Cmd),
case decode_rc(Rc) of
ok ->
ok;
Reason ->
{error, Reason}
receive
ok ->
ok;
{error, Reason} ->
{error, Reason}
end;
Error ->
{error, Error}
end.
@ -339,7 +351,11 @@ txn_begin(Opts) ->
Cmd = <<Flags:32/native>>,
<<Result:32/signed-native>> = erlang:port_control(get_port(), ?CMD_TXN_BEGIN, Cmd),
case decode_rc(Result) of
ok -> ok;
ok ->
receive
ok -> ok;
{error, Reason} -> {error, decode_rc(Reason)}
end;
Error -> {error, Error}
end.
@ -1060,7 +1076,11 @@ cursor_open(Db) ->
<<Rc:32/signed-native>> = erlang:port_control(get_port(), ?CMD_CURSOR_OPEN, Cmd),
case decode_rc(Rc) of
ok ->
ok;
receive
ok -> ok;
{error, Reason} -> {error, Reason}
end;
Reason ->
{error, Reason}
end.
@ -1166,7 +1186,10 @@ cursor_close() ->
<<Rc:32/signed-native>> = erlang:port_control(get_port(), ?CMD_CURSOR_CLOSE, <<>>),
case decode_rc(Rc) of
ok ->
ok;
receive
ok -> ok;
{error, Reason} -> {error, Reason}
end;
Reason ->
{error, Reason}
end.
@ -1199,7 +1222,10 @@ delete_database(Filename) ->
<<Rc:32/signed-native>> = erlang:port_control(get_port(), ?CMD_REMOVE_DB, Cmd),
case decode_rc(Rc) of
ok ->
ok;
receive
ok -> ok;
{error, Reason} -> {error, decode_rc(Reason)}
end;
Reason ->
{error, Reason}
end.
@ -2008,6 +2034,7 @@ get_port() ->
%%
%% Decode a integer return value into an atom representation
%%
decode_rc(Rc) when is_atom(Rc) -> Rc;
decode_rc(?ERROR_NONE) -> ok;
decode_rc(?ERROR_ASYNC_PENDING) -> async_pending;
decode_rc(?ERROR_INVALID_DBREF) -> invalid_db;