This commit is contained in:
Gregory Burd 2012-06-22 16:39:10 +01:00
parent d0245c85c0
commit a79dde264f
5 changed files with 121 additions and 133 deletions

View file

@ -66,23 +66,22 @@
| {expiry_secs, non_neg_integer()}
.
% @doc
% Create or open a hanoidb store. Argument `Dir' names a
% directory in which to keep the data files. By convention, we
% name hanoidb data directories with extension ".hanoidb".
%% @doc
%% Create or open a hanoidb store. Argument `Dir' names a
%%% directory in which to keep the data files. By convention, we
%% name hanoidb data directories with extension ".hanoidb".
- spec open(Dir::string()) -> hanoidb().
open(Dir) ->
open(Dir, []).
% @doc Create or open a hanoidb store.
%% @doc Create or open a hanoidb store.
- spec open(Dir::string(), Opts::[config_option()]) -> hanoidb().
open(Dir, Opts) ->
ok = start_app(),
gen_server:start(?MODULE, [Dir, Opts], []).
% @doc
% Close a Hanoi data store.
% @spec close(Ref::pid()) -> ok
%% @doc
%% Close a Hanoi data store.
- spec close(Ref::pid()) -> ok.
close(Ref) ->
try
@ -276,44 +275,36 @@ open_levels(Dir, Options) ->
{ok, Files} = file:list_dir(Dir),
%% parse file names and find max level
{MinLevel, MaxLevel, NumLevels} =
lists:foldl(fun(FileName, {MinLevel, MaxLevel, NumLevels}) ->
{MinLevel, MaxLevel} =
lists:foldl(fun(FileName, {MinLevel, MaxLevel}) ->
case parse_level(FileName) of
{ok, Level} ->
{erlang:min(MinLevel, Level),
erlang:max(MaxLevel, Level),
NumLevels + 1};
nomatch ->
{MinLevel, MaxLevel, NumLevels}
erlang:max(MaxLevel, Level)};
_ ->
{MinLevel, MaxLevel}
end
end,
{?TOP_LEVEL, ?TOP_LEVEL, 0},
{?TOP_LEVEL, ?TOP_LEVEL},
Files),
%% remove old nursery data file
NurseryFileName = filename:join(Dir, "nursery.data"),
file:delete(NurseryFileName),
TopLevel1 =
case NumLevels > 0 of
true ->
%% Do enough incremental merge to be sure we won't deadlock in insert
{TopLevel, MaxMerge} =
lists:foldl(fun(LevelNo, {NextLevel, MergeWork0}) ->
{ok, Level} = hanoidb_level:open(Dir, LevelNo, NextLevel, Options, self()),
MergeWork = MergeWork0 + hanoidb_level:unmerged_count(Level),
{Level, MergeWork}
end,
{undefined, 0},
lists:seq(MaxLevel, min(?TOP_LEVEL, MinLevel), -1)),
WorkPerIter = (MaxLevel - MinLevel + 1) * ?BTREE_SIZE(?TOP_LEVEL),
error_logger:info_msg("do_merge ... {~p,~p,~p}~n", [TopLevel, WorkPerIter, MaxMerge]),
do_merge(TopLevel, WorkPerIter, MaxMerge),
TopLevel;
false ->
?TOP_LEVEL
end,
{ok, TopLevel1, MaxLevel}.
%% Do enough incremental merge to be sure we won't deadlock in insert
{TopLevel, MaxMerge} =
lists:foldl(fun(LevelNo, {NextLevel, MergeWork0}) ->
{ok, Level} = hanoidb_level:open(Dir, LevelNo, NextLevel, Options, self()),
MergeWork = MergeWork0 + hanoidb_level:unmerged_count(Level),
{Level, MergeWork}
end,
{undefined, 0},
lists:seq(MaxLevel, min(?TOP_LEVEL, MinLevel), -1)),
WorkPerIter = (MaxLevel - MinLevel + 1) * ?BTREE_SIZE(?TOP_LEVEL),
% error_logger:info_msg("do_merge ... {~p,~p,~p}~n", [TopLevel, WorkPerIter, MaxMerge]),
do_merge(TopLevel, WorkPerIter, MaxMerge),
{ok, TopLevel, MaxLevel}.
do_merge(TopLevel, _Inc, N) when N =< 0 ->
ok = hanoidb_level:await_incremental_merge(TopLevel);
@ -324,7 +315,7 @@ do_merge(TopLevel, Inc, N) ->
parse_level(FileName) ->
case re:run(FileName, "^[^\\d]+-(\\d+)\\.data\$", [{capture,all_but_first,list}]) of
case re:run(FileName, "^[^\\d]+-(\\d+)\\.data$", [{capture,all_but_first,list}]) of
{match,[StringVal]} ->
{ok, list_to_integer(StringVal)};
_ ->
@ -355,6 +346,7 @@ terminate(normal, _State) ->
ok;
terminate(_Reason, _State) ->
error_logger:info_msg("got terminate(~p, ~p)~n", [_Reason, _State]),
% flush_nursery(State),
ok.
code_change(_OldVsn, State, _Extra) ->
@ -425,8 +417,9 @@ do_transact(TransactionSpec, State=#state{ nursery=Nursery, top=Top }) ->
{ok, Nursery2} = hanoidb_nursery:transact(TransactionSpec, Nursery, Top),
{ok, State#state{ nursery=Nursery2 }}.
flush_nursery(State=#state{ nursery=Nursery, top=Top }) ->
{ok, Nursery2} = hanoidb_nursery:flush(Nursery, Top),
flush_nursery(State=#state{ nursery=Nursery, top=Top, dir=Dir, max_level=MaxLevel, opt=Config }) ->
ok = hanoidb_nursery:finish(Nursery, Top),
{ok, Nursery2} = hanoidb_nursery:new(Dir, MaxLevel, Config),
{ok, State#state{ nursery=Nursery2 }}.
start_app() ->

View file

@ -435,7 +435,7 @@ main_loop(State = #state{ next=Next }) ->
?CALL(From, {init_snapshot_range_fold, WorkerPID, Range, List}) when State#state.folding == [] ->
?log("init_snapshot_range_fold ~p -> ~p", [Range, WorkerPID]),
?log("init_range_fold ~p -> ~p", [Range, WorkerPID]),
case {State#state.a, State#state.b, State#state.c} of
{undefined, undefined, undefined} ->
@ -488,8 +488,6 @@ main_loop(State = #state{ next=Next }) ->
?CALL(From, {init_blocking_range_fold, WorkerPID, Range, List}) ->
?log("init_blocking_range_fold ~p -> ~p", [Range, WorkerPID]),
case {State#state.a, State#state.b, State#state.c} of
{undefined, undefined, undefined} ->
RefList = List;
@ -805,67 +803,62 @@ filename(PFX, State) ->
start_range_fold(FileName, WorkerPID, Range, State) ->
Owner = self(),
PID =
proc_lib:spawn(fun() ->
try
?log("start_range_fold ~p on ~p -> ~p", [self, FileName, WorkerPID]),
erlang:link(WorkerPID),
{ok, File} = hanoidb_reader:open(FileName, [folding|State#state.opts]),
do_range_fold2(File, WorkerPID, self(), Range),
erlang:unlink(WorkerPID),
hanoidb_reader:close(File),
PID = proc_lib:spawn( fun() ->
try
?log("start_range_fold ~p on ~p -> ~p", [self, FileName, WorkerPID]),
erlang:link(WorkerPID),
{ok, File} = hanoidb_reader:open(FileName, [folding|State#state.opts]),
do_range_fold2(File, WorkerPID, self(), Range),
erlang:unlink(WorkerPID),
hanoidb_reader:close(File),
%% this will release the pinning of the fold file
Owner ! {range_fold_done, self(), FileName},
ok
catch
Class:Ex ->
io:format(user, "BAD: ~p:~p ~p~n", [Class,Ex,erlang:get_stacktrace()])
end
end),
%% this will release the pinning of the fold file
Owner ! {range_fold_done, self(), FileName},
ok
catch
Class:Ex ->
io:format(user, "BAD: ~p:~p ~p~n", [Class,Ex,erlang:get_stacktrace()])
end
end ),
{ok, PID}.
-define(FOLD_CHUNK_SIZE, 100).
-spec do_range_fold(BT :: hanoidb_reader:read_file(),
WorkerPID :: pid(),
SelfOrRef :: pid() | reference(),
Range :: #key_range{} ) -> ok.
do_range_fold(BT, WorkerPID, SelfOrRef, Range) ->
try case hanoidb_reader:range_fold(fun(Key, Value, 0) ->
WorkerPID ! {level_result, SelfOrRef, Key, Value},
{?FOLD_CHUNK_SIZE-1, []};
(Key, Value, {N, KVs}) ->
{N-1,[{Key,Value}|KVs]}
end,
{?FOLD_CHUNK_SIZE-1,[]},
BT,
Range) of
{limit, _, LastKey} ->
WorkerPID ! {level_limit, SelfOrRef, LastKey};
{done, _} ->
%% tell fold merge worker we're done
WorkerPID ! {level_done, SelfOrRef}
end
catch
exit:worker_died -> ok
case hanoidb_reader:range_fold(fun(Key,Value,_) ->
WorkerPID ! {level_result, SelfOrRef, Key, Value},
ok
end,
ok,
BT,
Range) of
{limit, _, LastKey} ->
WorkerPID ! {level_limit, SelfOrRef, LastKey};
{done, _} ->
%% tell fold merge worker we're done
WorkerPID ! {level_done, SelfOrRef}
end,
ok.
-define(FOLD_CHUNK_SIZE, 100).
-spec do_range_fold2(BT :: hanoidb_reader:read_file(),
WorkerPID :: pid(),
SelfOrRef :: pid() | reference(),
Range :: #key_range{} ) -> ok.
WorkerPID :: pid(),
SelfOrRef :: pid() | reference(),
Range :: #key_range{} ) -> ok.
do_range_fold2(BT, WorkerPID, SelfOrRef, Range) ->
try hanoidb_reader:range_fold(fun(Key,Value,{0,KVs}) ->
send(WorkerPID, SelfOrRef, [{Key,Value}|KVs]),
{?FOLD_CHUNK_SIZE-1, []};
(Key,Value,{N,KVs}) ->
{N-1,[{Key,Value}|KVs]}
end,
{?FOLD_CHUNK_SIZE-1,[]},
BT,
Range) of
send(WorkerPID, SelfOrRef, [{Key,Value}|KVs]),
{?FOLD_CHUNK_SIZE-1, []};
(Key,Value,{N,KVs}) ->
{N-1,[{Key,Value}|KVs]}
end,
{?FOLD_CHUNK_SIZE-1,[]},
BT,
Range) of
{limit, {_,KVs}, LastKey} ->
send(WorkerPID, SelfOrRef, KVs),
WorkerPID ! {level_limit, SelfOrRef, LastKey};

View file

@ -97,15 +97,8 @@ read_nursery_from_log(Directory, MaxLevel, Config) ->
end,
{ok, #nursery{ dir=Directory, cache=Cache, count=gb_trees:size(Cache), max_level=MaxLevel, config=Config }}.
nursery_full(#nursery{count=Count}=Nursery)
when Count + 1 > ?BTREE_SIZE(?TOP_LEVEL) ->
{full, Nursery};
nursery_full(Nursery) ->
{ok, Nursery}.
% @doc
% Add a Key/Value to the nursery
% @end
%% @doc Add a Key/Value to the nursery
%% @end
-spec do_add(#nursery{}, binary(), binary()|?TOMBSTONE, pos_integer() | infinity, pid()) -> {ok, #nursery{}}.
do_add(Nursery, Key, Value, infinity, Top) ->
do_add(Nursery, Key, Value, 0, Top);
@ -141,7 +134,12 @@ do_add(Nursery=#nursery{log_file=File, cache=Cache, total_size=TotalSize, count=
{ok, Nursery2} =
do_inc_merge(Nursery1#nursery{ cache=Cache2, total_size=TotalSize+erlang:iolist_size(Data),
count=Count+1 }, 1, Top),
nursery_full(Nursery2).
if Count+1 >= ?BTREE_SIZE(?TOP_LEVEL) ->
{full, Nursery2};
true ->
{ok, Nursery2}
end.
do_sync(File, Nursery) ->
LastSync =
@ -198,14 +196,13 @@ finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile,
N when N > 0 ->
%% next, flush cache to a new BTree
BTreeFileName = filename:join(Dir, "nursery.data"),
{ok, BT} = hanoidb_writer:open(BTreeFileName,
[{size,?BTREE_SIZE(?TOP_LEVEL)},
{compress, none} | Config]),
{ok, BT} = hanoidb_writer:open(BTreeFileName, [{size, ?BTREE_SIZE(?TOP_LEVEL)},
{compress, none} | Config]),
try
gb_trees_ext:fold(fun(Key, Value, Acc) ->
ok = hanoidb_writer:add(BT, Key, Value),
Acc
end, [], Cache)
[] = gb_trees_ext:fold(fun(Key, Value, Acc) ->
ok = hanoidb_writer:add(BT, Key, Value),
Acc
end, [], Cache)
after
ok = hanoidb_writer:close(BT)
end,
@ -293,37 +290,39 @@ transact(Spec, Nursery=#nursery{ log_file=File, cache=Cache0, total_size=TotalSi
length(Spec), Top).
do_inc_merge(Nursery=#nursery{ step=Step, merge_done=Done }, N, TopLevel) ->
if Step+N >= ?INC_MERGE_STEP ->
case Step+N >= ?INC_MERGE_STEP of
true ->
io:format("do_inc_merge: true ~p ~p ~p~n", [Step, N, ?INC_MERGE_STEP]),
hanoidb_level:begin_incremental_merge(TopLevel, Step+N),
{ok, Nursery#nursery{ step=0, merge_done=Done+Step+N }};
true ->
false ->
io:format("do_inc_merge: false ~p ~p ~p~n", [Step, N, ?INC_MERGE_STEP]),
{ok, Nursery#nursery{ step=Step+N }}
end.
do_level_fold(#nursery{cache=Cache}, FoldWorkerPID, KeyRange) ->
Ref = erlang:make_ref(),
FoldWorkerPID ! {prefix, [Ref]},
case lists:foldl(fun(_,{LastKey,limit}) ->
{LastKey,limit};
({Key,Value}, {LastKey,Count}) ->
IsExpired = is_expired(Value),
case ?KEY_IN_RANGE(Key,KeyRange) andalso (not IsExpired) of
true ->
BinOrTombstone = get_value(Value),
FoldWorkerPID ! {level_result, Ref, Key, BinOrTombstone},
case BinOrTombstone of
?TOMBSTONE ->
{Key, Count};
_ ->
{Key, decrement(Count)}
end;
false ->
{LastKey, Count}
end
end,
{undefined, KeyRange#key_range.limit},
gb_trees:to_list(Cache))
case gb_trees_ext:fold(
fun(_, _, {LastKey, limit}) ->
{LastKey, limit};
(Key, Value, {LastKey, Count}) ->
case ?KEY_IN_RANGE(Key, KeyRange) andalso (not is_expired(Value)) of
true ->
BinOrTombstone = get_value(Value),
FoldWorkerPID ! {level_result, Ref, Key, BinOrTombstone},
case BinOrTombstone of
?TOMBSTONE ->
{Key, Count};
_ ->
{Key, decrement(Count)}
end;
false ->
{LastKey, Count}
end
end,
{undefined, KeyRange#key_range.limit},
Cache)
of
{LastKey, limit} when LastKey =/= undefined ->
FoldWorkerPID ! {level_limit, Ref, LastKey};

View file

@ -164,10 +164,10 @@ decode_kv_list(<<?CRC_ENCODED, Custom/binary>>) ->
decode_crc_data(<<>>, [], Acc) ->
{ok, lists:reverse(Acc)};
decode_crc_data(<<>>, _BrokenData, Acc) ->
{error, data_corruption};
% {error, data_corruption};
% TODO: we *could* simply return the good parts of the data...
% would that be so wrong?
% {ok, lists:reverse(Acc)};
{ok, lists:reverse(Acc)};
decode_crc_data(<< BinSize:32/unsigned, CRC:32/unsigned, Bin:BinSize/binary, ?TAG_END, Rest/binary >>, Broken, Acc) ->
CRCTest = erlang:crc32( Bin ),
if CRC == CRCTest ->

View file

@ -39,7 +39,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3, serialize/1, deserialize/1]).
-export([open/2, add/3, count/1, close/1]).
-export([open/1, open/2, add/3, count/1, close/1]).
-record(node, {level :: integer(),
members=[] :: [ {binary(), binary()} ],
@ -67,9 +67,12 @@
%%% PUBLIC API
open(Name, Options) -> %% TODO: should this be called start_link?
open(Name,Options) ->
hanoidb_util:ensure_expiry(Options),
gen_server:start_link(?MODULE, ?MODULE, [Name, Options], []).
gen_server:start_link(?MODULE, [Name, Options], []).
open(Name) ->
gen_server:start_link(?MODULE, [Name,[{expiry_secs,0}]], []).
add(Ref, Key, Value) ->
gen_server:cast(Ref, {add, Key, Value}).