From 4bcd4b5de19404de27afcf3aae343f8aec2770c9 Mon Sep 17 00:00:00 2001 From: Gregory Burd Date: Sat, 23 Jun 2012 15:56:53 +0100 Subject: [PATCH] WIP: more bugs fixed, closer to working again... --- src/hanoidb.erl | 43 ++++++++++++++++------------- src/hanoidb_reader.erl | 12 ++++----- src/hanoidb_util.erl | 9 +++++++ src/hanoidb_writer.erl | 61 ++++++++++++++++++------------------------ 4 files changed, 66 insertions(+), 59 deletions(-) diff --git a/src/hanoidb.erl b/src/hanoidb.erl index e13a4e7..1c7fee1 100644 --- a/src/hanoidb.erl +++ b/src/hanoidb.erl @@ -41,7 +41,11 @@ -include_lib("include/hanoidb.hrl"). -include_lib("include/plain_rpc.hrl"). --record(state, { top, nursery, dir, opt, max_level }). +-record(state, { top :: pos_integer(), + nursery :: term(), + dir :: string(), + opt :: term(), + max_level :: pos_integer()}). %% 0 means never expire -define(DEFAULT_EXPIRY_SECS, 0). @@ -249,25 +253,28 @@ drain_worker_and_return(MRef, PID, Value) -> init([Dir, Opts0]) -> %% ensure expory_secs option is set in config - case get_opt(expiry_secs, Opts0) of - undefined -> - Opts = [{expiry_secs, ?DEFAULT_EXPIRY_SECS}|Opts0]; - N when is_integer(N), N >= 0 -> - Opts = [{expiry_secs, N}|Opts0] - end, - + Opts = + case get_opt(expiry_secs, Opts0) of + undefined -> + [{expiry_secs, ?DEFAULT_EXPIRY_SECS}|Opts0]; + N when is_integer(N), N >= 0 -> + [{expiry_secs, N}|Opts0] + end, hanoidb_util:ensure_expiry(Opts), - case file:read_file_info(Dir) of - {ok, #file_info{ type=directory }} -> - {ok, TopLevel, MaxLevel} = open_levels(Dir, Opts), - {ok, Nursery} = hanoidb_nursery:recover(Dir, TopLevel, MaxLevel, Opts); - {error, E} when E =:= enoent -> - ok = file:make_dir(Dir), - {ok, TopLevel} = hanoidb_level:open(Dir, ?TOP_LEVEL, undefined, Opts, self()), - MaxLevel = ?TOP_LEVEL, - {ok, Nursery} = hanoidb_nursery:new(Dir, MaxLevel, Opts) - end, + {Nursery, MaxLevel, TopLevel} = + case file:read_file_info(Dir) of + {ok, #file_info{ type=directory }} -> + {ok, TL, ML} = open_levels(Dir, Opts), + {ok, N0} = hanoidb_nursery:recover(Dir, TL, ML, Opts), + {N0, ML, TL}; + {error, E} when E =:= enoent -> + ok = file:make_dir(Dir), + {ok, TL} = hanoidb_level:open(Dir, ?TOP_LEVEL, undefined, Opts, self()), + ML = ?TOP_LEVEL, + {ok, N0} = hanoidb_nursery:new(Dir, ML, Opts), + {N0, ML, TL} + end, {ok, #state{ top=TopLevel, dir=Dir, nursery=Nursery, opt=Opts, max_level=MaxLevel }}. diff --git a/src/hanoidb_reader.erl b/src/hanoidb_reader.erl index 9aec261..d8600a0 100644 --- a/src/hanoidb_reader.erl +++ b/src/hanoidb_reader.erl @@ -66,9 +66,9 @@ open(Name, Config) -> case proplists:get_bool(folding, Config) of true -> ReadBufferSize = hanoidb:get_opt(read_buffer_size, Config, 512 * 1024), - file:open(Name, [read,{read_ahead, ReadBufferSize},binary]); + file:open(Name, [read, {read_ahead, ReadBufferSize}, binary]); false -> - file:open(Name, [read,binary]) + file:open(Name, [read, binary]) end, {ok, FileInfo} = file:read_file_info(Name), @@ -77,11 +77,11 @@ open(Name, Config) -> {ok, ?FILE_FORMAT} = file:pread(File, 0, 4), %% read root position - {ok, <>} = file:pread(File, FileInfo#file_info.size-8, 8), - {ok, <>} = file:pread(File, FileInfo#file_info.size-12, 4), - {ok, BloomData} = file:pread(File, FileInfo#file_info.size-12-BloomSize ,BloomSize), + {ok, <>} = file:pread(File, FileInfo#file_info.size - 8, 8), + {ok, <>} = file:pread(File, FileInfo#file_info.size - 12, 4), + {ok, BloomData} = file:pread(File, (FileInfo#file_info.size - 12 - BloomSize), BloomSize), - {ok, Bloom} = binary_to_term(BloomData), + {ok, Bloom} = hanoidb_util:decode_bloom(BloomData), %% suck in the root {ok, Root} = read_node(File, RootPos), diff --git a/src/hanoidb_util.erl b/src/hanoidb_util.erl index 3a53ff3..63051f4 100644 --- a/src/hanoidb_util.erl +++ b/src/hanoidb_util.erl @@ -205,6 +205,15 @@ decode_kv_data(<>) -> {ok, TX} = decode_crc_data(Rest, [], []), TX. +encode_bloom(Bloom) -> + case bloom:is_bloom(Bloom) of + true -> zlib:gzip(term_to_binary(Bloom)); + false -> <<>> + end. + +decode_bloom(Bin) -> + binary_to_term(zlib:gunzip(Bin)). + %% @doc Return number of seconds since 1970 -spec tstamp() -> pos_integer(). tstamp() -> diff --git a/src/hanoidb_writer.erl b/src/hanoidb_writer.erl index a6d60f7..f1d0433 100644 --- a/src/hanoidb_writer.erl +++ b/src/hanoidb_writer.erl @@ -94,11 +94,11 @@ init([Name, Options]) -> case do_open(Name, Options, [exclusive]) of {ok, IdxFile} -> file:write(IdxFile, ?FILE_FORMAT), - BloomFilter = bloom:new(erlang:min(Size, 16#ffffffff), 0.01), + Bloom = bloom:new(erlang:min(Size, 16#ffffffff), 0.01), BlockSize = hanoidb:get_opt(block_size, Options, ?NODE_SIZE), {ok, #state{ name=Name, index_file_pos=?FIRST_BLOCK_POS, index_file=IdxFile, - bloom = BloomFilter, + bloom = Bloom, block_size = BlockSize, compress = hanoidb:get_opt(compress, Options, none), opts = Options @@ -152,23 +152,20 @@ code_change(_OldVsn, State, _Extra) -> % io:format("serializing ~p @ ~p~n", [State#state.name, % State#state.index_file_pos]), serialize(#state{ bloom=Bloom, index_file=File, index_file_pos=Position }=State) -> - - %% assert that we're on track case file:position(File, {eof, 0}) of {ok, Position} -> ok; {ok, WrongPosition} -> exit({bad_position, Position, WrongPosition}) end, - ok = file:close(File), - erlang:term_to_binary( { State#state{ index_file=closed }, term_to_binary(Bloom) } ). + erlang:term_to_binary( { State#state{ index_file=closed }, hanoidb_util:encode_bloom(Bloom) } ). deserialize(Binary) -> - {State, BinBloom} = erlang:binary_to_term(Binary), - {ok, Bloom} = term_to_binary(BinBloom), + {State, Bin} = erlang:binary_to_term(Binary), + Bloom = hanoidb_util:decode_bloom(Bin), {ok, IdxFile} = do_open(State#state.name, State#state.opts, []), - State#state{ bloom = Bloom, index_file=IdxFile }. + State#state{ bloom=Bloom, index_file=IdxFile }. do_open(Name, Options, OpenOpts) -> @@ -178,10 +175,10 @@ do_open(Name, Options, OpenOpts) -> %% @doc flush pending nodes and write trailer +flush_nodes(State=#state{ nodes=[#node{level=N, members=[{_,{Pos,_Len}}]}], last_node_pos=Pos }) when N>0 -> + %% stack consists of one node with one {pos,len} member. Just ignore this node. + flush_nodes(State#state{ nodes=[] }); flush_nodes(#state{ nodes=[], last_node_pos=LastNodePos, last_node_size=_LastNodeSize, bloom=Bloom }=State) -> - BloomBin = term_to_binary(Bloom, [compressed]), - BloomSize = byte_size(BloomBin), - IdxFile = State#state.index_file, RootPos = @@ -194,6 +191,8 @@ flush_nodes(#state{ nodes=[], last_node_pos=LastNodePos, last_node_size=_LastNod ?FIRST_BLOCK_POS end, + BloomBin = hanoidb_util:encode_bloom(Bloom), + BloomSize = byte_size(BloomBin), Trailer = << 0:32, BloomBin/binary, BloomSize:32/unsigned, RootPos:64/unsigned >>, ok = file:write(IdxFile, Trailer), @@ -201,18 +200,17 @@ flush_nodes(#state{ nodes=[], last_node_pos=LastNodePos, last_node_size=_LastNod ok = file:close(IdxFile), {ok, State#state{ index_file=undefined, index_file_pos=undefined }}; - -%% stack consists of one node with one {pos,len} member. Just ignore this node. -flush_nodes(State=#state{ nodes=[#node{level=N, members=[{_,{Pos,_Len}}]}], last_node_pos=Pos }) when N>0 -> - flush_nodes(State#state{ nodes=[] }); - flush_nodes(State) -> {ok, State2} = close_node(State), flush_nodes(State2). -add_record(Level, Key, Value, - #state{ nodes=[ #node{level=Level, members=List, size=NodeSize}=CurrNode | RestNodes ], - value_count=VC, tombstone_count=TC }=State) -> + +add_record(Level, Key, Value, State=#state{ nodes=[] }) -> + add_record(Level, Key, Value, State#state{ nodes=[ #node{ level=Level } ] }); +add_record(Level, Key, Value, State=#state{ nodes=[ #node{level=Level2 } |_]=Stack }) + when Level < Level2 -> + add_record(Level, Key, Value, State#state{ nodes=[ #node{ level=(Level2 - 1) } | Stack] }); +add_record(Level, Key, Value, #state{ nodes=[ #node{level=Level, members=List, size=NodeSize}=CurrNode | RestNodes ], value_count=VC, tombstone_count=TC }=State) -> %% The top-of-stack node is at the level we wish to insert at. %% Assert that keys are increasing: @@ -236,10 +234,8 @@ add_record(Level, Key, Value, end, {TC1, VC1} = - case Level == 0 of - true -> - {TC, VC}; - false -> + case Level of + 0 -> case Value of ?TOMBSTONE -> {TC+1, VC}; @@ -247,7 +243,9 @@ add_record(Level, Key, Value, {TC+1, VC}; _ -> {TC, VC+1} - end + end; + _ -> + {TC, VC} end, NodeMembers = [{Key, Value} | List], @@ -256,17 +254,10 @@ add_record(Level, Key, Value, case NewSize >= State#state.block_size of true -> - {ok, State2}; + close_node(State2); false -> - close_node(State2) - end; - -add_record(Level, Key, Value, State=#state{ nodes=[] }) -> - add_record(Level, Key, Value, State#state{ nodes=[ #node{ level=Level } ] }); - -add_record(Level, Key, Value, State=#state{ nodes=[ #node{level=Level2 } |_]=Stack }) when Level < Level2 -> - add_record(Level, Key, Value, State#state{ nodes=[ #node{ level=(Level2-1) } | Stack] }). - + {ok, State2} + end. close_node(#state{nodes=[#node{ level=Level, members=NodeMembers }|RestNodes], compress=Compress} = State) -> OrderedMembers = lists:reverse(NodeMembers),