From 99a51980eb3a1c12fc809cb1abeda52d6aa71543 Mon Sep 17 00:00:00 2001 From: Steve Vinoski Date: Sat, 10 Mar 2012 13:02:52 -0500 Subject: [PATCH] store session as kv backend state In order to reduce the number of wiredtiger session open/close calls, open a session at startup and keep it in the kv backend state. This is safe as far as using these sessions on scheduler threads goes because the riak_kv_vnode fsm serializes all calls through the backend, so we'll never have a case of multiple scheduler threads concurrently trying to use the same session. --- src/riak_kv_wterl_backend.erl | 112 +++++++++++++--------------------- 1 file changed, 43 insertions(+), 69 deletions(-) diff --git a/src/riak_kv_wterl_backend.erl b/src/riak_kv_wterl_backend.erl index d4cda26..fb5f60a 100644 --- a/src/riak_kv_wterl_backend.erl +++ b/src/riak_kv_wterl_backend.erl @@ -50,8 +50,9 @@ %%-define(CAPABILITIES, [async_fold, indexes]). -define(CAPABILITIES, [async_fold]). --record(state, {ref :: reference(), +-record(state, {conn :: reference(), table :: string(), + session :: reference(), partition :: integer()}). -type state() :: #state{}. @@ -91,13 +92,10 @@ start(Partition, Config) -> {ok, ConnRef} -> Table = "table:wt" ++ integer_to_list(Partition), {ok, SRef} = wterl:session_open(ConnRef), - try - ok = wterl:session_create(SRef, Table) - after - ok = wterl:session_close(SRef) - end, - {ok, #state{ref=ConnRef, + ok = wterl:session_create(SRef, Table), + {ok, #state{conn=ConnRef, table=Table, + session=SRef, partition=Partition}}; {error, Reason} -> lager:error("Failed to start wterl backend: ~p\n", @@ -108,7 +106,8 @@ start(Partition, Config) -> %% @doc Stop the wterl backend -spec stop(state()) -> ok. -stop(_State) -> +stop(#state{session=SRef}) -> + ok = wterl:session_close(SRef), wterl_conn:close(). %% @doc Retrieve an object from the wterl backend @@ -116,20 +115,15 @@ stop(_State) -> {ok, any(), state()} | {ok, not_found, state()} | {error, term(), state()}. -get(Bucket, Key, #state{ref=Ref, table=Table}=State) -> +get(Bucket, Key, #state{table=Table, session=SRef}=State) -> WTKey = to_object_key(Bucket, Key), - {ok, SRef} = wterl:session_open(Ref), - try - case wterl:session_get(SRef, Table, WTKey) of - {ok, Value} -> - {ok, Value, State}; - not_found -> - {error, not_found, State}; - {error, Reason} -> - {error, Reason, State} - end - after - ok = wterl:session_close(SRef) + case wterl:session_get(SRef, Table, WTKey) of + {ok, Value} -> + {ok, Value, State}; + not_found -> + {error, not_found, State}; + {error, Reason} -> + {error, Reason, State} end. %% @doc Insert an object into the wterl backend. @@ -140,18 +134,13 @@ get(Bucket, Key, #state{ref=Ref, table=Table}=State) -> -spec put(riak_object:bucket(), riak_object:key(), [index_spec()], binary(), state()) -> {ok, state()} | {error, term(), state()}. -put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{ref=Ref, table=Table}=State) -> +put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{table=Table, session=SRef}=State) -> WTKey = to_object_key(Bucket, PrimaryKey), - {ok, SRef} = wterl:session_open(Ref), - try - case wterl:session_put(SRef, Table, WTKey, Val) of - ok -> - {ok, State}; - {error, Reason} -> - {error, Reason, State} - end - after - ok = wterl:session_close(SRef) + case wterl:session_put(SRef, Table, WTKey, Val) of + ok -> + {ok, State}; + {error, Reason} -> + {error, Reason, State} end. %% @doc Delete an object from the wterl backend @@ -161,31 +150,25 @@ put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{ref=Ref, table=Table}=State) -> -spec delete(riak_object:bucket(), riak_object:key(), [index_spec()], state()) -> {ok, state()} | {error, term(), state()}. -delete(Bucket, Key, _IndexSpecs, #state{ref=Ref, table=Table}=State) -> +delete(Bucket, Key, _IndexSpecs, #state{table=Table, session=SRef}=State) -> WTKey = to_object_key(Bucket, Key), - {ok, SRef} = wterl:session_open(Ref), - try - case wterl:session_delete(SRef, Table, WTKey) of - ok -> - {ok, State}; - {error, Reason} -> - {error, Reason, State} - end - after - ok = wterl:session_close(SRef) + case wterl:session_delete(SRef, Table, WTKey) of + ok -> + {ok, State}; + {error, Reason} -> + {error, Reason, State} end. - %% @doc Fold over all the buckets -spec fold_buckets(riak_kv_backend:fold_buckets_fun(), any(), [], state()) -> {ok, any()} | {async, fun()}. -fold_buckets(FoldBucketsFun, Acc, Opts, #state{ref=Ref, table=Table}) -> +fold_buckets(FoldBucketsFun, Acc, Opts, #state{conn=ConnRef, table=Table}) -> FoldFun = fold_buckets_fun(FoldBucketsFun), BucketFolder = fun() -> - {ok, SRef} = wterl:session_open(Ref), + {ok, SRef} = wterl:session_open(ConnRef), {ok, Cursor} = wterl:cursor_open(SRef, Table), try {FoldResult, _} = @@ -211,7 +194,7 @@ fold_buckets(FoldBucketsFun, Acc, Opts, #state{ref=Ref, table=Table}) -> any(), [{atom(), term()}], state()) -> {ok, term()} | {async, fun()}. -fold_keys(FoldKeysFun, Acc, Opts, #state{ref=Ref, table=Table}) -> +fold_keys(FoldKeysFun, Acc, Opts, #state{conn=ConnRef, table=Table}) -> %% Figure out how we should limit the fold: by bucket, by %% secondary index, or neither (fold across everything.) Bucket = lists:keyfind(bucket, 1, Opts), @@ -228,7 +211,7 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{ref=Ref, table=Table}) -> FoldFun = fold_keys_fun(FoldKeysFun, Limiter), KeyFolder = fun() -> - {ok, SRef} = wterl:session_open(Ref), + {ok, SRef} = wterl:session_open(ConnRef), {ok, Cursor} = wterl:cursor_open(SRef, Table), try wterl:fold_keys(Cursor, FoldFun, Acc) @@ -252,12 +235,12 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{ref=Ref, table=Table}) -> any(), [{atom(), term()}], state()) -> {ok, any()} | {async, fun()}. -fold_objects(FoldObjectsFun, Acc, Opts, #state{ref=Ref, table=Table}) -> +fold_objects(FoldObjectsFun, Acc, Opts, #state{conn=ConnRef, table=Table}) -> Bucket = proplists:get_value(bucket, Opts), FoldFun = fold_objects_fun(FoldObjectsFun, Bucket), ObjectFolder = fun() -> - {ok, SRef} = wterl:session_open(Ref), + {ok, SRef} = wterl:session_open(ConnRef), {ok, Cursor} = wterl:cursor_open(SRef, Table), try wterl:fold(Cursor, FoldFun, Acc) @@ -278,40 +261,30 @@ fold_objects(FoldObjectsFun, Acc, Opts, #state{ref=Ref, table=Table}) -> %% @doc Delete all objects from this wterl backend -spec drop(state()) -> {ok, state()} | {error, term(), state()}. -drop(#state{ref=Ref, table=Table}=State) -> - {ok, SRef} = wterl:session_open(Ref), - try - ok = wterl:session_truncate(SRef, Table), - {ok, State} - after - ok = wterl:session_close(SRef) - end. - +drop(#state{table=Table, session=SRef}=State) -> + ok = wterl:session_truncate(SRef, Table), + {ok, State}. %% @doc Returns true if this wterl backend contains any %% non-tombstone values; otherwise returns false. -spec is_empty(state()) -> boolean(). -is_empty(#state{ref=Ref, table=Table}) -> - {ok, SRef} = wterl:session_open(Ref), +is_empty(#state{table=Table, session=SRef}) -> {ok, Cursor} = wterl:cursor_open(SRef, Table), try not_found =:= wterl:cursor_next(Cursor) after - ok = wterl:cursor_close(Cursor), - ok = wterl:session_close(SRef) + ok = wterl:cursor_close(Cursor) end. %% @doc Get the status information for this wterl backend -spec status(state()) -> [{atom(), term()}]. -status(#state{ref=Ref, table=Table}) -> - {ok, SRef} = wterl:session_open(Ref), +status(#state{table=Table, session=SRef}) -> {ok, Cursor} = wterl:cursor_open(SRef, "statistics:"++Table), try Stats = fetch_status(Cursor), [{stats, Stats}] after - ok = wterl:cursor_close(Cursor), - ok = wterl:session_close(SRef) + ok = wterl:cursor_close(Cursor) end. %% @doc Register an asynchronous callback @@ -432,11 +405,12 @@ from_index_key(LKey) -> %% @private %% Return all status from wterl statistics cursor fetch_status(Cursor) -> - fetch_status(Cursor, wterl:cursor_next(Cursor), []). + fetch_status(Cursor, wterl:cursor_next_value(Cursor), []). fetch_status(_Cursor, not_found, Acc) -> lists:reverse(Acc); fetch_status(Cursor, {ok, Stat}, Acc) -> - fetch_status(Cursor, wterl:cursor_next(Cursor), [Stat|Acc]). + [What,Val|_] = [binary_to_list(B) || B <- binary:split(Stat, [<<0>>], [global])], + fetch_status(Cursor, wterl:cursor_next_value(Cursor), [{What,Val}|Acc]). %% =================================================================== %% EUnit tests