Added versions of transaction() and update() that take a transaction
timeout.
This commit is contained in:
parent
92b7c3a6ef
commit
254e253d81
1 changed files with 108 additions and 26 deletions
134
src/bdberl.erl
134
src/bdberl.erl
|
@ -29,14 +29,14 @@
|
|||
txn_stat/0, txn_stat/1,
|
||||
txn_stat_print/0, txn_stat_print/1,
|
||||
env_stat_print/0, env_stat_print/1,
|
||||
transaction/1, transaction/2, transaction/3,
|
||||
transaction/1, transaction/2, transaction/3, transaction/4,
|
||||
put/3, put/4,
|
||||
put_r/3, put_r/4,
|
||||
put_commit/3, put_commit/4,
|
||||
put_commit_r/3, put_commit_r/4,
|
||||
get/2, get/3,
|
||||
get_r/2, get_r/3,
|
||||
update/3, update/4, update/5,
|
||||
update/3, update/4, update/5, update/6, update/7,
|
||||
truncate/0, truncate/1,
|
||||
delete_database/1,
|
||||
cursor_open/1, cursor_next/0, cursor_prev/0, cursor_current/0, cursor_close/0,
|
||||
|
@ -62,6 +62,7 @@
|
|||
|
||||
-type db_txn_fun() :: fun(() -> term()).
|
||||
-type db_txn_retries() :: infinity | non_neg_integer().
|
||||
-type db_txn_timeleft() :: infinity | non_neg_integer().
|
||||
-type db_txn_error() :: {error, db_error_reason() | {transaction_failed, _}}.
|
||||
|
||||
-type db_update_fun() :: fun((db_key(), db_value(), any()) -> db_value()).
|
||||
|
@ -447,8 +448,8 @@ txn_abort() ->
|
|||
%% where
|
||||
%% Fun = function()
|
||||
%%
|
||||
%% @equiv transaction(Fun, infinity, [])
|
||||
%% @see transaction/3
|
||||
%% @equiv transaction(Fun, infinity, infinity, [])
|
||||
%% @see transaction/4
|
||||
%% @end
|
||||
%%--------------------------------------------------------------------
|
||||
-spec transaction(Fun :: db_txn_fun()) -> {ok, db_value()} | db_txn_error().
|
||||
|
@ -466,8 +467,8 @@ transaction(Fun) ->
|
|||
%% Fun = function()
|
||||
%% Retries = infinity | integer()
|
||||
%%
|
||||
%% @equiv transaction(Fun, Retries, [])
|
||||
%% @see transaction/3
|
||||
%% @equiv transaction(Fun, Retries, infinity, [])
|
||||
%% @see transaction/4
|
||||
%% @end
|
||||
%%--------------------------------------------------------------------
|
||||
-spec transaction(Fun :: db_txn_fun(), Retries :: db_txn_retries()) ->
|
||||
|
@ -477,6 +478,27 @@ transaction(Fun, Retries) ->
|
|||
transaction(Fun, Retries, []).
|
||||
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% @doc
|
||||
%% Execute a fun inside of a transaction.
|
||||
%%
|
||||
%% @spec transaction(Fun, Retries, Opts) -> {ok, Value} | {error, Error}
|
||||
%% where
|
||||
%% Fun = function()
|
||||
%% Retries = infinity | integer()
|
||||
%% Opts = [atom()]
|
||||
%% @equiv transaction(Fun, Retries, infinity, Opts)
|
||||
%% @see transaction/4
|
||||
%% @end
|
||||
%%--------------------------------------------------------------------
|
||||
-spec transaction(Fun :: db_txn_fun(), Retries :: db_txn_retries(), Opts :: db_flags()) ->
|
||||
{ok, db_value()} | db_txn_error().
|
||||
|
||||
transaction(Fun, Retries, Opts) ->
|
||||
transaction(Fun, Retries, infinity, Opts).
|
||||
|
||||
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% @doc
|
||||
%% Execute a fun inside of a transaction.
|
||||
|
@ -489,25 +511,31 @@ transaction(Fun, Retries) ->
|
|||
%%
|
||||
%% If the transaction deadlocks on commit, the function will be executed
|
||||
%% again until the commit succeeds or the number of retries exceeds the
|
||||
%% value of the `Retries' parameter.
|
||||
%% value of the `Retries' parameter or `TimeLeft' microseconds have elapsed
|
||||
%% (the accounting isn't perfect - it just measures time in the txn).
|
||||
%%
|
||||
%% The 'Opts' parameter is used to set flags for the call to txn_begin/1.
|
||||
%%
|
||||
%% @spec transaction(Fun, Retries, Opts) -> {ok, Value} | {error, Error}
|
||||
%% @spec transaction(Fun, Retries, TimeLeft, Opts) -> {ok, Value} | {error, Error}
|
||||
%% where
|
||||
%% Fun = function()
|
||||
%% Retries = infinity | integer()
|
||||
%% TimeLeft = infinity | integer()
|
||||
%% Opts = [atom()]
|
||||
%%
|
||||
%% @end
|
||||
%%--------------------------------------------------------------------
|
||||
-spec transaction(Fun :: db_txn_fun(), Retries :: db_txn_retries(), Opts :: db_flags()) ->
|
||||
-spec transaction(Fun :: db_txn_fun(), Retries :: db_txn_retries(),
|
||||
TimeLeft :: db_txn_timeleft(), Opts :: db_flags()) ->
|
||||
{ok, db_value()} | db_txn_error().
|
||||
transaction(_Fun, _Retries, TimeLeft, _Opts) when TimeLeft =/= infinity, TimeLeft < 0 ->
|
||||
{error, {transaction_failed, time_limit_reached}};
|
||||
|
||||
transaction(_Fun, 0, _Opts) ->
|
||||
transaction(_Fun, 0, _TimeLeft, _Opts) ->
|
||||
{error, {transaction_failed, retry_limit_reached}};
|
||||
|
||||
transaction(Fun, Retries, Opts) ->
|
||||
transaction(Fun, Retries, TimeLeft, Opts) ->
|
||||
Start = now(),
|
||||
case txn_begin(Opts) of
|
||||
ok ->
|
||||
try Fun() of
|
||||
|
@ -529,7 +557,11 @@ transaction(Fun, Retries, Opts) ->
|
|||
infinity -> infinity;
|
||||
Retries -> Retries - 1
|
||||
end,
|
||||
transaction(Fun, R);
|
||||
T = case TimeLeft of
|
||||
infinity -> infinity;
|
||||
TimeLeft -> TimeLeft - timer:now_diff(now(), Start)
|
||||
end,
|
||||
transaction(Fun, R, T, Opts);
|
||||
|
||||
_ : Reason ->
|
||||
error_logger:info_msg("function threw non-lock error - ~p", [Reason]),
|
||||
|
@ -911,7 +943,7 @@ get_r(Db, Key, Opts) ->
|
|||
%% Fun = function()
|
||||
%%
|
||||
%% @equiv update(Db, Key, Fun, undefined, [])
|
||||
%% @see update/5
|
||||
%% @see update/7
|
||||
%% @end
|
||||
%%--------------------------------------------------------------------
|
||||
-spec update(Db :: db(), Key :: db_key(), Fun :: db_update_fun()) ->
|
||||
|
@ -932,8 +964,8 @@ update(Db, Key, Fun) ->
|
|||
%% Fun = function()
|
||||
%% Args = any()
|
||||
%%
|
||||
%% @equiv update(Db, Key, Fun, undefined, [])
|
||||
%% @see update/5
|
||||
%% @equiv update(Db, Key, Fun, Args, [])
|
||||
%% @see update/7
|
||||
%% @end
|
||||
%%--------------------------------------------------------------------
|
||||
-spec update(Db :: db(), Key :: db_key(), Fun :: db_update_fun(), Args :: db_update_fun_args()) ->
|
||||
|
@ -947,14 +979,6 @@ update(Db, Key, Fun, Args) ->
|
|||
%% @doc
|
||||
%% Updates the value of a key by executing a fun.
|
||||
%%
|
||||
%% This function updates a key/value pair by calling the provided
|
||||
%% function and passing in the key and value as they are
|
||||
%% currently. Additional arguments can be provided to the function by
|
||||
%% passing them in the `Args' parameter. The entire operation is
|
||||
%% executed within the scope of a new transaction.
|
||||
%%
|
||||
%% The 'Opts' parameter is used to pass flags to transaction/3.
|
||||
%%
|
||||
%% @spec update(Db, Key, Fun, Args, Opts) -> {ok, Value} | {error, Error}
|
||||
%% where
|
||||
%% Db = integer()
|
||||
|
@ -963,12 +987,69 @@ update(Db, Key, Fun, Args) ->
|
|||
%% Args = any()
|
||||
%% Opts = [atom()]
|
||||
%%
|
||||
%% @equiv update(Db, Key, Fun, Args, infinity, [])
|
||||
%% @see update/7
|
||||
%% @end
|
||||
%%--------------------------------------------------------------------
|
||||
-spec update(Db :: db(), Key :: db_key(), Fun :: db_update_fun(), Args :: db_update_fun_args(), Opts :: db_flags()) ->
|
||||
{ok, db_value()} | db_txn_error().
|
||||
|
||||
update(Db, Key, Fun, Args, Opts) ->
|
||||
update(Db, Key, Fun, Args, infinity, Opts).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% @doc
|
||||
%% Updates the value of a key by executing a fun.
|
||||
%%
|
||||
%% @spec update(Db, Key, Fun, Args, Retries, Opts) -> {ok, Value} | {error, Error}
|
||||
%% where
|
||||
%% Db = integer()
|
||||
%% Key = term()
|
||||
%% Fun = function()
|
||||
%% Args = any()
|
||||
%% Retries = integer()
|
||||
%% Opts = [atom()]
|
||||
%%
|
||||
%% @equiv update(Db, Key, Fun, Args, Retries, infinity, [])
|
||||
%% @see update/7
|
||||
%% @end
|
||||
%%--------------------------------------------------------------------
|
||||
-spec update(Db :: db(), Key :: db_key(), Fun :: db_update_fun(), Args :: db_update_fun_args(),
|
||||
Retries :: db_txn_retries(), Opts :: db_flags()) ->
|
||||
{ok, db_value()} | db_txn_error().
|
||||
|
||||
update(Db, Key, Fun, Args, Retries, Opts) ->
|
||||
update(Db, Key, Fun, Args, Retries, infinity, Opts).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% @doc
|
||||
%% Updates the value of a key by executing a fun.
|
||||
%%
|
||||
%% This function updates a key/value pair by calling the provided
|
||||
%% function and passing in the key and value as they are
|
||||
%% currently. Additional arguments can be provided to the function by
|
||||
%% passing them in the `Args' parameter. The entire operation is
|
||||
%% executed within the scope of a new transaction.
|
||||
%%
|
||||
%% The 'Opts' parameter is used to pass flags to transaction/3.
|
||||
%%
|
||||
%% @spec update(Db, Key, Fun, Args, Retries, TimeLeft, Opts) -> {ok, Value} | {error, Error}
|
||||
%% where
|
||||
%% Db = integer()
|
||||
%% Key = term()
|
||||
%% Fun = function()
|
||||
%% Args = any()
|
||||
%% Retries = integer()
|
||||
%% TimeLeft = integer()
|
||||
%% Opts = [atom()]
|
||||
%%
|
||||
%% @end
|
||||
%%--------------------------------------------------------------------
|
||||
-spec update(Db :: db(), Key :: db_key(), Fun :: db_update_fun(), Args :: db_update_fun_args(),
|
||||
Retries :: db_txn_retries(), TimeLeft :: db_txn_timeleft(), Opts :: db_flags()) ->
|
||||
{ok, db_value()} | db_txn_error().
|
||||
|
||||
update(Db, Key, Fun, Args, Retries, TimeLeft, Opts) ->
|
||||
F = fun() ->
|
||||
Value = case get_r(Db, Key, [rmw]) of
|
||||
not_found -> not_found;
|
||||
|
@ -981,7 +1062,7 @@ update(Db, Key, Fun, Args, Opts) ->
|
|||
put_r(Db, Key, NewValue),
|
||||
NewValue
|
||||
end,
|
||||
transaction(F, infinity, Opts).
|
||||
transaction(F, Retries, TimeLeft, Opts).
|
||||
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -1210,13 +1291,14 @@ get_data_dirs() ->
|
|||
%%--------------------------------------------------------------------
|
||||
%% @doc
|
||||
%% Returns the list of directories that bdberl searches to find databases
|
||||
%% with the number of megabytes available for each dir
|
||||
%% with the number of megabytes available for each dir.
|
||||
%%
|
||||
%% @spec get_data_dirs_info() -> {ok, [DirName, Fsid, MbytesAvail]} | {error, Error}
|
||||
%% @spec get_data_dirs_info() -> {ok, [{DirName,Fsid,MbytesAvail}]} | {error, Reason}
|
||||
%% where
|
||||
%% DirName = string()
|
||||
%% Fsid = binary()
|
||||
%% MbytesAvail = integer()
|
||||
%% Reason = any()
|
||||
%%
|
||||
%% @end
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
Loading…
Reference in a new issue