WIP: still chasing the bug...
This commit is contained in:
parent
a79dde264f
commit
102c518269
4 changed files with 51 additions and 57 deletions
|
@ -346,7 +346,6 @@ terminate(normal, _State) ->
|
||||||
ok;
|
ok;
|
||||||
terminate(_Reason, _State) ->
|
terminate(_Reason, _State) ->
|
||||||
error_logger:info_msg("got terminate(~p, ~p)~n", [_Reason, _State]),
|
error_logger:info_msg("got terminate(~p, ~p)~n", [_Reason, _State]),
|
||||||
% flush_nursery(State),
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
|
@ -386,11 +385,11 @@ handle_call({get, Key}, From, State=#state{ top=Top, nursery=Nursery } ) when is
|
||||||
{noreply, State}
|
{noreply, State}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_call(close, _From, State=#state{top=Top}) ->
|
handle_call(close, _From, State=#state{ nursery=Nursery, top=Top, dir=Dir, max_level=MaxLevel, opt=Config }) ->
|
||||||
try
|
try
|
||||||
{ok, State2} = flush_nursery(State),
|
ok = hanoidb_nursery:finish(Nursery, Top),
|
||||||
ok = hanoidb_level:close(Top),
|
ok = hanoidb_level:close(Top),
|
||||||
{stop, normal, ok, State2}
|
{stop, normal, ok, State#state{ nursery=hanoidb_nursery:new(Dir, MaxLevel, Config)}}
|
||||||
catch
|
catch
|
||||||
E:R ->
|
E:R ->
|
||||||
error_logger:info_msg("exception from close ~p:~p~n", [E,R]),
|
error_logger:info_msg("exception from close ~p:~p~n", [E,R]),
|
||||||
|
@ -417,11 +416,6 @@ do_transact(TransactionSpec, State=#state{ nursery=Nursery, top=Top }) ->
|
||||||
{ok, Nursery2} = hanoidb_nursery:transact(TransactionSpec, Nursery, Top),
|
{ok, Nursery2} = hanoidb_nursery:transact(TransactionSpec, Nursery, Top),
|
||||||
{ok, State#state{ nursery=Nursery2 }}.
|
{ok, State#state{ nursery=Nursery2 }}.
|
||||||
|
|
||||||
flush_nursery(State=#state{ nursery=Nursery, top=Top, dir=Dir, max_level=MaxLevel, opt=Config }) ->
|
|
||||||
ok = hanoidb_nursery:finish(Nursery, Top),
|
|
||||||
{ok, Nursery2} = hanoidb_nursery:new(Dir, MaxLevel, Config),
|
|
||||||
{ok, State#state{ nursery=Nursery2 }}.
|
|
||||||
|
|
||||||
start_app() ->
|
start_app() ->
|
||||||
case application:start(?MODULE) of
|
case application:start(?MODULE) of
|
||||||
ok ->
|
ok ->
|
||||||
|
|
|
@ -131,9 +131,9 @@ do_add(Nursery=#nursery{log_file=File, cache=Cache, total_size=TotalSize, count=
|
||||||
ok = file:write(File, Data),
|
ok = file:write(File, Data),
|
||||||
Nursery1 = do_sync(File, Nursery),
|
Nursery1 = do_sync(File, Nursery),
|
||||||
|
|
||||||
{ok, Nursery2} =
|
{ok, Nursery2} = do_inc_merge(Nursery1#nursery{ cache=Cache2,
|
||||||
do_inc_merge(Nursery1#nursery{ cache=Cache2, total_size=TotalSize+erlang:iolist_size(Data),
|
total_size=TotalSize + erlang:iolist_size(Data),
|
||||||
count=Count+1 }, 1, Top),
|
count=Count + 1 }, 1, Top),
|
||||||
|
|
||||||
if Count+1 >= ?BTREE_SIZE(?TOP_LEVEL) ->
|
if Count+1 >= ?BTREE_SIZE(?TOP_LEVEL) ->
|
||||||
{full, Nursery2};
|
{full, Nursery2};
|
||||||
|
@ -199,10 +199,10 @@ finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile,
|
||||||
{ok, BT} = hanoidb_writer:open(BTreeFileName, [{size, ?BTREE_SIZE(?TOP_LEVEL)},
|
{ok, BT} = hanoidb_writer:open(BTreeFileName, [{size, ?BTREE_SIZE(?TOP_LEVEL)},
|
||||||
{compress, none} | Config]),
|
{compress, none} | Config]),
|
||||||
try
|
try
|
||||||
[] = gb_trees_ext:fold(fun(Key, Value, Acc) ->
|
ok = gb_trees_ext:fold(fun(Key, Value, Acc) ->
|
||||||
ok = hanoidb_writer:add(BT, Key, Value),
|
ok = hanoidb_writer:add(BT, Key, Value),
|
||||||
Acc
|
Acc
|
||||||
end, [], Cache)
|
end, ok, Cache)
|
||||||
after
|
after
|
||||||
ok = hanoidb_writer:close(BT)
|
ok = hanoidb_writer:close(BT)
|
||||||
end,
|
end,
|
||||||
|
@ -286,18 +286,15 @@ transact(Spec, Nursery=#nursery{ log_file=File, cache=Cache0, total_size=TotalSi
|
||||||
|
|
||||||
Count = gb_trees:size(Cache2),
|
Count = gb_trees:size(Cache2),
|
||||||
|
|
||||||
do_inc_merge(Nursery2#nursery{ cache=Cache2, total_size=TotalSize+erlang:iolist_size(Data), count=Count },
|
do_inc_merge(Nursery2#nursery{ cache=Cache2, total_size=TotalSize+erlang:iolist_size(Data), count=Count }, length(Spec), Top).
|
||||||
length(Spec), Top).
|
|
||||||
|
|
||||||
do_inc_merge(Nursery=#nursery{ step=Step, merge_done=Done }, N, TopLevel) ->
|
do_inc_merge(Nursery=#nursery{ step=Step, merge_done=Done }, N, TopLevel) ->
|
||||||
case Step+N >= ?INC_MERGE_STEP of
|
case Step+N >= ?INC_MERGE_STEP of
|
||||||
true ->
|
true ->
|
||||||
io:format("do_inc_merge: true ~p ~p ~p~n", [Step, N, ?INC_MERGE_STEP]),
|
hanoidb_level:begin_incremental_merge(TopLevel, Step + N),
|
||||||
hanoidb_level:begin_incremental_merge(TopLevel, Step+N),
|
{ok, Nursery#nursery{ step=0, merge_done=Done + Step + N }};
|
||||||
{ok, Nursery#nursery{ step=0, merge_done=Done+Step+N }};
|
|
||||||
false ->
|
false ->
|
||||||
io:format("do_inc_merge: false ~p ~p ~p~n", [Step, N, ?INC_MERGE_STEP]),
|
{ok, Nursery#nursery{ step=Step + N }}
|
||||||
{ok, Nursery#nursery{ step=Step+N }}
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_level_fold(#nursery{cache=Cache}, FoldWorkerPID, KeyRange) ->
|
do_level_fold(#nursery{cache=Cache}, FoldWorkerPID, KeyRange) ->
|
||||||
|
|
|
@ -40,9 +40,7 @@
|
||||||
-define(TAG_DELETED2, 16#85).
|
-define(TAG_DELETED2, 16#85).
|
||||||
-define(TAG_END, 16#FF).
|
-define(TAG_END, 16#FF).
|
||||||
|
|
||||||
-compile({inline, [
|
-compile({inline, [crc_encapsulate/1, crc_encapsulate_kv_entry/2 ]}).
|
||||||
crc_encapsulate/1, crc_encapsulate_kv_entry/2
|
|
||||||
]}).
|
|
||||||
|
|
||||||
|
|
||||||
index_file_name(Name) ->
|
index_file_name(Name) ->
|
||||||
|
@ -56,22 +54,22 @@ file_exists(FileName) ->
|
||||||
false
|
false
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
estimate_node_size_increment(_KVList, Key, {Value, _TStamp})
|
||||||
|
when is_integer(Value) -> byte_size(Key) + 5 + 4;
|
||||||
|
estimate_node_size_increment(_KVList, Key, {Value, _TStamp})
|
||||||
estimate_node_size_increment(_KVList,Key,Value) ->
|
when is_binary(Value) -> byte_size(Key) + 5 + 4 + byte_size(Value);
|
||||||
byte_size(Key)
|
estimate_node_size_increment(_KVList, Key, {Value, _TStamp})
|
||||||
+ 10
|
when is_atom(Value) -> byte_size(Key) + 8 + 4;
|
||||||
+ if
|
estimate_node_size_increment(_KVList, Key, {Value, _TStamp})
|
||||||
is_integer(Value) ->
|
when is_tuple(Value) -> byte_size(Key) + 13 + 4;
|
||||||
5;
|
estimate_node_size_increment(_KVList, Key, Value)
|
||||||
is_binary(Value) ->
|
when is_integer(Value) -> byte_size(Key) + 5 + 4;
|
||||||
5 + byte_size(Value);
|
estimate_node_size_increment(_KVList, Key, Value)
|
||||||
is_atom(Value) ->
|
when is_binary(Value) -> byte_size(Key) + 5 + 4 + byte_size(Value);
|
||||||
8;
|
estimate_node_size_increment(_KVList, Key, Value)
|
||||||
is_tuple(Value) ->
|
when is_atom(Value) -> byte_size(Key) + 8 + 4;
|
||||||
13
|
estimate_node_size_increment(_KVList, Key, Value)
|
||||||
end.
|
when is_tuple(Value) -> byte_size(Key) + 13 + 4.
|
||||||
|
|
||||||
-define(NO_COMPRESSION, 0).
|
-define(NO_COMPRESSION, 0).
|
||||||
-define(SNAPPY_COMPRESSION, 1).
|
-define(SNAPPY_COMPRESSION, 1).
|
||||||
|
|
|
@ -81,12 +81,12 @@ add(Ref, Key, Value) ->
|
||||||
count(Ref) ->
|
count(Ref) ->
|
||||||
gen_server:call(Ref, count, infinity).
|
gen_server:call(Ref, count, infinity).
|
||||||
|
|
||||||
|
%% @doc Close the btree index file
|
||||||
close(Ref) ->
|
close(Ref) ->
|
||||||
gen_server:call(Ref, close, infinity).
|
gen_server:call(Ref, close, infinity).
|
||||||
|
|
||||||
%%%
|
%%%
|
||||||
|
|
||||||
|
|
||||||
init([Name, Options]) ->
|
init([Name, Options]) ->
|
||||||
hanoidb_util:ensure_expiry(Options),
|
hanoidb_util:ensure_expiry(Options),
|
||||||
Size = proplists:get_value(size, Options, 2048),
|
Size = proplists:get_value(size, Options, 2048),
|
||||||
|
@ -94,7 +94,7 @@ init([Name, Options]) ->
|
||||||
case do_open(Name, Options, [exclusive]) of
|
case do_open(Name, Options, [exclusive]) of
|
||||||
{ok, IdxFile} ->
|
{ok, IdxFile} ->
|
||||||
file:write(IdxFile, ?FILE_FORMAT),
|
file:write(IdxFile, ?FILE_FORMAT),
|
||||||
BloomFilter = bloom:new(erlang:min(Size, 16#ffffffff), 0.001),
|
BloomFilter = bloom:new(erlang:min(Size, 16#ffffffff), 0.01),
|
||||||
BlockSize = hanoidb:get_opt(block_size, Options, ?NODE_SIZE),
|
BlockSize = hanoidb:get_opt(block_size, Options, ?NODE_SIZE),
|
||||||
{ok, #state{ name=Name,
|
{ok, #state{ name=Name,
|
||||||
index_file_pos=?FIRST_BLOCK_POS, index_file=IdxFile,
|
index_file_pos=?FIRST_BLOCK_POS, index_file=IdxFile,
|
||||||
|
@ -109,34 +109,39 @@ init([Name, Options]) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
handle_cast({add, Key, Value}, State)
|
handle_cast({add, Key, {?TOMBSTONE, TStamp}}, State)
|
||||||
when is_binary(Key), (is_binary(Value) orelse Value == ?TOMBSTONE)->
|
when is_binary(Key) ->
|
||||||
{ok, State2} = add_record(0, Key, Value, State),
|
{ok, State2} = add_record(0, Key, {?TOMBSTONE, TStamp}, State),
|
||||||
|
{noreply, State2};
|
||||||
|
handle_cast({add, Key, ?TOMBSTONE}, State)
|
||||||
|
when is_binary(Key) ->
|
||||||
|
{ok, State2} = add_record(0, Key, ?TOMBSTONE, State),
|
||||||
{noreply, State2};
|
{noreply, State2};
|
||||||
handle_cast({add, Key, {Value, TStamp}}, State)
|
handle_cast({add, Key, {Value, TStamp}}, State)
|
||||||
when is_binary(Key), (is_binary(Value) orelse Value == ?TOMBSTONE)->
|
when is_binary(Key), is_binary(Value) ->
|
||||||
{ok, State2} = add_record(0, Key, {Value, TStamp}, State),
|
{ok, State2} = add_record(0, Key, {Value, TStamp}, State),
|
||||||
|
{noreply, State2};
|
||||||
|
handle_cast({add, Key, Value}, State)
|
||||||
|
when is_binary(Key), is_binary(Value) ->
|
||||||
|
{ok, State2} = add_record(0, Key, Value, State),
|
||||||
{noreply, State2}.
|
{noreply, State2}.
|
||||||
|
|
||||||
handle_call(count, _From, State = #state{ value_count=VC, tombstone_count=TC }) ->
|
handle_call(count, _From, State = #state{ value_count=VC, tombstone_count=TC }) ->
|
||||||
{ok, VC+TC, State};
|
{ok, VC+TC, State};
|
||||||
|
|
||||||
handle_call(close, _From, State) ->
|
handle_call(close, _From, State) ->
|
||||||
{ok, State2} = flush_nodes(State),
|
{ok, State2} = flush_nodes(State),
|
||||||
{stop,normal,ok,State2}.
|
{stop, normal, ok, State2}.
|
||||||
|
|
||||||
handle_info(Info,State) ->
|
handle_info(Info, State) ->
|
||||||
error_logger:error_msg("Unknown info ~p~n", [Info]),
|
error_logger:error_msg("Unknown info ~p~n", [Info]),
|
||||||
{stop,bad_msg,State}.
|
{stop, bad_msg, State}.
|
||||||
|
|
||||||
|
|
||||||
terminate(normal,_State) ->
|
terminate(normal,_State) ->
|
||||||
ok;
|
ok;
|
||||||
|
|
||||||
%% premature delete -> cleanup
|
|
||||||
terminate(_Reason, State) ->
|
terminate(_Reason, State) ->
|
||||||
file:close( State#state.index_file ),
|
%% premature delete -> cleanup
|
||||||
file:delete( hanoidb_util:index_file_name(State#state.name) ).
|
file:close(State#state.index_file),
|
||||||
|
file:delete(hanoidb_util:index_file_name(State#state.name)).
|
||||||
|
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
{ok, State}.
|
{ok, State}.
|
||||||
|
@ -174,7 +179,6 @@ do_open(Name, Options, OpenOpts) ->
|
||||||
|
|
||||||
%% @doc flush pending nodes and write trailer
|
%% @doc flush pending nodes and write trailer
|
||||||
flush_nodes(#state{ nodes=[], last_node_pos=LastNodePos, last_node_size=_LastNodeSize, bloom=Bloom }=State) ->
|
flush_nodes(#state{ nodes=[], last_node_pos=LastNodePos, last_node_size=_LastNodeSize, bloom=Bloom }=State) ->
|
||||||
|
|
||||||
BloomBin = term_to_binary(Bloom, [compressed]),
|
BloomBin = term_to_binary(Bloom, [compressed]),
|
||||||
BloomSize = byte_size(BloomBin),
|
BloomSize = byte_size(BloomBin),
|
||||||
|
|
||||||
|
@ -213,7 +217,8 @@ add_record(Level, Key, Value,
|
||||||
|
|
||||||
%% Assert that keys are increasing:
|
%% Assert that keys are increasing:
|
||||||
case List of
|
case List of
|
||||||
[] -> ok;
|
[] ->
|
||||||
|
ok;
|
||||||
[{PrevKey,_}|_] ->
|
[{PrevKey,_}|_] ->
|
||||||
if
|
if
|
||||||
(Key >= PrevKey) -> ok;
|
(Key >= PrevKey) -> ok;
|
||||||
|
|
Loading…
Reference in a new issue