Add support for cursor->count()
This commit is contained in:
parent
11c864d949
commit
b70d9b8afa
5 changed files with 129 additions and 46 deletions
|
@ -113,6 +113,7 @@ static void do_async_txnop(void* arg);
|
|||
static void do_async_cursor_put(void* arg);
|
||||
static void do_async_cursor_get(void* arg);
|
||||
static void do_async_cursor_del(void* arg);
|
||||
static void do_async_cursor_count(void* arg);
|
||||
static void do_async_cursor_cnp(void* arg);
|
||||
static void do_async_truncate(void* arg);
|
||||
static void do_sync_data_dirs_info(PortData *p);
|
||||
|
@ -846,6 +847,19 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
|
|||
// Let caller know operation is in progress
|
||||
RETURN_INT(0, outbuf);
|
||||
}
|
||||
case CMD_CURSOR_COUNT:
|
||||
{
|
||||
FAIL_IF_ASYNC_PENDING(d, outbuf);
|
||||
FAIL_IF_NO_CURSOR(d, outbuf);
|
||||
|
||||
// Schedule the operation
|
||||
d->async_op = cmd;
|
||||
bdberl_general_tpool_run(&do_async_cursor_count, d, 0, &d->async_job);
|
||||
|
||||
// Let caller know operation is in progress
|
||||
// Outbuf is: <<0:32>>
|
||||
RETURN_INT(0, outbuf);
|
||||
}
|
||||
case CMD_CURSOR_CLOSE:
|
||||
{
|
||||
FAIL_IF_ASYNC_PENDING(d, outbuf);
|
||||
|
@ -1464,6 +1478,29 @@ static int send_dir_info(ErlDrvPort port, ErlDrvTermData pid, const char *path)
|
|||
}
|
||||
|
||||
|
||||
static void send_error_response(ErlDrvPort port, ErlDrvTermData pid, int rc)
|
||||
{
|
||||
// See if this is a standard errno that we have an erlang code for
|
||||
char *error = bdberl_rc_to_atom_str(rc);
|
||||
if (error != NULL)
|
||||
{
|
||||
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"),
|
||||
ERL_DRV_ATOM, driver_mk_atom(error),
|
||||
ERL_DRV_TUPLE, 2};
|
||||
driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0]));
|
||||
}
|
||||
else
|
||||
{
|
||||
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"),
|
||||
ERL_DRV_ATOM, driver_mk_atom("unknown"),
|
||||
ERL_DRV_INT, rc,
|
||||
ERL_DRV_TUPLE, 2,
|
||||
ERL_DRV_TUPLE, 2};
|
||||
driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0]));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void bdberl_send_rc(ErlDrvPort port, ErlDrvTermData pid, int rc)
|
||||
{
|
||||
// TODO: May need to tag the messages a bit more explicitly so that if another async
|
||||
|
@ -1476,24 +1513,7 @@ void bdberl_send_rc(ErlDrvPort port, ErlDrvTermData pid, int rc)
|
|||
}
|
||||
else
|
||||
{
|
||||
// See if this is a standard errno that we have an erlang code for
|
||||
char *error = bdberl_rc_to_atom_str(rc);
|
||||
if (error != NULL)
|
||||
{
|
||||
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"),
|
||||
ERL_DRV_ATOM, driver_mk_atom(error),
|
||||
ERL_DRV_TUPLE, 2};
|
||||
driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0]));
|
||||
}
|
||||
else
|
||||
{
|
||||
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"),
|
||||
ERL_DRV_ATOM, driver_mk_atom("unknown"),
|
||||
ERL_DRV_INT, rc,
|
||||
ERL_DRV_TUPLE, 2,
|
||||
ERL_DRV_TUPLE, 2};
|
||||
driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0]));
|
||||
}
|
||||
send_error_response(port, pid, rc);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1511,6 +1531,32 @@ void bdberl_async_cleanup_and_send_rc(PortData* d, int rc)
|
|||
bdberl_send_rc(port, pid, rc);
|
||||
}
|
||||
|
||||
static void async_cleanup_and_send_uint32(PortData* d, int rc, unsigned int value)
|
||||
{
|
||||
// Save the port and pid references -- we need copies independent from the PortData
|
||||
// structure. Once we release the port_lock after clearing the cmd, it's possible that
|
||||
// the port could go away without waiting on us to finish. This is acceptable, but we need
|
||||
// to be certain that there is no overlap of data between the two threads. driver_send_term
|
||||
// is safe to use from a thread, even if the port you're sending from has already expired.
|
||||
ErlDrvPort port = d->port;
|
||||
ErlDrvTermData pid = d->port_owner;
|
||||
|
||||
bdberl_async_cleanup(d);
|
||||
|
||||
// Notify port of result
|
||||
if (rc == 0)
|
||||
{
|
||||
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("ok"),
|
||||
ERL_DRV_UINT, value,
|
||||
ERL_DRV_TUPLE, 2};
|
||||
driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0]));
|
||||
}
|
||||
else
|
||||
{
|
||||
send_error_response(port, pid, rc);
|
||||
}
|
||||
}
|
||||
|
||||
static void async_cleanup_and_send_kv(PortData* d, int rc, DBT* key, DBT* value)
|
||||
{
|
||||
// Save the port and pid references -- we need copies independent from the PortData
|
||||
|
@ -1539,24 +1585,7 @@ static void async_cleanup_and_send_kv(PortData* d, int rc, DBT* key, DBT* value)
|
|||
}
|
||||
else
|
||||
{
|
||||
// See if this is a standard errno that we have an erlang code for
|
||||
char *error = bdberl_rc_to_atom_str(rc);
|
||||
if (error != NULL)
|
||||
{
|
||||
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"),
|
||||
ERL_DRV_ATOM, driver_mk_atom(error),
|
||||
ERL_DRV_TUPLE, 2};
|
||||
driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0]));
|
||||
}
|
||||
else
|
||||
{
|
||||
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"),
|
||||
ERL_DRV_ATOM, driver_mk_atom("unknown"),
|
||||
ERL_DRV_INT, rc,
|
||||
ERL_DRV_TUPLE, 2,
|
||||
ERL_DRV_TUPLE, 2};
|
||||
driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0]));
|
||||
}
|
||||
send_error_response(port, pid, rc);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1810,10 +1839,27 @@ static void do_async_cursor_get(void* arg)
|
|||
|
||||
static void do_async_cursor_del(void* arg)
|
||||
{
|
||||
PortData* d = (PortData*)arg;
|
||||
assert(d->cursor != NULL);
|
||||
DBGCMD(d, "cursor_del/2 not yet implemented..."); /* TODO: implement this. */
|
||||
bdberl_async_cleanup_and_send_rc(d, ERROR_DB_ACTIVE);
|
||||
PortData* d = (PortData*)arg;
|
||||
assert(d->cursor != NULL);
|
||||
DBGCMD(d, "cursor_del/2 not yet implemented..."); /* TODO: implement this. */
|
||||
bdberl_async_cleanup_and_send_rc(d, ERROR_DB_ACTIVE);
|
||||
}
|
||||
|
||||
|
||||
static void do_async_cursor_count(void* arg)
|
||||
{
|
||||
PortData* d = (PortData*)arg;
|
||||
assert(d->cursor != NULL);
|
||||
|
||||
// Place to store the record count.
|
||||
db_recno_t count = 0;
|
||||
|
||||
// Execute the operation
|
||||
DBGCMD(d, "d->cursor->count(%p, %p, %08X);", d->cursor, &count, 0);
|
||||
int rc = d->cursor->count(d->cursor, &count, 0);
|
||||
DBGCMDRC(d, rc);
|
||||
|
||||
async_cleanup_and_send_uint32(d, rc, count);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -53,9 +53,6 @@
|
|||
#define CMD_CURSOR_NEXT 12
|
||||
#define CMD_CURSOR_PREV 13
|
||||
#define CMD_CURSOR_CLOSE 14
|
||||
#define CMD_CURSOR_GET 35 /* TODO: renumber these next 3 and match them with bdberl.hrl */
|
||||
#define CMD_CURSOR_PUT 36
|
||||
#define CMD_CURSOR_DEL 37
|
||||
#define CMD_PUT_COMMIT 15
|
||||
#define CMD_REMOVE_DB 16
|
||||
#define CMD_TRUNCATE 17
|
||||
|
@ -76,6 +73,10 @@
|
|||
#define CMD_DATA_DIRS_INFO 32
|
||||
#define CMD_LOG_DIR_INFO 33
|
||||
#define CMD_DRIVER_INFO 34
|
||||
#define CMD_CURSOR_GET 35
|
||||
#define CMD_CURSOR_PUT 36
|
||||
#define CMD_CURSOR_DEL 37
|
||||
#define CMD_CURSOR_COUNT 38
|
||||
|
||||
/**
|
||||
* Command status values
|
||||
|
|
|
@ -46,9 +46,6 @@
|
|||
-define(CMD_CURSOR_NEXT, 12).
|
||||
-define(CMD_CURSOR_PREV, 13).
|
||||
-define(CMD_CURSOR_CLOSE, 14).
|
||||
-define(CMD_CURSOR_GET, 35). %% TODO: renumber these 3 and match them to bdberl_drv.h
|
||||
-define(CMD_CURSOR_PUT, 36).
|
||||
-define(CMD_CURSOR_DEL, 37).
|
||||
-define(CMD_PUT_COMMIT, 15).
|
||||
-define(CMD_REMOVE_DB, 16).
|
||||
-define(CMD_TRUNCATE, 17).
|
||||
|
@ -69,6 +66,10 @@
|
|||
-define(CMD_DATA_DIRS_INFO, 32).
|
||||
-define(CMD_LOG_DIR_INFO, 33).
|
||||
-define(CMD_DRIVER_INFO, 34).
|
||||
-define(CMD_CURSOR_GET, 35).
|
||||
-define(CMD_CURSOR_PUT, 36).
|
||||
-define(CMD_CURSOR_DEL, 37).
|
||||
-define(CMD_CURSOR_COUNT, 38).
|
||||
|
||||
-define(DB_TYPE_BTREE, 1).
|
||||
-define(DB_TYPE_HASH, 2).
|
||||
|
|
|
@ -68,6 +68,7 @@
|
|||
delete_database/1,
|
||||
cursor_open/1, cursor_next/0, cursor_prev/0, cursor_current/0, cursor_close/0,
|
||||
cursor_get/0, cursor_get/1, cursor_get/2, %TODO: cursor_del/2, cursor_del/3, cursor_put/2, cursor_put/3,
|
||||
cursor_count/0,
|
||||
driver_info/0,
|
||||
register_logger/0,
|
||||
stop/0]).
|
||||
|
@ -1397,6 +1398,25 @@ cursor_get(Key, Opts) ->
|
|||
end.
|
||||
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% @doc
|
||||
%% Returns the count of duplicate records for the key to which the
|
||||
%% cursor currently refers.
|
||||
%%
|
||||
%% If this function fails for any reason, the state of the cursor will
|
||||
%% be unchanged.
|
||||
%%
|
||||
%% @spec cursor_count() -> {ok, Count} | {error, Error}
|
||||
%%
|
||||
%% @end
|
||||
%%--------------------------------------------------------------------
|
||||
-spec cursor_count() -> {ok, Count :: number()} | {error, db_error()}.
|
||||
|
||||
cursor_count() ->
|
||||
<<Result:32/signed-native>> = erlang:port_control(get_port(), ?CMD_CURSOR_COUNT, <<>>),
|
||||
recv_val(Result).
|
||||
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% @doc
|
||||
%% Closes the cursor.
|
||||
|
|
|
@ -55,6 +55,7 @@ all() ->
|
|||
update_should_accept_args_for_fun,
|
||||
port_should_return_transaction_timeouts,
|
||||
cursor_should_iterate, cursor_get_should_pos, cursor_should_fail_if_not_open,
|
||||
cursor_should_return_count,
|
||||
put_commit_should_end_txn,
|
||||
data_dir_should_be_priv_dir,
|
||||
delete_should_remove_file,
|
||||
|
@ -270,6 +271,20 @@ cursor_get_should_pos(Config) ->
|
|||
|
||||
ok = bdberl:cursor_close().
|
||||
|
||||
cursor_should_return_count(Config) ->
|
||||
Db = ?config(db, Config),
|
||||
|
||||
%% Store some sample values in the db
|
||||
ok = bdberl:put(Db, key, value1),
|
||||
% ok = bdberl:put(Db, key, value2),
|
||||
% ok = bdberl:put(Db, key, value3),
|
||||
% ok = bdberl:put(Db, key, value4),
|
||||
ok = bdberl:cursor_open(Db),
|
||||
{ok, _} = bdberl:cursor_get(key),
|
||||
%% Validate the count of duplicate values
|
||||
{ok, 1} = bdberl:cursor_count(),
|
||||
ok = bdberl:cursor_close().
|
||||
|
||||
cursor_should_fail_if_not_open(_Config) ->
|
||||
{error, no_cursor} = bdberl:cursor_next(),
|
||||
{error, no_cursor} = bdberl:cursor_prev(),
|
||||
|
|
Loading…
Reference in a new issue