Make the top_level
parameter configurable.
This commit is contained in:
parent
4ba712512d
commit
f0d24894c5
5 changed files with 62 additions and 43 deletions
|
@ -82,7 +82,13 @@ Put these values in your `app.config` in the `hanoidb` section
|
||||||
%% Both have same log2(N) worst case, but `fast' is
|
%% Both have same log2(N) worst case, but `fast' is
|
||||||
%% sometimes faster; yielding latency fluctuations.
|
%% sometimes faster; yielding latency fluctuations.
|
||||||
%%
|
%%
|
||||||
{merge_strategy, fast | predictable}
|
{merge_strategy, fast | predictable},
|
||||||
|
|
||||||
|
%% "Level0" files has 2^N KVs in it, defaulting to 1024.
|
||||||
|
%% If the database is to contain very small KVs, this is
|
||||||
|
%% likely too small, and will result in many unnecessary
|
||||||
|
%% file operations. (Subsequent levels double in size).
|
||||||
|
{top_level, 10} % 1024 Key/Values
|
||||||
]},
|
]},
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|
|
@ -70,6 +70,7 @@
|
||||||
| {sync_strategy, none | sync | {seconds, pos_integer()}}
|
| {sync_strategy, none | sync | {seconds, pos_integer()}}
|
||||||
| {expiry_secs, non_neg_integer()}
|
| {expiry_secs, non_neg_integer()}
|
||||||
| {spawn_opt, list()}
|
| {spawn_opt, list()}
|
||||||
|
| {top_level, pos_integer()}
|
||||||
.
|
.
|
||||||
|
|
||||||
%% @doc
|
%% @doc
|
||||||
|
@ -278,24 +279,26 @@ init([Dir, Opts0]) ->
|
||||||
end,
|
end,
|
||||||
hanoidb_util:ensure_expiry(Opts),
|
hanoidb_util:ensure_expiry(Opts),
|
||||||
|
|
||||||
{Nursery, MaxLevel, TopLevel} =
|
{Top, Nur, Max} =
|
||||||
case file:read_file_info(Dir) of
|
case file:read_file_info(Dir) of
|
||||||
{ok, #file_info{ type=directory }} ->
|
{ok, #file_info{ type=directory }} ->
|
||||||
{ok, TL, ML} = open_levels(Dir, Opts),
|
{ok, TopLevel, MinLevel, MaxLevel} = open_levels(Dir, Opts),
|
||||||
{ok, N0} = hanoidb_nursery:recover(Dir, TL, ML, Opts),
|
{ok, Nursery} = hanoidb_nursery:recover(Dir, TopLevel, MinLevel, MaxLevel, Opts),
|
||||||
{N0, ML, TL};
|
{TopLevel, Nursery, MaxLevel};
|
||||||
{error, E} when E =:= enoent ->
|
{error, E} when E =:= enoent ->
|
||||||
ok = file:make_dir(Dir),
|
ok = file:make_dir(Dir),
|
||||||
{ok, TL} = hanoidb_level:open(Dir, ?TOP_LEVEL, undefined, Opts, self()),
|
MinLevel = get_opt(top_level, Opts0, ?TOP_LEVEL),
|
||||||
ML = ?TOP_LEVEL,
|
{ok, TopLevel} = hanoidb_level:open(Dir, MinLevel, undefined, Opts, self()),
|
||||||
{ok, N0} = hanoidb_nursery:new(Dir, ML, Opts),
|
MaxLevel = MinLevel,
|
||||||
{N0, ML, TL}
|
{ok, Nursery} = hanoidb_nursery:new(Dir, MinLevel, MaxLevel, Opts),
|
||||||
end,
|
{TopLevel, Nursery, MaxLevel}
|
||||||
{ok, #state{ top=TopLevel, dir=Dir, nursery=Nursery, opt=Opts, max_level=MaxLevel }}.
|
end,
|
||||||
|
{ok, #state{ top=Top, dir=Dir, nursery=Nur, opt=Opts, max_level=Max }}.
|
||||||
|
|
||||||
|
|
||||||
open_levels(Dir, Options) ->
|
open_levels(Dir, Options) ->
|
||||||
{ok, Files} = file:list_dir(Dir),
|
{ok, Files} = file:list_dir(Dir),
|
||||||
|
TopLevel0 = get_opt(top_level, Options, ?TOP_LEVEL),
|
||||||
|
|
||||||
%% parse file names and find max level
|
%% parse file names and find max level
|
||||||
{MinLevel, MaxLevel} =
|
{MinLevel, MaxLevel} =
|
||||||
|
@ -308,7 +311,7 @@ open_levels(Dir, Options) ->
|
||||||
{MinLevel, MaxLevel}
|
{MinLevel, MaxLevel}
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
{?TOP_LEVEL, ?TOP_LEVEL},
|
{TopLevel0, TopLevel0},
|
||||||
Files),
|
Files),
|
||||||
|
|
||||||
%% remove old nursery data file
|
%% remove old nursery data file
|
||||||
|
@ -323,17 +326,17 @@ open_levels(Dir, Options) ->
|
||||||
{Level, MergeWork}
|
{Level, MergeWork}
|
||||||
end,
|
end,
|
||||||
{undefined, 0},
|
{undefined, 0},
|
||||||
lists:seq(MaxLevel, min(?TOP_LEVEL, MinLevel), -1)),
|
lists:seq(MaxLevel, MinLevel, -1)),
|
||||||
WorkPerIter = (MaxLevel - MinLevel + 1) * ?BTREE_SIZE(?TOP_LEVEL),
|
WorkPerIter = (MaxLevel - MinLevel + 1) * ?BTREE_SIZE(MinLevel),
|
||||||
% error_logger:info_msg("do_merge ... {~p,~p,~p}~n", [TopLevel, WorkPerIter, MaxMerge]),
|
% error_logger:info_msg("do_merge ... {~p,~p,~p}~n", [TopLevel, WorkPerIter, MaxMerge]),
|
||||||
do_merge(TopLevel, WorkPerIter, MaxMerge),
|
do_merge(TopLevel, WorkPerIter, MaxMerge, MinLevel),
|
||||||
{ok, TopLevel, MaxLevel}.
|
{ok, TopLevel, MinLevel, MaxLevel}.
|
||||||
|
|
||||||
do_merge(TopLevel, _Inc, N) when N =< 0 ->
|
do_merge(TopLevel, _Inc, N, _MinLevel) when N =< 0 ->
|
||||||
ok = hanoidb_level:await_incremental_merge(TopLevel);
|
ok = hanoidb_level:await_incremental_merge(TopLevel);
|
||||||
do_merge(TopLevel, Inc, N) ->
|
do_merge(TopLevel, Inc, N, MinLevel) ->
|
||||||
ok = hanoidb_level:begin_incremental_merge(TopLevel, ?BTREE_SIZE(?TOP_LEVEL)),
|
ok = hanoidb_level:begin_incremental_merge(TopLevel, ?BTREE_SIZE(MinLevel)),
|
||||||
do_merge(TopLevel, Inc, N-Inc).
|
do_merge(TopLevel, Inc, N-Inc, MinLevel).
|
||||||
|
|
||||||
|
|
||||||
parse_level(FileName) ->
|
parse_level(FileName) ->
|
||||||
|
@ -413,7 +416,8 @@ handle_call(close, _From, State=#state{ nursery=undefined }) ->
|
||||||
handle_call(close, _From, State=#state{ nursery=Nursery, top=Top, dir=Dir, max_level=MaxLevel, opt=Config }) ->
|
handle_call(close, _From, State=#state{ nursery=Nursery, top=Top, dir=Dir, max_level=MaxLevel, opt=Config }) ->
|
||||||
try
|
try
|
||||||
ok = hanoidb_nursery:finish(Nursery, Top),
|
ok = hanoidb_nursery:finish(Nursery, Top),
|
||||||
{ok, Nursery2} = hanoidb_nursery:new(Dir, MaxLevel, Config),
|
MinLevel = hanoidb_level:level(Top),
|
||||||
|
{ok, Nursery2} = hanoidb_nursery:new(Dir, MinLevel, MaxLevel, Config),
|
||||||
ok = hanoidb_level:close(Top),
|
ok = hanoidb_level:close(Top),
|
||||||
{stop, normal, ok, State#state{ nursery=Nursery2 }}
|
{stop, normal, ok, State#state{ nursery=Nursery2 }}
|
||||||
catch
|
catch
|
||||||
|
@ -423,9 +427,10 @@ handle_call(close, _From, State=#state{ nursery=Nursery, top=Top, dir=Dir, max_l
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_call(destroy, _From, State=#state{top=Top, nursery=Nursery }) ->
|
handle_call(destroy, _From, State=#state{top=Top, nursery=Nursery }) ->
|
||||||
|
TopLevelNumber = hanoidb_level:level(Top),
|
||||||
ok = hanoidb_nursery:destroy(Nursery),
|
ok = hanoidb_nursery:destroy(Nursery),
|
||||||
ok = hanoidb_level:destroy(Top),
|
ok = hanoidb_level:destroy(Top),
|
||||||
{stop, normal, ok, State#state{ top=undefined, nursery=undefined, max_level=?TOP_LEVEL }}.
|
{stop, normal, ok, State#state{ top=undefined, nursery=undefined, max_level=TopLevelNumber }}.
|
||||||
|
|
||||||
-spec do_put(key(), value(), expiry(), #state{}) -> {ok, #state{}}.
|
-spec do_put(key(), value(), expiry(), #state{}) -> {ok, #state{}}.
|
||||||
do_put(Key, Value, Expiry, State=#state{ nursery=Nursery, top=Top }) when Nursery =/= undefined ->
|
do_put(Key, Value, Expiry, State=#state{ nursery=Nursery, top=Top }) when Nursery =/= undefined ->
|
||||||
|
|
|
@ -23,8 +23,8 @@
|
||||||
%% ----------------------------------------------------------------------------
|
%% ----------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
%% smallest levels are 256 entries
|
%% smallest levels are 1024 entries
|
||||||
-define(TOP_LEVEL, 8).
|
-define(TOP_LEVEL, 10).
|
||||||
-define(BTREE_SIZE(Level), (1 bsl (Level))).
|
-define(BTREE_SIZE(Level), (1 bsl (Level))).
|
||||||
-define(FILE_FORMAT, <<"HAN2">>).
|
-define(FILE_FORMAT, <<"HAN2">>).
|
||||||
-define(FIRST_BLOCK_POS, byte_size(?FILE_FORMAT)).
|
-define(FIRST_BLOCK_POS, byte_size(?FILE_FORMAT)).
|
||||||
|
@ -54,6 +54,7 @@
|
||||||
total_size=0 :: integer(),
|
total_size=0 :: integer(),
|
||||||
count=0 :: integer(),
|
count=0 :: integer(),
|
||||||
last_sync=now() :: erlang:timestamp(),
|
last_sync=now() :: erlang:timestamp(),
|
||||||
|
min_level :: integer(),
|
||||||
max_level :: integer(),
|
max_level :: integer(),
|
||||||
config=[] :: [{atom(), term()}],
|
config=[] :: [{atom(), term()}],
|
||||||
step=0 :: integer(),
|
step=0 :: integer(),
|
||||||
|
|
|
@ -46,7 +46,7 @@
|
||||||
|
|
||||||
-export([open/5, lookup/2, lookup/3, inject/2, close/1, snapshot_range/3, blocking_range/3,
|
-export([open/5, lookup/2, lookup/3, inject/2, close/1, snapshot_range/3, blocking_range/3,
|
||||||
begin_incremental_merge/2, await_incremental_merge/1, set_max_level/2,
|
begin_incremental_merge/2, await_incremental_merge/1, set_max_level/2,
|
||||||
unmerged_count/1, destroy/1]).
|
unmerged_count/1, destroy/1, level/1]).
|
||||||
|
|
||||||
-include_lib("kernel/include/file.hrl").
|
-include_lib("kernel/include/file.hrl").
|
||||||
|
|
||||||
|
@ -90,6 +90,9 @@ open(Dir,Level,Next,Opts,Owner) when Level>0 ->
|
||||||
SpawnOpt),
|
SpawnOpt),
|
||||||
{ok, PID}.
|
{ok, PID}.
|
||||||
|
|
||||||
|
level(Ref) ->
|
||||||
|
plain_rpc:call(Ref, level).
|
||||||
|
|
||||||
lookup(Ref, Key) ->
|
lookup(Ref, Key) ->
|
||||||
plain_rpc:call(Ref, {lookup, Key}).
|
plain_rpc:call(Ref, {lookup, Key}).
|
||||||
|
|
||||||
|
@ -356,6 +359,11 @@ main_loop(State = #state{ next=Next }) ->
|
||||||
->
|
->
|
||||||
do_step(StepFrom, DoneWork, StepSize, State);
|
do_step(StepFrom, DoneWork, StepSize, State);
|
||||||
|
|
||||||
|
%% simply replies the level number
|
||||||
|
?CALL(From, level) ->
|
||||||
|
plain_rpc:send_reply(From, State#state.level),
|
||||||
|
main_loop(State);
|
||||||
|
|
||||||
{MRef, step_done} when MRef == State#state.step_merge_ref ->
|
{MRef, step_done} when MRef == State#state.step_merge_ref ->
|
||||||
demonitor(MRef, [flush]),
|
demonitor(MRef, [flush]),
|
||||||
|
|
||||||
|
|
|
@ -25,40 +25,39 @@
|
||||||
-module(hanoidb_nursery).
|
-module(hanoidb_nursery).
|
||||||
-author('Kresten Krab Thorup <krab@trifork.com>').
|
-author('Kresten Krab Thorup <krab@trifork.com>').
|
||||||
|
|
||||||
-export([new/3, recover/4, finish/2, lookup/2, add/4, add/5]).
|
-export([new/4, recover/5, finish/2, lookup/2, add/4, add/5]).
|
||||||
-export([do_level_fold/3, set_max_level/2, transact/3, destroy/1]).
|
-export([do_level_fold/3, set_max_level/2, transact/3, destroy/1]).
|
||||||
|
|
||||||
-include("include/hanoidb.hrl").
|
-include("include/hanoidb.hrl").
|
||||||
-include("hanoidb.hrl").
|
-include("hanoidb.hrl").
|
||||||
-include_lib("kernel/include/file.hrl").
|
-include_lib("kernel/include/file.hrl").
|
||||||
|
|
||||||
-spec new(string(), integer(), [_]) -> {ok, #nursery{}} | {error, term()}.
|
-spec new(string(), integer(), integer(), [_]) -> {ok, #nursery{}} | {error, term()}.
|
||||||
|
|
||||||
-define(LOGFILENAME(Dir), filename:join(Dir, "nursery.log")).
|
-define(LOGFILENAME(Dir), filename:join(Dir, "nursery.log")).
|
||||||
|
|
||||||
%% do incremental merge every this many inserts
|
%% do incremental merge every this many inserts
|
||||||
%% this value *must* be less than or equal to
|
%% this value *must* be less than or equal to
|
||||||
%% 2^TOP_LEVEL == ?BTREE_SIZE(?TOP_LEVEL)
|
%% 2^TOP_LEVEL == ?BTREE_SIZE(?TOP_LEVEL)
|
||||||
-define(INC_MERGE_STEP, ?BTREE_SIZE(?TOP_LEVEL)/2).
|
-define(INC_MERGE_STEP, ?BTREE_SIZE(MinLevel) div 2).
|
||||||
|
|
||||||
new(Directory, MaxLevel, Config) ->
|
new(Directory, MinLevel, MaxLevel, Config) ->
|
||||||
hanoidb_util:ensure_expiry(Config),
|
hanoidb_util:ensure_expiry(Config),
|
||||||
|
|
||||||
{ok, File} = file:open(?LOGFILENAME(Directory),
|
{ok, File} = file:open(?LOGFILENAME(Directory),
|
||||||
[raw, exclusive, write, delayed_write, append]),
|
[raw, exclusive, write, delayed_write, append]),
|
||||||
{ok, #nursery{ log_file=File, dir=Directory, cache= gb_trees:empty(),
|
{ok, #nursery{ log_file=File, dir=Directory, cache= gb_trees:empty(),
|
||||||
max_level=MaxLevel, config=Config }}.
|
min_level=MinLevel, max_level=MaxLevel, config=Config }}.
|
||||||
|
|
||||||
|
|
||||||
recover(Directory, TopLevel, MaxLevel, Config) ->
|
recover(Directory, TopLevel, MinLevel, MaxLevel, Config) ->
|
||||||
hanoidb_util:ensure_expiry(Config),
|
hanoidb_util:ensure_expiry(Config),
|
||||||
|
|
||||||
case file:read_file_info(?LOGFILENAME(Directory)) of
|
case file:read_file_info(?LOGFILENAME(Directory)) of
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
ok = do_recover(Directory, TopLevel, MaxLevel, Config),
|
ok = do_recover(Directory, TopLevel, MaxLevel, Config),
|
||||||
new(Directory, MaxLevel, Config);
|
new(Directory, MinLevel, MaxLevel, Config);
|
||||||
{error, enoent} ->
|
{error, enoent} ->
|
||||||
new(Directory, MaxLevel, Config)
|
new(Directory, MinLevel, MaxLevel, Config)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_recover(Directory, TopLevel, MaxLevel, Config) ->
|
do_recover(Directory, TopLevel, MaxLevel, Config) ->
|
||||||
|
@ -175,7 +174,7 @@ lookup(Key, #nursery{cache=Cache}) ->
|
||||||
%% @end
|
%% @end
|
||||||
-spec finish(Nursery::#nursery{}, TopLevel::pid()) -> ok.
|
-spec finish(Nursery::#nursery{}, TopLevel::pid()) -> ok.
|
||||||
finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile, merge_done=DoneMerge,
|
finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile, merge_done=DoneMerge,
|
||||||
count=Count, config=Config }, TopLevel) ->
|
count=Count, config=Config, min_level=MinLevel }, TopLevel) ->
|
||||||
|
|
||||||
hanoidb_util:ensure_expiry(Config),
|
hanoidb_util:ensure_expiry(Config),
|
||||||
|
|
||||||
|
@ -189,7 +188,7 @@ finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile, merge_done=DoneMerge,
|
||||||
N when N > 0 ->
|
N when N > 0 ->
|
||||||
%% next, flush cache to a new BTree
|
%% next, flush cache to a new BTree
|
||||||
BTreeFileName = filename:join(Dir, "nursery.data"),
|
BTreeFileName = filename:join(Dir, "nursery.data"),
|
||||||
{ok, BT} = hanoidb_writer:open(BTreeFileName, [{size, ?BTREE_SIZE(?TOP_LEVEL)},
|
{ok, BT} = hanoidb_writer:open(BTreeFileName, [{size, ?BTREE_SIZE(MinLevel)},
|
||||||
{compress, none} | Config]),
|
{compress, none} | Config]),
|
||||||
try
|
try
|
||||||
ok = gb_trees_ext:fold(fun(Key, Value, Acc) ->
|
ok = gb_trees_ext:fold(fun(Key, Value, Acc) ->
|
||||||
|
@ -205,10 +204,10 @@ finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile, merge_done=DoneMerge,
|
||||||
|
|
||||||
%% Issue some work if this is a top-level inject (blocks until previous such
|
%% Issue some work if this is a top-level inject (blocks until previous such
|
||||||
%% incremental merge is finished).
|
%% incremental merge is finished).
|
||||||
if DoneMerge >= ?BTREE_SIZE(?TOP_LEVEL) ->
|
if DoneMerge >= ?BTREE_SIZE(MinLevel) ->
|
||||||
ok;
|
ok;
|
||||||
true ->
|
true ->
|
||||||
hanoidb_level:begin_incremental_merge(TopLevel, ?BTREE_SIZE(?TOP_LEVEL) - DoneMerge)
|
hanoidb_level:begin_incremental_merge(TopLevel, ?BTREE_SIZE(MinLevel) - DoneMerge)
|
||||||
end;
|
end;
|
||||||
% {ok, _Nursery2} = do_inc_merge(Nursery, Count, TopLevel);
|
% {ok, _Nursery2} = do_inc_merge(Nursery, Count, TopLevel);
|
||||||
|
|
||||||
|
@ -247,13 +246,13 @@ add(Key, Value, Expiry, Nursery, Top) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec flush(#nursery{}, pid()) -> {ok, #nursery{}}.
|
-spec flush(#nursery{}, pid()) -> {ok, #nursery{}}.
|
||||||
flush(Nursery=#nursery{ dir=Dir, max_level=MaxLevel, config=Config }, Top) ->
|
flush(Nursery=#nursery{ dir=Dir, min_level=MinLevel, max_level=MaxLevel, config=Config }, Top) ->
|
||||||
ok = finish(Nursery, Top),
|
ok = finish(Nursery, Top),
|
||||||
{error, enoent} = file:read_file_info(filename:join(Dir, "nursery.log")),
|
{error, enoent} = file:read_file_info(filename:join(Dir, "nursery.log")),
|
||||||
hanoidb_nursery:new(Dir, MaxLevel, Config).
|
hanoidb_nursery:new(Dir, MinLevel, MaxLevel, Config).
|
||||||
|
|
||||||
has_room(#nursery{ count=Count }, N) ->
|
has_room(#nursery{ count=Count, min_level=MinLevel }, N) ->
|
||||||
(Count + N + 1) < ?BTREE_SIZE(?TOP_LEVEL).
|
(Count + N + 1) < ?BTREE_SIZE(MinLevel).
|
||||||
|
|
||||||
ensure_space(Nursery, NeededRoom, Top) ->
|
ensure_space(Nursery, NeededRoom, Top) ->
|
||||||
case has_room(Nursery, NeededRoom) of
|
case has_room(Nursery, NeededRoom) of
|
||||||
|
@ -303,7 +302,7 @@ transact1(Spec, Nursery1=#nursery{ log_file=File, cache=Cache0, total_size=Total
|
||||||
|
|
||||||
do_inc_merge(Nursery2#nursery{ cache=Cache2, total_size=TotalSize+erlang:iolist_size(Data), count=Count }, length(Spec), Top).
|
do_inc_merge(Nursery2#nursery{ cache=Cache2, total_size=TotalSize+erlang:iolist_size(Data), count=Count }, length(Spec), Top).
|
||||||
|
|
||||||
do_inc_merge(Nursery=#nursery{ step=Step, merge_done=Done }, N, TopLevel) ->
|
do_inc_merge(Nursery=#nursery{ step=Step, merge_done=Done, min_level=MinLevel }, N, TopLevel) ->
|
||||||
if Step+N >= ?INC_MERGE_STEP ->
|
if Step+N >= ?INC_MERGE_STEP ->
|
||||||
hanoidb_level:begin_incremental_merge(TopLevel, Step + N),
|
hanoidb_level:begin_incremental_merge(TopLevel, Step + N),
|
||||||
{ok, Nursery#nursery{ step=0, merge_done=Done + Step + N }};
|
{ok, Nursery#nursery{ step=0, merge_done=Done + Step + N }};
|
||||||
|
|
Loading…
Reference in a new issue