diff --git a/src/bdberl.erl b/src/bdberl.erl index 5163686..2aa0f4a 100644 --- a/src/bdberl.erl +++ b/src/bdberl.erl @@ -12,7 +12,7 @@ txn_commit/0, txn_commit/1, txn_abort/0, get_cache_size/0, set_cache_size/3, get_txn_timeout/0, set_txn_timeout/1, - transaction/1, + transaction/1, transaction/2, put/3, put/4, put_r/3, put_r/4, put_commit/3, put_commit/4, @@ -24,6 +24,8 @@ -include("bdberl.hrl"). +-define(is_lock_error(Error), (Error =:= deadlock orelse Error =:= lock_not_granted)). + open(Name, Type) -> open(Name, Type, [create]). @@ -97,18 +99,34 @@ txn_abort() -> Error -> {error, {txn_abort, Error}} end. - + transaction(Fun) -> + transaction(Fun, infinity). + +transaction(_Fun, 0) -> + txn_abort(), + {error, {transaction_failed, retry_limit_reached}}; +transaction(Fun, Retries) -> txn_begin(), try Fun() of abort -> txn_abort(), {error, transaction_aborted}; + Value -> txn_commit(), {ok, Value} catch - _ : Reason -> + throw : {error, {_Op, Error}} when ?is_lock_error(Error) -> + txn_abort(), + erlang:yield(), + R = case Retries of + infinity -> infinity; + Retries -> Retries - 1 + end, + transaction(Fun, R); + + _ : Reason -> txn_abort(), {error, {transaction_failed, Reason}} end. @@ -142,7 +160,6 @@ put_commit_r(Db, Key, Value, Opts) -> ok -> ok; Error -> throw(Error) end. - get(Db, Key) -> get(Db, Key, []). @@ -178,7 +195,7 @@ update(Db, Key, Fun) -> update(Db, Key, Fun, Args) -> F = fun() -> - Value = case get(Db, Key, [rmw]) of + Value = case get_r(Db, Key, [rmw]) of not_found -> not_found; {ok, Val} -> Val end, @@ -186,7 +203,7 @@ update(Db, Key, Fun, Args) -> undefined -> Fun(Key, Value); Args -> Fun(Key, Value, Args) end, - ok = put_commit(Db, Key, NewValue), + put_commit_r(Db, Key, NewValue), NewValue end, transaction(F). diff --git a/test/thrash_SUITE.erl b/test/thrash_SUITE.erl index 21cb9d7..5e6b7a3 100644 --- a/test/thrash_SUITE.erl +++ b/test/thrash_SUITE.erl @@ -29,7 +29,7 @@ wait_for_finish(0) -> wait_for_finish(Count) -> receive {finished, Pid} -> - io:format("~p is done; ~p remaining.\n", [Pid, Count-1]), + ct:print("~p is done; ~p remaining.\n", [Pid, Count-1]), wait_for_finish(Count-1) end. @@ -47,34 +47,16 @@ thrash_run(Owner) -> thrash_incr_loop(Owner, 0) -> Owner ! {finished, self()}; thrash_incr_loop(Owner, Count) -> - ct:print("~p\n", [Count]), + ct:print("~p", [Count]), %% Choose random key Key = random:uniform(1200), %% Start a txn that will read the current value of the key and increment by 1 - F = fun() -> - case bdberl:get(0, Key, [rmw]) of - not_found -> - Value = 0; - - {ok, Value} -> - Value - end, - ok = bdberl:put(0, Key, Value) + F = fun(_Key, Value) -> + case Value of + not_found -> 0; + Value -> Value + 1 + end end, - ok = do_txn(F, 0), + {ok, _} = bdberl:update(0, Key, F), thrash_incr_loop(Owner, Count-1). - -do_txn(F, Count) -> - case bdberl:txn_begin() of - ok -> - case catch(F()) of - {'EXIT', _Reason} -> - io:format("Txn attempt ~p failed; retrying", [Count]), - do_txn(F, Count+1); - _Other -> - ok = bdberl:txn_commit() - end; - {error, _Reason} -> - do_txn(F, Count+1) - end.