Added transaction retry logic.
This commit is contained in:
parent
48c5fc3f6c
commit
4494e9736d
|
@ -12,7 +12,7 @@
|
|||
txn_commit/0, txn_commit/1, txn_abort/0,
|
||||
get_cache_size/0, set_cache_size/3,
|
||||
get_txn_timeout/0, set_txn_timeout/1,
|
||||
transaction/1,
|
||||
transaction/1, transaction/2,
|
||||
put/3, put/4,
|
||||
put_r/3, put_r/4,
|
||||
put_commit/3, put_commit/4,
|
||||
|
@ -24,6 +24,8 @@
|
|||
|
||||
-include("bdberl.hrl").
|
||||
|
||||
-define(is_lock_error(Error), (Error =:= deadlock orelse Error =:= lock_not_granted)).
|
||||
|
||||
|
||||
open(Name, Type) ->
|
||||
open(Name, Type, [create]).
|
||||
|
@ -97,18 +99,34 @@ txn_abort() ->
|
|||
Error ->
|
||||
{error, {txn_abort, Error}}
|
||||
end.
|
||||
|
||||
|
||||
transaction(Fun) ->
|
||||
transaction(Fun, infinity).
|
||||
|
||||
transaction(_Fun, 0) ->
|
||||
txn_abort(),
|
||||
{error, {transaction_failed, retry_limit_reached}};
|
||||
transaction(Fun, Retries) ->
|
||||
txn_begin(),
|
||||
try Fun() of
|
||||
abort ->
|
||||
txn_abort(),
|
||||
{error, transaction_aborted};
|
||||
|
||||
Value ->
|
||||
txn_commit(),
|
||||
{ok, Value}
|
||||
catch
|
||||
_ : Reason ->
|
||||
throw : {error, {_Op, Error}} when ?is_lock_error(Error) ->
|
||||
txn_abort(),
|
||||
erlang:yield(),
|
||||
R = case Retries of
|
||||
infinity -> infinity;
|
||||
Retries -> Retries - 1
|
||||
end,
|
||||
transaction(Fun, R);
|
||||
|
||||
_ : Reason ->
|
||||
txn_abort(),
|
||||
{error, {transaction_failed, Reason}}
|
||||
end.
|
||||
|
@ -142,7 +160,6 @@ put_commit_r(Db, Key, Value, Opts) ->
|
|||
ok -> ok;
|
||||
Error -> throw(Error)
|
||||
end.
|
||||
|
||||
|
||||
get(Db, Key) ->
|
||||
get(Db, Key, []).
|
||||
|
@ -178,7 +195,7 @@ update(Db, Key, Fun) ->
|
|||
|
||||
update(Db, Key, Fun, Args) ->
|
||||
F = fun() ->
|
||||
Value = case get(Db, Key, [rmw]) of
|
||||
Value = case get_r(Db, Key, [rmw]) of
|
||||
not_found -> not_found;
|
||||
{ok, Val} -> Val
|
||||
end,
|
||||
|
@ -186,7 +203,7 @@ update(Db, Key, Fun, Args) ->
|
|||
undefined -> Fun(Key, Value);
|
||||
Args -> Fun(Key, Value, Args)
|
||||
end,
|
||||
ok = put_commit(Db, Key, NewValue),
|
||||
put_commit_r(Db, Key, NewValue),
|
||||
NewValue
|
||||
end,
|
||||
transaction(F).
|
||||
|
|
|
@ -29,7 +29,7 @@ wait_for_finish(0) ->
|
|||
wait_for_finish(Count) ->
|
||||
receive
|
||||
{finished, Pid} ->
|
||||
io:format("~p is done; ~p remaining.\n", [Pid, Count-1]),
|
||||
ct:print("~p is done; ~p remaining.\n", [Pid, Count-1]),
|
||||
wait_for_finish(Count-1)
|
||||
end.
|
||||
|
||||
|
@ -47,34 +47,16 @@ thrash_run(Owner) ->
|
|||
thrash_incr_loop(Owner, 0) ->
|
||||
Owner ! {finished, self()};
|
||||
thrash_incr_loop(Owner, Count) ->
|
||||
ct:print("~p\n", [Count]),
|
||||
ct:print("~p", [Count]),
|
||||
%% Choose random key
|
||||
Key = random:uniform(1200),
|
||||
|
||||
%% Start a txn that will read the current value of the key and increment by 1
|
||||
F = fun() ->
|
||||
case bdberl:get(0, Key, [rmw]) of
|
||||
not_found ->
|
||||
Value = 0;
|
||||
|
||||
{ok, Value} ->
|
||||
Value
|
||||
end,
|
||||
ok = bdberl:put(0, Key, Value)
|
||||
F = fun(_Key, Value) ->
|
||||
case Value of
|
||||
not_found -> 0;
|
||||
Value -> Value + 1
|
||||
end
|
||||
end,
|
||||
ok = do_txn(F, 0),
|
||||
{ok, _} = bdberl:update(0, Key, F),
|
||||
thrash_incr_loop(Owner, Count-1).
|
||||
|
||||
do_txn(F, Count) ->
|
||||
case bdberl:txn_begin() of
|
||||
ok ->
|
||||
case catch(F()) of
|
||||
{'EXIT', _Reason} ->
|
||||
io:format("Txn attempt ~p failed; retrying", [Count]),
|
||||
do_txn(F, Count+1);
|
||||
_Other ->
|
||||
ok = bdberl:txn_commit()
|
||||
end;
|
||||
{error, _Reason} ->
|
||||
do_txn(F, Count+1)
|
||||
end.
|
||||
|
|
Loading…
Reference in a new issue