Some naming changes and more specific types in read/write path.

This commit is contained in:
Gregory Burd 2012-06-25 14:27:42 +05:30
parent 3140f9f281
commit 0f3c649f80
2 changed files with 32 additions and 31 deletions

View file

@ -38,6 +38,7 @@
-record(node, {level :: non_neg_integer(),
members=[] :: list(any()) }).
-record(index, {file :: port(),
root :: #node{},
bloom :: term(),
@ -122,7 +123,7 @@ fold(Fun, Acc0, #index{file=File}) ->
{ok, Node} = read_node(File,?FIRST_BLOCK_POS),
fold0(File,fun({K,V},Acc) -> Fun(K,V,Acc) end,Node,Acc0).
fold0(File,Fun,#node{level=0,members=List},Acc0) ->
fold0(File,Fun,#node{level=0, members=List},Acc0) ->
Acc1 = lists:foldl(Fun,Acc0,List),
fold1(File,Fun,Acc1);
fold0(File,Fun,_InnerNode,Acc0) ->
@ -371,7 +372,7 @@ find_start(K, KVs) ->
read_node(File, {Pos, Size}) ->
% error_logger:info_msg("read_node ~p ~p ~p~n", [File, Pos, Size]),
{ok, <<_:32, Level:16/unsigned, Data/binary>>} = file:pread(File, Pos, Size),
{ok, <<_:32/unsigned, Level:16/unsigned, Data/binary>>} = file:pread(File, Pos, Size),
hanoidb_util:decode_index_node(Level, Data);
read_node(File, Pos) ->
@ -383,7 +384,7 @@ read_node(File, Pos) ->
read_node(File) ->
% error_logger:info_msg("read_node ~p~n", [File]),
{ok, <<Len:32, Level:16/unsigned>>} = file:read(File, 6),
{ok, <<Len:32/unsigned, Level:16/unsigned>>} = file:read(File, 6),
% error_logger:info_msg("decoded ~p ~p~n", [Len, Level]),
case Len of
0 ->
@ -399,12 +400,12 @@ next_leaf_node(File) ->
eof ->
%% premature end-of-file
eof;
{ok, <<0:32, _:16>>} ->
{ok, <<0:32/unsigned, _:16/unsigned>>} ->
eof;
{ok, <<Len:32, 0:16>>} ->
{ok, <<Len:32/unsigned, 0:16/unsigned>>} ->
{ok, Data} = file:read(File, Len-2),
hanoidb_util:decode_index_node(0, Data);
{ok, <<Len:32, _:16>>} ->
{ok, <<Len:32/unsigned, _:16/unsigned>>} ->
{ok, _} = file:position(File, {cur,Len-2}),
next_leaf_node(File)
end.

View file

@ -116,13 +116,13 @@ handle_cast({add, Key, {?TOMBSTONE, TStamp}}, State)
true ->
State;
false ->
{ok, State2} = add_record(0, Key, {?TOMBSTONE, TStamp}, State),
{ok, State2} = append_node(0, Key, {?TOMBSTONE, TStamp}, State),
State2
end,
{noreply, NewState};
handle_cast({add, Key, ?TOMBSTONE}, State)
when is_binary(Key) ->
{ok, NewState} = add_record(0, Key, ?TOMBSTONE, State),
{ok, NewState} = append_node(0, Key, ?TOMBSTONE, State),
{noreply, NewState};
handle_cast({add, Key, {Value, TStamp}}, State)
when is_binary(Key), is_binary(Value) ->
@ -131,19 +131,19 @@ handle_cast({add, Key, {Value, TStamp}}, State)
true ->
State;
false ->
{ok, State2} = add_record(0, Key, {Value, TStamp}, State),
{ok, State2} = append_node(0, Key, {Value, TStamp}, State),
State2
end,
{noreply, NewState};
handle_cast({add, Key, Value}, State)
when is_binary(Key), is_binary(Value) ->
{ok, State2} = add_record(0, Key, Value, State),
{ok, State2} = append_node(0, Key, Value, State),
{noreply, State2}.
handle_call(count, _From, State = #state{ value_count=VC, tombstone_count=TC }) ->
{ok, VC+TC, State};
handle_call(close, _From, State) ->
{ok, State2} = flush_nodes(State),
{ok, State2} = archive_nodes(State),
{stop, normal, ok, State2}.
handle_info(Info, State) ->
@ -186,13 +186,13 @@ do_open(Name, Options, OpenOpts) ->
%% @doc flush pending nodes and write trailer
flush_nodes(#state{ nodes=[], last_node_pos=LastNodePos, last_node_size=_LastNodeSize, bloom=Bloom, index_file=IdxFile }=State) ->
archive_nodes(#state{ nodes=[], last_node_pos=LastNodePos, last_node_size=_LastNodeSize, bloom=Bloom, index_file=IdxFile }=State) ->
{BloomBin, BloomSize, RootPos} =
case LastNodePos of
undefined ->
%% store contains no entries
ok = file:write(IdxFile, <<0:32,0:16>>),
ok = file:write(IdxFile, <<0:32/unsigned, 0:16/unsigned>>),
FilterSize = bloom:filter_size(Bloom),
{<<FilterSize:32/unsigned>>, 0, ?FIRST_BLOCK_POS};
_ ->
@ -200,29 +200,29 @@ flush_nodes(#state{ nodes=[], last_node_pos=LastNodePos, last_node_size=_LastNod
{EncodedBloom, byte_size(EncodedBloom), LastNodePos}
end,
Trailer = << 0:32, BloomBin/binary, BloomSize:32/unsigned, RootPos:64/unsigned >>,
Trailer = << 0:32/unsigned, BloomBin/binary, BloomSize:32/unsigned, RootPos:64/unsigned >>,
ok = file:write(IdxFile, Trailer),
ok = file:datasync(IdxFile),
ok = file:close(IdxFile),
{ok, State#state{ index_file=undefined, index_file_pos=undefined }};
flush_nodes(State=#state{ nodes=[#node{level=N, members=[{_,{Pos,_Len}}]}], last_node_pos=Pos })
archive_nodes(State=#state{ nodes=[#node{level=N, members=[{_,{Pos,_Len}}]}], last_node_pos=Pos })
when N > 0 ->
%% Ignore this node, its stack consists of one node with one {pos,len} member
flush_nodes(State#state{ nodes=[] });
archive_nodes(State#state{ nodes=[] });
flush_nodes(State) ->
{ok, State2} = close_node(State),
flush_nodes(State2).
archive_nodes(State) ->
{ok, State2} = flush_node_buffer(State),
archive_nodes(State2).
add_record(Level, Key, Value, State=#state{ nodes=[] }) ->
add_record(Level, Key, Value, State#state{ nodes=[ #node{ level=Level } ] });
add_record(Level, Key, Value, State=#state{ nodes=[ #node{level=Level2 } |_]=Stack })
append_node(Level, Key, Value, State=#state{ nodes=[] }) ->
append_node(Level, Key, Value, State#state{ nodes=[ #node{ level=Level } ] });
append_node(Level, Key, Value, State=#state{ nodes=[ #node{level=Level2 } |_]=Stack })
when Level < Level2 ->
add_record(Level, Key, Value, State#state{ nodes=[ #node{ level=(Level2 - 1) } | Stack] });
add_record(Level, Key, Value, #state{ nodes=[ #node{level=Level, members=List, size=NodeSize}=CurrNode | RestNodes ], value_count=VC, tombstone_count=TC, bloom=Bloom }=State) ->
append_node(Level, Key, Value, State#state{ nodes=[ #node{ level=(Level2 - 1) } | Stack] });
append_node(Level, Key, Value, #state{ nodes=[ #node{level=Level, members=List, size=NodeSize}=CurrNode | RestNodes ], value_count=VC, tombstone_count=TC, bloom=Bloom }=State) ->
%% The top-of-stack node is at the level we wish to insert at.
%% Assert that keys are increasing:
@ -263,12 +263,12 @@ add_record(Level, Key, Value, #state{ nodes=[ #node{level=Level, members=List, s
case NewSize >= State#state.block_size of
true ->
close_node(State2);
flush_node_buffer(State2);
false ->
{ok, State2}
end.
close_node(#state{nodes=[#node{ level=Level, members=NodeMembers }|RestNodes], compress=Compress, index_file_pos=NodePos} = State) ->
flush_node_buffer(#state{nodes=[#node{ level=Level, members=NodeMembers }|RestNodes], compress=Compress, index_file_pos=NodePos} = State) ->
OrderedMembers = lists:reverse(NodeMembers),
{ok, BlockData} = hanoidb_util:encode_index_node(OrderedMembers, Compress),
@ -279,8 +279,8 @@ close_node(#state{nodes=[#node{ level=Level, members=NodeMembers }|RestNodes], c
ok = file:write(State#state.index_file, Data),
{FirstKey, _} = hd(OrderedMembers),
add_record(Level + 1, FirstKey, {NodePos, DataSize},
State#state{ nodes = RestNodes,
index_file_pos = NodePos + DataSize,
last_node_pos = NodePos,
last_node_size = DataSize }).
append_node(Level + 1, FirstKey, {NodePos, DataSize},
State#state{ nodes = RestNodes,
index_file_pos = NodePos + DataSize,
last_node_pos = NodePos,
last_node_size = DataSize }).