Implemented support for put_commit

This commit is contained in:
Dave Smith 2008-12-15 11:47:38 -07:00
parent 2d323cb304
commit 865f8958c5
5 changed files with 79 additions and 22 deletions

View file

@ -419,9 +419,16 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
} }
case CMD_PUT: case CMD_PUT:
case CMD_GET: case CMD_GET:
case CMD_PUT_COMMIT:
{ {
FAIL_IF_ASYNC_PENDING(d, outbuf); FAIL_IF_ASYNC_PENDING(d, outbuf);
// Put/commit requires a transaction to be active
if (cmd == CMD_PUT_COMMIT && (!d->txn))
{
RETURN_INT(ERROR_NO_TXN, outbuf);
}
// Inbuf is: << DbRef:32, Rest/binary>> // Inbuf is: << DbRef:32, Rest/binary>>
int dbref = UNPACK_INT(inbuf, 0); int dbref = UNPACK_INT(inbuf, 0);
@ -445,7 +452,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
// Mark the port as busy and then schedule the appropriate async operation // Mark the port as busy and then schedule the appropriate async operation
d->async_op = cmd; d->async_op = cmd;
TPoolJobFunc fn; TPoolJobFunc fn;
if (cmd == CMD_PUT) if (cmd == CMD_PUT || cmd == CMD_PUT_COMMIT)
{ {
fn = &do_async_put; fn = &do_async_put;
} }
@ -792,6 +799,15 @@ static void do_async_put(void* arg)
d->txn->abort(d->txn); d->txn->abort(d->txn);
d->txn = 0; d->txn = 0;
} }
else if (d->txn && d->async_op == CMD_PUT_COMMIT)
{
// Put needs to be followed by a commit -- saves us another pass through the driver and
// threadpool queues
rc = d->txn->commit(d->txn, 0);
// Regardless of the txn commit outcome, we still need to invalidate the transaction
d->txn = 0;
}
// Save the port and pid references -- we need copies independent from the PortData // Save the port and pid references -- we need copies independent from the PortData
// structure. Once we release the port_lock after clearing the cmd, it's possible that // structure. Once we release the port_lock after clearing the cmd, it's possible that

View file

@ -42,6 +42,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
#define CMD_CURSOR_NEXT 11 #define CMD_CURSOR_NEXT 11
#define CMD_CURSOR_PREV 12 #define CMD_CURSOR_PREV 12
#define CMD_CURSOR_CLOSE 13 #define CMD_CURSOR_CLOSE 13
#define CMD_PUT_COMMIT 14
/** /**

View file

@ -19,6 +19,7 @@
-define(CMD_CURSOR_NEXT, 11). -define(CMD_CURSOR_NEXT, 11).
-define(CMD_CURSOR_PREV, 12). -define(CMD_CURSOR_PREV, 12).
-define(CMD_CURSOR_CLOSE, 13). -define(CMD_CURSOR_CLOSE, 13).
-define(CMD_PUT_COMMIT, 14).
-define(DB_TYPE_BTREE, 1). -define(DB_TYPE_BTREE, 1).

View file

@ -15,6 +15,8 @@
transaction/1, transaction/1,
put/3, put/4, put/3, put/4,
put_r/3, put_r/4, put_r/3, put_r/4,
put_commit/3, put_commit/4,
put_commit_r/3, put_commit_r/4,
get/2, get/3, get/2, get/3,
get_r/2, get_r/3, get_r/2, get_r/3,
update/3, update/4, update/3, update/4,
@ -61,7 +63,7 @@ txn_begin() ->
txn_begin(Opts) -> txn_begin(Opts) ->
Flags = process_flags(Opts), Flags = process_flags(Opts),
Cmd = <<Flags:32/unsigned-native>>, Cmd = <<Flags:32/unsigned-native>>,
<<Result:32/native>> = erlang:port_control(get_port(), ?CMD_TXN_BEGIN, Cmd), <<Result:32/native-signed>> = erlang:port_control(get_port(), ?CMD_TXN_BEGIN, Cmd),
case decode_rc(Result) of case decode_rc(Result) of
ok -> ok; ok -> ok;
Error -> {error, {txn_begin, Error}} Error -> {error, {txn_begin, Error}}
@ -73,7 +75,7 @@ txn_commit() ->
txn_commit(Opts) -> txn_commit(Opts) ->
Flags = process_flags(Opts), Flags = process_flags(Opts),
Cmd = <<Flags:32/unsigned-native>>, Cmd = <<Flags:32/unsigned-native>>,
<<Result:32/native>> = erlang:port_control(get_port(), ?CMD_TXN_COMMIT, Cmd), <<Result:32/native-signed>> = erlang:port_control(get_port(), ?CMD_TXN_COMMIT, Cmd),
case decode_rc(Result) of case decode_rc(Result) of
ok -> ok ->
receive receive
@ -85,7 +87,7 @@ txn_commit(Opts) ->
end. end.
txn_abort() -> txn_abort() ->
<<Result:32/native>> = erlang:port_control(get_port(), ?CMD_TXN_ABORT, <<>>), <<Result:32/native-signed>> = erlang:port_control(get_port(), ?CMD_TXN_ABORT, <<>>),
case decode_rc(Result) of case decode_rc(Result) of
ok -> ok ->
receive receive
@ -115,20 +117,7 @@ put(Db, Key, Value) ->
put(Db, Key, Value, []). put(Db, Key, Value, []).
put(Db, Key, Value, Opts) -> put(Db, Key, Value, Opts) ->
{KeyLen, KeyBin} = to_binary(Key), do_put(?CMD_PUT, Db, Key, Value, Opts).
{ValLen, ValBin} = to_binary(Value),
Flags = process_flags(Opts),
Cmd = <<Db:32/native, Flags:32/unsigned-native, KeyLen:32/native, KeyBin/bytes, ValLen:32/native, ValBin/bytes>>,
<<Result:32/native>> = erlang:port_control(get_port(), ?CMD_PUT, Cmd),
case decode_rc(Result) of
ok ->
receive
ok -> ok;
{error, Reason} -> {error, {put, decode_rc(Reason)}}
end;
Error ->
{error, {put, decode_rc(Error)}}
end.
put_r(Db, Key, Value) -> put_r(Db, Key, Value) ->
put(Db, Key, Value, []). put(Db, Key, Value, []).
@ -139,6 +128,22 @@ put_r(Db, Key, Value, Opts) ->
Error -> throw(Error) Error -> throw(Error)
end. end.
put_commit(Db, Key, Value) ->
put_commit(Db, Key, Value, []).
put_commit(Db, Key, Value, Opts) ->
do_put(?CMD_PUT_COMMIT, Db, Key, Value, Opts).
put_commit_r(Db, Key, Value) ->
put_commit_r(Db, Key, Value, []).
put_commit_r(Db, Key, Value, Opts) ->
case do_put(?CMD_PUT_COMMIT, Db, Key, Value, Opts) of
ok -> ok;
Error -> throw(Error)
end.
get(Db, Key) -> get(Db, Key) ->
get(Db, Key, []). get(Db, Key, []).
@ -146,7 +151,7 @@ get(Db, Key, Opts) ->
{KeyLen, KeyBin} = to_binary(Key), {KeyLen, KeyBin} = to_binary(Key),
Flags = process_flags(Opts), Flags = process_flags(Opts),
Cmd = <<Db:32/native, Flags:32/unsigned-native, KeyLen:32/native, KeyBin/bytes>>, Cmd = <<Db:32/native, Flags:32/unsigned-native, KeyLen:32/native, KeyBin/bytes>>,
<<Result:32/native>> = erlang:port_control(get_port(), ?CMD_GET, Cmd), <<Result:32/native-signed>> = erlang:port_control(get_port(), ?CMD_GET, Cmd),
case decode_rc(Result) of case decode_rc(Result) of
ok -> ok ->
receive receive
@ -178,7 +183,7 @@ update(Db, Key, Fun, Args) ->
undefined -> Fun(Key, Value); undefined -> Fun(Key, Value);
Args -> Fun(Key, Value, Args) Args -> Fun(Key, Value, Args)
end, end,
ok = put(Db, Key, NewValue), ok = put_commit(Db, Key, NewValue),
NewValue NewValue
end, end,
transaction(F). transaction(F).
@ -340,6 +345,27 @@ flag_value(Flag) ->
end. end.
%%
%% Execute a PUT, using the provide "Action" to determine if it's a PUT or PUT_COMMIT
%%
do_put(Action, Db, Key, Value, Opts) ->
{KeyLen, KeyBin} = to_binary(Key),
{ValLen, ValBin} = to_binary(Value),
Flags = process_flags(Opts),
Cmd = <<Db:32/native, Flags:32/unsigned-native, KeyLen:32/native, KeyBin/bytes, ValLen:32/native, ValBin/bytes>>,
<<Result:32/native>> = erlang:port_control(get_port(), Action, Cmd),
case decode_rc(Result) of
ok ->
receive
ok -> ok;
{error, Reason} -> {error, {put, decode_rc(Reason)}}
end;
Error ->
{error, {put, decode_rc(Error)}}
end.
%% %%
%% Move the cursor in a given direction. Invoked by cursor_next/prev/current. %% Move the cursor in a given direction. Invoked by cursor_next/prev/current.
%% %%

View file

@ -24,7 +24,8 @@ all() ->
update_should_save_value_if_successful, update_should_save_value_if_successful,
update_should_accept_args_for_fun, update_should_accept_args_for_fun,
port_should_tune_transaction_timeouts, port_should_tune_transaction_timeouts,
cursor_should_iterate, cursor_should_fail_if_not_open]. cursor_should_iterate, cursor_should_fail_if_not_open,
put_commit_should_end_txn].
init_per_testcase(_TestCase, Config) -> init_per_testcase(_TestCase, Config) ->
@ -157,5 +158,17 @@ cursor_should_fail_if_not_open(Config) ->
{error, no_cursor} = bdberl:cursor_current(), {error, no_cursor} = bdberl:cursor_current(),
{error, no_cursor} = bdberl:cursor_close(). {error, no_cursor} = bdberl:cursor_close().
put_commit_should_end_txn(Config) ->
Db = ?config(db, Config),
%% Start a transaction
ok = bdberl:txn_begin(),
ok = bdberl:put_commit(Db, key1, value1),
%% Commit should now fail since the txn is done
{error, {txn_commit, no_txn}} = bdberl:txn_commit(),
%% Verify data got committed
{ok, value1} = bdberl:get(Db, key1).