From 865f8958c5f51dd5100106a818001657000d7c3c Mon Sep 17 00:00:00 2001 From: Dave Smith Date: Mon, 15 Dec 2008 11:47:38 -0700 Subject: [PATCH] Implemented support for put_commit --- c_src/bdberl_drv.c | 18 +++++++++++- c_src/bdberl_drv.h | 1 + include/bdberl.hrl | 1 + src/bdberl.erl | 64 ++++++++++++++++++++++++++++++------------- test/bdberl_SUITE.erl | 17 ++++++++++-- 5 files changed, 79 insertions(+), 22 deletions(-) diff --git a/c_src/bdberl_drv.c b/c_src/bdberl_drv.c index 719d722..ed112cf 100644 --- a/c_src/bdberl_drv.c +++ b/c_src/bdberl_drv.c @@ -419,9 +419,16 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, } case CMD_PUT: case CMD_GET: + case CMD_PUT_COMMIT: { FAIL_IF_ASYNC_PENDING(d, outbuf); + // Put/commit requires a transaction to be active + if (cmd == CMD_PUT_COMMIT && (!d->txn)) + { + RETURN_INT(ERROR_NO_TXN, outbuf); + } + // Inbuf is: << DbRef:32, Rest/binary>> int dbref = UNPACK_INT(inbuf, 0); @@ -445,7 +452,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, // Mark the port as busy and then schedule the appropriate async operation d->async_op = cmd; TPoolJobFunc fn; - if (cmd == CMD_PUT) + if (cmd == CMD_PUT || cmd == CMD_PUT_COMMIT) { fn = &do_async_put; } @@ -792,6 +799,15 @@ static void do_async_put(void* arg) d->txn->abort(d->txn); d->txn = 0; } + else if (d->txn && d->async_op == CMD_PUT_COMMIT) + { + // Put needs to be followed by a commit -- saves us another pass through the driver and + // threadpool queues + rc = d->txn->commit(d->txn, 0); + + // Regardless of the txn commit outcome, we still need to invalidate the transaction + d->txn = 0; + } // Save the port and pid references -- we need copies independent from the PortData // structure. Once we release the port_lock after clearing the cmd, it's possible that diff --git a/c_src/bdberl_drv.h b/c_src/bdberl_drv.h index cb19616..5a28953 100644 --- a/c_src/bdberl_drv.h +++ b/c_src/bdberl_drv.h @@ -42,6 +42,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, #define CMD_CURSOR_NEXT 11 #define CMD_CURSOR_PREV 12 #define CMD_CURSOR_CLOSE 13 +#define CMD_PUT_COMMIT 14 /** diff --git a/include/bdberl.hrl b/include/bdberl.hrl index d3a1f59..4b3b675 100644 --- a/include/bdberl.hrl +++ b/include/bdberl.hrl @@ -19,6 +19,7 @@ -define(CMD_CURSOR_NEXT, 11). -define(CMD_CURSOR_PREV, 12). -define(CMD_CURSOR_CLOSE, 13). +-define(CMD_PUT_COMMIT, 14). -define(DB_TYPE_BTREE, 1). diff --git a/src/bdberl.erl b/src/bdberl.erl index c6edf4c..2bcd304 100644 --- a/src/bdberl.erl +++ b/src/bdberl.erl @@ -15,6 +15,8 @@ transaction/1, put/3, put/4, put_r/3, put_r/4, + put_commit/3, put_commit/4, + put_commit_r/3, put_commit_r/4, get/2, get/3, get_r/2, get_r/3, update/3, update/4, @@ -61,7 +63,7 @@ txn_begin() -> txn_begin(Opts) -> Flags = process_flags(Opts), Cmd = <>, - <> = erlang:port_control(get_port(), ?CMD_TXN_BEGIN, Cmd), + <> = erlang:port_control(get_port(), ?CMD_TXN_BEGIN, Cmd), case decode_rc(Result) of ok -> ok; Error -> {error, {txn_begin, Error}} @@ -73,7 +75,7 @@ txn_commit() -> txn_commit(Opts) -> Flags = process_flags(Opts), Cmd = <>, - <> = erlang:port_control(get_port(), ?CMD_TXN_COMMIT, Cmd), + <> = erlang:port_control(get_port(), ?CMD_TXN_COMMIT, Cmd), case decode_rc(Result) of ok -> receive @@ -85,7 +87,7 @@ txn_commit(Opts) -> end. txn_abort() -> - <> = erlang:port_control(get_port(), ?CMD_TXN_ABORT, <<>>), + <> = erlang:port_control(get_port(), ?CMD_TXN_ABORT, <<>>), case decode_rc(Result) of ok -> receive @@ -115,20 +117,7 @@ put(Db, Key, Value) -> put(Db, Key, Value, []). put(Db, Key, Value, Opts) -> - {KeyLen, KeyBin} = to_binary(Key), - {ValLen, ValBin} = to_binary(Value), - Flags = process_flags(Opts), - Cmd = <>, - <> = erlang:port_control(get_port(), ?CMD_PUT, Cmd), - case decode_rc(Result) of - ok -> - receive - ok -> ok; - {error, Reason} -> {error, {put, decode_rc(Reason)}} - end; - Error -> - {error, {put, decode_rc(Error)}} - end. + do_put(?CMD_PUT, Db, Key, Value, Opts). put_r(Db, Key, Value) -> put(Db, Key, Value, []). @@ -139,6 +128,22 @@ put_r(Db, Key, Value, Opts) -> Error -> throw(Error) end. +put_commit(Db, Key, Value) -> + put_commit(Db, Key, Value, []). + +put_commit(Db, Key, Value, Opts) -> + do_put(?CMD_PUT_COMMIT, Db, Key, Value, Opts). + +put_commit_r(Db, Key, Value) -> + put_commit_r(Db, Key, Value, []). + +put_commit_r(Db, Key, Value, Opts) -> + case do_put(?CMD_PUT_COMMIT, Db, Key, Value, Opts) of + ok -> ok; + Error -> throw(Error) + end. + + get(Db, Key) -> get(Db, Key, []). @@ -146,7 +151,7 @@ get(Db, Key, Opts) -> {KeyLen, KeyBin} = to_binary(Key), Flags = process_flags(Opts), Cmd = <>, - <> = erlang:port_control(get_port(), ?CMD_GET, Cmd), + <> = erlang:port_control(get_port(), ?CMD_GET, Cmd), case decode_rc(Result) of ok -> receive @@ -178,7 +183,7 @@ update(Db, Key, Fun, Args) -> undefined -> Fun(Key, Value); Args -> Fun(Key, Value, Args) end, - ok = put(Db, Key, NewValue), + ok = put_commit(Db, Key, NewValue), NewValue end, transaction(F). @@ -340,6 +345,27 @@ flag_value(Flag) -> end. +%% +%% Execute a PUT, using the provide "Action" to determine if it's a PUT or PUT_COMMIT +%% +do_put(Action, Db, Key, Value, Opts) -> + {KeyLen, KeyBin} = to_binary(Key), + {ValLen, ValBin} = to_binary(Value), + Flags = process_flags(Opts), + Cmd = <>, + <> = erlang:port_control(get_port(), Action, Cmd), + case decode_rc(Result) of + ok -> + receive + ok -> ok; + {error, Reason} -> {error, {put, decode_rc(Reason)}} + end; + Error -> + {error, {put, decode_rc(Error)}} + end. + + + %% %% Move the cursor in a given direction. Invoked by cursor_next/prev/current. %% diff --git a/test/bdberl_SUITE.erl b/test/bdberl_SUITE.erl index 3aadda2..b509b20 100644 --- a/test/bdberl_SUITE.erl +++ b/test/bdberl_SUITE.erl @@ -24,7 +24,8 @@ all() -> update_should_save_value_if_successful, update_should_accept_args_for_fun, port_should_tune_transaction_timeouts, - cursor_should_iterate, cursor_should_fail_if_not_open]. + cursor_should_iterate, cursor_should_fail_if_not_open, + put_commit_should_end_txn]. init_per_testcase(_TestCase, Config) -> @@ -157,5 +158,17 @@ cursor_should_fail_if_not_open(Config) -> {error, no_cursor} = bdberl:cursor_current(), {error, no_cursor} = bdberl:cursor_close(). - +put_commit_should_end_txn(Config) -> + Db = ?config(db, Config), + + %% Start a transaction + ok = bdberl:txn_begin(), + ok = bdberl:put_commit(Db, key1, value1), + + %% Commit should now fail since the txn is done + {error, {txn_commit, no_txn}} = bdberl:txn_commit(), + + %% Verify data got committed + {ok, value1} = bdberl:get(Db, key1). +