diff --git a/src/hanoidb.erl b/src/hanoidb.erl index e7d081e..aa1f1e2 100644 --- a/src/hanoidb.erl +++ b/src/hanoidb.erl @@ -66,23 +66,22 @@ | {expiry_secs, non_neg_integer()} . -% @doc -% Create or open a hanoidb store. Argument `Dir' names a -% directory in which to keep the data files. By convention, we -% name hanoidb data directories with extension ".hanoidb". +%% @doc +%% Create or open a hanoidb store. Argument `Dir' names a +%%% directory in which to keep the data files. By convention, we +%% name hanoidb data directories with extension ".hanoidb". - spec open(Dir::string()) -> hanoidb(). open(Dir) -> open(Dir, []). -% @doc Create or open a hanoidb store. +%% @doc Create or open a hanoidb store. - spec open(Dir::string(), Opts::[config_option()]) -> hanoidb(). open(Dir, Opts) -> ok = start_app(), gen_server:start(?MODULE, [Dir, Opts], []). -% @doc -% Close a Hanoi data store. -% @spec close(Ref::pid()) -> ok +%% @doc +%% Close a Hanoi data store. - spec close(Ref::pid()) -> ok. close(Ref) -> try @@ -276,44 +275,36 @@ open_levels(Dir, Options) -> {ok, Files} = file:list_dir(Dir), %% parse file names and find max level - {MinLevel, MaxLevel, NumLevels} = - lists:foldl(fun(FileName, {MinLevel, MaxLevel, NumLevels}) -> + {MinLevel, MaxLevel} = + lists:foldl(fun(FileName, {MinLevel, MaxLevel}) -> case parse_level(FileName) of {ok, Level} -> {erlang:min(MinLevel, Level), - erlang:max(MaxLevel, Level), - NumLevels + 1}; - nomatch -> - {MinLevel, MaxLevel, NumLevels} + erlang:max(MaxLevel, Level)}; + _ -> + {MinLevel, MaxLevel} end end, - {?TOP_LEVEL, ?TOP_LEVEL, 0}, + {?TOP_LEVEL, ?TOP_LEVEL}, Files), %% remove old nursery data file NurseryFileName = filename:join(Dir, "nursery.data"), file:delete(NurseryFileName), - TopLevel1 = - case NumLevels > 0 of - true -> - %% Do enough incremental merge to be sure we won't deadlock in insert - {TopLevel, MaxMerge} = - lists:foldl(fun(LevelNo, {NextLevel, MergeWork0}) -> - {ok, Level} = hanoidb_level:open(Dir, LevelNo, NextLevel, Options, self()), - MergeWork = MergeWork0 + hanoidb_level:unmerged_count(Level), - {Level, MergeWork} - end, - {undefined, 0}, - lists:seq(MaxLevel, min(?TOP_LEVEL, MinLevel), -1)), - WorkPerIter = (MaxLevel - MinLevel + 1) * ?BTREE_SIZE(?TOP_LEVEL), - error_logger:info_msg("do_merge ... {~p,~p,~p}~n", [TopLevel, WorkPerIter, MaxMerge]), - do_merge(TopLevel, WorkPerIter, MaxMerge), - TopLevel; - false -> - ?TOP_LEVEL - end, - {ok, TopLevel1, MaxLevel}. + %% Do enough incremental merge to be sure we won't deadlock in insert + {TopLevel, MaxMerge} = + lists:foldl(fun(LevelNo, {NextLevel, MergeWork0}) -> + {ok, Level} = hanoidb_level:open(Dir, LevelNo, NextLevel, Options, self()), + MergeWork = MergeWork0 + hanoidb_level:unmerged_count(Level), + {Level, MergeWork} + end, + {undefined, 0}, + lists:seq(MaxLevel, min(?TOP_LEVEL, MinLevel), -1)), + WorkPerIter = (MaxLevel - MinLevel + 1) * ?BTREE_SIZE(?TOP_LEVEL), +% error_logger:info_msg("do_merge ... {~p,~p,~p}~n", [TopLevel, WorkPerIter, MaxMerge]), + do_merge(TopLevel, WorkPerIter, MaxMerge), + {ok, TopLevel, MaxLevel}. do_merge(TopLevel, _Inc, N) when N =< 0 -> ok = hanoidb_level:await_incremental_merge(TopLevel); @@ -324,7 +315,7 @@ do_merge(TopLevel, Inc, N) -> parse_level(FileName) -> - case re:run(FileName, "^[^\\d]+-(\\d+)\\.data\$", [{capture,all_but_first,list}]) of + case re:run(FileName, "^[^\\d]+-(\\d+)\\.data$", [{capture,all_but_first,list}]) of {match,[StringVal]} -> {ok, list_to_integer(StringVal)}; _ -> @@ -355,6 +346,7 @@ terminate(normal, _State) -> ok; terminate(_Reason, _State) -> error_logger:info_msg("got terminate(~p, ~p)~n", [_Reason, _State]), + % flush_nursery(State), ok. code_change(_OldVsn, State, _Extra) -> @@ -425,8 +417,9 @@ do_transact(TransactionSpec, State=#state{ nursery=Nursery, top=Top }) -> {ok, Nursery2} = hanoidb_nursery:transact(TransactionSpec, Nursery, Top), {ok, State#state{ nursery=Nursery2 }}. -flush_nursery(State=#state{ nursery=Nursery, top=Top }) -> - {ok, Nursery2} = hanoidb_nursery:flush(Nursery, Top), +flush_nursery(State=#state{ nursery=Nursery, top=Top, dir=Dir, max_level=MaxLevel, opt=Config }) -> + ok = hanoidb_nursery:finish(Nursery, Top), + {ok, Nursery2} = hanoidb_nursery:new(Dir, MaxLevel, Config), {ok, State#state{ nursery=Nursery2 }}. start_app() -> diff --git a/src/hanoidb_level.erl b/src/hanoidb_level.erl index 0587cdc..28f755a 100644 --- a/src/hanoidb_level.erl +++ b/src/hanoidb_level.erl @@ -435,7 +435,7 @@ main_loop(State = #state{ next=Next }) -> ?CALL(From, {init_snapshot_range_fold, WorkerPID, Range, List}) when State#state.folding == [] -> - ?log("init_snapshot_range_fold ~p -> ~p", [Range, WorkerPID]), + ?log("init_range_fold ~p -> ~p", [Range, WorkerPID]), case {State#state.a, State#state.b, State#state.c} of {undefined, undefined, undefined} -> @@ -488,8 +488,6 @@ main_loop(State = #state{ next=Next }) -> ?CALL(From, {init_blocking_range_fold, WorkerPID, Range, List}) -> - ?log("init_blocking_range_fold ~p -> ~p", [Range, WorkerPID]), - case {State#state.a, State#state.b, State#state.c} of {undefined, undefined, undefined} -> RefList = List; @@ -805,67 +803,62 @@ filename(PFX, State) -> start_range_fold(FileName, WorkerPID, Range, State) -> Owner = self(), - PID = - proc_lib:spawn(fun() -> - try - ?log("start_range_fold ~p on ~p -> ~p", [self, FileName, WorkerPID]), - erlang:link(WorkerPID), - {ok, File} = hanoidb_reader:open(FileName, [folding|State#state.opts]), - do_range_fold2(File, WorkerPID, self(), Range), - erlang:unlink(WorkerPID), - hanoidb_reader:close(File), + PID = proc_lib:spawn( fun() -> + try + ?log("start_range_fold ~p on ~p -> ~p", [self, FileName, WorkerPID]), + erlang:link(WorkerPID), + {ok, File} = hanoidb_reader:open(FileName, [folding|State#state.opts]), + do_range_fold2(File, WorkerPID, self(), Range), + erlang:unlink(WorkerPID), + hanoidb_reader:close(File), - %% this will release the pinning of the fold file - Owner ! {range_fold_done, self(), FileName}, - ok - catch - Class:Ex -> - io:format(user, "BAD: ~p:~p ~p~n", [Class,Ex,erlang:get_stacktrace()]) - end - end), + %% this will release the pinning of the fold file + Owner ! {range_fold_done, self(), FileName}, + ok + catch + Class:Ex -> + io:format(user, "BAD: ~p:~p ~p~n", [Class,Ex,erlang:get_stacktrace()]) + end + end ), {ok, PID}. --define(FOLD_CHUNK_SIZE, 100). - -spec do_range_fold(BT :: hanoidb_reader:read_file(), WorkerPID :: pid(), SelfOrRef :: pid() | reference(), Range :: #key_range{} ) -> ok. do_range_fold(BT, WorkerPID, SelfOrRef, Range) -> - try case hanoidb_reader:range_fold(fun(Key, Value, 0) -> - WorkerPID ! {level_result, SelfOrRef, Key, Value}, - {?FOLD_CHUNK_SIZE-1, []}; - (Key, Value, {N, KVs}) -> - {N-1,[{Key,Value}|KVs]} - end, - {?FOLD_CHUNK_SIZE-1,[]}, - BT, - Range) of - {limit, _, LastKey} -> - WorkerPID ! {level_limit, SelfOrRef, LastKey}; - {done, _} -> - %% tell fold merge worker we're done - WorkerPID ! {level_done, SelfOrRef} - end - catch - exit:worker_died -> ok + case hanoidb_reader:range_fold(fun(Key,Value,_) -> + WorkerPID ! {level_result, SelfOrRef, Key, Value}, + ok + end, + ok, + BT, + Range) of + {limit, _, LastKey} -> + WorkerPID ! {level_limit, SelfOrRef, LastKey}; + {done, _} -> + %% tell fold merge worker we're done + WorkerPID ! {level_done, SelfOrRef} + end, ok. +-define(FOLD_CHUNK_SIZE, 100). + -spec do_range_fold2(BT :: hanoidb_reader:read_file(), - WorkerPID :: pid(), - SelfOrRef :: pid() | reference(), - Range :: #key_range{} ) -> ok. + WorkerPID :: pid(), + SelfOrRef :: pid() | reference(), + Range :: #key_range{} ) -> ok. do_range_fold2(BT, WorkerPID, SelfOrRef, Range) -> try hanoidb_reader:range_fold(fun(Key,Value,{0,KVs}) -> - send(WorkerPID, SelfOrRef, [{Key,Value}|KVs]), - {?FOLD_CHUNK_SIZE-1, []}; - (Key,Value,{N,KVs}) -> - {N-1,[{Key,Value}|KVs]} - end, - {?FOLD_CHUNK_SIZE-1,[]}, - BT, - Range) of + send(WorkerPID, SelfOrRef, [{Key,Value}|KVs]), + {?FOLD_CHUNK_SIZE-1, []}; + (Key,Value,{N,KVs}) -> + {N-1,[{Key,Value}|KVs]} + end, + {?FOLD_CHUNK_SIZE-1,[]}, + BT, + Range) of {limit, {_,KVs}, LastKey} -> send(WorkerPID, SelfOrRef, KVs), WorkerPID ! {level_limit, SelfOrRef, LastKey}; diff --git a/src/hanoidb_nursery.erl b/src/hanoidb_nursery.erl index 669737c..a7eec4f 100644 --- a/src/hanoidb_nursery.erl +++ b/src/hanoidb_nursery.erl @@ -97,15 +97,8 @@ read_nursery_from_log(Directory, MaxLevel, Config) -> end, {ok, #nursery{ dir=Directory, cache=Cache, count=gb_trees:size(Cache), max_level=MaxLevel, config=Config }}. -nursery_full(#nursery{count=Count}=Nursery) - when Count + 1 > ?BTREE_SIZE(?TOP_LEVEL) -> - {full, Nursery}; -nursery_full(Nursery) -> - {ok, Nursery}. - -% @doc -% Add a Key/Value to the nursery -% @end +%% @doc Add a Key/Value to the nursery +%% @end -spec do_add(#nursery{}, binary(), binary()|?TOMBSTONE, pos_integer() | infinity, pid()) -> {ok, #nursery{}}. do_add(Nursery, Key, Value, infinity, Top) -> do_add(Nursery, Key, Value, 0, Top); @@ -141,7 +134,12 @@ do_add(Nursery=#nursery{log_file=File, cache=Cache, total_size=TotalSize, count= {ok, Nursery2} = do_inc_merge(Nursery1#nursery{ cache=Cache2, total_size=TotalSize+erlang:iolist_size(Data), count=Count+1 }, 1, Top), - nursery_full(Nursery2). + + if Count+1 >= ?BTREE_SIZE(?TOP_LEVEL) -> + {full, Nursery2}; + true -> + {ok, Nursery2} + end. do_sync(File, Nursery) -> LastSync = @@ -198,14 +196,13 @@ finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile, N when N > 0 -> %% next, flush cache to a new BTree BTreeFileName = filename:join(Dir, "nursery.data"), - {ok, BT} = hanoidb_writer:open(BTreeFileName, - [{size,?BTREE_SIZE(?TOP_LEVEL)}, - {compress, none} | Config]), + {ok, BT} = hanoidb_writer:open(BTreeFileName, [{size, ?BTREE_SIZE(?TOP_LEVEL)}, + {compress, none} | Config]), try - gb_trees_ext:fold(fun(Key, Value, Acc) -> - ok = hanoidb_writer:add(BT, Key, Value), - Acc - end, [], Cache) + [] = gb_trees_ext:fold(fun(Key, Value, Acc) -> + ok = hanoidb_writer:add(BT, Key, Value), + Acc + end, [], Cache) after ok = hanoidb_writer:close(BT) end, @@ -293,37 +290,39 @@ transact(Spec, Nursery=#nursery{ log_file=File, cache=Cache0, total_size=TotalSi length(Spec), Top). do_inc_merge(Nursery=#nursery{ step=Step, merge_done=Done }, N, TopLevel) -> - if Step+N >= ?INC_MERGE_STEP -> + case Step+N >= ?INC_MERGE_STEP of + true -> + io:format("do_inc_merge: true ~p ~p ~p~n", [Step, N, ?INC_MERGE_STEP]), hanoidb_level:begin_incremental_merge(TopLevel, Step+N), {ok, Nursery#nursery{ step=0, merge_done=Done+Step+N }}; - true -> + false -> + io:format("do_inc_merge: false ~p ~p ~p~n", [Step, N, ?INC_MERGE_STEP]), {ok, Nursery#nursery{ step=Step+N }} end. do_level_fold(#nursery{cache=Cache}, FoldWorkerPID, KeyRange) -> Ref = erlang:make_ref(), FoldWorkerPID ! {prefix, [Ref]}, - case lists:foldl(fun(_,{LastKey,limit}) -> - {LastKey,limit}; - ({Key,Value}, {LastKey,Count}) -> - IsExpired = is_expired(Value), - - case ?KEY_IN_RANGE(Key,KeyRange) andalso (not IsExpired) of - true -> - BinOrTombstone = get_value(Value), - FoldWorkerPID ! {level_result, Ref, Key, BinOrTombstone}, - case BinOrTombstone of - ?TOMBSTONE -> - {Key, Count}; - _ -> - {Key, decrement(Count)} - end; - false -> - {LastKey, Count} - end - end, - {undefined, KeyRange#key_range.limit}, - gb_trees:to_list(Cache)) + case gb_trees_ext:fold( + fun(_, _, {LastKey, limit}) -> + {LastKey, limit}; + (Key, Value, {LastKey, Count}) -> + case ?KEY_IN_RANGE(Key, KeyRange) andalso (not is_expired(Value)) of + true -> + BinOrTombstone = get_value(Value), + FoldWorkerPID ! {level_result, Ref, Key, BinOrTombstone}, + case BinOrTombstone of + ?TOMBSTONE -> + {Key, Count}; + _ -> + {Key, decrement(Count)} + end; + false -> + {LastKey, Count} + end + end, + {undefined, KeyRange#key_range.limit}, + Cache) of {LastKey, limit} when LastKey =/= undefined -> FoldWorkerPID ! {level_limit, Ref, LastKey}; diff --git a/src/hanoidb_util.erl b/src/hanoidb_util.erl index 970c8ba..5123994 100644 --- a/src/hanoidb_util.erl +++ b/src/hanoidb_util.erl @@ -164,10 +164,10 @@ decode_kv_list(<>) -> decode_crc_data(<<>>, [], Acc) -> {ok, lists:reverse(Acc)}; decode_crc_data(<<>>, _BrokenData, Acc) -> - {error, data_corruption}; +% {error, data_corruption}; % TODO: we *could* simply return the good parts of the data... % would that be so wrong? -% {ok, lists:reverse(Acc)}; + {ok, lists:reverse(Acc)}; decode_crc_data(<< BinSize:32/unsigned, CRC:32/unsigned, Bin:BinSize/binary, ?TAG_END, Rest/binary >>, Broken, Acc) -> CRCTest = erlang:crc32( Bin ), if CRC == CRCTest -> diff --git a/src/hanoidb_writer.erl b/src/hanoidb_writer.erl index 5c38ec0..2bbcd84 100644 --- a/src/hanoidb_writer.erl +++ b/src/hanoidb_writer.erl @@ -39,7 +39,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, serialize/1, deserialize/1]). --export([open/2, add/3, count/1, close/1]). +-export([open/1, open/2, add/3, count/1, close/1]). -record(node, {level :: integer(), members=[] :: [ {binary(), binary()} ], @@ -67,9 +67,12 @@ %%% PUBLIC API -open(Name, Options) -> %% TODO: should this be called start_link? +open(Name,Options) -> hanoidb_util:ensure_expiry(Options), - gen_server:start_link(?MODULE, ?MODULE, [Name, Options], []). + gen_server:start_link(?MODULE, [Name, Options], []). + +open(Name) -> + gen_server:start_link(?MODULE, [Name,[{expiry_secs,0}]], []). add(Ref, Key, Value) -> gen_server:cast(Ref, {add, Key, Value}).