Share per-table cursors for get/put/delete operations. #4
1 changed files with 27 additions and 6 deletions
|
@ -53,6 +53,7 @@
|
|||
-record(state, {conn :: wterl:connection(),
|
||||
table :: string(),
|
||||
session :: wterl:session(),
|
||||
cursors :: ets:tid(),
|
||||
partition :: integer()}).
|
||||
|
||||
-type state() :: #state{}.
|
||||
|
@ -145,18 +146,24 @@ start(Partition, Config) ->
|
|||
|
||||
%% @doc Stop the wterl backend
|
||||
-spec stop(state()) -> ok.
|
||||
stop(#state{conn=ConnRef, session=SRef}) ->
|
||||
stop(#state{conn=ConnRef, session=SRef, cursors=undefined}) ->
|
||||
ok = wterl:session_close(SRef),
|
||||
wterl_conn:close(ConnRef).
|
||||
wterl_conn:close(ConnRef);
|
||||
stop(#state{cursors=Cursors}=State) ->
|
||||
ets:foldl(fun({_Table, Cursor}, _) ->
|
||||
ok = wterl:cursor_close(Cursor)
|
||||
end, true, Cursors),
|
||||
ets:delete(Cursors),
|
||||
stop(State#state{cursors=undefined}).
|
||||
|
||||
%% @doc Retrieve an object from the wterl backend
|
||||
-spec get(riak_object:bucket(), riak_object:key(), state()) ->
|
||||
{ok, any(), state()} |
|
||||
{ok, not_found, state()} |
|
||||
{error, term(), state()}.
|
||||
get(Bucket, Key, #state{table=Table, session=SRef}=State) ->
|
||||
get(Bucket, Key, #state{session=SRef, table=Table}=State) ->
|
||||
WTKey = to_object_key(Bucket, Key),
|
||||
case wterl:session_get(SRef, Table, WTKey) of
|
||||
case wterl:cursor_search(shared_cursor(SRef, Table, State), WTKey) of
|
||||
{ok, Value} ->
|
||||
{ok, Value, State};
|
||||
not_found ->
|
||||
|
@ -175,7 +182,7 @@ get(Bucket, Key, #state{table=Table, session=SRef}=State) ->
|
|||
{error, term(), state()}.
|
||||
put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{table=Table, session=SRef}=State) ->
|
||||
WTKey = to_object_key(Bucket, PrimaryKey),
|
||||
case wterl:session_put(SRef, Table, WTKey, Val) of
|
||||
case wterl:cursor_insert(shared_cursor(SRef, Table, State), WTKey, Val) of
|
||||
ok ->
|
||||
{ok, State};
|
||||
{error, Reason} ->
|
||||
|
@ -191,7 +198,7 @@ put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{table=Table, session=SRef}=Stat
|
|||
{error, term(), state()}.
|
||||
delete(Bucket, Key, _IndexSpecs, #state{table=Table, session=SRef}=State) ->
|
||||
WTKey = to_object_key(Bucket, Key),
|
||||
case wterl:session_delete(SRef, Table, WTKey) of
|
||||
case wterl:cursor_remove(shared_cursor(SRef, Table, State), WTKey) of
|
||||
ok ->
|
||||
{ok, State};
|
||||
{error, Reason} ->
|
||||
|
@ -340,6 +347,20 @@ callback(_Ref, _Msg, State) ->
|
|||
%% Internal functions
|
||||
%% ===================================================================
|
||||
|
||||
shared_cursor(SRef, Table, #state{cursors=undefined}=State) ->
|
||||
Cursors = ets:new(?MODULE, []),
|
||||
shared_cursor(SRef, Table, State#state{cursors=Cursors});
|
||||
shared_cursor(SRef, Table, #state{cursors=Cursors}=State) ->
|
||||
case ets:lookup(Cursors, Table) of
|
||||
[{Table, Cursor}] ->
|
||||
{Cursor, State};
|
||||
_ ->
|
||||
Cursor = wterl:cursor_open(SRef, Table),
|
||||
ets:insert(Cursors, {Table, Cursor}),
|
||||
{Cursor, State}
|
||||
end.
|
||||
|
||||
|
||||
%% @private
|
||||
%% Return a function to fold over the buckets on this backend
|
||||
fold_buckets_fun(FoldBucketsFun) ->
|
||||
|
|
Loading…
Reference in a new issue