diff --git a/src/hanoi_level.erl b/src/hanoi_level.erl index d64afaa..35bab1d 100644 --- a/src/hanoi_level.erl +++ b/src/hanoi_level.erl @@ -723,7 +723,7 @@ start_range_fold(FileName, WorkerPID, Range, State) -> proc_lib:spawn( fun() -> ?log("start_range_fold ~p on ~p -> ~p", [self, FileName, WorkerPID]), erlang:link(WorkerPID), - {ok, File} = hanoi_reader:open(FileName, [sequential|State#state.opts]), + {ok, File} = hanoi_reader:open(FileName, [folding|State#state.opts]), do_range_fold(File, WorkerPID, self(), Range), erlang:unlink(WorkerPID), hanoi_reader:close(File), diff --git a/src/hanoi_merger.erl b/src/hanoi_merger.erl index 0a2487d..bd54ba2 100644 --- a/src/hanoi_merger.erl +++ b/src/hanoi_merger.erl @@ -33,6 +33,13 @@ -include("hanoi.hrl"). +%% A merger which is inactive for this long will sleep +%% which means that it will close open files, and compress +%% current ebloom. +%% +-define(HIBERNATE_TIMEOUT, 5000). + + %% %% Most likely, there will be plenty of I/O being generated by %% concurrent merges, so we default to running the entire merge @@ -77,7 +84,10 @@ hibernate_scan(Keep) -> receive {step, From, HowMany} -> {BT1, BT2, OutBin, IsLastLevel, AKVs, BKVs, Count, N} = erlang:binary_to_term( zlib:gunzip( Keep ) ), - scan(BT1, BT2, hanoi_writer:deserialize(OutBin), IsLastLevel, AKVs, BKVs, Count, {N+HowMany, From}) + scan(hanoi_reader:deserialize(BT1), + hanoi_reader:deserialize(BT2), + hanoi_writer:deserialize(OutBin), + IsLastLevel, AKVs, BKVs, Count, {N+HowMany, From}) end. scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, {N, FromPID}) when N < 1, AKVs =/= [], BKVs =/= [] -> @@ -91,10 +101,12 @@ scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, {N, FromPID}) when N < 1, AK receive {step, From, HowMany} -> scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, {N+HowMany, From}) - after 5000 -> + after ?HIBERNATE_TIMEOUT -> case ?LOCAL_WRITER of true -> - Args = {BT1, BT2, hanoi_writer:serialize(Out), IsLastLevel, AKVs, BKVs, Count, N}, + Args = {hanoi_reader:serialize(BT1), + hanoi_reader:serialize(BT2), + hanoi_writer:serialize(Out), IsLastLevel, AKVs, BKVs, Count, N}, Keep = zlib:gzip ( erlang:term_to_binary( Args ) ), hibernate_scan(Keep); false -> @@ -161,7 +173,9 @@ hibernate_scan_only(Keep) -> receive {step, From, HowMany} -> {BT, OutBin, IsLastLevel, KVs, Count, N} = erlang:binary_to_term( zlib:gunzip( Keep ) ), - scan_only(BT, hanoi_writer:deserialize(OutBin), IsLastLevel, KVs, Count, {N+HowMany, From}) + scan_only(hanoi_reader:deserialize(BT), + hanoi_writer:deserialize(OutBin), + IsLastLevel, KVs, Count, {N+HowMany, From}) end. @@ -176,8 +190,9 @@ scan_only(BT, Out, IsLastLevel, KVs, Count, {N, FromPID}) when N < 1, KVs =/= [] receive {step, From, HowMany} -> scan_only(BT, Out, IsLastLevel, KVs, Count, {N+HowMany, From}) - after 5000 -> - Args = {BT, hanoi_writer:serialize(Out), IsLastLevel, KVs, Count, N}, + after ?HIBERNATE_TIMEOUT -> + Args = {hanoi_reader:serialize(BT), + hanoi_writer:serialize(Out), IsLastLevel, KVs, Count, N}, Keep = zlib:gzip ( erlang:term_to_binary( Args ) ), hibernate_scan_only(Keep) end; diff --git a/src/hanoi_reader.erl b/src/hanoi_reader.erl index 303b295..43b016c 100644 --- a/src/hanoi_reader.erl +++ b/src/hanoi_reader.erl @@ -32,9 +32,10 @@ -export([open/1, open/2,close/1,lookup/2,fold/3,range_fold/4]). -export([first_node/1,next_node/1]). +-export([serialize/1, deserialize/1]). -record(node, { level, members=[] }). --record(index, {file, root, bloom}). +-record(index, {file, root, bloom, name, config=[]}). -type read_file() :: #index{}. @@ -42,7 +43,7 @@ open(Name) -> open(Name, [random]). --type config() :: [sequential | random | {atom(), term()}]. +-type config() :: [sequential | folding | random | {atom(), term()}]. -spec open(Name::string(), config()) -> read_file(). @@ -50,24 +51,43 @@ open(Name, Config) -> case proplists:get_bool(sequential, Config) of true -> ReadBufferSize = hanoi:get_opt(read_buffer_size, Config, 512 * 1024), - {ok, File} = file:open(Name, [raw,read,{read_ahead, ReadBufferSize},binary]); + {ok, File} = file:open(Name, [raw,read,{read_ahead, ReadBufferSize},binary]), + {ok, #index{file=File, name=Name, config=Config}}; + false -> - {ok, File} = file:open(Name, [read,binary]) - end, + case proplists:get_bool(folding, Config) of + true -> + ReadBufferSize = hanoi:get_opt(read_buffer_size, Config, 512 * 1024), + {ok, File} = file:open(Name, [read,{read_ahead, ReadBufferSize},binary]); + false -> + {ok, File} = file:open(Name, [read,binary]) + end, - {ok, FileInfo} = file:read_file_info(Name), + {ok, FileInfo} = file:read_file_info(Name), - %% read root position - {ok, <>} = file:pread(File, FileInfo#file_info.size-8, 8), - {ok, <>} = file:pread(File, FileInfo#file_info.size-12, 4), - {ok, BloomData} = file:pread(File, FileInfo#file_info.size-12-BloomSize ,BloomSize), + %% read root position + {ok, <>} = file:pread(File, FileInfo#file_info.size-8, 8), + {ok, <>} = file:pread(File, FileInfo#file_info.size-12, 4), + {ok, BloomData} = file:pread(File, FileInfo#file_info.size-12-BloomSize ,BloomSize), - {ok, Bloom} = ebloom:deserialize(zlib:unzip(BloomData)), + {ok, Bloom} = ebloom:deserialize(zlib:unzip(BloomData)), - %% suck in the root - {ok, Root} = read_node(File, RootPos), + %% suck in the root + {ok, Root} = read_node(File, RootPos), + + {ok, #index{file=File, root=Root, bloom=Bloom, name=Name, config=Config}} + end. + +serialize(#index{file=File, bloom=undefined }=Index) -> + {ok, Position} = file:position(File, cur), + ok = file:close(File), + {seq_read_file, Index, Position}. + +deserialize({seq_read_file, Index, Position}) -> + {ok, #index{file=File}=Index2} = open(Index#index.name, Index#index.config), + {ok, Position} = file:position(File, {bof, Position}), + Index2. - {ok, #index{file=File, root=Root, bloom=Bloom}}. fold(Fun, Acc0, #index{file=File}) -> @@ -222,7 +242,13 @@ lookup_in_node(File,#node{members=Members},Key) -> plain_rpc:send_reply(From, Result) end end), - plain_rpc:call(PID, read); + try plain_rpc:call(PID, read) + catch + Class:Ex -> + error_logger:error_msg("crashX: ~p:~p ~p~n", [Class,Ex,erlang:get_stacktrace()]), + not_found + end; + not_found -> not_found end. diff --git a/src/hanoi_writer.erl b/src/hanoi_writer.erl index a7cd92e..7f0228a 100644 --- a/src/hanoi_writer.erl +++ b/src/hanoi_writer.erl @@ -55,7 +55,9 @@ bloom, block_size = ?NODE_SIZE, - compress = none :: none | snappy | gzip + compress = none :: none | snappy | gzip, + + opts = [] }). @@ -80,19 +82,18 @@ close(Ref) -> init([Name,Options]) -> Size = proplists:get_value(size, Options, 2048), - WriteBufferSize = hanoi:get_opt(write_buffer_size, Options, 512 * 1024), % io:format("got name: ~p~n", [Name]), - BlockSize = hanoi:get_opt(block_size, Options, ?NODE_SIZE), - case file:open( hanoi_util:index_file_name(Name), - [raw, exclusive, write, {delayed_write, WriteBufferSize, 2000}]) of + case do_open(Name, Options, [exclusive]) of {ok, IdxFile} -> {ok, BloomFilter} = ebloom:new(erlang:min(Size,16#ffffffff), 0.01, 123), + BlockSize = hanoi:get_opt(block_size, Options, ?NODE_SIZE), {ok, #state{ name=Name, index_file_pos=0, index_file=IdxFile, bloom = BloomFilter, block_size = BlockSize, - compress = hanoi:get_opt(compress, Options, none) + compress = hanoi:get_opt(compress, Options, none), + opts = Options }}; {error, _}=Error -> error_logger:error_msg("hanoi_writer cannot open ~p: ~p~n", [Name, Error]), @@ -127,13 +128,30 @@ code_change(_OldVsn, State, _Extra) -> %%%%% INTERNAL FUNCTIONS -serialize(#state{ bloom=Bloom }=State) -> - erlang:term_to_binary( { State, ebloom:serialize(Bloom) } ). +serialize(#state{ bloom=Bloom, index_file=File }=State) -> +% io:format("serializing ~p @ ~p~n", [State#state.name, +% State#state.index_file_pos]), + + %% assert that we're on track + Position = State#state.index_file_pos, + {ok, Position} = file:position(File, cur), + + ok = file:close(File), + erlang:term_to_binary( { State#state{ index_file=closed }, ebloom:serialize(Bloom) } ). deserialize(Binary) -> { State, BinBloom } = erlang:binary_to_term( Binary ), +% io:format("deserializing ~p @ ~p~n", [State#state.name, +% State#state.index_file_pos]), {ok, Bloom } = ebloom:deserialize(BinBloom), - State#state{ bloom = Bloom }. + {ok, IdxFile} = do_open(State#state.name, State#state.opts, []), + State#state{ bloom = Bloom, index_file=IdxFile }. + + +do_open(Name, Options, OpenOpts) -> + WriteBufferSize = hanoi:get_opt(write_buffer_size, Options, 512 * 1024), + file:open( hanoi_util:index_file_name(Name), + [raw, append, {delayed_write, WriteBufferSize, 2000} | OpenOpts]). % @doc flush pending nodes and write trailer