Adding support for cursor operations

This commit is contained in:
Dave Smith 2008-12-15 11:25:03 -07:00
parent f56351e1dc
commit 2d323cb304
5 changed files with 297 additions and 30 deletions

View file

@ -27,6 +27,7 @@ static void tune_system(int target, void* values, BinHelper* bh);
static void do_async_put(void* arg);
static void do_async_get(void* arg);
static void do_async_txnop(void* arg);
static void do_async_cursor_get(void* arg);
static int add_dbref(PortData* data, int dbref);
static int del_dbref(PortData* data, int dbref);
@ -170,12 +171,11 @@ DRIVER_INIT(bdberl_drv)
}
// Make sure we can distiguish between lock timeouts and deadlocks
int rc = G_DB_ENV->set_flags(G_DB_ENV, DB_TIME_NOTGRANTED, 1);
printf("TIME_NOT_GRANTED rc: %d\n", rc);
G_DB_ENV->set_flags(G_DB_ENV, DB_TIME_NOTGRANTED, 1);
// Initialization transaction timeout so that deadlock checking works properly
db_timeout_t to = 500 * 1000; // 500 ms
rc = G_DB_ENV->set_timeout(G_DB_ENV, to, DB_SET_TXN_TIMEOUT);
printf("DB_SET_TXN_TMEOUT rc: %d value %d\n", rc, to);
G_DB_ENV->set_timeout(G_DB_ENV, to, DB_SET_TXN_TIMEOUT);
// BDB is setup -- allocate structures for tracking databases
G_DATABASES = (Database*) driver_alloc(sizeof(Database) * G_DATABASES_SIZE);
@ -266,6 +266,12 @@ static void bdberl_drv_stop(ErlDrvData handle)
// Cleanup the port lock
erl_drv_mutex_destroy(d->port_lock);
// If a cursor is open, close it
if (d->cursor)
{
d->cursor->close(d->cursor);
}
// If a txn is currently active, terminate it.
if (d->txn)
{
@ -349,6 +355,12 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
{
FAIL_IF_ASYNC_PENDING(d, outbuf);
// Fail if a cursor is open
if (d->cursor != 0)
{
RETURN_INT(ERROR_CURSOR_OPEN, outbuf);
}
// Fail if a txn is open
if (d->txn != 0)
{
@ -468,6 +480,76 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
tune_system(target, values, &bh);
RETURN_BH(bh, outbuf);
}
case CMD_CURSOR_OPEN:
{
FAIL_IF_ASYNC_PENDING(d, outbuf);
if (d->cursor)
{
RETURN_INT(ERROR_CURSOR_OPEN, outbuf);
}
// Inbuf is << DbRef:32, Flags:32 >>
int dbref = UNPACK_INT(inbuf, 0);
int flags = UNPACK_INT(inbuf, 4);
// Make sure we have a reference to the requested database
if (has_dbref(d, dbref))
{
// Grab the database handle and open the cursor
DB* db = G_DATABASES[dbref].db;
int rc = db->cursor(db, d->txn, &(d->cursor), flags);
RETURN_INT(rc, outbuf);
}
else
{
RETURN_INT(ERROR_INVALID_DBREF, outbuf);
}
}
case CMD_CURSOR_CURR:
case CMD_CURSOR_NEXT:
case CMD_CURSOR_PREV:
{
FAIL_IF_ASYNC_PENDING(d, outbuf);
// Fail if no cursor currently open
if (!d->cursor)
{
RETURN_INT(ERROR_NO_CURSOR, outbuf);
}
// Schedule the operation
d->async_op = cmd;
d->async_pool = G_TPOOL_GENERAL;
d->async_job = bdberl_tpool_run(G_TPOOL_GENERAL, &do_async_cursor_get, d, 0);
// Let caller know operation is in progress
RETURN_INT(0, outbuf);
}
case CMD_CURSOR_CLOSE:
{
FAIL_IF_ASYNC_PENDING(d, outbuf);
// Fail if no cursor open
if (!d->cursor)
{
RETURN_INT(ERROR_NO_CURSOR, outbuf);
}
// It's possible to get a deadlock when closing a cursor -- in that situation we also
// need to go ahead and abort the txn
int rc = d->cursor->close(d->cursor);
if (d->txn && (rc == DB_LOCK_NOTGRANTED || rc == DB_LOCK_DEADLOCK))
{
d->txn->abort(d->txn);
d->txn = 0;
}
// Regardless of what happens, clear out the cursor pointer
d->cursor = 0;
RETURN_INT(0, outbuf);
}
}
*outbuf = 0;
return 0;
@ -589,8 +671,6 @@ static int open_database(const char* name, DBTYPE type, unsigned int flags, Port
static int close_database(int dbref, unsigned flags, PortData* data)
{
// printf("Closing %d for port %p\n", dbref, data->port);
// Remove this database from our list
if (del_dbref(data, dbref))
{
@ -622,10 +702,12 @@ static int close_database(int dbref, unsigned flags, PortData* data)
}
WRITE_UNLOCK(G_DATABASES_RWLOCK);
return 1;
return 0;
}
else
{
return ERROR_INVALID_DBREF;
}
return 0;
}
/**
@ -858,6 +940,85 @@ static void do_async_txnop(void* arg)
}
static void do_async_cursor_get(void* arg)
{
// Payload is: << DbRef:32, Flags:32, KeyLen:32, Key:KeyLen >>
PortData* d = (PortData*)arg;
// Setup DBTs
DBT key;
DBT value;
memset(&key, '\0', sizeof(DBT));
memset(&value, '\0', sizeof(DBT));
// Determine what type of cursor get to perform
int flags = 0;
switch (d->async_op)
{
case CMD_CURSOR_NEXT:
flags = DB_NEXT; break;
case CMD_CURSOR_PREV:
flags = DB_PREV; break;
default:
flags = DB_CURRENT;
}
// Execute the operation
int rc = d->cursor->get(d->cursor, &key, &value, flags);
// Cleanup as necessary; any sort of failure means we need to close the cursor and abort
// the transaction
if (rc && rc != DB_NOTFOUND)
{
d->cursor->close(d->cursor);
d->cursor = 0;
if (d->txn)
{
d->txn->abort(d->txn);
d->txn = 0;
}
}
// Save the port and pid references -- we need copies independent from the PortData
// structure. Once we release the port_lock after clearing the cmd, it's possible that
// the port could go away without waiting on us to finish. This is acceptable, but we need
// to be certain that there is no overlap of data between the two threads. driver_send_term
// is safe to use from a thread, even if the port you're sending from has already expired.
ErlDrvPort port = d->port;
ErlDrvTermData port_owner = d->port_owner;
// Release the port for another operation
d->async_pool = 0;
d->async_job = 0;
erl_drv_mutex_lock(d->port_lock);
d->async_op = CMD_NONE;
erl_drv_mutex_unlock(d->port_lock);
// Notify port of result
if (rc == 0)
{
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("ok"),
ERL_DRV_BUF2BINARY, (ErlDrvTermData)key.data, (ErlDrvUInt)key.size,
ERL_DRV_BUF2BINARY, (ErlDrvTermData)value.data, (ErlDrvUInt)value.size,
ERL_DRV_TUPLE, 3};
driver_send_term(port, port_owner, response, sizeof(response) / sizeof(response[0]));
}
else if (rc == DB_NOTFOUND)
{
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("not_found") };
driver_send_term(port, port_owner, response, sizeof(response) / sizeof(response[0]));
}
else
{
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"),
ERL_DRV_INT, rc,
ERL_DRV_TUPLE, 2};
driver_send_term(port, port_owner, response, sizeof(response) / sizeof(response[0]));
}
}
static void* zalloc(unsigned int size)
{
void* res = driver_alloc(size);

View file

@ -37,6 +37,11 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
#define CMD_GET 6
#define CMD_PUT 7
#define CMD_TUNE 8
#define CMD_CURSOR_OPEN 9
#define CMD_CURSOR_CURR 10
#define CMD_CURSOR_NEXT 11
#define CMD_CURSOR_PREV 12
#define CMD_CURSOR_CLOSE 13
/**
@ -60,6 +65,8 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
#define ERROR_INVALID_DBREF (-29002) /* DbRef not currently opened by this port */
#define ERROR_TXN_OPEN (-29003) /* Transaction already active on this port */
#define ERROR_NO_TXN (-29004) /* No transaction open on this port */
#define ERROR_CURSOR_OPEN (-29005) /* Cursor already active on this port */
#define ERROR_NO_CURSOR (-29006) /* No cursor open on this port */
/**
@ -137,12 +144,14 @@ typedef struct
ErlDrvTermData port_owner; /* Pid of the port owner */
DbRefList* dbrefs; /* List of databases that this port has opened */
DbRefList* dbrefs; /* List of databases that this port has opened */
DB_TXN* txn; /* Transaction handle for this port; each port may only have 1 txn
* active */
DB_TXN* txn; /* Transaction handle for this port; each port may only have 1 txn
* active */
int async_op; /* Value indicating what async op is pending */
DBC* cursor; /* Active cursor handle; each port may have only 1 cursor active */
int async_op; /* Value indicating what async op is pending */
int async_flags; /* Flags for the async op command */

View file

@ -5,15 +5,21 @@
%%
%% -------------------------------------------------------------------
-define(CMD_NONE, 0).
-define(CMD_OPEN_DB, 1).
-define(CMD_CLOSE_DB, 2).
-define(CMD_TXN_BEGIN, 3).
-define(CMD_TXN_COMMIT, 4).
-define(CMD_TXN_ABORT, 5).
-define(CMD_GET, 6).
-define(CMD_PUT, 7).
-define(CMD_TUNE, 8).
-define(CMD_NONE, 0).
-define(CMD_OPEN_DB, 1).
-define(CMD_CLOSE_DB, 2).
-define(CMD_TXN_BEGIN, 3).
-define(CMD_TXN_COMMIT, 4).
-define(CMD_TXN_ABORT, 5).
-define(CMD_GET, 6).
-define(CMD_PUT, 7).
-define(CMD_TUNE, 8).
-define(CMD_CURSOR_OPEN, 9).
-define(CMD_CURSOR_CURR, 10).
-define(CMD_CURSOR_NEXT, 11).
-define(CMD_CURSOR_PREV, 12).
-define(CMD_CURSOR_CLOSE, 13).
-define(DB_TYPE_BTREE, 1).
-define(DB_TYPE_HASH, 2).
@ -32,6 +38,8 @@
-define(ERROR_INVALID_DBREF, -29002). % DbRef not currently opened by this port
-define(ERROR_TXN_OPEN, -29003). % Transaction already active on this port
-define(ERROR_NO_TXN, -29004). % No transaction open on this port
-define(ERROR_CURSOR_OPEN, -29005). % Cursor already active on this port
-define(ERROR_NO_CURSOR, -29006). % No cursor open on this port
-define(ERROR_DB_LOCK_NOTGRANTED, -30993). % Lock was busy and not granted
-define(ERROR_DB_LOCK_DEADLOCK, -30994). % Deadlock occurred

View file

@ -17,7 +17,8 @@
put_r/3, put_r/4,
get/2, get/3,
get_r/2, get_r/3,
update/3, update/4]).
update/3, update/4,
cursor_open/1, cursor_next/0, cursor_prev/0, cursor_current/0, cursor_close/0]).
-include("bdberl.hrl").
@ -46,11 +47,12 @@ close(Db) ->
close(Db, Opts) ->
Flags = process_flags(Opts),
Cmd = <<Db:32/native-integer, Flags:32/unsigned-native-integer>>,
case erlang:port_control(get_port(), ?CMD_CLOSE_DB, Cmd) of
<<0:32/native-integer>> ->
{error, invalid_db};
<<1:32/native-integer>> ->
ok
<<Rc:32/native-signed>> = erlang:port_control(get_port(), ?CMD_CLOSE_DB, Cmd),
case decode_rc(Rc) of
ok ->
ok;
Reason ->
{error, Reason}
end.
txn_begin() ->
@ -181,6 +183,36 @@ update(Db, Key, Fun, Args) ->
end,
transaction(F).
cursor_open(Db) ->
Cmd = <<Db:32/native, 0:32/native>>,
<<Rc:32/native-signed>> = erlang:port_control(get_port(), ?CMD_CURSOR_OPEN, Cmd),
case decode_rc(Rc) of
ok ->
ok;
Reason ->
{error, Reason}
end.
cursor_next() ->
do_cursor_move(?CMD_CURSOR_NEXT).
cursor_prev() ->
do_cursor_move(?CMD_CURSOR_PREV).
cursor_current() ->
do_cursor_move(?CMD_CURSOR_CURR).
cursor_close() ->
<<Rc:32/native-signed>> = erlang:port_control(get_port(), ?CMD_CURSOR_CLOSE, <<>>),
case decode_rc(Rc) of
ok ->
ok;
Reason ->
{error, Reason}
end.
get_cache_size() ->
Cmd = <<?SYSP_CACHESIZE_GET:32/native>>,
<<Result:32/signed-native, Gbytes:32/native, Bytes:32/native, Ncaches:32/native>> =
@ -248,8 +280,11 @@ get_port() ->
%%
decode_rc(?ERROR_NONE) -> ok;
decode_rc(?ERROR_ASYNC_PENDING) -> async_pending;
decode_rc(?ERROR_INVALID_DBREF) -> invalid_dbref;
decode_rc(?ERROR_INVALID_DBREF) -> invalid_db;
decode_rc(?ERROR_TXN_OPEN) -> transaction_open;
decode_rc(?ERROR_NO_TXN) -> no_txn;
decode_rc(?ERROR_CURSOR_OPEN) -> cursor_open;
decode_rc(?ERROR_NO_CURSOR) -> no_cursor;
decode_rc(?ERROR_DB_LOCK_NOTGRANTED) -> lock_not_granted;
decode_rc(?ERROR_DB_LOCK_DEADLOCK) -> deadlock;
decode_rc(Rc) -> {unknown, Rc}.
@ -304,3 +339,22 @@ flag_value(Flag) ->
txn_write_nosync -> ?DB_TXN_WRITE_NOSYNC
end.
%%
%% Move the cursor in a given direction. Invoked by cursor_next/prev/current.
%%
do_cursor_move(Direction) ->
<<Rc:32/native-signed>> = erlang:port_control(get_port(), Direction, <<>>),
case decode_rc(Rc) of
ok ->
receive
{ok, KeyBin, ValueBin} ->
{ok, binary_to_term(KeyBin), binary_to_term(ValueBin)};
not_found ->
not_found;
{error, ReasonCode} ->
{error, decode_rc(ReasonCode)}
end;
Reason ->
{error, Reason}
end.

View file

@ -23,7 +23,8 @@ all() ->
transaction_should_abort_on_user_abort,
update_should_save_value_if_successful,
update_should_accept_args_for_fun,
port_should_tune_transaction_timeouts].
port_should_tune_transaction_timeouts,
cursor_should_iterate, cursor_should_fail_if_not_open].
init_per_testcase(_TestCase, Config) ->
@ -124,3 +125,37 @@ port_should_tune_transaction_timeouts(_Config) ->
{ok, 500000} = bdberl:get_txn_timeout(),
ok = bdberl:set_txn_timeout(250000),
{ok, 250000} = bdberl:get_txn_timeout().
cursor_should_iterate(Config) ->
Db = ?config(db, Config),
%% Store some sample values in the db
ok = bdberl:put(Db, key1, value1),
ok = bdberl:put(Db, key2, value2),
ok = bdberl:put(Db, key3, value3),
%% Validate that the cursor returns each value in order (ASSUME btree db)
ok = bdberl:cursor_open(Db),
{ok, key1, value1} = bdberl:cursor_next(),
{ok, key2, value2} = bdberl:cursor_next(),
{ok, key3, value3} = bdberl:cursor_next(),
not_found = bdberl:cursor_next(),
%% Validate that the "current" key is key3
{ok, key3, value3} = bdberl:cursor_current(),
%% Now move backwards (should jump to key2, since we are "on" key3)
{ok, key2, value2} = bdberl:cursor_prev(),
{ok, key1, value1} = bdberl:cursor_prev(),
not_found = bdberl:cursor_prev(),
ok = bdberl:cursor_close().
cursor_should_fail_if_not_open(Config) ->
{error, no_cursor} = bdberl:cursor_next(),
{error, no_cursor} = bdberl:cursor_prev(),
{error, no_cursor} = bdberl:cursor_current(),
{error, no_cursor} = bdberl:cursor_close().