Implement hibernation for readers too

This enables all open files in a merge worker
to be closed while it is waiting for work to do.
This commit is contained in:
Kresten Krab Thorup 2012-05-01 02:12:02 +02:00
parent c998e8ca31
commit 01ea88b67c
4 changed files with 90 additions and 31 deletions

View file

@ -723,7 +723,7 @@ start_range_fold(FileName, WorkerPID, Range, State) ->
proc_lib:spawn( fun() -> proc_lib:spawn( fun() ->
?log("start_range_fold ~p on ~p -> ~p", [self, FileName, WorkerPID]), ?log("start_range_fold ~p on ~p -> ~p", [self, FileName, WorkerPID]),
erlang:link(WorkerPID), erlang:link(WorkerPID),
{ok, File} = hanoi_reader:open(FileName, [sequential|State#state.opts]), {ok, File} = hanoi_reader:open(FileName, [folding|State#state.opts]),
do_range_fold(File, WorkerPID, self(), Range), do_range_fold(File, WorkerPID, self(), Range),
erlang:unlink(WorkerPID), erlang:unlink(WorkerPID),
hanoi_reader:close(File), hanoi_reader:close(File),

View file

@ -33,6 +33,13 @@
-include("hanoi.hrl"). -include("hanoi.hrl").
%% A merger which is inactive for this long will sleep
%% which means that it will close open files, and compress
%% current ebloom.
%%
-define(HIBERNATE_TIMEOUT, 5000).
%% %%
%% Most likely, there will be plenty of I/O being generated by %% Most likely, there will be plenty of I/O being generated by
%% concurrent merges, so we default to running the entire merge %% concurrent merges, so we default to running the entire merge
@ -77,7 +84,10 @@ hibernate_scan(Keep) ->
receive receive
{step, From, HowMany} -> {step, From, HowMany} ->
{BT1, BT2, OutBin, IsLastLevel, AKVs, BKVs, Count, N} = erlang:binary_to_term( zlib:gunzip( Keep ) ), {BT1, BT2, OutBin, IsLastLevel, AKVs, BKVs, Count, N} = erlang:binary_to_term( zlib:gunzip( Keep ) ),
scan(BT1, BT2, hanoi_writer:deserialize(OutBin), IsLastLevel, AKVs, BKVs, Count, {N+HowMany, From}) scan(hanoi_reader:deserialize(BT1),
hanoi_reader:deserialize(BT2),
hanoi_writer:deserialize(OutBin),
IsLastLevel, AKVs, BKVs, Count, {N+HowMany, From})
end. end.
scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, {N, FromPID}) when N < 1, AKVs =/= [], BKVs =/= [] -> scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, {N, FromPID}) when N < 1, AKVs =/= [], BKVs =/= [] ->
@ -91,10 +101,12 @@ scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, {N, FromPID}) when N < 1, AK
receive receive
{step, From, HowMany} -> {step, From, HowMany} ->
scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, {N+HowMany, From}) scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, {N+HowMany, From})
after 5000 -> after ?HIBERNATE_TIMEOUT ->
case ?LOCAL_WRITER of case ?LOCAL_WRITER of
true -> true ->
Args = {BT1, BT2, hanoi_writer:serialize(Out), IsLastLevel, AKVs, BKVs, Count, N}, Args = {hanoi_reader:serialize(BT1),
hanoi_reader:serialize(BT2),
hanoi_writer:serialize(Out), IsLastLevel, AKVs, BKVs, Count, N},
Keep = zlib:gzip ( erlang:term_to_binary( Args ) ), Keep = zlib:gzip ( erlang:term_to_binary( Args ) ),
hibernate_scan(Keep); hibernate_scan(Keep);
false -> false ->
@ -161,7 +173,9 @@ hibernate_scan_only(Keep) ->
receive receive
{step, From, HowMany} -> {step, From, HowMany} ->
{BT, OutBin, IsLastLevel, KVs, Count, N} = erlang:binary_to_term( zlib:gunzip( Keep ) ), {BT, OutBin, IsLastLevel, KVs, Count, N} = erlang:binary_to_term( zlib:gunzip( Keep ) ),
scan_only(BT, hanoi_writer:deserialize(OutBin), IsLastLevel, KVs, Count, {N+HowMany, From}) scan_only(hanoi_reader:deserialize(BT),
hanoi_writer:deserialize(OutBin),
IsLastLevel, KVs, Count, {N+HowMany, From})
end. end.
@ -176,8 +190,9 @@ scan_only(BT, Out, IsLastLevel, KVs, Count, {N, FromPID}) when N < 1, KVs =/= []
receive receive
{step, From, HowMany} -> {step, From, HowMany} ->
scan_only(BT, Out, IsLastLevel, KVs, Count, {N+HowMany, From}) scan_only(BT, Out, IsLastLevel, KVs, Count, {N+HowMany, From})
after 5000 -> after ?HIBERNATE_TIMEOUT ->
Args = {BT, hanoi_writer:serialize(Out), IsLastLevel, KVs, Count, N}, Args = {hanoi_reader:serialize(BT),
hanoi_writer:serialize(Out), IsLastLevel, KVs, Count, N},
Keep = zlib:gzip ( erlang:term_to_binary( Args ) ), Keep = zlib:gzip ( erlang:term_to_binary( Args ) ),
hibernate_scan_only(Keep) hibernate_scan_only(Keep)
end; end;

View file

@ -32,9 +32,10 @@
-export([open/1, open/2,close/1,lookup/2,fold/3,range_fold/4]). -export([open/1, open/2,close/1,lookup/2,fold/3,range_fold/4]).
-export([first_node/1,next_node/1]). -export([first_node/1,next_node/1]).
-export([serialize/1, deserialize/1]).
-record(node, { level, members=[] }). -record(node, { level, members=[] }).
-record(index, {file, root, bloom}). -record(index, {file, root, bloom, name, config=[]}).
-type read_file() :: #index{}. -type read_file() :: #index{}.
@ -42,7 +43,7 @@
open(Name) -> open(Name) ->
open(Name, [random]). open(Name, [random]).
-type config() :: [sequential | random | {atom(), term()}]. -type config() :: [sequential | folding | random | {atom(), term()}].
-spec open(Name::string(), config()) -> read_file(). -spec open(Name::string(), config()) -> read_file().
@ -50,24 +51,43 @@ open(Name, Config) ->
case proplists:get_bool(sequential, Config) of case proplists:get_bool(sequential, Config) of
true -> true ->
ReadBufferSize = hanoi:get_opt(read_buffer_size, Config, 512 * 1024), ReadBufferSize = hanoi:get_opt(read_buffer_size, Config, 512 * 1024),
{ok, File} = file:open(Name, [raw,read,{read_ahead, ReadBufferSize},binary]); {ok, File} = file:open(Name, [raw,read,{read_ahead, ReadBufferSize},binary]),
{ok, #index{file=File, name=Name, config=Config}};
false -> false ->
{ok, File} = file:open(Name, [read,binary]) case proplists:get_bool(folding, Config) of
end, true ->
ReadBufferSize = hanoi:get_opt(read_buffer_size, Config, 512 * 1024),
{ok, File} = file:open(Name, [read,{read_ahead, ReadBufferSize},binary]);
false ->
{ok, File} = file:open(Name, [read,binary])
end,
{ok, FileInfo} = file:read_file_info(Name), {ok, FileInfo} = file:read_file_info(Name),
%% read root position %% read root position
{ok, <<RootPos:64/unsigned>>} = file:pread(File, FileInfo#file_info.size-8, 8), {ok, <<RootPos:64/unsigned>>} = file:pread(File, FileInfo#file_info.size-8, 8),
{ok, <<BloomSize:32/unsigned>>} = file:pread(File, FileInfo#file_info.size-12, 4), {ok, <<BloomSize:32/unsigned>>} = file:pread(File, FileInfo#file_info.size-12, 4),
{ok, BloomData} = file:pread(File, FileInfo#file_info.size-12-BloomSize ,BloomSize), {ok, BloomData} = file:pread(File, FileInfo#file_info.size-12-BloomSize ,BloomSize),
{ok, Bloom} = ebloom:deserialize(zlib:unzip(BloomData)), {ok, Bloom} = ebloom:deserialize(zlib:unzip(BloomData)),
%% suck in the root %% suck in the root
{ok, Root} = read_node(File, RootPos), {ok, Root} = read_node(File, RootPos),
{ok, #index{file=File, root=Root, bloom=Bloom, name=Name, config=Config}}
end.
serialize(#index{file=File, bloom=undefined }=Index) ->
{ok, Position} = file:position(File, cur),
ok = file:close(File),
{seq_read_file, Index, Position}.
deserialize({seq_read_file, Index, Position}) ->
{ok, #index{file=File}=Index2} = open(Index#index.name, Index#index.config),
{ok, Position} = file:position(File, {bof, Position}),
Index2.
{ok, #index{file=File, root=Root, bloom=Bloom}}.
fold(Fun, Acc0, #index{file=File}) -> fold(Fun, Acc0, #index{file=File}) ->
@ -222,7 +242,13 @@ lookup_in_node(File,#node{members=Members},Key) ->
plain_rpc:send_reply(From, Result) plain_rpc:send_reply(From, Result)
end end
end), end),
plain_rpc:call(PID, read); try plain_rpc:call(PID, read)
catch
Class:Ex ->
error_logger:error_msg("crashX: ~p:~p ~p~n", [Class,Ex,erlang:get_stacktrace()]),
not_found
end;
not_found -> not_found ->
not_found not_found
end. end.

View file

@ -55,7 +55,9 @@
bloom, bloom,
block_size = ?NODE_SIZE, block_size = ?NODE_SIZE,
compress = none :: none | snappy | gzip compress = none :: none | snappy | gzip,
opts = []
}). }).
@ -80,19 +82,18 @@ close(Ref) ->
init([Name,Options]) -> init([Name,Options]) ->
Size = proplists:get_value(size, Options, 2048), Size = proplists:get_value(size, Options, 2048),
WriteBufferSize = hanoi:get_opt(write_buffer_size, Options, 512 * 1024),
% io:format("got name: ~p~n", [Name]), % io:format("got name: ~p~n", [Name]),
BlockSize = hanoi:get_opt(block_size, Options, ?NODE_SIZE), case do_open(Name, Options, [exclusive]) of
case file:open( hanoi_util:index_file_name(Name),
[raw, exclusive, write, {delayed_write, WriteBufferSize, 2000}]) of
{ok, IdxFile} -> {ok, IdxFile} ->
{ok, BloomFilter} = ebloom:new(erlang:min(Size,16#ffffffff), 0.01, 123), {ok, BloomFilter} = ebloom:new(erlang:min(Size,16#ffffffff), 0.01, 123),
BlockSize = hanoi:get_opt(block_size, Options, ?NODE_SIZE),
{ok, #state{ name=Name, {ok, #state{ name=Name,
index_file_pos=0, index_file=IdxFile, index_file_pos=0, index_file=IdxFile,
bloom = BloomFilter, bloom = BloomFilter,
block_size = BlockSize, block_size = BlockSize,
compress = hanoi:get_opt(compress, Options, none) compress = hanoi:get_opt(compress, Options, none),
opts = Options
}}; }};
{error, _}=Error -> {error, _}=Error ->
error_logger:error_msg("hanoi_writer cannot open ~p: ~p~n", [Name, Error]), error_logger:error_msg("hanoi_writer cannot open ~p: ~p~n", [Name, Error]),
@ -127,13 +128,30 @@ code_change(_OldVsn, State, _Extra) ->
%%%%% INTERNAL FUNCTIONS %%%%% INTERNAL FUNCTIONS
serialize(#state{ bloom=Bloom }=State) -> serialize(#state{ bloom=Bloom, index_file=File }=State) ->
erlang:term_to_binary( { State, ebloom:serialize(Bloom) } ). % io:format("serializing ~p @ ~p~n", [State#state.name,
% State#state.index_file_pos]),
%% assert that we're on track
Position = State#state.index_file_pos,
{ok, Position} = file:position(File, cur),
ok = file:close(File),
erlang:term_to_binary( { State#state{ index_file=closed }, ebloom:serialize(Bloom) } ).
deserialize(Binary) -> deserialize(Binary) ->
{ State, BinBloom } = erlang:binary_to_term( Binary ), { State, BinBloom } = erlang:binary_to_term( Binary ),
% io:format("deserializing ~p @ ~p~n", [State#state.name,
% State#state.index_file_pos]),
{ok, Bloom } = ebloom:deserialize(BinBloom), {ok, Bloom } = ebloom:deserialize(BinBloom),
State#state{ bloom = Bloom }. {ok, IdxFile} = do_open(State#state.name, State#state.opts, []),
State#state{ bloom = Bloom, index_file=IdxFile }.
do_open(Name, Options, OpenOpts) ->
WriteBufferSize = hanoi:get_opt(write_buffer_size, Options, 512 * 1024),
file:open( hanoi_util:index_file_name(Name),
[raw, append, {delayed_write, WriteBufferSize, 2000} | OpenOpts]).
% @doc flush pending nodes and write trailer % @doc flush pending nodes and write trailer