Revert shared cursor changes for now.

This commit is contained in:
Gregory Burd 2013-03-17 08:15:33 -04:00
parent 1129241fe0
commit ce79d22659

View file

@ -54,7 +54,6 @@
-record(state, {table :: string(), -record(state, {table :: string(),
session :: wterl:session(), session :: wterl:session(),
cursor :: wterl:cursor(),
partition :: integer()}). partition :: integer()}).
-type state() :: #state{}. -type state() :: #state{}.
@ -135,24 +134,21 @@ start(Partition, Config) ->
%% @doc Stop the wterl backend %% @doc Stop the wterl backend
-spec stop(state()) -> ok. -spec stop(state()) -> ok.
stop(#state{session=undefined, cursor=undefined}) -> stop(#state{session=undefined}) ->
ok; ok;
stop(#state{session=Session, cursor=undefined}) -> stop(#state{session=Session}) ->
ok = wterl:session_close(Session); ok = wterl:session_close(Session).
stop(#state{cursor=Cursor}=State) ->
ok = wterl:cursor_close(Cursor),
stop(State#state{cursor=undefined}).
%% @doc Retrieve an object from the wterl backend %% @doc Retrieve an object from the wterl backend
-spec get(riak_object:bucket(), riak_object:key(), state()) -> -spec get(riak_object:bucket(), riak_object:key(), state()) ->
{ok, any(), state()} | {ok, any(), state()} |
{ok, not_found, state()} | {ok, not_found, state()} |
{error, term(), state()}. {error, term(), state()}.
get(Bucket, Key, #state{session=undefined, cursor=undefined}=State) -> get(Bucket, Key, #state{session=undefined}=State) ->
get(Bucket, Key, establish_connection(State)); get(Bucket, Key, establish_connection(State));
get(Bucket, Key, #state{cursor=Cursor}=State) -> get(Bucket, Key, #state{session=Session, table=Table}=State) ->
WTKey = to_object_key(Bucket, Key), WTKey = to_object_key(Bucket, Key),
case wterl:cursor_search(Cursor, WTKey) of case wterl:session_get(Session, Table, WTKey) of
{ok, Value} -> {ok, Value} ->
{ok, Value, State}; {ok, Value, State};
not_found -> not_found ->
@ -169,11 +165,11 @@ get(Bucket, Key, #state{cursor=Cursor}=State) ->
-spec put(riak_object:bucket(), riak_object:key(), [index_spec()], binary(), state()) -> -spec put(riak_object:bucket(), riak_object:key(), [index_spec()], binary(), state()) ->
{ok, state()} | {ok, state()} |
{error, term(), state()}. {error, term(), state()}.
put(Bucket, PrimaryKey, IndexSpecs, Val, #state{session=undefined, cursor=undefined}=State) -> put(Bucket, PrimaryKey, IndexSpecs, Val, #state{session=undefined}=State) ->
put(Bucket, PrimaryKey, IndexSpecs, Val, establish_connection(State)); put(Bucket, PrimaryKey, IndexSpecs, Val, establish_connection(State));
put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{cursor=Cursor}=State) -> put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{session=Session, table=Table}=State) ->
WTKey = to_object_key(Bucket, PrimaryKey), WTKey = to_object_key(Bucket, PrimaryKey),
case wterl:cursor_insert(Cursor, WTKey, Val) of case wterl:session_put(Session, Table, WTKey, Val) of
ok -> ok ->
{ok, State}; {ok, State};
{error, Reason} -> {error, Reason} ->
@ -187,11 +183,11 @@ put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{cursor=Cursor}=State) ->
-spec delete(riak_object:bucket(), riak_object:key(), [index_spec()], state()) -> -spec delete(riak_object:bucket(), riak_object:key(), [index_spec()], state()) ->
{ok, state()} | {ok, state()} |
{error, term(), state()}. {error, term(), state()}.
delete(Bucket, Key, IndexSpecs, #state{session=undefined, cursor=undefined}=State) -> delete(Bucket, Key, IndexSpecs, #state{session=undefined}=State) ->
delete(Bucket, Key, IndexSpecs, establish_connection(State)); delete(Bucket, Key, IndexSpecs, establish_connection(State));
delete(Bucket, Key, _IndexSpecs, #state{cursor=Cursor}=State) -> delete(Bucket, Key, _IndexSpecs, #state{session=Session, table=Table}=State) ->
WTKey = to_object_key(Bucket, Key), WTKey = to_object_key(Bucket, Key),
case wterl:cursor_remove(Cursor, WTKey) of case wterl:session_delete(Session, Table, WTKey) of
ok -> ok ->
{ok, State}; {ok, State};
{error, Reason} -> {error, Reason} ->
@ -316,7 +312,7 @@ fold_objects(FoldObjectsFun, Acc, Opts, #state{table=Table}) ->
%% @doc Delete all objects from this wterl backend %% @doc Delete all objects from this wterl backend
-spec drop(state()) -> {ok, state()} | {error, term(), state()}. -spec drop(state()) -> {ok, state()} | {error, term(), state()}.
drop(#state{session=undefined, cursor=undefined}=State) -> drop(#state{session=undefined}=State) ->
drop(establish_connection(State)); drop(establish_connection(State));
drop(#state{table=Table, session=Session}=State) -> drop(#state{table=Table, session=Session}=State) ->
case wterl:session_truncate(Session, Table) of case wterl:session_truncate(Session, Table) of
@ -329,7 +325,7 @@ drop(#state{table=Table, session=Session}=State) ->
%% @doc Returns true if this wterl backend contains any %% @doc Returns true if this wterl backend contains any
%% non-tombstone values; otherwise returns false. %% non-tombstone values; otherwise returns false.
-spec is_empty(state()) -> boolean(). -spec is_empty(state()) -> boolean().
is_empty(#state{session=undefined, cursor=undefined}=State) -> is_empty(#state{session=undefined}=State) ->
is_empty(establish_connection(State)); is_empty(establish_connection(State));
is_empty(#state{table=Table, session=Session}) -> is_empty(#state{table=Table, session=Session}) ->
{ok, Cursor} = wterl:cursor_open(Session, Table), {ok, Cursor} = wterl:cursor_open(Session, Table),
@ -341,7 +337,7 @@ is_empty(#state{table=Table, session=Session}) ->
%% @doc Get the status information for this wterl backend %% @doc Get the status information for this wterl backend
-spec status(state()) -> [{atom(), term()}]. -spec status(state()) -> [{atom(), term()}].
status(#state{session=undefined, cursor=undefined}=State) -> status(#state{session=undefined}=State) ->
status(establish_connection(State)); status(establish_connection(State));
status(#state{table=Table, session=Session}) -> status(#state{table=Table, session=Session}) ->
{ok, Cursor} = wterl:cursor_open(Session, "statistics:"++Table), {ok, Cursor} = wterl:cursor_open(Session, "statistics:"++Table),
@ -362,7 +358,7 @@ callback(_Ref, _Msg, State) ->
%% Internal functions %% Internal functions
%% =================================================================== %% ===================================================================
establish_connection(#state{table=Table, session=undefined, cursor=undefined}=State) -> establish_connection(#state{table=Table, session=undefined}=State) ->
{ok, Connection} = wterl_conn:get(), {ok, Connection} = wterl_conn:get(),
case wterl:session_open(Connection, wterl:config_to_bin([{isolation, "snapshot"}])) of case wterl:session_open(Connection, wterl:config_to_bin([{isolation, "snapshot"}])) of
{ok, Session} -> {ok, Session} ->
@ -374,13 +370,7 @@ establish_connection(#state{table=Table, session=undefined, cursor=undefined}=St
{lsm_bloom_config, [{leaf_page_max, "16MB"}]} ], {lsm_bloom_config, [{leaf_page_max, "16MB"}]} ],
case wterl:session_create(Session, Table, wterl:config_to_bin(SessionOpts)) of case wterl:session_create(Session, Table, wterl:config_to_bin(SessionOpts)) of
ok -> ok ->
case wterl:cursor_open(Session, Table) of State#state{session=Session};
{ok, Cursor} ->
State#state{session=Session, cursor=Cursor};
{error, Reason} ->
lager:error("Failed to open WiredTiger cursor on ~p because: ~p", [Table, Reason]),
State
end;
{error, Reason} -> {error, Reason} ->
lager:error("Failed to start wterl backend: ~p\n", [Reason]), lager:error("Failed to start wterl backend: ~p\n", [Reason]),
State State