diff --git a/src/riak_kv_wterl_backend.erl b/src/riak_kv_wterl_backend.erl index 27e1e3e..30cdf85 100644 --- a/src/riak_kv_wterl_backend.erl +++ b/src/riak_kv_wterl_backend.erl @@ -22,7 +22,6 @@ -module(riak_kv_wterl_backend). -behavior(temp_riak_kv_backend). --author('Steve Vinoski '). %% KV Backend API -export([api_version/0, @@ -51,13 +50,10 @@ %%-define(CAPABILITIES, [async_fold, indexes]). -define(CAPABILITIES, [async_fold]). --record(pass, {session :: wterl:session(), - cursor :: wterl:cursor()}). --type pass() :: #pass{}. - -record(state, {table :: string(), connection :: wterl:connection(), - passes :: [pass()]}). + is_empty_cursor :: wterl:cursor(), + status_cursor :: wterl:cursor()}). -type state() :: #state{}. -type config() :: [{atom(), term()}]. @@ -91,9 +87,9 @@ start(Partition, Config) -> ok; {error, {already_started, _}} -> ok; - {error, Reason1} -> - lager:error("Failed to start wterl: ~p", [Reason1]), - {error, Reason1} + {error, Reason1} -> + lager:error("Failed to start wterl: ~p", [Reason1]), + {error, Reason1} end, case AppStart of ok -> @@ -101,20 +97,26 @@ start(Partition, Config) -> %% if the cluster size > the n value Table = "lsm:wt" ++ integer_to_list(Partition), {ok, Connection} = establish_connection(Config), - Passes = establish_passes(erlang:system_info(schedulers), Connection, Table), - {ok, #state{table=Table, connection=Connection, passes=Passes}}; - {error, Reason2} -> - {error, Reason2} + case wterl:cursor_open(Connection, Table) of + {ok, IsEmptyCursor} -> + case wterl:cursor_open(Connection, Table) of + {ok, StatusCursor} -> + {ok, #state{table=Table, connection=Connection, + is_empty_cursor=IsEmptyCursor, + status_cursor=StatusCursor}}; + {error, Reason2} -> + {error, Reason2} + end; + {error, Reason3} -> + {error, Reason3} + end; + {error, Reason4} -> + {error, Reason4} end. %% @doc Stop the wterl backend -spec stop(state()) -> ok. -stop(#state{passes=Passes}) -> - lists:foreach(fun(Elem) -> - {Session, Cursor} = Elem, - ok = wterl:cursor_close(Cursor), - ok = wterl:session_close(Session) - end, Passes), +stop(_State) -> ok. %% @doc Retrieve an object from the wterl backend @@ -122,10 +124,9 @@ stop(#state{passes=Passes}) -> {ok, any(), state()} | {ok, not_found, state()} | {error, term(), state()}. -get(Bucket, Key, #state{passes=Passes}=State) -> +get(Bucket, Key, #state{connection=Connection}=State) -> WTKey = to_object_key(Bucket, Key), - {_Session, Cursor} = lists:nth(erlang:system_info(scheduler_id), Passes), - case wterl:cursor_search(Cursor, WTKey) of + case wterl:get(Connection, WTKey) of {ok, Value} -> {ok, Value, State}; not_found -> @@ -142,10 +143,8 @@ get(Bucket, Key, #state{passes=Passes}=State) -> -spec put(riak_object:bucket(), riak_object:key(), [index_spec()], binary(), state()) -> {ok, state()} | {error, term(), state()}. -put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{passes=Passes}=State) -> - WTKey = to_object_key(Bucket, PrimaryKey), - {_Session, Cursor} = lists:nth(erlang:system_info(scheduler_id), Passes), - case wterl:cursor_insert(Cursor, WTKey, Val) of +put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{connection=Connection}=State) -> + case wterl:put(Connection, to_object_key(Bucket, PrimaryKey), Val) of ok -> {ok, State}; {error, Reason} -> @@ -159,10 +158,8 @@ put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{passes=Passes}=State) -> -spec delete(riak_object:bucket(), riak_object:key(), [index_spec()], state()) -> {ok, state()} | {error, term(), state()}. -delete(Bucket, Key, _IndexSpecs, #state{passes=Passes}=State) -> - WTKey = to_object_key(Bucket, Key), - {_Session, Cursor} = lists:nth(erlang:system_info(scheduler_id), Passes), - case wterl:cursor_remove(Cursor, WTKey) of +delete(Bucket, Key, _IndexSpecs, #state{connection=Connection}=State) -> + case wterl:delete(Connection, to_object_key(Bucket, Key)) of ok -> {ok, State}; {error, Reason} -> @@ -174,13 +171,11 @@ delete(Bucket, Key, _IndexSpecs, #state{passes=Passes}=State) -> any(), [], state()) -> {ok, any()} | {async, fun()}. -fold_buckets(FoldBucketsFun, Acc, Opts, #state{table=Table}) -> - {ok, Connection} = wterl_conn:get(), +fold_buckets(FoldBucketsFun, Acc, Opts, #state{connection=Connection, table=Table}) -> FoldFun = fold_buckets_fun(FoldBucketsFun), BucketFolder = fun() -> - {ok, Session} = wterl:session_open(Connection), - case wterl:cursor_open(Session, Table) of + case wterl:cursor_open(Connection, Table) of {error, "No such file or directory"} -> Acc; {ok, Cursor} -> @@ -192,8 +187,7 @@ fold_buckets(FoldBucketsFun, Acc, Opts, #state{table=Table}) -> {break, AccFinal} -> AccFinal after - ok = wterl:cursor_close(Cursor), - ok = wterl:session_close(Session) + ok = wterl:cursor_close(Cursor) end end end, @@ -209,7 +203,7 @@ fold_buckets(FoldBucketsFun, Acc, Opts, #state{table=Table}) -> any(), [{atom(), term()}], state()) -> {ok, term()} | {async, fun()}. -fold_keys(FoldKeysFun, Acc, Opts, #state{table=Table}) -> +fold_keys(FoldKeysFun, Acc, Opts, #state{connection=Connection, table=Table}) -> %% Figure out how we should limit the fold: by bucket, by %% secondary index, or neither (fold across everything.) Bucket = lists:keyfind(bucket, 1, Opts), @@ -222,14 +216,11 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{table=Table}) -> true -> undefined end, - {ok, Connection} = wterl_conn:get(), - %% Set up the fold... FoldFun = fold_keys_fun(FoldKeysFun, Limiter), KeyFolder = fun() -> - {ok, Session} = wterl:session_open(Connection), - case wterl:cursor_open(Session, Table) of + case wterl:cursor_open(Connection, Table) of {error, "No such file or directory"} -> Acc; {ok, Cursor} -> @@ -239,8 +230,7 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{table=Table}) -> {break, AccFinal} -> AccFinal after - ok = wterl:cursor_close(Cursor), - ok = wterl:session_close(Session) + ok = wterl:cursor_close(Cursor) end end end, @@ -256,14 +246,12 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{table=Table}) -> any(), [{atom(), term()}], state()) -> {ok, any()} | {async, fun()}. -fold_objects(FoldObjectsFun, Acc, Opts, #state{table=Table}) -> - {ok, Connection} = wterl_conn:get(), +fold_objects(FoldObjectsFun, Acc, Opts, #state{connection=Connection, table=Table}) -> Bucket = proplists:get_value(bucket, Opts), FoldFun = fold_objects_fun(FoldObjectsFun, Bucket), ObjectFolder = fun() -> - {ok, Session} = wterl:session_open(Connection), - case wterl:cursor_open(Session, Table) of + case wterl:cursor_open(Connection, Table) of {error, "No such file or directory"} -> Acc; {ok, Cursor} -> @@ -273,8 +261,7 @@ fold_objects(FoldObjectsFun, Acc, Opts, #state{table=Table}) -> {break, AccFinal} -> AccFinal after - ok = wterl:cursor_close(Cursor), - ok = wterl:session_close(Session) + ok = wterl:cursor_close(Cursor) end end end, @@ -287,9 +274,8 @@ fold_objects(FoldObjectsFun, Acc, Opts, #state{table=Table}) -> %% @doc Delete all objects from this wterl backend -spec drop(state()) -> {ok, state()} | {error, term(), state()}. -drop(#state{passes=Passes, table=Table}=State) -> - {Session, _Cursor} = lists:nth(erlang:system_info(scheduler_id), Passes), - case wterl:session_truncate(Session, Table) of +drop(#state{connection=Connection, table=Table}=State) -> + case wterl:truncate(Connection, Table) of ok -> {ok, State}; Error -> @@ -299,8 +285,7 @@ drop(#state{passes=Passes, table=Table}=State) -> %% @doc Returns true if this wterl backend contains any %% non-tombstone values; otherwise returns false. -spec is_empty(state()) -> boolean(). -is_empty(#state{passes=Passes}) -> - {_Session, Cursor} = lists:nth(erlang:system_info(scheduler_id), Passes), +is_empty(#state{is_empty_cursor=Cursor}) -> wterl:cursor_reset(Cursor), try not_found =:= wterl:cursor_next(Cursor) @@ -310,8 +295,8 @@ is_empty(#state{passes=Passes}) -> %% @doc Get the status information for this wterl backend -spec status(state()) -> [{atom(), term()}]. -status(#state{passes=Passes}) -> - {_Session, Cursor} = lists:nth(erlang:system_info(scheduler_id), Passes), +status(#state{status_cursor=Cursor}) -> + wterl:cursor_reset(Cursor), try Stats = fetch_status(Cursor), [{stats, Stats}] @@ -346,44 +331,27 @@ establish_connection(Config) -> {error, data_root_unset}; DataRoot -> ok = filelib:ensure_dir(filename:join(DataRoot, "x")), + + %% WT Connection Options: RequestedCacheSize = app_helper:get_prop_or_env(cache_size, Config, wterl), - Opts = + ConnectionOpts = orddict:from_list( - [ wterl:config_value(create, Config, true), - wterl:config_value(sync, Config, false), - wterl:config_value(logging, Config, true), - wterl:config_value(transactional, Config, true), + [ wterl:config_value(create, Config, true), + wterl:config_value(sync, Config, false), + wterl:config_value(logging, Config, true), + wterl:config_value(transactional, Config, true), wterl:config_value(session_max, Config, max_sessions(Config)), - wterl:config_value(cache_size, Config, size_cache(RequestedCacheSize)), - wterl:config_value(statistics_log, Config, [{wait, 30}]), % sec + wterl:config_value(cache_size, Config, size_cache(RequestedCacheSize)), + wterl:config_value(statistics_log, Config, [{wait, 30}]), % sec %% NOTE: LSM auto-checkpoints, so we don't have too. %% wterl:config_value(checkpoint, Config, [{wait, 10}]), % sec - wterl:config_value(verbose, Config, [ - %"ckpt" "block", "shared_cache", "evictserver", "fileops", - %"hazard", "mutex", "read", "readserver", "reconcile", - %"salvage", "verify", "write", "evict", "lsm" - ]) ] ++ proplists:get_value(wterl, Config, [])), % sec - case wterl_conn:open(DataRoot, Opts) of - {ok, Connection} -> - {ok, Connection}; - {error, Reason2} -> - lager:error("Failed to establish a WiredTiger connection, wterl backend unable to start: ~p\n", [Reason2]), - {error, Reason2} - end - end. + wterl:config_value(verbose, Config, [ + %"ckpt" "block", "shared_cache", "evictserver", "fileops", + %"hazard", "mutex", "read", "readserver", "reconcile", + %"salvage", "verify", "write", "evict", "lsm" + ]) ] ++ proplists:get_value(wterl, Config, [])), % sec -establish_passes(Count, Connection, Table) - when is_number(Count), Count > 0 -> - lists:map(fun(_Elem) -> - {ok, Session} = establish_session(Connection, Table), - {ok, Cursor} = wterl:cursor_open(Session, Table), - {Session, Cursor} - end, lists:seq(1, Count)). - -%% @private -establish_session(Connection, Table) -> - case wterl:session_open(Connection, wterl:config_to_bin([{isolation, "snapshot"}])) of - {ok, Session} -> + %% WT Session Options: SessionOpts = [{block_compressor, "snappy"}, {internal_page_max, "128K"}, @@ -393,17 +361,15 @@ establish_session(Connection, Table) -> {lsm_bloom_oldest, true} , {lsm_bloom_bit_count, 128}, {lsm_bloom_hash_count, 64}, - {lsm_bloom_config, [{leaf_page_max, "8MB"}]} ], - case wterl:session_create(Session, Table, wterl:config_to_bin(SessionOpts)) of - ok -> - {ok, Session}; - {error, Reason} -> - lager:error("Failed to start wterl backend: ~p\n", [Reason]), - {error, Reason} - end; - {error, Reason} -> - lager:error("Failed to open a WiredTiger session: ~p\n", [Reason]), - {error, Reason} + {lsm_bloom_config, [{leaf_page_max, "8MB"}]}], + + case wterl_conn:open(DataRoot, SessionOpts, ConnectionOpts) of + {ok, Connection} -> + {ok, Connection}; + {error, Reason2} -> + lager:error("Failed to establish a WiredTiger connection, wterl backend unable to start: ~p\n", [Reason2]), + {error, Reason2} + end end. %% @private