Use non-cursor ops for get/put/delete because under the covers they
are reusing sessions and cursors in the NIF layer. Keep two cursors open for just the house keeping tasks, is_empty and stats.
This commit is contained in:
parent
f7219dcaec
commit
07592f20fb
1 changed files with 65 additions and 99 deletions
|
@ -22,7 +22,6 @@
|
||||||
|
|
||||||
-module(riak_kv_wterl_backend).
|
-module(riak_kv_wterl_backend).
|
||||||
-behavior(temp_riak_kv_backend).
|
-behavior(temp_riak_kv_backend).
|
||||||
-author('Steve Vinoski <steve@basho.com>').
|
|
||||||
|
|
||||||
%% KV Backend API
|
%% KV Backend API
|
||||||
-export([api_version/0,
|
-export([api_version/0,
|
||||||
|
@ -51,13 +50,10 @@
|
||||||
%%-define(CAPABILITIES, [async_fold, indexes]).
|
%%-define(CAPABILITIES, [async_fold, indexes]).
|
||||||
-define(CAPABILITIES, [async_fold]).
|
-define(CAPABILITIES, [async_fold]).
|
||||||
|
|
||||||
-record(pass, {session :: wterl:session(),
|
|
||||||
cursor :: wterl:cursor()}).
|
|
||||||
-type pass() :: #pass{}.
|
|
||||||
|
|
||||||
-record(state, {table :: string(),
|
-record(state, {table :: string(),
|
||||||
connection :: wterl:connection(),
|
connection :: wterl:connection(),
|
||||||
passes :: [pass()]}).
|
is_empty_cursor :: wterl:cursor(),
|
||||||
|
status_cursor :: wterl:cursor()}).
|
||||||
|
|
||||||
-type state() :: #state{}.
|
-type state() :: #state{}.
|
||||||
-type config() :: [{atom(), term()}].
|
-type config() :: [{atom(), term()}].
|
||||||
|
@ -101,20 +97,26 @@ start(Partition, Config) ->
|
||||||
%% if the cluster size > the n value
|
%% if the cluster size > the n value
|
||||||
Table = "lsm:wt" ++ integer_to_list(Partition),
|
Table = "lsm:wt" ++ integer_to_list(Partition),
|
||||||
{ok, Connection} = establish_connection(Config),
|
{ok, Connection} = establish_connection(Config),
|
||||||
Passes = establish_passes(erlang:system_info(schedulers), Connection, Table),
|
case wterl:cursor_open(Connection, Table) of
|
||||||
{ok, #state{table=Table, connection=Connection, passes=Passes}};
|
{ok, IsEmptyCursor} ->
|
||||||
|
case wterl:cursor_open(Connection, Table) of
|
||||||
|
{ok, StatusCursor} ->
|
||||||
|
{ok, #state{table=Table, connection=Connection,
|
||||||
|
is_empty_cursor=IsEmptyCursor,
|
||||||
|
status_cursor=StatusCursor}};
|
||||||
{error, Reason2} ->
|
{error, Reason2} ->
|
||||||
{error, Reason2}
|
{error, Reason2}
|
||||||
|
end;
|
||||||
|
{error, Reason3} ->
|
||||||
|
{error, Reason3}
|
||||||
|
end;
|
||||||
|
{error, Reason4} ->
|
||||||
|
{error, Reason4}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% @doc Stop the wterl backend
|
%% @doc Stop the wterl backend
|
||||||
-spec stop(state()) -> ok.
|
-spec stop(state()) -> ok.
|
||||||
stop(#state{passes=Passes}) ->
|
stop(_State) ->
|
||||||
lists:foreach(fun(Elem) ->
|
|
||||||
{Session, Cursor} = Elem,
|
|
||||||
ok = wterl:cursor_close(Cursor),
|
|
||||||
ok = wterl:session_close(Session)
|
|
||||||
end, Passes),
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
%% @doc Retrieve an object from the wterl backend
|
%% @doc Retrieve an object from the wterl backend
|
||||||
|
@ -122,10 +124,9 @@ stop(#state{passes=Passes}) ->
|
||||||
{ok, any(), state()} |
|
{ok, any(), state()} |
|
||||||
{ok, not_found, state()} |
|
{ok, not_found, state()} |
|
||||||
{error, term(), state()}.
|
{error, term(), state()}.
|
||||||
get(Bucket, Key, #state{passes=Passes}=State) ->
|
get(Bucket, Key, #state{connection=Connection}=State) ->
|
||||||
WTKey = to_object_key(Bucket, Key),
|
WTKey = to_object_key(Bucket, Key),
|
||||||
{_Session, Cursor} = lists:nth(erlang:system_info(scheduler_id), Passes),
|
case wterl:get(Connection, WTKey) of
|
||||||
case wterl:cursor_search(Cursor, WTKey) of
|
|
||||||
{ok, Value} ->
|
{ok, Value} ->
|
||||||
{ok, Value, State};
|
{ok, Value, State};
|
||||||
not_found ->
|
not_found ->
|
||||||
|
@ -142,10 +143,8 @@ get(Bucket, Key, #state{passes=Passes}=State) ->
|
||||||
-spec put(riak_object:bucket(), riak_object:key(), [index_spec()], binary(), state()) ->
|
-spec put(riak_object:bucket(), riak_object:key(), [index_spec()], binary(), state()) ->
|
||||||
{ok, state()} |
|
{ok, state()} |
|
||||||
{error, term(), state()}.
|
{error, term(), state()}.
|
||||||
put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{passes=Passes}=State) ->
|
put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{connection=Connection}=State) ->
|
||||||
WTKey = to_object_key(Bucket, PrimaryKey),
|
case wterl:put(Connection, to_object_key(Bucket, PrimaryKey), Val) of
|
||||||
{_Session, Cursor} = lists:nth(erlang:system_info(scheduler_id), Passes),
|
|
||||||
case wterl:cursor_insert(Cursor, WTKey, Val) of
|
|
||||||
ok ->
|
ok ->
|
||||||
{ok, State};
|
{ok, State};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
|
@ -159,10 +158,8 @@ put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{passes=Passes}=State) ->
|
||||||
-spec delete(riak_object:bucket(), riak_object:key(), [index_spec()], state()) ->
|
-spec delete(riak_object:bucket(), riak_object:key(), [index_spec()], state()) ->
|
||||||
{ok, state()} |
|
{ok, state()} |
|
||||||
{error, term(), state()}.
|
{error, term(), state()}.
|
||||||
delete(Bucket, Key, _IndexSpecs, #state{passes=Passes}=State) ->
|
delete(Bucket, Key, _IndexSpecs, #state{connection=Connection}=State) ->
|
||||||
WTKey = to_object_key(Bucket, Key),
|
case wterl:delete(Connection, to_object_key(Bucket, Key)) of
|
||||||
{_Session, Cursor} = lists:nth(erlang:system_info(scheduler_id), Passes),
|
|
||||||
case wterl:cursor_remove(Cursor, WTKey) of
|
|
||||||
ok ->
|
ok ->
|
||||||
{ok, State};
|
{ok, State};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
|
@ -174,13 +171,11 @@ delete(Bucket, Key, _IndexSpecs, #state{passes=Passes}=State) ->
|
||||||
any(),
|
any(),
|
||||||
[],
|
[],
|
||||||
state()) -> {ok, any()} | {async, fun()}.
|
state()) -> {ok, any()} | {async, fun()}.
|
||||||
fold_buckets(FoldBucketsFun, Acc, Opts, #state{table=Table}) ->
|
fold_buckets(FoldBucketsFun, Acc, Opts, #state{connection=Connection, table=Table}) ->
|
||||||
{ok, Connection} = wterl_conn:get(),
|
|
||||||
FoldFun = fold_buckets_fun(FoldBucketsFun),
|
FoldFun = fold_buckets_fun(FoldBucketsFun),
|
||||||
BucketFolder =
|
BucketFolder =
|
||||||
fun() ->
|
fun() ->
|
||||||
{ok, Session} = wterl:session_open(Connection),
|
case wterl:cursor_open(Connection, Table) of
|
||||||
case wterl:cursor_open(Session, Table) of
|
|
||||||
{error, "No such file or directory"} ->
|
{error, "No such file or directory"} ->
|
||||||
Acc;
|
Acc;
|
||||||
{ok, Cursor} ->
|
{ok, Cursor} ->
|
||||||
|
@ -192,8 +187,7 @@ fold_buckets(FoldBucketsFun, Acc, Opts, #state{table=Table}) ->
|
||||||
{break, AccFinal} ->
|
{break, AccFinal} ->
|
||||||
AccFinal
|
AccFinal
|
||||||
after
|
after
|
||||||
ok = wterl:cursor_close(Cursor),
|
ok = wterl:cursor_close(Cursor)
|
||||||
ok = wterl:session_close(Session)
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
|
@ -209,7 +203,7 @@ fold_buckets(FoldBucketsFun, Acc, Opts, #state{table=Table}) ->
|
||||||
any(),
|
any(),
|
||||||
[{atom(), term()}],
|
[{atom(), term()}],
|
||||||
state()) -> {ok, term()} | {async, fun()}.
|
state()) -> {ok, term()} | {async, fun()}.
|
||||||
fold_keys(FoldKeysFun, Acc, Opts, #state{table=Table}) ->
|
fold_keys(FoldKeysFun, Acc, Opts, #state{connection=Connection, table=Table}) ->
|
||||||
%% Figure out how we should limit the fold: by bucket, by
|
%% Figure out how we should limit the fold: by bucket, by
|
||||||
%% secondary index, or neither (fold across everything.)
|
%% secondary index, or neither (fold across everything.)
|
||||||
Bucket = lists:keyfind(bucket, 1, Opts),
|
Bucket = lists:keyfind(bucket, 1, Opts),
|
||||||
|
@ -222,14 +216,11 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{table=Table}) ->
|
||||||
true -> undefined
|
true -> undefined
|
||||||
end,
|
end,
|
||||||
|
|
||||||
{ok, Connection} = wterl_conn:get(),
|
|
||||||
|
|
||||||
%% Set up the fold...
|
%% Set up the fold...
|
||||||
FoldFun = fold_keys_fun(FoldKeysFun, Limiter),
|
FoldFun = fold_keys_fun(FoldKeysFun, Limiter),
|
||||||
KeyFolder =
|
KeyFolder =
|
||||||
fun() ->
|
fun() ->
|
||||||
{ok, Session} = wterl:session_open(Connection),
|
case wterl:cursor_open(Connection, Table) of
|
||||||
case wterl:cursor_open(Session, Table) of
|
|
||||||
{error, "No such file or directory"} ->
|
{error, "No such file or directory"} ->
|
||||||
Acc;
|
Acc;
|
||||||
{ok, Cursor} ->
|
{ok, Cursor} ->
|
||||||
|
@ -239,8 +230,7 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{table=Table}) ->
|
||||||
{break, AccFinal} ->
|
{break, AccFinal} ->
|
||||||
AccFinal
|
AccFinal
|
||||||
after
|
after
|
||||||
ok = wterl:cursor_close(Cursor),
|
ok = wterl:cursor_close(Cursor)
|
||||||
ok = wterl:session_close(Session)
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
|
@ -256,14 +246,12 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{table=Table}) ->
|
||||||
any(),
|
any(),
|
||||||
[{atom(), term()}],
|
[{atom(), term()}],
|
||||||
state()) -> {ok, any()} | {async, fun()}.
|
state()) -> {ok, any()} | {async, fun()}.
|
||||||
fold_objects(FoldObjectsFun, Acc, Opts, #state{table=Table}) ->
|
fold_objects(FoldObjectsFun, Acc, Opts, #state{connection=Connection, table=Table}) ->
|
||||||
{ok, Connection} = wterl_conn:get(),
|
|
||||||
Bucket = proplists:get_value(bucket, Opts),
|
Bucket = proplists:get_value(bucket, Opts),
|
||||||
FoldFun = fold_objects_fun(FoldObjectsFun, Bucket),
|
FoldFun = fold_objects_fun(FoldObjectsFun, Bucket),
|
||||||
ObjectFolder =
|
ObjectFolder =
|
||||||
fun() ->
|
fun() ->
|
||||||
{ok, Session} = wterl:session_open(Connection),
|
case wterl:cursor_open(Connection, Table) of
|
||||||
case wterl:cursor_open(Session, Table) of
|
|
||||||
{error, "No such file or directory"} ->
|
{error, "No such file or directory"} ->
|
||||||
Acc;
|
Acc;
|
||||||
{ok, Cursor} ->
|
{ok, Cursor} ->
|
||||||
|
@ -273,8 +261,7 @@ fold_objects(FoldObjectsFun, Acc, Opts, #state{table=Table}) ->
|
||||||
{break, AccFinal} ->
|
{break, AccFinal} ->
|
||||||
AccFinal
|
AccFinal
|
||||||
after
|
after
|
||||||
ok = wterl:cursor_close(Cursor),
|
ok = wterl:cursor_close(Cursor)
|
||||||
ok = wterl:session_close(Session)
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
|
@ -287,9 +274,8 @@ fold_objects(FoldObjectsFun, Acc, Opts, #state{table=Table}) ->
|
||||||
|
|
||||||
%% @doc Delete all objects from this wterl backend
|
%% @doc Delete all objects from this wterl backend
|
||||||
-spec drop(state()) -> {ok, state()} | {error, term(), state()}.
|
-spec drop(state()) -> {ok, state()} | {error, term(), state()}.
|
||||||
drop(#state{passes=Passes, table=Table}=State) ->
|
drop(#state{connection=Connection, table=Table}=State) ->
|
||||||
{Session, _Cursor} = lists:nth(erlang:system_info(scheduler_id), Passes),
|
case wterl:truncate(Connection, Table) of
|
||||||
case wterl:session_truncate(Session, Table) of
|
|
||||||
ok ->
|
ok ->
|
||||||
{ok, State};
|
{ok, State};
|
||||||
Error ->
|
Error ->
|
||||||
|
@ -299,8 +285,7 @@ drop(#state{passes=Passes, table=Table}=State) ->
|
||||||
%% @doc Returns true if this wterl backend contains any
|
%% @doc Returns true if this wterl backend contains any
|
||||||
%% non-tombstone values; otherwise returns false.
|
%% non-tombstone values; otherwise returns false.
|
||||||
-spec is_empty(state()) -> boolean().
|
-spec is_empty(state()) -> boolean().
|
||||||
is_empty(#state{passes=Passes}) ->
|
is_empty(#state{is_empty_cursor=Cursor}) ->
|
||||||
{_Session, Cursor} = lists:nth(erlang:system_info(scheduler_id), Passes),
|
|
||||||
wterl:cursor_reset(Cursor),
|
wterl:cursor_reset(Cursor),
|
||||||
try
|
try
|
||||||
not_found =:= wterl:cursor_next(Cursor)
|
not_found =:= wterl:cursor_next(Cursor)
|
||||||
|
@ -310,8 +295,8 @@ is_empty(#state{passes=Passes}) ->
|
||||||
|
|
||||||
%% @doc Get the status information for this wterl backend
|
%% @doc Get the status information for this wterl backend
|
||||||
-spec status(state()) -> [{atom(), term()}].
|
-spec status(state()) -> [{atom(), term()}].
|
||||||
status(#state{passes=Passes}) ->
|
status(#state{status_cursor=Cursor}) ->
|
||||||
{_Session, Cursor} = lists:nth(erlang:system_info(scheduler_id), Passes),
|
wterl:cursor_reset(Cursor),
|
||||||
try
|
try
|
||||||
Stats = fetch_status(Cursor),
|
Stats = fetch_status(Cursor),
|
||||||
[{stats, Stats}]
|
[{stats, Stats}]
|
||||||
|
@ -346,8 +331,10 @@ establish_connection(Config) ->
|
||||||
{error, data_root_unset};
|
{error, data_root_unset};
|
||||||
DataRoot ->
|
DataRoot ->
|
||||||
ok = filelib:ensure_dir(filename:join(DataRoot, "x")),
|
ok = filelib:ensure_dir(filename:join(DataRoot, "x")),
|
||||||
|
|
||||||
|
%% WT Connection Options:
|
||||||
RequestedCacheSize = app_helper:get_prop_or_env(cache_size, Config, wterl),
|
RequestedCacheSize = app_helper:get_prop_or_env(cache_size, Config, wterl),
|
||||||
Opts =
|
ConnectionOpts =
|
||||||
orddict:from_list(
|
orddict:from_list(
|
||||||
[ wterl:config_value(create, Config, true),
|
[ wterl:config_value(create, Config, true),
|
||||||
wterl:config_value(sync, Config, false),
|
wterl:config_value(sync, Config, false),
|
||||||
|
@ -363,27 +350,8 @@ establish_connection(Config) ->
|
||||||
%"hazard", "mutex", "read", "readserver", "reconcile",
|
%"hazard", "mutex", "read", "readserver", "reconcile",
|
||||||
%"salvage", "verify", "write", "evict", "lsm"
|
%"salvage", "verify", "write", "evict", "lsm"
|
||||||
]) ] ++ proplists:get_value(wterl, Config, [])), % sec
|
]) ] ++ proplists:get_value(wterl, Config, [])), % sec
|
||||||
case wterl_conn:open(DataRoot, Opts) of
|
|
||||||
{ok, Connection} ->
|
|
||||||
{ok, Connection};
|
|
||||||
{error, Reason2} ->
|
|
||||||
lager:error("Failed to establish a WiredTiger connection, wterl backend unable to start: ~p\n", [Reason2]),
|
|
||||||
{error, Reason2}
|
|
||||||
end
|
|
||||||
end.
|
|
||||||
|
|
||||||
establish_passes(Count, Connection, Table)
|
%% WT Session Options:
|
||||||
when is_number(Count), Count > 0 ->
|
|
||||||
lists:map(fun(_Elem) ->
|
|
||||||
{ok, Session} = establish_session(Connection, Table),
|
|
||||||
{ok, Cursor} = wterl:cursor_open(Session, Table),
|
|
||||||
{Session, Cursor}
|
|
||||||
end, lists:seq(1, Count)).
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
establish_session(Connection, Table) ->
|
|
||||||
case wterl:session_open(Connection, wterl:config_to_bin([{isolation, "snapshot"}])) of
|
|
||||||
{ok, Session} ->
|
|
||||||
SessionOpts =
|
SessionOpts =
|
||||||
[{block_compressor, "snappy"},
|
[{block_compressor, "snappy"},
|
||||||
{internal_page_max, "128K"},
|
{internal_page_max, "128K"},
|
||||||
|
@ -394,16 +362,14 @@ establish_session(Connection, Table) ->
|
||||||
{lsm_bloom_bit_count, 128},
|
{lsm_bloom_bit_count, 128},
|
||||||
{lsm_bloom_hash_count, 64},
|
{lsm_bloom_hash_count, 64},
|
||||||
{lsm_bloom_config, [{leaf_page_max, "8MB"}]}],
|
{lsm_bloom_config, [{leaf_page_max, "8MB"}]}],
|
||||||
case wterl:session_create(Session, Table, wterl:config_to_bin(SessionOpts)) of
|
|
||||||
ok ->
|
case wterl_conn:open(DataRoot, SessionOpts, ConnectionOpts) of
|
||||||
{ok, Session};
|
{ok, Connection} ->
|
||||||
{error, Reason} ->
|
{ok, Connection};
|
||||||
lager:error("Failed to start wterl backend: ~p\n", [Reason]),
|
{error, Reason2} ->
|
||||||
{error, Reason}
|
lager:error("Failed to establish a WiredTiger connection, wterl backend unable to start: ~p\n", [Reason2]),
|
||||||
end;
|
{error, Reason2}
|
||||||
{error, Reason} ->
|
end
|
||||||
lager:error("Failed to open a WiredTiger session: ~p\n", [Reason]),
|
|
||||||
{error, Reason}
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
|
|
Loading…
Reference in a new issue