From 2d323cb3043002442ef77e6fd8d769b0f254b264 Mon Sep 17 00:00:00 2001 From: Dave Smith Date: Mon, 15 Dec 2008 11:25:03 -0700 Subject: [PATCH] Adding support for cursor operations --- c_src/bdberl_drv.c | 179 +++++++++++++++++++++++++++++++++++++++--- c_src/bdberl_drv.h | 17 +++- include/bdberl.hrl | 26 +++--- src/bdberl.erl | 68 ++++++++++++++-- test/bdberl_SUITE.erl | 37 ++++++++- 5 files changed, 297 insertions(+), 30 deletions(-) diff --git a/c_src/bdberl_drv.c b/c_src/bdberl_drv.c index a32dfba..719d722 100644 --- a/c_src/bdberl_drv.c +++ b/c_src/bdberl_drv.c @@ -27,6 +27,7 @@ static void tune_system(int target, void* values, BinHelper* bh); static void do_async_put(void* arg); static void do_async_get(void* arg); static void do_async_txnop(void* arg); +static void do_async_cursor_get(void* arg); static int add_dbref(PortData* data, int dbref); static int del_dbref(PortData* data, int dbref); @@ -170,12 +171,11 @@ DRIVER_INIT(bdberl_drv) } // Make sure we can distiguish between lock timeouts and deadlocks - int rc = G_DB_ENV->set_flags(G_DB_ENV, DB_TIME_NOTGRANTED, 1); - printf("TIME_NOT_GRANTED rc: %d\n", rc); + G_DB_ENV->set_flags(G_DB_ENV, DB_TIME_NOTGRANTED, 1); + // Initialization transaction timeout so that deadlock checking works properly db_timeout_t to = 500 * 1000; // 500 ms - rc = G_DB_ENV->set_timeout(G_DB_ENV, to, DB_SET_TXN_TIMEOUT); - printf("DB_SET_TXN_TMEOUT rc: %d value %d\n", rc, to); + G_DB_ENV->set_timeout(G_DB_ENV, to, DB_SET_TXN_TIMEOUT); // BDB is setup -- allocate structures for tracking databases G_DATABASES = (Database*) driver_alloc(sizeof(Database) * G_DATABASES_SIZE); @@ -266,6 +266,12 @@ static void bdberl_drv_stop(ErlDrvData handle) // Cleanup the port lock erl_drv_mutex_destroy(d->port_lock); + // If a cursor is open, close it + if (d->cursor) + { + d->cursor->close(d->cursor); + } + // If a txn is currently active, terminate it. if (d->txn) { @@ -349,6 +355,12 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, { FAIL_IF_ASYNC_PENDING(d, outbuf); + // Fail if a cursor is open + if (d->cursor != 0) + { + RETURN_INT(ERROR_CURSOR_OPEN, outbuf); + } + // Fail if a txn is open if (d->txn != 0) { @@ -468,6 +480,76 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, tune_system(target, values, &bh); RETURN_BH(bh, outbuf); } + case CMD_CURSOR_OPEN: + { + FAIL_IF_ASYNC_PENDING(d, outbuf); + + if (d->cursor) + { + RETURN_INT(ERROR_CURSOR_OPEN, outbuf); + } + + // Inbuf is << DbRef:32, Flags:32 >> + int dbref = UNPACK_INT(inbuf, 0); + int flags = UNPACK_INT(inbuf, 4); + + // Make sure we have a reference to the requested database + if (has_dbref(d, dbref)) + { + // Grab the database handle and open the cursor + DB* db = G_DATABASES[dbref].db; + int rc = db->cursor(db, d->txn, &(d->cursor), flags); + RETURN_INT(rc, outbuf); + } + else + { + RETURN_INT(ERROR_INVALID_DBREF, outbuf); + } + } + case CMD_CURSOR_CURR: + case CMD_CURSOR_NEXT: + case CMD_CURSOR_PREV: + { + FAIL_IF_ASYNC_PENDING(d, outbuf); + + // Fail if no cursor currently open + if (!d->cursor) + { + RETURN_INT(ERROR_NO_CURSOR, outbuf); + } + + // Schedule the operation + d->async_op = cmd; + d->async_pool = G_TPOOL_GENERAL; + d->async_job = bdberl_tpool_run(G_TPOOL_GENERAL, &do_async_cursor_get, d, 0); + + // Let caller know operation is in progress + RETURN_INT(0, outbuf); + } + case CMD_CURSOR_CLOSE: + { + FAIL_IF_ASYNC_PENDING(d, outbuf); + + // Fail if no cursor open + if (!d->cursor) + { + RETURN_INT(ERROR_NO_CURSOR, outbuf); + } + + // It's possible to get a deadlock when closing a cursor -- in that situation we also + // need to go ahead and abort the txn + int rc = d->cursor->close(d->cursor); + if (d->txn && (rc == DB_LOCK_NOTGRANTED || rc == DB_LOCK_DEADLOCK)) + { + d->txn->abort(d->txn); + d->txn = 0; + } + + // Regardless of what happens, clear out the cursor pointer + d->cursor = 0; + + RETURN_INT(0, outbuf); + } } *outbuf = 0; return 0; @@ -589,8 +671,6 @@ static int open_database(const char* name, DBTYPE type, unsigned int flags, Port static int close_database(int dbref, unsigned flags, PortData* data) { -// printf("Closing %d for port %p\n", dbref, data->port); - // Remove this database from our list if (del_dbref(data, dbref)) { @@ -622,10 +702,12 @@ static int close_database(int dbref, unsigned flags, PortData* data) } WRITE_UNLOCK(G_DATABASES_RWLOCK); - return 1; + return 0; + } + else + { + return ERROR_INVALID_DBREF; } - - return 0; } /** @@ -858,6 +940,85 @@ static void do_async_txnop(void* arg) } +static void do_async_cursor_get(void* arg) +{ + // Payload is: << DbRef:32, Flags:32, KeyLen:32, Key:KeyLen >> + PortData* d = (PortData*)arg; + + // Setup DBTs + DBT key; + DBT value; + memset(&key, '\0', sizeof(DBT)); + memset(&value, '\0', sizeof(DBT)); + + // Determine what type of cursor get to perform + int flags = 0; + switch (d->async_op) + { + case CMD_CURSOR_NEXT: + flags = DB_NEXT; break; + case CMD_CURSOR_PREV: + flags = DB_PREV; break; + default: + flags = DB_CURRENT; + } + + // Execute the operation + int rc = d->cursor->get(d->cursor, &key, &value, flags); + + // Cleanup as necessary; any sort of failure means we need to close the cursor and abort + // the transaction + if (rc && rc != DB_NOTFOUND) + { + d->cursor->close(d->cursor); + d->cursor = 0; + if (d->txn) + { + d->txn->abort(d->txn); + d->txn = 0; + } + } + + // Save the port and pid references -- we need copies independent from the PortData + // structure. Once we release the port_lock after clearing the cmd, it's possible that + // the port could go away without waiting on us to finish. This is acceptable, but we need + // to be certain that there is no overlap of data between the two threads. driver_send_term + // is safe to use from a thread, even if the port you're sending from has already expired. + ErlDrvPort port = d->port; + ErlDrvTermData port_owner = d->port_owner; + + // Release the port for another operation + d->async_pool = 0; + d->async_job = 0; + erl_drv_mutex_lock(d->port_lock); + d->async_op = CMD_NONE; + erl_drv_mutex_unlock(d->port_lock); + + // Notify port of result + if (rc == 0) + { + ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("ok"), + ERL_DRV_BUF2BINARY, (ErlDrvTermData)key.data, (ErlDrvUInt)key.size, + ERL_DRV_BUF2BINARY, (ErlDrvTermData)value.data, (ErlDrvUInt)value.size, + ERL_DRV_TUPLE, 3}; + driver_send_term(port, port_owner, response, sizeof(response) / sizeof(response[0])); + } + else if (rc == DB_NOTFOUND) + { + ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("not_found") }; + driver_send_term(port, port_owner, response, sizeof(response) / sizeof(response[0])); + } + else + { + ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"), + ERL_DRV_INT, rc, + ERL_DRV_TUPLE, 2}; + driver_send_term(port, port_owner, response, sizeof(response) / sizeof(response[0])); + } +} + + + static void* zalloc(unsigned int size) { void* res = driver_alloc(size); diff --git a/c_src/bdberl_drv.h b/c_src/bdberl_drv.h index 952fe23..cb19616 100644 --- a/c_src/bdberl_drv.h +++ b/c_src/bdberl_drv.h @@ -37,6 +37,11 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, #define CMD_GET 6 #define CMD_PUT 7 #define CMD_TUNE 8 +#define CMD_CURSOR_OPEN 9 +#define CMD_CURSOR_CURR 10 +#define CMD_CURSOR_NEXT 11 +#define CMD_CURSOR_PREV 12 +#define CMD_CURSOR_CLOSE 13 /** @@ -60,6 +65,8 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, #define ERROR_INVALID_DBREF (-29002) /* DbRef not currently opened by this port */ #define ERROR_TXN_OPEN (-29003) /* Transaction already active on this port */ #define ERROR_NO_TXN (-29004) /* No transaction open on this port */ +#define ERROR_CURSOR_OPEN (-29005) /* Cursor already active on this port */ +#define ERROR_NO_CURSOR (-29006) /* No cursor open on this port */ /** @@ -137,12 +144,14 @@ typedef struct ErlDrvTermData port_owner; /* Pid of the port owner */ - DbRefList* dbrefs; /* List of databases that this port has opened */ + DbRefList* dbrefs; /* List of databases that this port has opened */ - DB_TXN* txn; /* Transaction handle for this port; each port may only have 1 txn - * active */ + DB_TXN* txn; /* Transaction handle for this port; each port may only have 1 txn + * active */ - int async_op; /* Value indicating what async op is pending */ + DBC* cursor; /* Active cursor handle; each port may have only 1 cursor active */ + + int async_op; /* Value indicating what async op is pending */ int async_flags; /* Flags for the async op command */ diff --git a/include/bdberl.hrl b/include/bdberl.hrl index b6984c3..d3a1f59 100644 --- a/include/bdberl.hrl +++ b/include/bdberl.hrl @@ -5,15 +5,21 @@ %% %% ------------------------------------------------------------------- --define(CMD_NONE, 0). --define(CMD_OPEN_DB, 1). --define(CMD_CLOSE_DB, 2). --define(CMD_TXN_BEGIN, 3). --define(CMD_TXN_COMMIT, 4). --define(CMD_TXN_ABORT, 5). --define(CMD_GET, 6). --define(CMD_PUT, 7). --define(CMD_TUNE, 8). +-define(CMD_NONE, 0). +-define(CMD_OPEN_DB, 1). +-define(CMD_CLOSE_DB, 2). +-define(CMD_TXN_BEGIN, 3). +-define(CMD_TXN_COMMIT, 4). +-define(CMD_TXN_ABORT, 5). +-define(CMD_GET, 6). +-define(CMD_PUT, 7). +-define(CMD_TUNE, 8). +-define(CMD_CURSOR_OPEN, 9). +-define(CMD_CURSOR_CURR, 10). +-define(CMD_CURSOR_NEXT, 11). +-define(CMD_CURSOR_PREV, 12). +-define(CMD_CURSOR_CLOSE, 13). + -define(DB_TYPE_BTREE, 1). -define(DB_TYPE_HASH, 2). @@ -32,6 +38,8 @@ -define(ERROR_INVALID_DBREF, -29002). % DbRef not currently opened by this port -define(ERROR_TXN_OPEN, -29003). % Transaction already active on this port -define(ERROR_NO_TXN, -29004). % No transaction open on this port +-define(ERROR_CURSOR_OPEN, -29005). % Cursor already active on this port +-define(ERROR_NO_CURSOR, -29006). % No cursor open on this port -define(ERROR_DB_LOCK_NOTGRANTED, -30993). % Lock was busy and not granted -define(ERROR_DB_LOCK_DEADLOCK, -30994). % Deadlock occurred diff --git a/src/bdberl.erl b/src/bdberl.erl index 54e0f3e..c6edf4c 100644 --- a/src/bdberl.erl +++ b/src/bdberl.erl @@ -17,7 +17,8 @@ put_r/3, put_r/4, get/2, get/3, get_r/2, get_r/3, - update/3, update/4]). + update/3, update/4, + cursor_open/1, cursor_next/0, cursor_prev/0, cursor_current/0, cursor_close/0]). -include("bdberl.hrl"). @@ -46,11 +47,12 @@ close(Db) -> close(Db, Opts) -> Flags = process_flags(Opts), Cmd = <>, - case erlang:port_control(get_port(), ?CMD_CLOSE_DB, Cmd) of - <<0:32/native-integer>> -> - {error, invalid_db}; - <<1:32/native-integer>> -> - ok + <> = erlang:port_control(get_port(), ?CMD_CLOSE_DB, Cmd), + case decode_rc(Rc) of + ok -> + ok; + Reason -> + {error, Reason} end. txn_begin() -> @@ -181,6 +183,36 @@ update(Db, Key, Fun, Args) -> end, transaction(F). +cursor_open(Db) -> + Cmd = <>, + <> = erlang:port_control(get_port(), ?CMD_CURSOR_OPEN, Cmd), + case decode_rc(Rc) of + ok -> + ok; + Reason -> + {error, Reason} + end. + + +cursor_next() -> + do_cursor_move(?CMD_CURSOR_NEXT). + +cursor_prev() -> + do_cursor_move(?CMD_CURSOR_PREV). + +cursor_current() -> + do_cursor_move(?CMD_CURSOR_CURR). + +cursor_close() -> + <> = erlang:port_control(get_port(), ?CMD_CURSOR_CLOSE, <<>>), + case decode_rc(Rc) of + ok -> + ok; + Reason -> + {error, Reason} + end. + + get_cache_size() -> Cmd = <>, <> = @@ -248,8 +280,11 @@ get_port() -> %% decode_rc(?ERROR_NONE) -> ok; decode_rc(?ERROR_ASYNC_PENDING) -> async_pending; -decode_rc(?ERROR_INVALID_DBREF) -> invalid_dbref; +decode_rc(?ERROR_INVALID_DBREF) -> invalid_db; +decode_rc(?ERROR_TXN_OPEN) -> transaction_open; decode_rc(?ERROR_NO_TXN) -> no_txn; +decode_rc(?ERROR_CURSOR_OPEN) -> cursor_open; +decode_rc(?ERROR_NO_CURSOR) -> no_cursor; decode_rc(?ERROR_DB_LOCK_NOTGRANTED) -> lock_not_granted; decode_rc(?ERROR_DB_LOCK_DEADLOCK) -> deadlock; decode_rc(Rc) -> {unknown, Rc}. @@ -304,3 +339,22 @@ flag_value(Flag) -> txn_write_nosync -> ?DB_TXN_WRITE_NOSYNC end. + +%% +%% Move the cursor in a given direction. Invoked by cursor_next/prev/current. +%% +do_cursor_move(Direction) -> + <> = erlang:port_control(get_port(), Direction, <<>>), + case decode_rc(Rc) of + ok -> + receive + {ok, KeyBin, ValueBin} -> + {ok, binary_to_term(KeyBin), binary_to_term(ValueBin)}; + not_found -> + not_found; + {error, ReasonCode} -> + {error, decode_rc(ReasonCode)} + end; + Reason -> + {error, Reason} + end. diff --git a/test/bdberl_SUITE.erl b/test/bdberl_SUITE.erl index c54f380..3aadda2 100644 --- a/test/bdberl_SUITE.erl +++ b/test/bdberl_SUITE.erl @@ -23,7 +23,8 @@ all() -> transaction_should_abort_on_user_abort, update_should_save_value_if_successful, update_should_accept_args_for_fun, - port_should_tune_transaction_timeouts]. + port_should_tune_transaction_timeouts, + cursor_should_iterate, cursor_should_fail_if_not_open]. init_per_testcase(_TestCase, Config) -> @@ -124,3 +125,37 @@ port_should_tune_transaction_timeouts(_Config) -> {ok, 500000} = bdberl:get_txn_timeout(), ok = bdberl:set_txn_timeout(250000), {ok, 250000} = bdberl:get_txn_timeout(). + +cursor_should_iterate(Config) -> + Db = ?config(db, Config), + + %% Store some sample values in the db + ok = bdberl:put(Db, key1, value1), + ok = bdberl:put(Db, key2, value2), + ok = bdberl:put(Db, key3, value3), + + %% Validate that the cursor returns each value in order (ASSUME btree db) + ok = bdberl:cursor_open(Db), + {ok, key1, value1} = bdberl:cursor_next(), + {ok, key2, value2} = bdberl:cursor_next(), + {ok, key3, value3} = bdberl:cursor_next(), + not_found = bdberl:cursor_next(), + + %% Validate that the "current" key is key3 + {ok, key3, value3} = bdberl:cursor_current(), + + %% Now move backwards (should jump to key2, since we are "on" key3) + {ok, key2, value2} = bdberl:cursor_prev(), + {ok, key1, value1} = bdberl:cursor_prev(), + not_found = bdberl:cursor_prev(), + + ok = bdberl:cursor_close(). + +cursor_should_fail_if_not_open(Config) -> + {error, no_cursor} = bdberl:cursor_next(), + {error, no_cursor} = bdberl:cursor_prev(), + {error, no_cursor} = bdberl:cursor_current(), + {error, no_cursor} = bdberl:cursor_close(). + + +