Share cursors managed stored in the module's ets table.

This commit is contained in:
Gregory Burd 2013-03-21 14:34:54 -04:00
parent 9ab363adfe
commit 6225f9fde1

View file

@ -52,6 +52,7 @@
-record(state, {table :: string(), -record(state, {table :: string(),
connection :: wterl:connection(), connection :: wterl:connection(),
cursors :: ets:tid(),
session :: wterl:session()}). session :: wterl:session()}).
-type state() :: #state{}. -type state() :: #state{}.
@ -100,10 +101,16 @@ start(Partition, Config) ->
%% @doc Stop the wterl backend %% @doc Stop the wterl backend
-spec stop(state()) -> ok. -spec stop(state()) -> ok.
stop(#state{session=undefined}) -> stop(#state{session=undefined, cursors=undefined}) ->
ok; ok;
stop(#state{session=Session}) -> stop(#state{session=Session, cursors=undefined}) ->
ok = wterl:session_close(Session). ok = wterl:session_close(Session);
stop(#state{session=Session, cursors=Cursors}=State) ->
ets:foldl(fun({_Table, Cursor}, _) ->
ok = wterl:cursor_close(Cursor)
end, true, Cursors),
ets:delete(Cursors),
stop(State#state{session=Session, cursors=undefined}).
%% @doc Retrieve an object from the wterl backend %% @doc Retrieve an object from the wterl backend
-spec get(riak_object:bucket(), riak_object:key(), state()) -> -spec get(riak_object:bucket(), riak_object:key(), state()) ->
@ -112,9 +119,10 @@ stop(#state{session=Session}) ->
{error, term(), state()}. {error, term(), state()}.
get(Bucket, Key, #state{session=undefined}=State) -> get(Bucket, Key, #state{session=undefined}=State) ->
get(Bucket, Key, establish_session(State)); get(Bucket, Key, establish_session(State));
get(Bucket, Key, #state{session=Session, table=Table}=State) -> get(Bucket, Key, #state{session=Session, table=Table}=State0) ->
WTKey = to_object_key(Bucket, Key), WTKey = to_object_key(Bucket, Key),
case wterl:session_get(Session, Table, WTKey) of {Cursor, State} = cursor(Session, Table, get, State0),
case wterl:cursor_search(Cursor, WTKey) of
{ok, Value} -> {ok, Value} ->
{ok, Value, State}; {ok, Value, State};
not_found -> not_found ->
@ -133,9 +141,10 @@ get(Bucket, Key, #state{session=Session, table=Table}=State) ->
{error, term(), state()}. {error, term(), state()}.
put(Bucket, PrimaryKey, IndexSpecs, Val, #state{session=undefined}=State) -> put(Bucket, PrimaryKey, IndexSpecs, Val, #state{session=undefined}=State) ->
put(Bucket, PrimaryKey, IndexSpecs, Val, establish_session(State)); put(Bucket, PrimaryKey, IndexSpecs, Val, establish_session(State));
put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{session=Session, table=Table}=State) -> put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{session=Session, table=Table}=State0) ->
WTKey = to_object_key(Bucket, PrimaryKey), WTKey = to_object_key(Bucket, PrimaryKey),
case wterl:session_put(Session, Table, WTKey, Val) of {Cursor, State} = cursor(Session, Table, put, State0),
case wterl:cursor_insert(Cursor, WTKey, Val) of
ok -> ok ->
{ok, State}; {ok, State};
{error, Reason} -> {error, Reason} ->
@ -151,9 +160,10 @@ put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{session=Session, table=Table}=S
{error, term(), state()}. {error, term(), state()}.
delete(Bucket, Key, IndexSpecs, #state{session=undefined}=State) -> delete(Bucket, Key, IndexSpecs, #state{session=undefined}=State) ->
delete(Bucket, Key, IndexSpecs, establish_session(State)); delete(Bucket, Key, IndexSpecs, establish_session(State));
delete(Bucket, Key, _IndexSpecs, #state{session=Session, table=Table}=State) -> delete(Bucket, Key, _IndexSpecs, #state{session=Session, table=Table}=State0) ->
WTKey = to_object_key(Bucket, Key), WTKey = to_object_key(Bucket, Key),
case wterl:session_delete(Session, Table, WTKey) of {Cursor, State} = cursor(Session, Table, delete, State0),
case wterl:cursor_remove(Cursor, WTKey) of
ok -> ok ->
{ok, State}; {ok, State};
{error, Reason} -> {error, Reason} ->
@ -324,6 +334,19 @@ callback(_Ref, _Msg, State) ->
%% Internal functions %% Internal functions
%% =================================================================== %% ===================================================================
cursor(Session, Table, Type, #state{cursors=undefined}=State) ->
Cursors = ets:new(?MODULE, [{read_concurrency, true}]),
cursor(Session, Table, Type, State#state{cursors=Cursors});
cursor(Session, Table, Type, #state{cursors=Cursors}=State) ->
case ets:lookup(Cursors, {Type, Table}) of
[{{_Type, Table}, Cursor}] ->
{Cursor, State};
_ ->
{ok, Cursor} = wterl:cursor_open(Session, Table),
ets:insert(Cursors, {Table, Cursor}),
{Cursor, State}
end.
%% @private %% @private
establish_connection(Table, Config) -> establish_connection(Table, Config) ->
%% Get the data root directory %% Get the data root directory