diff --git a/src/riak_kv_wterl_backend.erl b/src/riak_kv_wterl_backend.erl index 753ef7d..35465df 100644 --- a/src/riak_kv_wterl_backend.erl +++ b/src/riak_kv_wterl_backend.erl @@ -52,6 +52,7 @@ -record(state, {table :: string(), connection :: wterl:connection(), + cursors :: ets:tid(), session :: wterl:session()}). -type state() :: #state{}. @@ -100,10 +101,16 @@ start(Partition, Config) -> %% @doc Stop the wterl backend -spec stop(state()) -> ok. -stop(#state{session=undefined}) -> +stop(#state{session=undefined, cursors=undefined}) -> ok; -stop(#state{session=Session}) -> - ok = wterl:session_close(Session). +stop(#state{session=Session, cursors=undefined}) -> + ok = wterl:session_close(Session); +stop(#state{session=Session, cursors=Cursors}=State) -> + ets:foldl(fun({_Table, Cursor}, _) -> + ok = wterl:cursor_close(Cursor) + end, true, Cursors), + ets:delete(Cursors), + stop(State#state{session=Session, cursors=undefined}). %% @doc Retrieve an object from the wterl backend -spec get(riak_object:bucket(), riak_object:key(), state()) -> @@ -112,9 +119,10 @@ stop(#state{session=Session}) -> {error, term(), state()}. get(Bucket, Key, #state{session=undefined}=State) -> get(Bucket, Key, establish_session(State)); -get(Bucket, Key, #state{session=Session, table=Table}=State) -> +get(Bucket, Key, #state{session=Session, table=Table}=State0) -> WTKey = to_object_key(Bucket, Key), - case wterl:session_get(Session, Table, WTKey) of + {Cursor, State} = cursor(Session, Table, get, State0), + case wterl:cursor_search(Cursor, WTKey) of {ok, Value} -> {ok, Value, State}; not_found -> @@ -133,9 +141,10 @@ get(Bucket, Key, #state{session=Session, table=Table}=State) -> {error, term(), state()}. put(Bucket, PrimaryKey, IndexSpecs, Val, #state{session=undefined}=State) -> put(Bucket, PrimaryKey, IndexSpecs, Val, establish_session(State)); -put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{session=Session, table=Table}=State) -> +put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{session=Session, table=Table}=State0) -> WTKey = to_object_key(Bucket, PrimaryKey), - case wterl:session_put(Session, Table, WTKey, Val) of + {Cursor, State} = cursor(Session, Table, put, State0), + case wterl:cursor_insert(Cursor, WTKey, Val) of ok -> {ok, State}; {error, Reason} -> @@ -151,9 +160,10 @@ put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{session=Session, table=Table}=S {error, term(), state()}. delete(Bucket, Key, IndexSpecs, #state{session=undefined}=State) -> delete(Bucket, Key, IndexSpecs, establish_session(State)); -delete(Bucket, Key, _IndexSpecs, #state{session=Session, table=Table}=State) -> +delete(Bucket, Key, _IndexSpecs, #state{session=Session, table=Table}=State0) -> WTKey = to_object_key(Bucket, Key), - case wterl:session_delete(Session, Table, WTKey) of + {Cursor, State} = cursor(Session, Table, delete, State0), + case wterl:cursor_remove(Cursor, WTKey) of ok -> {ok, State}; {error, Reason} -> @@ -324,6 +334,19 @@ callback(_Ref, _Msg, State) -> %% Internal functions %% =================================================================== +cursor(Session, Table, Type, #state{cursors=undefined}=State) -> + Cursors = ets:new(?MODULE, [{read_concurrency, true}]), + cursor(Session, Table, Type, State#state{cursors=Cursors}); +cursor(Session, Table, Type, #state{cursors=Cursors}=State) -> + case ets:lookup(Cursors, {Type, Table}) of + [{{_Type, Table}, Cursor}] -> + {Cursor, State}; + _ -> + {ok, Cursor} = wterl:cursor_open(Session, Table), + ets:insert(Cursors, {Table, Cursor}), + {Cursor, State} + end. + %% @private establish_connection(Table, Config) -> %% Get the data root directory