Fixed issue reusing cursors.

This commit is contained in:
Gregory Burd 2013-03-16 16:42:43 -04:00
parent ab45690a4b
commit 22d42f5076

View file

@ -55,7 +55,7 @@
-record(state, {conn :: wterl:connection(), -record(state, {conn :: wterl:connection(),
table :: string(), table :: string(),
session :: wterl:session(), session :: wterl:session(),
cursors :: ets:tid(), cursor :: wterl:cursor(),
partition :: integer()}). partition :: integer()}).
-type state() :: #state{}. -type state() :: #state{}.
@ -93,17 +93,17 @@ start(Partition, Config) ->
AppStart = case application:start(wterl) of AppStart = case application:start(wterl) of
ok -> ok ->
ok; ok;
{error, {already_started, _}} -> {error, {already_started, _Pid}}=_AppStartError ->
ok; ok;
{error, Reason} -> {error, Reason}=AppStartError ->
lager:error("Failed to start wterl: ~p", [Reason]), lager:error("Failed to start wterl: ~p", [Reason]),
{error, Reason} AppStartError
end, end,
case AppStart of case AppStart of
ok -> ok ->
ok = filelib:ensure_dir(filename:join(DataRoot, "x")), ok = filelib:ensure_dir(filename:join(DataRoot, "x")),
SessionMax = SessionMax =
case app_helper:get_env(riak_core, ring_creation_size) of case app_helper:get_prop_or_env(ring_creation_size, Config, riak_core) of
undefined -> 1024; undefined -> 1024;
RingSize when RingSize < 512 -> 1024; RingSize when RingSize < 512 -> 1024;
RingSize -> RingSize * 2 RingSize -> RingSize * 2
@ -149,15 +149,12 @@ start(Partition, Config) ->
%% @doc Stop the wterl backend %% @doc Stop the wterl backend
-spec stop(state()) -> ok. -spec stop(state()) -> ok.
stop(#state{conn=ConnRef, session=SRef, cursors=undefined}) -> stop(#state{conn=ConnRef, session=SRef, cursor=undefined}) ->
ok = wterl:session_close(SRef), ok = wterl:session_close(SRef),
wterl_conn:close(ConnRef); wterl_conn:close(ConnRef);
stop(#state{cursors=Cursors}=State) -> stop(#state{cursor=Cursor}=State) ->
ets:foldl(fun({_Table, Cursor}, _) -> ok = wterl:cursor_close(Cursor),
ok = wterl:cursor_close(Cursor) stop(State#state{cursor=undefined}).
end, true, Cursors),
ets:delete(Cursors),
stop(State#state{cursors=undefined}).
%% @doc Retrieve an object from the wterl backend %% @doc Retrieve an object from the wterl backend
-spec get(riak_object:bucket(), riak_object:key(), state()) -> -spec get(riak_object:bucket(), riak_object:key(), state()) ->
@ -166,13 +163,14 @@ stop(#state{cursors=Cursors}=State) ->
{error, term(), state()}. {error, term(), state()}.
get(Bucket, Key, #state{session=SRef, table=Table}=State) -> get(Bucket, Key, #state{session=SRef, table=Table}=State) ->
WTKey = to_object_key(Bucket, Key), WTKey = to_object_key(Bucket, Key),
case wterl:cursor_search(shared_cursor(SRef, Table, State), WTKey) of Cursor = shared_cursor(SRef, Table, State),
case wterl:cursor_search(Cursor, WTKey) of
{ok, Value} -> {ok, Value} ->
{ok, Value, State}; {ok, Value, State#state{cursor=Cursor}};
not_found -> not_found ->
{error, not_found, State}; {error, not_found, State#state{cursor=Cursor}};
{error, Reason} -> {error, Reason} ->
{error, Reason, State} {error, Reason, State#state{cursor=Cursor}}
end. end.
%% @doc Insert an object into the wterl backend. %% @doc Insert an object into the wterl backend.
@ -185,11 +183,12 @@ get(Bucket, Key, #state{session=SRef, table=Table}=State) ->
{error, term(), state()}. {error, term(), state()}.
put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{table=Table, session=SRef}=State) -> put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{table=Table, session=SRef}=State) ->
WTKey = to_object_key(Bucket, PrimaryKey), WTKey = to_object_key(Bucket, PrimaryKey),
case wterl:cursor_insert(shared_cursor(SRef, Table, State), WTKey, Val) of Cursor = shared_cursor(SRef, Table, State),
case wterl:cursor_insert(Cursor, WTKey, Val) of
ok -> ok ->
{ok, State}; {ok, State#state{cursor=Cursor}};
{error, Reason} -> {error, Reason} ->
{error, Reason, State} {error, Reason, State#state{cursor=Cursor}}
end. end.
%% @doc Delete an object from the wterl backend %% @doc Delete an object from the wterl backend
@ -201,11 +200,12 @@ put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{table=Table, session=SRef}=Stat
{error, term(), state()}. {error, term(), state()}.
delete(Bucket, Key, _IndexSpecs, #state{table=Table, session=SRef}=State) -> delete(Bucket, Key, _IndexSpecs, #state{table=Table, session=SRef}=State) ->
WTKey = to_object_key(Bucket, Key), WTKey = to_object_key(Bucket, Key),
case wterl:cursor_remove(shared_cursor(SRef, Table, State), WTKey) of Cursor = shared_cursor(SRef, Table, State),
case wterl:cursor_remove(Cursor, WTKey) of
ok -> ok ->
{ok, State}; {ok, State#state{cursor=Cursor}};
{error, Reason} -> {error, Reason} ->
{error, Reason, State} {error, Reason, State#state{cursor=Cursor}}
end. end.
%% @doc Fold over all the buckets %% @doc Fold over all the buckets
@ -350,18 +350,17 @@ callback(_Ref, _Msg, State) ->
%% Internal functions %% Internal functions
%% =================================================================== %% ===================================================================
shared_cursor(SRef, Table, #state{cursors=undefined}=State) -> shared_cursor(SRef, Table, #state{cursor=undefined}=State) ->
Cursors = ets:new(?MODULE, []), case wterl:cursor_open(SRef, Table) of
shared_cursor(SRef, Table, State#state{cursors=Cursors}); {ok, Cursor} ->
shared_cursor(SRef, Table, #state{cursors=Cursors}=State) -> Cursor;
case ets:lookup(Cursors, Table) of {error, Reason} ->
[{Table, Cursor}] -> lager:error("Failed to open WT cursor on ~p because: ~p", [Table, Reason]),
{Cursor, State}; undefined
_ -> end;
Cursor = wterl:cursor_open(SRef, Table),
ets:insert(Cursors, {Table, Cursor}), shared_cursor(_SRef, _Table, #state{cursor=Cursor}) ->
{Cursor, State} Cursor.
end.
%% @private %% @private
@ -482,8 +481,6 @@ fetch_status(Cursor, {ok, Stat}, Acc) ->
size_cache(Config) -> size_cache(Config) ->
Size = Size =
case app_helper:get_prop_or_env(cache_size, Config, wterl) of case app_helper:get_prop_or_env(cache_size, Config, wterl) of
{ok, Value} ->
Value;
undefined -> undefined ->
RunningApps = application:which_applications(), RunningApps = application:which_applications(),
FinalGuess = FinalGuess =
@ -510,9 +507,11 @@ size_cache(Config) ->
false -> false ->
"1GB" "1GB"
end, end,
application:set_env(wt, cache_size, FinalGuess), application:set_env(wterl, cache_size, FinalGuess),
lager:warning("Using best-guess cache size of ~p for WiredTiger storage backend.", [FinalGuess]), lager:warning("Using best-guess cache size of ~p for WiredTiger storage backend.", [FinalGuess]),
FinalGuess FinalGuess;
Value ->
Value
end, end,
Size. Size.