store session as kv backend state

In order to reduce the number of wiredtiger session open/close calls,
open a session at startup and keep it in the kv backend state. This is
safe as far as using these sessions on scheduler threads goes because
the riak_kv_vnode fsm serializes all calls through the backend, so
we'll never have a case of multiple scheduler threads concurrently
trying to use the same session.
This commit is contained in:
Steve Vinoski 2012-03-10 13:02:52 -05:00
parent dc01981be8
commit 99a51980eb

View file

@ -50,8 +50,9 @@
%%-define(CAPABILITIES, [async_fold, indexes]). %%-define(CAPABILITIES, [async_fold, indexes]).
-define(CAPABILITIES, [async_fold]). -define(CAPABILITIES, [async_fold]).
-record(state, {ref :: reference(), -record(state, {conn :: reference(),
table :: string(), table :: string(),
session :: reference(),
partition :: integer()}). partition :: integer()}).
-type state() :: #state{}. -type state() :: #state{}.
@ -91,13 +92,10 @@ start(Partition, Config) ->
{ok, ConnRef} -> {ok, ConnRef} ->
Table = "table:wt" ++ integer_to_list(Partition), Table = "table:wt" ++ integer_to_list(Partition),
{ok, SRef} = wterl:session_open(ConnRef), {ok, SRef} = wterl:session_open(ConnRef),
try ok = wterl:session_create(SRef, Table),
ok = wterl:session_create(SRef, Table) {ok, #state{conn=ConnRef,
after
ok = wterl:session_close(SRef)
end,
{ok, #state{ref=ConnRef,
table=Table, table=Table,
session=SRef,
partition=Partition}}; partition=Partition}};
{error, Reason} -> {error, Reason} ->
lager:error("Failed to start wterl backend: ~p\n", lager:error("Failed to start wterl backend: ~p\n",
@ -108,7 +106,8 @@ start(Partition, Config) ->
%% @doc Stop the wterl backend %% @doc Stop the wterl backend
-spec stop(state()) -> ok. -spec stop(state()) -> ok.
stop(_State) -> stop(#state{session=SRef}) ->
ok = wterl:session_close(SRef),
wterl_conn:close(). wterl_conn:close().
%% @doc Retrieve an object from the wterl backend %% @doc Retrieve an object from the wterl backend
@ -116,10 +115,8 @@ stop(_State) ->
{ok, any(), state()} | {ok, any(), state()} |
{ok, not_found, state()} | {ok, not_found, state()} |
{error, term(), state()}. {error, term(), state()}.
get(Bucket, Key, #state{ref=Ref, table=Table}=State) -> get(Bucket, Key, #state{table=Table, session=SRef}=State) ->
WTKey = to_object_key(Bucket, Key), WTKey = to_object_key(Bucket, Key),
{ok, SRef} = wterl:session_open(Ref),
try
case wterl:session_get(SRef, Table, WTKey) of case wterl:session_get(SRef, Table, WTKey) of
{ok, Value} -> {ok, Value} ->
{ok, Value, State}; {ok, Value, State};
@ -127,9 +124,6 @@ get(Bucket, Key, #state{ref=Ref, table=Table}=State) ->
{error, not_found, State}; {error, not_found, State};
{error, Reason} -> {error, Reason} ->
{error, Reason, State} {error, Reason, State}
end
after
ok = wterl:session_close(SRef)
end. end.
%% @doc Insert an object into the wterl backend. %% @doc Insert an object into the wterl backend.
@ -140,18 +134,13 @@ get(Bucket, Key, #state{ref=Ref, table=Table}=State) ->
-spec put(riak_object:bucket(), riak_object:key(), [index_spec()], binary(), state()) -> -spec put(riak_object:bucket(), riak_object:key(), [index_spec()], binary(), state()) ->
{ok, state()} | {ok, state()} |
{error, term(), state()}. {error, term(), state()}.
put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{ref=Ref, table=Table}=State) -> put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{table=Table, session=SRef}=State) ->
WTKey = to_object_key(Bucket, PrimaryKey), WTKey = to_object_key(Bucket, PrimaryKey),
{ok, SRef} = wterl:session_open(Ref),
try
case wterl:session_put(SRef, Table, WTKey, Val) of case wterl:session_put(SRef, Table, WTKey, Val) of
ok -> ok ->
{ok, State}; {ok, State};
{error, Reason} -> {error, Reason} ->
{error, Reason, State} {error, Reason, State}
end
after
ok = wterl:session_close(SRef)
end. end.
%% @doc Delete an object from the wterl backend %% @doc Delete an object from the wterl backend
@ -161,31 +150,25 @@ put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{ref=Ref, table=Table}=State) ->
-spec delete(riak_object:bucket(), riak_object:key(), [index_spec()], state()) -> -spec delete(riak_object:bucket(), riak_object:key(), [index_spec()], state()) ->
{ok, state()} | {ok, state()} |
{error, term(), state()}. {error, term(), state()}.
delete(Bucket, Key, _IndexSpecs, #state{ref=Ref, table=Table}=State) -> delete(Bucket, Key, _IndexSpecs, #state{table=Table, session=SRef}=State) ->
WTKey = to_object_key(Bucket, Key), WTKey = to_object_key(Bucket, Key),
{ok, SRef} = wterl:session_open(Ref),
try
case wterl:session_delete(SRef, Table, WTKey) of case wterl:session_delete(SRef, Table, WTKey) of
ok -> ok ->
{ok, State}; {ok, State};
{error, Reason} -> {error, Reason} ->
{error, Reason, State} {error, Reason, State}
end
after
ok = wterl:session_close(SRef)
end. end.
%% @doc Fold over all the buckets %% @doc Fold over all the buckets
-spec fold_buckets(riak_kv_backend:fold_buckets_fun(), -spec fold_buckets(riak_kv_backend:fold_buckets_fun(),
any(), any(),
[], [],
state()) -> {ok, any()} | {async, fun()}. state()) -> {ok, any()} | {async, fun()}.
fold_buckets(FoldBucketsFun, Acc, Opts, #state{ref=Ref, table=Table}) -> fold_buckets(FoldBucketsFun, Acc, Opts, #state{conn=ConnRef, table=Table}) ->
FoldFun = fold_buckets_fun(FoldBucketsFun), FoldFun = fold_buckets_fun(FoldBucketsFun),
BucketFolder = BucketFolder =
fun() -> fun() ->
{ok, SRef} = wterl:session_open(Ref), {ok, SRef} = wterl:session_open(ConnRef),
{ok, Cursor} = wterl:cursor_open(SRef, Table), {ok, Cursor} = wterl:cursor_open(SRef, Table),
try try
{FoldResult, _} = {FoldResult, _} =
@ -211,7 +194,7 @@ fold_buckets(FoldBucketsFun, Acc, Opts, #state{ref=Ref, table=Table}) ->
any(), any(),
[{atom(), term()}], [{atom(), term()}],
state()) -> {ok, term()} | {async, fun()}. state()) -> {ok, term()} | {async, fun()}.
fold_keys(FoldKeysFun, Acc, Opts, #state{ref=Ref, table=Table}) -> fold_keys(FoldKeysFun, Acc, Opts, #state{conn=ConnRef, table=Table}) ->
%% Figure out how we should limit the fold: by bucket, by %% Figure out how we should limit the fold: by bucket, by
%% secondary index, or neither (fold across everything.) %% secondary index, or neither (fold across everything.)
Bucket = lists:keyfind(bucket, 1, Opts), Bucket = lists:keyfind(bucket, 1, Opts),
@ -228,7 +211,7 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{ref=Ref, table=Table}) ->
FoldFun = fold_keys_fun(FoldKeysFun, Limiter), FoldFun = fold_keys_fun(FoldKeysFun, Limiter),
KeyFolder = KeyFolder =
fun() -> fun() ->
{ok, SRef} = wterl:session_open(Ref), {ok, SRef} = wterl:session_open(ConnRef),
{ok, Cursor} = wterl:cursor_open(SRef, Table), {ok, Cursor} = wterl:cursor_open(SRef, Table),
try try
wterl:fold_keys(Cursor, FoldFun, Acc) wterl:fold_keys(Cursor, FoldFun, Acc)
@ -252,12 +235,12 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{ref=Ref, table=Table}) ->
any(), any(),
[{atom(), term()}], [{atom(), term()}],
state()) -> {ok, any()} | {async, fun()}. state()) -> {ok, any()} | {async, fun()}.
fold_objects(FoldObjectsFun, Acc, Opts, #state{ref=Ref, table=Table}) -> fold_objects(FoldObjectsFun, Acc, Opts, #state{conn=ConnRef, table=Table}) ->
Bucket = proplists:get_value(bucket, Opts), Bucket = proplists:get_value(bucket, Opts),
FoldFun = fold_objects_fun(FoldObjectsFun, Bucket), FoldFun = fold_objects_fun(FoldObjectsFun, Bucket),
ObjectFolder = ObjectFolder =
fun() -> fun() ->
{ok, SRef} = wterl:session_open(Ref), {ok, SRef} = wterl:session_open(ConnRef),
{ok, Cursor} = wterl:cursor_open(SRef, Table), {ok, Cursor} = wterl:cursor_open(SRef, Table),
try try
wterl:fold(Cursor, FoldFun, Acc) wterl:fold(Cursor, FoldFun, Acc)
@ -278,40 +261,30 @@ fold_objects(FoldObjectsFun, Acc, Opts, #state{ref=Ref, table=Table}) ->
%% @doc Delete all objects from this wterl backend %% @doc Delete all objects from this wterl backend
-spec drop(state()) -> {ok, state()} | {error, term(), state()}. -spec drop(state()) -> {ok, state()} | {error, term(), state()}.
drop(#state{ref=Ref, table=Table}=State) -> drop(#state{table=Table, session=SRef}=State) ->
{ok, SRef} = wterl:session_open(Ref),
try
ok = wterl:session_truncate(SRef, Table), ok = wterl:session_truncate(SRef, Table),
{ok, State} {ok, State}.
after
ok = wterl:session_close(SRef)
end.
%% @doc Returns true if this wterl backend contains any %% @doc Returns true if this wterl backend contains any
%% non-tombstone values; otherwise returns false. %% non-tombstone values; otherwise returns false.
-spec is_empty(state()) -> boolean(). -spec is_empty(state()) -> boolean().
is_empty(#state{ref=Ref, table=Table}) -> is_empty(#state{table=Table, session=SRef}) ->
{ok, SRef} = wterl:session_open(Ref),
{ok, Cursor} = wterl:cursor_open(SRef, Table), {ok, Cursor} = wterl:cursor_open(SRef, Table),
try try
not_found =:= wterl:cursor_next(Cursor) not_found =:= wterl:cursor_next(Cursor)
after after
ok = wterl:cursor_close(Cursor), ok = wterl:cursor_close(Cursor)
ok = wterl:session_close(SRef)
end. end.
%% @doc Get the status information for this wterl backend %% @doc Get the status information for this wterl backend
-spec status(state()) -> [{atom(), term()}]. -spec status(state()) -> [{atom(), term()}].
status(#state{ref=Ref, table=Table}) -> status(#state{table=Table, session=SRef}) ->
{ok, SRef} = wterl:session_open(Ref),
{ok, Cursor} = wterl:cursor_open(SRef, "statistics:"++Table), {ok, Cursor} = wterl:cursor_open(SRef, "statistics:"++Table),
try try
Stats = fetch_status(Cursor), Stats = fetch_status(Cursor),
[{stats, Stats}] [{stats, Stats}]
after after
ok = wterl:cursor_close(Cursor), ok = wterl:cursor_close(Cursor)
ok = wterl:session_close(SRef)
end. end.
%% @doc Register an asynchronous callback %% @doc Register an asynchronous callback
@ -432,11 +405,12 @@ from_index_key(LKey) ->
%% @private %% @private
%% Return all status from wterl statistics cursor %% Return all status from wterl statistics cursor
fetch_status(Cursor) -> fetch_status(Cursor) ->
fetch_status(Cursor, wterl:cursor_next(Cursor), []). fetch_status(Cursor, wterl:cursor_next_value(Cursor), []).
fetch_status(_Cursor, not_found, Acc) -> fetch_status(_Cursor, not_found, Acc) ->
lists:reverse(Acc); lists:reverse(Acc);
fetch_status(Cursor, {ok, Stat}, Acc) -> fetch_status(Cursor, {ok, Stat}, Acc) ->
fetch_status(Cursor, wterl:cursor_next(Cursor), [Stat|Acc]). [What,Val|_] = [binary_to_list(B) || B <- binary:split(Stat, [<<0>>], [global])],
fetch_status(Cursor, wterl:cursor_next_value(Cursor), [{What,Val}|Acc]).
%% =================================================================== %% ===================================================================
%% EUnit tests %% EUnit tests