diff --git a/src/riak_kv_wterl_backend.erl b/src/riak_kv_wterl_backend.erl index f536489..4f9e0bd 100644 --- a/src/riak_kv_wterl_backend.erl +++ b/src/riak_kv_wterl_backend.erl @@ -53,6 +53,7 @@ -record(state, {conn :: wterl:connection(), table :: string(), session :: wterl:session(), + cursors :: ets:tid(), partition :: integer()}). -type state() :: #state{}. @@ -145,18 +146,24 @@ start(Partition, Config) -> %% @doc Stop the wterl backend -spec stop(state()) -> ok. -stop(#state{conn=ConnRef, session=SRef}) -> +stop(#state{conn=ConnRef, session=SRef, cursors=undefined}) -> ok = wterl:session_close(SRef), - wterl_conn:close(ConnRef). + wterl_conn:close(ConnRef); +stop(#state{cursors=Cursors}=State) -> + ets:foldl(fun({_Table, Cursor}, _) -> + ok = wterl:cursor_close(Cursor) + end, true, Cursors), + ets:delete(Cursors), + stop(State#state{cursors=undefined}). %% @doc Retrieve an object from the wterl backend -spec get(riak_object:bucket(), riak_object:key(), state()) -> {ok, any(), state()} | {ok, not_found, state()} | {error, term(), state()}. -get(Bucket, Key, #state{table=Table, session=SRef}=State) -> +get(Bucket, Key, #state{session=SRef, table=Table}=State) -> WTKey = to_object_key(Bucket, Key), - case wterl:session_get(SRef, Table, WTKey) of + case wterl:cursor_search(shared_cursor(SRef, Table, State), WTKey) of {ok, Value} -> {ok, Value, State}; not_found -> @@ -175,7 +182,7 @@ get(Bucket, Key, #state{table=Table, session=SRef}=State) -> {error, term(), state()}. put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{table=Table, session=SRef}=State) -> WTKey = to_object_key(Bucket, PrimaryKey), - case wterl:session_put(SRef, Table, WTKey, Val) of + case wterl:cursor_insert(shared_cursor(SRef, Table, State), WTKey, Val) of ok -> {ok, State}; {error, Reason} -> @@ -191,7 +198,7 @@ put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{table=Table, session=SRef}=Stat {error, term(), state()}. delete(Bucket, Key, _IndexSpecs, #state{table=Table, session=SRef}=State) -> WTKey = to_object_key(Bucket, Key), - case wterl:session_delete(SRef, Table, WTKey) of + case wterl:cursor_remove(shared_cursor(SRef, Table, State), WTKey) of ok -> {ok, State}; {error, Reason} -> @@ -340,6 +347,20 @@ callback(_Ref, _Msg, State) -> %% Internal functions %% =================================================================== +shared_cursor(SRef, Table, #state{cursors=undefined}=State) -> + Cursors = ets:new(?MODULE, []), + shared_cursor(SRef, Table, State#state{cursors=Cursors}); +shared_cursor(SRef, Table, #state{cursors=Cursors}=State) -> + case ets:lookup(Cursors, Table) of + [{Table, Cursor}] -> + {Cursor, State}; + _ -> + Cursor = wterl:cursor_open(SRef, Table), + ets:insert(Cursors, {Table, Cursor}), + {Cursor, State} + end. + + %% @private %% Return a function to fold over the buckets on this backend fold_buckets_fun(FoldBucketsFun) ->