diff --git a/c_src/bdberl_drv.c b/c_src/bdberl_drv.c index 4c7bfb8..b9aad6c 100644 --- a/c_src/bdberl_drv.c +++ b/c_src/bdberl_drv.c @@ -113,6 +113,7 @@ static void do_async_txnop(void* arg); static void do_async_cursor_put(void* arg); static void do_async_cursor_get(void* arg); static void do_async_cursor_del(void* arg); +static void do_async_cursor_count(void* arg); static void do_async_cursor_cnp(void* arg); static void do_async_truncate(void* arg); static void do_sync_data_dirs_info(PortData *p); @@ -846,6 +847,19 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, // Let caller know operation is in progress RETURN_INT(0, outbuf); } + case CMD_CURSOR_COUNT: + { + FAIL_IF_ASYNC_PENDING(d, outbuf); + FAIL_IF_NO_CURSOR(d, outbuf); + + // Schedule the operation + d->async_op = cmd; + bdberl_general_tpool_run(&do_async_cursor_count, d, 0, &d->async_job); + + // Let caller know operation is in progress + // Outbuf is: <<0:32>> + RETURN_INT(0, outbuf); + } case CMD_CURSOR_CLOSE: { FAIL_IF_ASYNC_PENDING(d, outbuf); @@ -1464,6 +1478,29 @@ static int send_dir_info(ErlDrvPort port, ErlDrvTermData pid, const char *path) } +static void send_error_response(ErlDrvPort port, ErlDrvTermData pid, int rc) +{ + // See if this is a standard errno that we have an erlang code for + char *error = bdberl_rc_to_atom_str(rc); + if (error != NULL) + { + ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"), + ERL_DRV_ATOM, driver_mk_atom(error), + ERL_DRV_TUPLE, 2}; + driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); + } + else + { + ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"), + ERL_DRV_ATOM, driver_mk_atom("unknown"), + ERL_DRV_INT, rc, + ERL_DRV_TUPLE, 2, + ERL_DRV_TUPLE, 2}; + driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); + } + } + + void bdberl_send_rc(ErlDrvPort port, ErlDrvTermData pid, int rc) { // TODO: May need to tag the messages a bit more explicitly so that if another async @@ -1476,24 +1513,7 @@ void bdberl_send_rc(ErlDrvPort port, ErlDrvTermData pid, int rc) } else { - // See if this is a standard errno that we have an erlang code for - char *error = bdberl_rc_to_atom_str(rc); - if (error != NULL) - { - ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"), - ERL_DRV_ATOM, driver_mk_atom(error), - ERL_DRV_TUPLE, 2}; - driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); - } - else - { - ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"), - ERL_DRV_ATOM, driver_mk_atom("unknown"), - ERL_DRV_INT, rc, - ERL_DRV_TUPLE, 2, - ERL_DRV_TUPLE, 2}; - driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); - } + send_error_response(port, pid, rc); } } @@ -1511,6 +1531,32 @@ void bdberl_async_cleanup_and_send_rc(PortData* d, int rc) bdberl_send_rc(port, pid, rc); } +static void async_cleanup_and_send_uint32(PortData* d, int rc, unsigned int value) +{ + // Save the port and pid references -- we need copies independent from the PortData + // structure. Once we release the port_lock after clearing the cmd, it's possible that + // the port could go away without waiting on us to finish. This is acceptable, but we need + // to be certain that there is no overlap of data between the two threads. driver_send_term + // is safe to use from a thread, even if the port you're sending from has already expired. + ErlDrvPort port = d->port; + ErlDrvTermData pid = d->port_owner; + + bdberl_async_cleanup(d); + + // Notify port of result + if (rc == 0) + { + ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("ok"), + ERL_DRV_UINT, value, + ERL_DRV_TUPLE, 2}; + driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); + } + else + { + send_error_response(port, pid, rc); + } +} + static void async_cleanup_and_send_kv(PortData* d, int rc, DBT* key, DBT* value) { // Save the port and pid references -- we need copies independent from the PortData @@ -1539,24 +1585,7 @@ static void async_cleanup_and_send_kv(PortData* d, int rc, DBT* key, DBT* value) } else { - // See if this is a standard errno that we have an erlang code for - char *error = bdberl_rc_to_atom_str(rc); - if (error != NULL) - { - ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"), - ERL_DRV_ATOM, driver_mk_atom(error), - ERL_DRV_TUPLE, 2}; - driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); - } - else - { - ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"), - ERL_DRV_ATOM, driver_mk_atom("unknown"), - ERL_DRV_INT, rc, - ERL_DRV_TUPLE, 2, - ERL_DRV_TUPLE, 2}; - driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); - } + send_error_response(port, pid, rc); } } @@ -1810,10 +1839,27 @@ static void do_async_cursor_get(void* arg) static void do_async_cursor_del(void* arg) { - PortData* d = (PortData*)arg; - assert(d->cursor != NULL); - DBGCMD(d, "cursor_del/2 not yet implemented..."); /* TODO: implement this. */ - bdberl_async_cleanup_and_send_rc(d, ERROR_DB_ACTIVE); + PortData* d = (PortData*)arg; + assert(d->cursor != NULL); + DBGCMD(d, "cursor_del/2 not yet implemented..."); /* TODO: implement this. */ + bdberl_async_cleanup_and_send_rc(d, ERROR_DB_ACTIVE); +} + + +static void do_async_cursor_count(void* arg) +{ + PortData* d = (PortData*)arg; + assert(d->cursor != NULL); + + // Place to store the record count. + db_recno_t count = 0; + + // Execute the operation + DBGCMD(d, "d->cursor->count(%p, %p, %08X);", d->cursor, &count, 0); + int rc = d->cursor->count(d->cursor, &count, 0); + DBGCMDRC(d, rc); + + async_cleanup_and_send_uint32(d, rc, count); } diff --git a/c_src/bdberl_drv.h b/c_src/bdberl_drv.h index d4a5a88..5d18303 100644 --- a/c_src/bdberl_drv.h +++ b/c_src/bdberl_drv.h @@ -53,9 +53,6 @@ #define CMD_CURSOR_NEXT 12 #define CMD_CURSOR_PREV 13 #define CMD_CURSOR_CLOSE 14 -#define CMD_CURSOR_GET 35 /* TODO: renumber these next 3 and match them with bdberl.hrl */ -#define CMD_CURSOR_PUT 36 -#define CMD_CURSOR_DEL 37 #define CMD_PUT_COMMIT 15 #define CMD_REMOVE_DB 16 #define CMD_TRUNCATE 17 @@ -76,6 +73,10 @@ #define CMD_DATA_DIRS_INFO 32 #define CMD_LOG_DIR_INFO 33 #define CMD_DRIVER_INFO 34 +#define CMD_CURSOR_GET 35 +#define CMD_CURSOR_PUT 36 +#define CMD_CURSOR_DEL 37 +#define CMD_CURSOR_COUNT 38 /** * Command status values diff --git a/include/bdberl.hrl b/include/bdberl.hrl index 993abcd..a8479ec 100644 --- a/include/bdberl.hrl +++ b/include/bdberl.hrl @@ -46,9 +46,6 @@ -define(CMD_CURSOR_NEXT, 12). -define(CMD_CURSOR_PREV, 13). -define(CMD_CURSOR_CLOSE, 14). --define(CMD_CURSOR_GET, 35). %% TODO: renumber these 3 and match them to bdberl_drv.h --define(CMD_CURSOR_PUT, 36). --define(CMD_CURSOR_DEL, 37). -define(CMD_PUT_COMMIT, 15). -define(CMD_REMOVE_DB, 16). -define(CMD_TRUNCATE, 17). @@ -69,6 +66,10 @@ -define(CMD_DATA_DIRS_INFO, 32). -define(CMD_LOG_DIR_INFO, 33). -define(CMD_DRIVER_INFO, 34). +-define(CMD_CURSOR_GET, 35). +-define(CMD_CURSOR_PUT, 36). +-define(CMD_CURSOR_DEL, 37). +-define(CMD_CURSOR_COUNT, 38). -define(DB_TYPE_BTREE, 1). -define(DB_TYPE_HASH, 2). diff --git a/src/bdberl.erl b/src/bdberl.erl index 8921026..fba2ce3 100644 --- a/src/bdberl.erl +++ b/src/bdberl.erl @@ -68,6 +68,7 @@ delete_database/1, cursor_open/1, cursor_next/0, cursor_prev/0, cursor_current/0, cursor_close/0, cursor_get/0, cursor_get/1, cursor_get/2, %TODO: cursor_del/2, cursor_del/3, cursor_put/2, cursor_put/3, + cursor_count/0, driver_info/0, register_logger/0, stop/0]). @@ -1397,6 +1398,25 @@ cursor_get(Key, Opts) -> end. +%%-------------------------------------------------------------------- +%% @doc +%% Returns the count of duplicate records for the key to which the +%% cursor currently refers. +%% +%% If this function fails for any reason, the state of the cursor will +%% be unchanged. +%% +%% @spec cursor_count() -> {ok, Count} | {error, Error} +%% +%% @end +%%-------------------------------------------------------------------- +-spec cursor_count() -> {ok, Count :: number()} | {error, db_error()}. + +cursor_count() -> + <> = erlang:port_control(get_port(), ?CMD_CURSOR_COUNT, <<>>), + recv_val(Result). + + %%-------------------------------------------------------------------- %% @doc %% Closes the cursor. diff --git a/test/bdberl_SUITE.erl b/test/bdberl_SUITE.erl index 261a177..19694d3 100644 --- a/test/bdberl_SUITE.erl +++ b/test/bdberl_SUITE.erl @@ -55,6 +55,7 @@ all() -> update_should_accept_args_for_fun, port_should_return_transaction_timeouts, cursor_should_iterate, cursor_get_should_pos, cursor_should_fail_if_not_open, + cursor_should_return_count, put_commit_should_end_txn, data_dir_should_be_priv_dir, delete_should_remove_file, @@ -270,6 +271,20 @@ cursor_get_should_pos(Config) -> ok = bdberl:cursor_close(). +cursor_should_return_count(Config) -> + Db = ?config(db, Config), + + %% Store some sample values in the db + ok = bdberl:put(Db, key, value1), +% ok = bdberl:put(Db, key, value2), +% ok = bdberl:put(Db, key, value3), +% ok = bdberl:put(Db, key, value4), + ok = bdberl:cursor_open(Db), + {ok, _} = bdberl:cursor_get(key), + %% Validate the count of duplicate values + {ok, 1} = bdberl:cursor_count(), + ok = bdberl:cursor_close(). + cursor_should_fail_if_not_open(_Config) -> {error, no_cursor} = bdberl:cursor_next(), {error, no_cursor} = bdberl:cursor_prev(),