Adding support for not_found on get and txn abort
This commit is contained in:
parent
a89938c472
commit
24d712840e
3 changed files with 59 additions and 19 deletions
|
@ -24,7 +24,7 @@ static void do_async_put(void* arg);
|
||||||
static void do_async_put_free(void* arg);
|
static void do_async_put_free(void* arg);
|
||||||
static void do_async_get(void* arg);
|
static void do_async_get(void* arg);
|
||||||
static void do_async_get_free(void* arg);
|
static void do_async_get_free(void* arg);
|
||||||
static void do_async_commit(void* arg);
|
static void do_async_txnop(void* arg);
|
||||||
|
|
||||||
static int add_dbref(PortData* data, int dbref);
|
static int add_dbref(PortData* data, int dbref);
|
||||||
static int del_dbref(PortData* data, int dbref);
|
static int del_dbref(PortData* data, int dbref);
|
||||||
|
@ -260,6 +260,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
|
||||||
RETURN_INT(rc, outbuf);
|
RETURN_INT(rc, outbuf);
|
||||||
}
|
}
|
||||||
case CMD_TXN_COMMIT:
|
case CMD_TXN_COMMIT:
|
||||||
|
case CMD_TXN_ABORT:
|
||||||
{
|
{
|
||||||
// If an async operation is pending, fail
|
// If an async operation is pending, fail
|
||||||
if (d->async_op != CMD_NONE)
|
if (d->async_op != CMD_NONE)
|
||||||
|
@ -278,11 +279,11 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
|
||||||
adata->port = d;
|
adata->port = d;
|
||||||
|
|
||||||
// Update port data to indicate we have an operation in progress
|
// Update port data to indicate we have an operation in progress
|
||||||
d->async_op = CMD_TXN_COMMIT;
|
d->async_op = cmd;
|
||||||
|
|
||||||
// Schedule async operation to execute the commit
|
// Schedule async operation to execute the commit/abort
|
||||||
unsigned int key = (unsigned int)d->port;
|
unsigned int key = (unsigned int)d->port;
|
||||||
driver_async(d->port, &key, &do_async_commit, adata, 0);
|
driver_async(d->port, &key, &do_async_txnop, adata, 0);
|
||||||
|
|
||||||
// Outbuf is <<Rc:32>>
|
// Outbuf is <<Rc:32>>
|
||||||
RETURN_INT(0, outbuf);
|
RETURN_INT(0, outbuf);
|
||||||
|
@ -382,8 +383,8 @@ static void bdberl_drv_ready_async(ErlDrvData handle, ErlDrvThreadData thread_da
|
||||||
}
|
}
|
||||||
case CMD_GET:
|
case CMD_GET:
|
||||||
{
|
{
|
||||||
// Extract return code == if it's zero, send back {ok, Payload} to driver process; otherwise
|
// Extract return code == if it's zero, send back {ok, Payload} or not_found to driver
|
||||||
// send a {error, Reason} tuple
|
// process; otherwise send a {error, Reason} tuple
|
||||||
AsyncData* adata = (AsyncData*)thread_data;
|
AsyncData* adata = (AsyncData*)thread_data;
|
||||||
if (adata->rc == 0)
|
if (adata->rc == 0)
|
||||||
{
|
{
|
||||||
|
@ -392,6 +393,12 @@ static void bdberl_drv_ready_async(ErlDrvData handle, ErlDrvThreadData thread_da
|
||||||
ERL_DRV_TUPLE, 2};
|
ERL_DRV_TUPLE, 2};
|
||||||
driver_output_term(d->port, response, sizeof(response) / sizeof(response[0]));
|
driver_output_term(d->port, response, sizeof(response) / sizeof(response[0]));
|
||||||
}
|
}
|
||||||
|
else if (adata->rc == DB_NOTFOUND)
|
||||||
|
{
|
||||||
|
printf("not foudn\n");
|
||||||
|
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("not_found") };
|
||||||
|
driver_output_term(d->port, response, sizeof(response) / sizeof(response[0]));
|
||||||
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"),
|
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"),
|
||||||
|
@ -639,14 +646,21 @@ static void do_async_get_free(void* arg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static void do_async_commit(void* arg)
|
static void do_async_txnop(void* arg)
|
||||||
{
|
{
|
||||||
printf("do_async_commit\n");
|
printf("do_async_txnop\n");
|
||||||
|
|
||||||
// Execute the actual commit
|
// Execute the actual commit/abort
|
||||||
AsyncData* adata = (AsyncData*)arg;
|
AsyncData* adata = (AsyncData*)arg;
|
||||||
|
if (adata->port->async_op == CMD_TXN_COMMIT)
|
||||||
|
{
|
||||||
adata->rc = adata->port->txn->commit(adata->port->txn, 0);
|
adata->rc = adata->port->txn->commit(adata->port->txn, 0);
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
adata->rc = adata->port->txn->abort(adata->port->txn);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static void* zalloc(unsigned int size)
|
static void* zalloc(unsigned int size)
|
||||||
|
|
|
@ -9,7 +9,7 @@
|
||||||
-export([new/0,
|
-export([new/0,
|
||||||
open_database/3,
|
open_database/3,
|
||||||
close_database/2,
|
close_database/2,
|
||||||
txn_begin/1, txn_commit/1,
|
txn_begin/1, txn_commit/1, txn_abort/1,
|
||||||
put/4,
|
put/4,
|
||||||
get/3]).
|
get/3]).
|
||||||
|
|
||||||
|
@ -37,7 +37,10 @@
|
||||||
-define(ERROR_NO_TXN, -29004). % No transaction open on this port
|
-define(ERROR_NO_TXN, -29004). % No transaction open on this port
|
||||||
|
|
||||||
new() ->
|
new() ->
|
||||||
ok = erl_ddll:load_driver(code:priv_dir(bdberl), bdberl_drv),
|
case erl_ddll:load_driver(code:priv_dir(bdberl), bdberl_drv) of
|
||||||
|
ok -> ok;
|
||||||
|
{error, permanent} -> ok % Means that the driver is already active
|
||||||
|
end,
|
||||||
Port = open_port({spawn, bdberl_drv}, [binary]),
|
Port = open_port({spawn, bdberl_drv}, [binary]),
|
||||||
{ok, Port}.
|
{ok, Port}.
|
||||||
|
|
||||||
|
@ -85,6 +88,18 @@ txn_commit(Port) ->
|
||||||
?ERROR_NO_TXN -> {error, no_txn}
|
?ERROR_NO_TXN -> {error, no_txn}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
txn_abort(Port) ->
|
||||||
|
<<Result:32/native>> = erlang:port_control(Port, ?CMD_TXN_ABORT, <<>>),
|
||||||
|
case Result of
|
||||||
|
?ERROR_NONE ->
|
||||||
|
receive
|
||||||
|
ok -> ok;
|
||||||
|
{error, Reason} -> {error, Reason}
|
||||||
|
end;
|
||||||
|
?ERROR_ASYNC_PENDING -> {error, async_pending};
|
||||||
|
?ERROR_NO_TXN -> {error, no_txn}
|
||||||
|
end.
|
||||||
|
|
||||||
|
|
||||||
put(Port, DbRef, Key, Value) ->
|
put(Port, DbRef, Key, Value) ->
|
||||||
{KeyLen, KeyBin} = to_binary(Key),
|
{KeyLen, KeyBin} = to_binary(Key),
|
||||||
|
@ -110,6 +125,7 @@ get(Port, DbRef, Key) ->
|
||||||
?ERROR_NONE ->
|
?ERROR_NONE ->
|
||||||
receive
|
receive
|
||||||
{ok, Bin} -> {ok, binary_to_term(Bin)};
|
{ok, Bin} -> {ok, binary_to_term(Bin)};
|
||||||
|
not_found -> not_found;
|
||||||
{error, Reason} -> {error, Reason}
|
{error, Reason} -> {error, Reason}
|
||||||
end;
|
end;
|
||||||
?ERROR_ASYNC_PENDING -> {error, async_pending};
|
?ERROR_ASYNC_PENDING -> {error, async_pending};
|
||||||
|
|
|
@ -10,7 +10,7 @@
|
||||||
|
|
||||||
all() ->
|
all() ->
|
||||||
% [test_db].
|
% [test_db].
|
||||||
[test_put].
|
[test_put, test_txn].
|
||||||
|
|
||||||
init_per_testcase(TestCase, Config) ->
|
init_per_testcase(TestCase, Config) ->
|
||||||
Config.
|
Config.
|
||||||
|
@ -52,18 +52,28 @@ test_put(_Config) ->
|
||||||
{ok, 0} = bdberl_port:open_database(P, "test1", hash),
|
{ok, 0} = bdberl_port:open_database(P, "test1", hash),
|
||||||
|
|
||||||
ok = bdberl_port:txn_begin(P),
|
ok = bdberl_port:txn_begin(P),
|
||||||
|
|
||||||
ok = bdberl_port:put(P, 0, akey, avalue),
|
ok = bdberl_port:put(P, 0, akey, avalue),
|
||||||
|
|
||||||
ok = bdberl_port:txn_commit(P),
|
ok = bdberl_port:txn_commit(P),
|
||||||
|
|
||||||
ok = bdberl_port:txn_begin(P),
|
ok = bdberl_port:txn_begin(P),
|
||||||
|
|
||||||
{ok, avalue} = bdberl_port:get(P, 0, akey),
|
{ok, avalue} = bdberl_port:get(P, 0, akey),
|
||||||
|
ok = bdberl_port:txn_commit(P).
|
||||||
|
|
||||||
|
|
||||||
|
test_txn(_Config) ->
|
||||||
|
{ok, P} = bdberl_port:new(),
|
||||||
|
{ok, 0} = bdberl_port:open_database(P, "test2", btree),
|
||||||
|
|
||||||
|
ok = bdberl_port:txn_begin(P),
|
||||||
|
ok = bdberl_port:put(P, 0, akey, avalue),
|
||||||
|
ok = bdberl_port:txn_abort(P),
|
||||||
|
|
||||||
|
ok = bdberl_port:txn_begin(P),
|
||||||
|
not_found = bdberl_port:get(P, 0, akey),
|
||||||
ok = bdberl_port:txn_commit(P).
|
ok = bdberl_port:txn_commit(P).
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue