WIP: more bugs fixed, closer to working again...

This commit is contained in:
Gregory Burd 2012-06-23 15:56:53 +01:00
parent 102c518269
commit 4bcd4b5de1
4 changed files with 66 additions and 59 deletions

View file

@ -41,7 +41,11 @@
-include_lib("include/hanoidb.hrl").
-include_lib("include/plain_rpc.hrl").
-record(state, { top, nursery, dir, opt, max_level }).
-record(state, { top :: pos_integer(),
nursery :: term(),
dir :: string(),
opt :: term(),
max_level :: pos_integer()}).
%% 0 means never expire
-define(DEFAULT_EXPIRY_SECS, 0).
@ -249,25 +253,28 @@ drain_worker_and_return(MRef, PID, Value) ->
init([Dir, Opts0]) ->
%% ensure expory_secs option is set in config
case get_opt(expiry_secs, Opts0) of
undefined ->
Opts = [{expiry_secs, ?DEFAULT_EXPIRY_SECS}|Opts0];
N when is_integer(N), N >= 0 ->
Opts = [{expiry_secs, N}|Opts0]
end,
Opts =
case get_opt(expiry_secs, Opts0) of
undefined ->
[{expiry_secs, ?DEFAULT_EXPIRY_SECS}|Opts0];
N when is_integer(N), N >= 0 ->
[{expiry_secs, N}|Opts0]
end,
hanoidb_util:ensure_expiry(Opts),
case file:read_file_info(Dir) of
{ok, #file_info{ type=directory }} ->
{ok, TopLevel, MaxLevel} = open_levels(Dir, Opts),
{ok, Nursery} = hanoidb_nursery:recover(Dir, TopLevel, MaxLevel, Opts);
{error, E} when E =:= enoent ->
ok = file:make_dir(Dir),
{ok, TopLevel} = hanoidb_level:open(Dir, ?TOP_LEVEL, undefined, Opts, self()),
MaxLevel = ?TOP_LEVEL,
{ok, Nursery} = hanoidb_nursery:new(Dir, MaxLevel, Opts)
end,
{Nursery, MaxLevel, TopLevel} =
case file:read_file_info(Dir) of
{ok, #file_info{ type=directory }} ->
{ok, TL, ML} = open_levels(Dir, Opts),
{ok, N0} = hanoidb_nursery:recover(Dir, TL, ML, Opts),
{N0, ML, TL};
{error, E} when E =:= enoent ->
ok = file:make_dir(Dir),
{ok, TL} = hanoidb_level:open(Dir, ?TOP_LEVEL, undefined, Opts, self()),
ML = ?TOP_LEVEL,
{ok, N0} = hanoidb_nursery:new(Dir, ML, Opts),
{N0, ML, TL}
end,
{ok, #state{ top=TopLevel, dir=Dir, nursery=Nursery, opt=Opts, max_level=MaxLevel }}.

View file

@ -66,9 +66,9 @@ open(Name, Config) ->
case proplists:get_bool(folding, Config) of
true ->
ReadBufferSize = hanoidb:get_opt(read_buffer_size, Config, 512 * 1024),
file:open(Name, [read,{read_ahead, ReadBufferSize},binary]);
file:open(Name, [read, {read_ahead, ReadBufferSize}, binary]);
false ->
file:open(Name, [read,binary])
file:open(Name, [read, binary])
end,
{ok, FileInfo} = file:read_file_info(Name),
@ -77,11 +77,11 @@ open(Name, Config) ->
{ok, ?FILE_FORMAT} = file:pread(File, 0, 4),
%% read root position
{ok, <<RootPos:64/unsigned>>} = file:pread(File, FileInfo#file_info.size-8, 8),
{ok, <<BloomSize:32/unsigned>>} = file:pread(File, FileInfo#file_info.size-12, 4),
{ok, BloomData} = file:pread(File, FileInfo#file_info.size-12-BloomSize ,BloomSize),
{ok, <<RootPos:64/unsigned>>} = file:pread(File, FileInfo#file_info.size - 8, 8),
{ok, <<BloomSize:32/unsigned>>} = file:pread(File, FileInfo#file_info.size - 12, 4),
{ok, BloomData} = file:pread(File, (FileInfo#file_info.size - 12 - BloomSize), BloomSize),
{ok, Bloom} = binary_to_term(BloomData),
{ok, Bloom} = hanoidb_util:decode_bloom(BloomData),
%% suck in the root
{ok, Root} = read_node(File, RootPos),

View file

@ -205,6 +205,15 @@ decode_kv_data(<<?TAG_TRANSACT, Rest/binary>>) ->
{ok, TX} = decode_crc_data(Rest, [], []),
TX.
encode_bloom(Bloom) ->
case bloom:is_bloom(Bloom) of
true -> zlib:gzip(term_to_binary(Bloom));
false -> <<>>
end.
decode_bloom(Bin) ->
binary_to_term(zlib:gunzip(Bin)).
%% @doc Return number of seconds since 1970
-spec tstamp() -> pos_integer().
tstamp() ->

View file

@ -94,11 +94,11 @@ init([Name, Options]) ->
case do_open(Name, Options, [exclusive]) of
{ok, IdxFile} ->
file:write(IdxFile, ?FILE_FORMAT),
BloomFilter = bloom:new(erlang:min(Size, 16#ffffffff), 0.01),
Bloom = bloom:new(erlang:min(Size, 16#ffffffff), 0.01),
BlockSize = hanoidb:get_opt(block_size, Options, ?NODE_SIZE),
{ok, #state{ name=Name,
index_file_pos=?FIRST_BLOCK_POS, index_file=IdxFile,
bloom = BloomFilter,
bloom = Bloom,
block_size = BlockSize,
compress = hanoidb:get_opt(compress, Options, none),
opts = Options
@ -152,23 +152,20 @@ code_change(_OldVsn, State, _Extra) ->
% io:format("serializing ~p @ ~p~n", [State#state.name,
% State#state.index_file_pos]),
serialize(#state{ bloom=Bloom, index_file=File, index_file_pos=Position }=State) ->
%% assert that we're on track
case file:position(File, {eof, 0}) of
{ok, Position} ->
ok;
{ok, WrongPosition} ->
exit({bad_position, Position, WrongPosition})
end,
ok = file:close(File),
erlang:term_to_binary( { State#state{ index_file=closed }, term_to_binary(Bloom) } ).
erlang:term_to_binary( { State#state{ index_file=closed }, hanoidb_util:encode_bloom(Bloom) } ).
deserialize(Binary) ->
{State, BinBloom} = erlang:binary_to_term(Binary),
{ok, Bloom} = term_to_binary(BinBloom),
{State, Bin} = erlang:binary_to_term(Binary),
Bloom = hanoidb_util:decode_bloom(Bin),
{ok, IdxFile} = do_open(State#state.name, State#state.opts, []),
State#state{ bloom = Bloom, index_file=IdxFile }.
State#state{ bloom=Bloom, index_file=IdxFile }.
do_open(Name, Options, OpenOpts) ->
@ -178,10 +175,10 @@ do_open(Name, Options, OpenOpts) ->
%% @doc flush pending nodes and write trailer
flush_nodes(State=#state{ nodes=[#node{level=N, members=[{_,{Pos,_Len}}]}], last_node_pos=Pos }) when N>0 ->
%% stack consists of one node with one {pos,len} member. Just ignore this node.
flush_nodes(State#state{ nodes=[] });
flush_nodes(#state{ nodes=[], last_node_pos=LastNodePos, last_node_size=_LastNodeSize, bloom=Bloom }=State) ->
BloomBin = term_to_binary(Bloom, [compressed]),
BloomSize = byte_size(BloomBin),
IdxFile = State#state.index_file,
RootPos =
@ -194,6 +191,8 @@ flush_nodes(#state{ nodes=[], last_node_pos=LastNodePos, last_node_size=_LastNod
?FIRST_BLOCK_POS
end,
BloomBin = hanoidb_util:encode_bloom(Bloom),
BloomSize = byte_size(BloomBin),
Trailer = << 0:32, BloomBin/binary, BloomSize:32/unsigned, RootPos:64/unsigned >>,
ok = file:write(IdxFile, Trailer),
@ -201,18 +200,17 @@ flush_nodes(#state{ nodes=[], last_node_pos=LastNodePos, last_node_size=_LastNod
ok = file:close(IdxFile),
{ok, State#state{ index_file=undefined, index_file_pos=undefined }};
%% stack consists of one node with one {pos,len} member. Just ignore this node.
flush_nodes(State=#state{ nodes=[#node{level=N, members=[{_,{Pos,_Len}}]}], last_node_pos=Pos }) when N>0 ->
flush_nodes(State#state{ nodes=[] });
flush_nodes(State) ->
{ok, State2} = close_node(State),
flush_nodes(State2).
add_record(Level, Key, Value,
#state{ nodes=[ #node{level=Level, members=List, size=NodeSize}=CurrNode | RestNodes ],
value_count=VC, tombstone_count=TC }=State) ->
add_record(Level, Key, Value, State=#state{ nodes=[] }) ->
add_record(Level, Key, Value, State#state{ nodes=[ #node{ level=Level } ] });
add_record(Level, Key, Value, State=#state{ nodes=[ #node{level=Level2 } |_]=Stack })
when Level < Level2 ->
add_record(Level, Key, Value, State#state{ nodes=[ #node{ level=(Level2 - 1) } | Stack] });
add_record(Level, Key, Value, #state{ nodes=[ #node{level=Level, members=List, size=NodeSize}=CurrNode | RestNodes ], value_count=VC, tombstone_count=TC }=State) ->
%% The top-of-stack node is at the level we wish to insert at.
%% Assert that keys are increasing:
@ -236,10 +234,8 @@ add_record(Level, Key, Value,
end,
{TC1, VC1} =
case Level == 0 of
true ->
{TC, VC};
false ->
case Level of
0 ->
case Value of
?TOMBSTONE ->
{TC+1, VC};
@ -247,7 +243,9 @@ add_record(Level, Key, Value,
{TC+1, VC};
_ ->
{TC, VC+1}
end
end;
_ ->
{TC, VC}
end,
NodeMembers = [{Key, Value} | List],
@ -256,17 +254,10 @@ add_record(Level, Key, Value,
case NewSize >= State#state.block_size of
true ->
{ok, State2};
close_node(State2);
false ->
close_node(State2)
end;
add_record(Level, Key, Value, State=#state{ nodes=[] }) ->
add_record(Level, Key, Value, State#state{ nodes=[ #node{ level=Level } ] });
add_record(Level, Key, Value, State=#state{ nodes=[ #node{level=Level2 } |_]=Stack }) when Level < Level2 ->
add_record(Level, Key, Value, State#state{ nodes=[ #node{ level=(Level2-1) } | Stack] }).
{ok, State2}
end.
close_node(#state{nodes=[#node{ level=Level, members=NodeMembers }|RestNodes], compress=Compress} = State) ->
OrderedMembers = lists:reverse(NodeMembers),