diff --git a/src/riak_kv_wterl_backend.erl b/src/riak_kv_wterl_backend.erl index 6b62bba..d589c97 100644 --- a/src/riak_kv_wterl_backend.erl +++ b/src/riak_kv_wterl_backend.erl @@ -54,7 +54,6 @@ -record(state, {table :: string(), session :: wterl:session(), - cursor :: wterl:cursor(), partition :: integer()}). -type state() :: #state{}. @@ -135,24 +134,21 @@ start(Partition, Config) -> %% @doc Stop the wterl backend -spec stop(state()) -> ok. -stop(#state{session=undefined, cursor=undefined}) -> +stop(#state{session=undefined}) -> ok; -stop(#state{session=Session, cursor=undefined}) -> - ok = wterl:session_close(Session); -stop(#state{cursor=Cursor}=State) -> - ok = wterl:cursor_close(Cursor), - stop(State#state{cursor=undefined}). +stop(#state{session=Session}) -> + ok = wterl:session_close(Session). %% @doc Retrieve an object from the wterl backend -spec get(riak_object:bucket(), riak_object:key(), state()) -> {ok, any(), state()} | {ok, not_found, state()} | {error, term(), state()}. -get(Bucket, Key, #state{session=undefined, cursor=undefined}=State) -> +get(Bucket, Key, #state{session=undefined}=State) -> get(Bucket, Key, establish_connection(State)); -get(Bucket, Key, #state{cursor=Cursor}=State) -> +get(Bucket, Key, #state{session=Session, table=Table}=State) -> WTKey = to_object_key(Bucket, Key), - case wterl:cursor_search(Cursor, WTKey) of + case wterl:session_get(Session, Table, WTKey) of {ok, Value} -> {ok, Value, State}; not_found -> @@ -169,11 +165,11 @@ get(Bucket, Key, #state{cursor=Cursor}=State) -> -spec put(riak_object:bucket(), riak_object:key(), [index_spec()], binary(), state()) -> {ok, state()} | {error, term(), state()}. -put(Bucket, PrimaryKey, IndexSpecs, Val, #state{session=undefined, cursor=undefined}=State) -> +put(Bucket, PrimaryKey, IndexSpecs, Val, #state{session=undefined}=State) -> put(Bucket, PrimaryKey, IndexSpecs, Val, establish_connection(State)); -put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{cursor=Cursor}=State) -> +put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{session=Session, table=Table}=State) -> WTKey = to_object_key(Bucket, PrimaryKey), - case wterl:cursor_insert(Cursor, WTKey, Val) of + case wterl:session_put(Session, Table, WTKey, Val) of ok -> {ok, State}; {error, Reason} -> @@ -187,11 +183,11 @@ put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{cursor=Cursor}=State) -> -spec delete(riak_object:bucket(), riak_object:key(), [index_spec()], state()) -> {ok, state()} | {error, term(), state()}. -delete(Bucket, Key, IndexSpecs, #state{session=undefined, cursor=undefined}=State) -> +delete(Bucket, Key, IndexSpecs, #state{session=undefined}=State) -> delete(Bucket, Key, IndexSpecs, establish_connection(State)); -delete(Bucket, Key, _IndexSpecs, #state{cursor=Cursor}=State) -> +delete(Bucket, Key, _IndexSpecs, #state{session=Session, table=Table}=State) -> WTKey = to_object_key(Bucket, Key), - case wterl:cursor_remove(Cursor, WTKey) of + case wterl:session_delete(Session, Table, WTKey) of ok -> {ok, State}; {error, Reason} -> @@ -316,7 +312,7 @@ fold_objects(FoldObjectsFun, Acc, Opts, #state{table=Table}) -> %% @doc Delete all objects from this wterl backend -spec drop(state()) -> {ok, state()} | {error, term(), state()}. -drop(#state{session=undefined, cursor=undefined}=State) -> +drop(#state{session=undefined}=State) -> drop(establish_connection(State)); drop(#state{table=Table, session=Session}=State) -> case wterl:session_truncate(Session, Table) of @@ -329,7 +325,7 @@ drop(#state{table=Table, session=Session}=State) -> %% @doc Returns true if this wterl backend contains any %% non-tombstone values; otherwise returns false. -spec is_empty(state()) -> boolean(). -is_empty(#state{session=undefined, cursor=undefined}=State) -> +is_empty(#state{session=undefined}=State) -> is_empty(establish_connection(State)); is_empty(#state{table=Table, session=Session}) -> {ok, Cursor} = wterl:cursor_open(Session, Table), @@ -341,7 +337,7 @@ is_empty(#state{table=Table, session=Session}) -> %% @doc Get the status information for this wterl backend -spec status(state()) -> [{atom(), term()}]. -status(#state{session=undefined, cursor=undefined}=State) -> +status(#state{session=undefined}=State) -> status(establish_connection(State)); status(#state{table=Table, session=Session}) -> {ok, Cursor} = wterl:cursor_open(Session, "statistics:"++Table), @@ -362,7 +358,7 @@ callback(_Ref, _Msg, State) -> %% Internal functions %% =================================================================== -establish_connection(#state{table=Table, session=undefined, cursor=undefined}=State) -> +establish_connection(#state{table=Table, session=undefined}=State) -> {ok, Connection} = wterl_conn:get(), case wterl:session_open(Connection, wterl:config_to_bin([{isolation, "snapshot"}])) of {ok, Session} -> @@ -374,13 +370,7 @@ establish_connection(#state{table=Table, session=undefined, cursor=undefined}=St {lsm_bloom_config, [{leaf_page_max, "16MB"}]} ], case wterl:session_create(Session, Table, wterl:config_to_bin(SessionOpts)) of ok -> - case wterl:cursor_open(Session, Table) of - {ok, Cursor} -> - State#state{session=Session, cursor=Cursor}; - {error, Reason} -> - lager:error("Failed to open WiredTiger cursor on ~p because: ~p", [Table, Reason]), - State - end; + State#state{session=Session}; {error, Reason} -> lager:error("Failed to start wterl backend: ~p\n", [Reason]), State