diff --git a/src/riak_kv_wterl_backend.erl b/src/riak_kv_wterl_backend.erl index 51a11cd..6b62bba 100644 --- a/src/riak_kv_wterl_backend.erl +++ b/src/riak_kv_wterl_backend.erl @@ -52,8 +52,7 @@ %%-define(CAPABILITIES, [async_fold, indexes]). -define(CAPABILITIES, [async_fold]). --record(state, {conn :: wterl:connection(), - table :: string(), +-record(state, {table :: string(), session :: wterl:session(), cursor :: wterl:cursor(), partition :: integer()}). @@ -85,22 +84,24 @@ capabilities(_, _) -> -spec start(integer(), config()) -> {ok, state()} | {error, term()}. start(Partition, Config) -> %% Get the data root directory + Table = "lsm:wt" ++ integer_to_list(Partition), case app_helper:get_prop_or_env(data_root, Config, wterl) of undefined -> lager:error("Failed to create wterl dir: data_root is not set"), {error, data_root_unset}; DataRoot -> - AppStart = case application:start(wterl) of - ok -> - ok; - {error, {already_started, _Pid}}=_AppStartError -> - ok; - {error, Reason}=AppStartError -> - lager:error("Failed to start wterl: ~p", [Reason]), - AppStartError - end, - case AppStart of - ok -> + Started = + case application:start(wterl) of + ok -> + wterl_conn:is_open(); + {error, {already_started, _}} -> + wterl_conn:is_open(); + {error, Reason1} -> + lager:error("Failed to start wterl: ~p", [Reason1]), + {error, Reason1} + end, + case Started of + false -> ok = filelib:ensure_dir(filename:join(DataRoot, "x")), SessionMax = case app_helper:get_prop_or_env(ring_creation_size, Config, riak_core) of @@ -108,50 +109,36 @@ start(Partition, Config) -> RingSize when RingSize < 512 -> 1024; RingSize -> RingSize * 2 end, - ConnectionOpts = [Config, - {create, true}, - {sync, false}, - {logging, true}, - {transactional, true}, - {session_max, SessionMax}, - {cache_size, size_cache(Config)}, - {checkpoint, [{wait, 1}]}, % sec - {statistics_log, [{wait, 30}]} % sec - %% {verbose, - %% ["block", "shared_cache", "ckpt", "evict", - %% "evictserver", "fileops", "hazard", "lsm", - %% "mutex", "read", "readserver", "reconcile", - %% "salvage", "verify", "write"]} - ], - case wterl_conn:open(DataRoot, ConnectionOpts) of - {ok, ConnRef} -> - Table = "lsm:wt" ++ integer_to_list(Partition), - SessionOpenOpts = [{isolation, "snapshot"}], - {ok, SRef} = wterl:session_open(ConnRef, wterl:config_to_bin(SessionOpenOpts)), - SessionOpts = [%TODO {block_compressor, "snappy"}, - {internal_page_max, "128K"}, - {leaf_page_max, "256K"}, - {lsm_chunk_size, "256MB"}, - {lsm_bloom_config, [{leaf_page_max, "16MB"}]} ], - ok = wterl:session_create(SRef, Table, wterl:config_to_bin(SessionOpts)), - {ok, #state{conn=ConnRef, - table=Table, - session=SRef, - partition=Partition}}; - {error, ConnReason}=ConnError -> - lager:error("Failed to start wterl backend: ~p\n", [ConnReason]), - ConnError + case wterl_conn:open(DataRoot, + [Config, + {create, true}, + {sync, false}, + {logging, true}, + {transactional, true}, + {session_max, SessionMax}, + {cache_size, size_cache(Config)}, + {checkpoint, [{wait, 1}]}, % sec + {statistics_log, [{wait, 30}]} % sec + ]) of + {ok, _Connection} -> + {ok, #state{partition=Partition, table=Table}}; + {error, Reason2} -> + lager:error("Failed to establish a WiredTiger connection, wterl backend unable to start: ~p\n", [Reason2]), + {error, Reason2} end; - Error -> - Error + true -> + {ok, #state{partition=Partition, table=Table}}; + {error, Reason3} -> + {error, Reason3} end end. %% @doc Stop the wterl backend -spec stop(state()) -> ok. -stop(#state{conn=ConnRef, session=SRef, cursor=undefined}) -> - ok = wterl:session_close(SRef), - wterl_conn:close(ConnRef); +stop(#state{session=undefined, cursor=undefined}) -> + ok; +stop(#state{session=Session, cursor=undefined}) -> + ok = wterl:session_close(Session); stop(#state{cursor=Cursor}=State) -> ok = wterl:cursor_close(Cursor), stop(State#state{cursor=undefined}). @@ -161,16 +148,17 @@ stop(#state{cursor=Cursor}=State) -> {ok, any(), state()} | {ok, not_found, state()} | {error, term(), state()}. -get(Bucket, Key, #state{session=SRef, table=Table}=State) -> +get(Bucket, Key, #state{session=undefined, cursor=undefined}=State) -> + get(Bucket, Key, establish_connection(State)); +get(Bucket, Key, #state{cursor=Cursor}=State) -> WTKey = to_object_key(Bucket, Key), - Cursor = shared_cursor(SRef, Table, State), case wterl:cursor_search(Cursor, WTKey) of {ok, Value} -> - {ok, Value, State#state{cursor=Cursor}}; + {ok, Value, State}; not_found -> - {error, not_found, State#state{cursor=Cursor}}; + {error, not_found, State}; {error, Reason} -> - {error, Reason, State#state{cursor=Cursor}} + {error, Reason, State} end. %% @doc Insert an object into the wterl backend. @@ -181,14 +169,15 @@ get(Bucket, Key, #state{session=SRef, table=Table}=State) -> -spec put(riak_object:bucket(), riak_object:key(), [index_spec()], binary(), state()) -> {ok, state()} | {error, term(), state()}. -put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{table=Table, session=SRef}=State) -> +put(Bucket, PrimaryKey, IndexSpecs, Val, #state{session=undefined, cursor=undefined}=State) -> + put(Bucket, PrimaryKey, IndexSpecs, Val, establish_connection(State)); +put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{cursor=Cursor}=State) -> WTKey = to_object_key(Bucket, PrimaryKey), - Cursor = shared_cursor(SRef, Table, State), case wterl:cursor_insert(Cursor, WTKey, Val) of ok -> - {ok, State#state{cursor=Cursor}}; + {ok, State}; {error, Reason} -> - {error, Reason, State#state{cursor=Cursor}} + {error, Reason, State} end. %% @doc Delete an object from the wterl backend @@ -198,14 +187,15 @@ put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{table=Table, session=SRef}=Stat -spec delete(riak_object:bucket(), riak_object:key(), [index_spec()], state()) -> {ok, state()} | {error, term(), state()}. -delete(Bucket, Key, _IndexSpecs, #state{table=Table, session=SRef}=State) -> +delete(Bucket, Key, IndexSpecs, #state{session=undefined, cursor=undefined}=State) -> + delete(Bucket, Key, IndexSpecs, establish_connection(State)); +delete(Bucket, Key, _IndexSpecs, #state{cursor=Cursor}=State) -> WTKey = to_object_key(Bucket, Key), - Cursor = shared_cursor(SRef, Table, State), case wterl:cursor_remove(Cursor, WTKey) of ok -> - {ok, State#state{cursor=Cursor}}; + {ok, State}; {error, Reason} -> - {error, Reason, State#state{cursor=Cursor}} + {error, Reason, State} end. %% @doc Fold over all the buckets @@ -213,22 +203,27 @@ delete(Bucket, Key, _IndexSpecs, #state{table=Table, session=SRef}=State) -> any(), [], state()) -> {ok, any()} | {async, fun()}. -fold_buckets(FoldBucketsFun, Acc, Opts, #state{conn=ConnRef, table=Table}) -> +fold_buckets(FoldBucketsFun, Acc, Opts, #state{table=Table}) -> + {ok, Connection} = wterl_conn:get(), FoldFun = fold_buckets_fun(FoldBucketsFun), BucketFolder = fun() -> - {ok, SRef} = wterl:session_open(ConnRef), - {ok, Cursor} = wterl:cursor_open(SRef, Table), - try - {FoldResult, _} = - wterl:fold_keys(Cursor, FoldFun, {Acc, []}), - FoldResult - catch - {break, AccFinal} -> - AccFinal - after - ok = wterl:cursor_close(Cursor), - ok = wterl:session_close(SRef) + {ok, Session} = wterl:session_open(Connection), + case wterl:cursor_open(Session, Table) of + {error, "No such file or directory"} -> + Acc; + {ok, Cursor} -> + try + {FoldResult, _} = + wterl:fold_keys(Cursor, FoldFun, {Acc, []}), + FoldResult + catch + {break, AccFinal} -> + AccFinal + after + ok = wterl:cursor_close(Cursor), + ok = wterl:session_close(Session) + end end end, case lists:member(async_fold, Opts) of @@ -243,7 +238,9 @@ fold_buckets(FoldBucketsFun, Acc, Opts, #state{conn=ConnRef, table=Table}) -> any(), [{atom(), term()}], state()) -> {ok, term()} | {async, fun()}. -fold_keys(FoldKeysFun, Acc, Opts, #state{conn=ConnRef, table=Table}) -> +fold_keys(FoldKeysFun, Acc, Opts, #state{table=Table}) -> + {ok, Connection} = wterl_conn:get(), + %% Figure out how we should limit the fold: by bucket, by %% secondary index, or neither (fold across everything.) Bucket = lists:keyfind(bucket, 1, Opts), @@ -260,16 +257,20 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{conn=ConnRef, table=Table}) -> FoldFun = fold_keys_fun(FoldKeysFun, Limiter), KeyFolder = fun() -> - {ok, SRef} = wterl:session_open(ConnRef), - {ok, Cursor} = wterl:cursor_open(SRef, Table), - try - wterl:fold_keys(Cursor, FoldFun, Acc) - catch - {break, AccFinal} -> - AccFinal - after - ok = wterl:cursor_close(Cursor), - ok = wterl:session_close(SRef) + {ok, Session} = wterl:session_open(Connection), + case wterl:cursor_open(Session, Table) of + {error, "No such file or directory"} -> + Acc; + {ok, Cursor} -> + try + wterl:fold_keys(Cursor, FoldFun, Acc) + catch + {break, AccFinal} -> + AccFinal + after + ok = wterl:cursor_close(Cursor), + ok = wterl:session_close(Session) + end end end, case lists:member(async_fold, Opts) of @@ -284,21 +285,26 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{conn=ConnRef, table=Table}) -> any(), [{atom(), term()}], state()) -> {ok, any()} | {async, fun()}. -fold_objects(FoldObjectsFun, Acc, Opts, #state{conn=ConnRef, table=Table}) -> +fold_objects(FoldObjectsFun, Acc, Opts, #state{table=Table}) -> + {ok, Connection} = wterl_conn:get(), Bucket = proplists:get_value(bucket, Opts), FoldFun = fold_objects_fun(FoldObjectsFun, Bucket), ObjectFolder = fun() -> - {ok, SRef} = wterl:session_open(ConnRef), - {ok, Cursor} = wterl:cursor_open(SRef, Table), - try - wterl:fold(Cursor, FoldFun, Acc) - catch - {break, AccFinal} -> - AccFinal - after - ok = wterl:cursor_close(Cursor), - ok = wterl:session_close(SRef) + {ok, Session} = wterl:session_open(Connection), + case wterl:cursor_open(Session, Table) of + {error, "No such file or directory"} -> + Acc; + {ok, Cursor} -> + try + wterl:fold(Cursor, FoldFun, Acc) + catch + {break, AccFinal} -> + AccFinal + after + ok = wterl:cursor_close(Cursor), + ok = wterl:session_close(Session) + end end end, case lists:member(async_fold, Opts) of @@ -310,8 +316,10 @@ fold_objects(FoldObjectsFun, Acc, Opts, #state{conn=ConnRef, table=Table}) -> %% @doc Delete all objects from this wterl backend -spec drop(state()) -> {ok, state()} | {error, term(), state()}. -drop(#state{table=Table, session=SRef}=State) -> - case wterl:session_truncate(SRef, Table) of +drop(#state{session=undefined, cursor=undefined}=State) -> + drop(establish_connection(State)); +drop(#state{table=Table, session=Session}=State) -> + case wterl:session_truncate(Session, Table) of ok -> {ok, State}; Error -> @@ -321,8 +329,10 @@ drop(#state{table=Table, session=SRef}=State) -> %% @doc Returns true if this wterl backend contains any %% non-tombstone values; otherwise returns false. -spec is_empty(state()) -> boolean(). -is_empty(#state{table=Table, session=SRef}) -> - {ok, Cursor} = wterl:cursor_open(SRef, Table), +is_empty(#state{session=undefined, cursor=undefined}=State) -> + is_empty(establish_connection(State)); +is_empty(#state{table=Table, session=Session}) -> + {ok, Cursor} = wterl:cursor_open(Session, Table), try not_found =:= wterl:cursor_next(Cursor) after @@ -331,8 +341,10 @@ is_empty(#state{table=Table, session=SRef}) -> %% @doc Get the status information for this wterl backend -spec status(state()) -> [{atom(), term()}]. -status(#state{table=Table, session=SRef}) -> - {ok, Cursor} = wterl:cursor_open(SRef, "statistics:"++Table), +status(#state{session=undefined, cursor=undefined}=State) -> + status(establish_connection(State)); +status(#state{table=Table, session=Session}) -> + {ok, Cursor} = wterl:cursor_open(Session, "statistics:"++Table), try Stats = fetch_status(Cursor), [{stats, Stats}] @@ -350,18 +362,35 @@ callback(_Ref, _Msg, State) -> %% Internal functions %% =================================================================== -shared_cursor(SRef, Table, #state{cursor=undefined}=State) -> - case wterl:cursor_open(SRef, Table) of - {ok, Cursor} -> - Cursor; +establish_connection(#state{table=Table, session=undefined, cursor=undefined}=State) -> + {ok, Connection} = wterl_conn:get(), + case wterl:session_open(Connection, wterl:config_to_bin([{isolation, "snapshot"}])) of + {ok, Session} -> + SessionOpts = + [%TODO {block_compressor, "snappy"}, + {internal_page_max, "128K"}, + {leaf_page_max, "256K"}, + {lsm_chunk_size, "256MB"}, + {lsm_bloom_config, [{leaf_page_max, "16MB"}]} ], + case wterl:session_create(Session, Table, wterl:config_to_bin(SessionOpts)) of + ok -> + case wterl:cursor_open(Session, Table) of + {ok, Cursor} -> + State#state{session=Session, cursor=Cursor}; + {error, Reason} -> + lager:error("Failed to open WiredTiger cursor on ~p because: ~p", [Table, Reason]), + State + end; + {error, Reason} -> + lager:error("Failed to start wterl backend: ~p\n", [Reason]), + State + end; {error, Reason} -> - lager:error("Failed to open WT cursor on ~p because: ~p", [Table, Reason]), - undefined + lager:error("Failed to open a WiredTiger session: ~p\n", [Reason]), + State end; - -shared_cursor(_SRef, _Table, #state{cursor=Cursor}) -> - Cursor. - +establish_connection(State) -> + State. %% @private %% Return a function to fold over the buckets on this backend @@ -508,7 +537,7 @@ size_cache(Config) -> "1GB" end, application:set_env(wterl, cache_size, FinalGuess), - lager:warning("Using best-guess cache size of ~p for WiredTiger storage backend.", [FinalGuess]), + lager:warning("Using cache size of ~p for WiredTiger storage backend.", [FinalGuess]), FinalGuess; Value -> Value diff --git a/src/wterl_conn.erl b/src/wterl_conn.erl index 7815a2d..f0f91f7 100644 --- a/src/wterl_conn.erl +++ b/src/wterl_conn.erl @@ -36,9 +36,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, { - conn :: wterl:connection() - }). +-record(state, { conn :: wterl:connection() }). -type config_list() :: [{atom(), any()}].