New config: {read|write}_buffer_size
These two parameters (defaulting to 512k) control the amount of erlang file buffer space to allocate for delayed_write and read_ahead when merging. This config parameter is *per merge task* of which there can be many for each open HanoiDB; and again multiplied by number of active vnodes in Riak. As such, this can config parameter is significant for the memory usage of a Riak with Hanoi, but setting it too low will kill the performance.
This commit is contained in:
parent
a6952cdb77
commit
18c197d959
6 changed files with 27 additions and 24 deletions
|
@ -39,6 +39,8 @@ Put these values in your `app.config` in the `hanoi` section
|
||||||
{compress, none | snappy | gzip},
|
{compress, none | snappy | gzip},
|
||||||
{sync_strategy, none | sync | {seconds, N}},
|
{sync_strategy, none | sync | {seconds, N}},
|
||||||
{page_size, 8192}
|
{page_size, 8192}
|
||||||
|
{write_buffer_size, 524288} % 512kB
|
||||||
|
{read_buffer_size, 524288} % 512kB
|
||||||
]},
|
]},
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|
|
@ -167,12 +167,12 @@ initialize2(State) ->
|
||||||
file:delete(BFileName),
|
file:delete(BFileName),
|
||||||
ok = file:rename(MFileName, AFileName),
|
ok = file:rename(MFileName, AFileName),
|
||||||
|
|
||||||
{ok, BTA} = hanoi_reader:open(AFileName, random),
|
{ok, BTA} = hanoi_reader:open(AFileName, [random|State#state.opts]),
|
||||||
|
|
||||||
case file:read_file_info(CFileName) of
|
case file:read_file_info(CFileName) of
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
file:rename(CFileName, BFileName),
|
file:rename(CFileName, BFileName),
|
||||||
{ok, BTB} = hanoi_reader:open(BFileName, random),
|
{ok, BTB} = hanoi_reader:open(BFileName, [random|State#state.opts]),
|
||||||
check_begin_merge_then_loop(init_state(State#state{ a= BTA, b=BTB }));
|
check_begin_merge_then_loop(init_state(State#state{ a= BTA, b=BTB }));
|
||||||
|
|
||||||
{error, enoent} ->
|
{error, enoent} ->
|
||||||
|
@ -182,12 +182,12 @@ initialize2(State) ->
|
||||||
{error, enoent} ->
|
{error, enoent} ->
|
||||||
case file:read_file_info(BFileName) of
|
case file:read_file_info(BFileName) of
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
{ok, BTA} = hanoi_reader:open(AFileName, random),
|
{ok, BTA} = hanoi_reader:open(AFileName, [random|State#state.opts]),
|
||||||
{ok, BTB} = hanoi_reader:open(BFileName, random),
|
{ok, BTB} = hanoi_reader:open(BFileName, [random|State#state.opts]),
|
||||||
|
|
||||||
case file:read_file_info(CFileName) of
|
case file:read_file_info(CFileName) of
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
{ok, BTC} = hanoi_reader:open(CFileName, random);
|
{ok, BTC} = hanoi_reader:open(CFileName, [random|State#state.opts]);
|
||||||
{error, enoent} ->
|
{error, enoent} ->
|
||||||
BTC = undefined
|
BTC = undefined
|
||||||
end,
|
end,
|
||||||
|
@ -201,7 +201,7 @@ initialize2(State) ->
|
||||||
|
|
||||||
case file:read_file_info(AFileName) of
|
case file:read_file_info(AFileName) of
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
{ok, BTA} = hanoi_reader:open(AFileName, random),
|
{ok, BTA} = hanoi_reader:open(AFileName, [random|State#state.opts]),
|
||||||
main_loop(init_state(State#state{ a=BTA }));
|
main_loop(init_state(State#state{ a=BTA }));
|
||||||
|
|
||||||
{error, enoent} ->
|
{error, enoent} ->
|
||||||
|
@ -259,7 +259,7 @@ main_loop(State = #state{ next=Next }) ->
|
||||||
|
|
||||||
plain_rpc:send_reply(From, ok),
|
plain_rpc:send_reply(From, ok),
|
||||||
|
|
||||||
case hanoi_reader:open(ToFileName, random) of
|
case hanoi_reader:open(ToFileName, [random|State#state.opts]) of
|
||||||
{ok, BT} ->
|
{ok, BT} ->
|
||||||
if SetPos == #state.b ->
|
if SetPos == #state.b ->
|
||||||
check_begin_merge_then_loop(setelement(SetPos, State, BT));
|
check_begin_merge_then_loop(setelement(SetPos, State, BT));
|
||||||
|
@ -471,7 +471,7 @@ main_loop(State = #state{ next=Next }) ->
|
||||||
% then, rename M to A, and open it
|
% then, rename M to A, and open it
|
||||||
AFileName = filename("A",State2),
|
AFileName = filename("A",State2),
|
||||||
ok = file:rename(MFileName, AFileName),
|
ok = file:rename(MFileName, AFileName),
|
||||||
{ok, BT} = hanoi_reader:open(AFileName, random),
|
{ok, BT} = hanoi_reader:open(AFileName, [random|State#state.opts]),
|
||||||
|
|
||||||
% iff there is a C file, then move it to B position
|
% iff there is a C file, then move it to B position
|
||||||
% TODO: consider recovery for this
|
% TODO: consider recovery for this
|
||||||
|
@ -703,7 +703,7 @@ start_range_fold(FileName, WorkerPID, Range, State) ->
|
||||||
proc_lib:spawn( fun() ->
|
proc_lib:spawn( fun() ->
|
||||||
?log("start_range_fold ~p on ~p -> ~p", [self, FileName, WorkerPID]),
|
?log("start_range_fold ~p on ~p -> ~p", [self, FileName, WorkerPID]),
|
||||||
erlang:link(WorkerPID),
|
erlang:link(WorkerPID),
|
||||||
{ok, File} = hanoi_reader:open(FileName, sequential),
|
{ok, File} = hanoi_reader:open(FileName, [sequential|State#state.opts]),
|
||||||
do_range_fold(File, WorkerPID, self(), Range),
|
do_range_fold(File, WorkerPID, self(), Range),
|
||||||
erlang:unlink(WorkerPID),
|
erlang:unlink(WorkerPID),
|
||||||
hanoi_reader:close(File),
|
hanoi_reader:close(File),
|
||||||
|
|
|
@ -41,8 +41,8 @@
|
||||||
-define(LOCAL_WRITER, true).
|
-define(LOCAL_WRITER, true).
|
||||||
|
|
||||||
merge(A,B,C, Size, IsLastLevel, Options) ->
|
merge(A,B,C, Size, IsLastLevel, Options) ->
|
||||||
{ok, BT1} = hanoi_reader:open(A, sequential),
|
{ok, BT1} = hanoi_reader:open(A, [sequential|Options]),
|
||||||
{ok, BT2} = hanoi_reader:open(B, sequential),
|
{ok, BT2} = hanoi_reader:open(B, [sequential|Options]),
|
||||||
case ?LOCAL_WRITER of
|
case ?LOCAL_WRITER of
|
||||||
true ->
|
true ->
|
||||||
{ok, Out} = hanoi_writer:init([C, [{size,Size} | Options]]);
|
{ok, Out} = hanoi_writer:init([C, [{size,Size} | Options]]);
|
||||||
|
|
|
@ -40,21 +40,21 @@
|
||||||
|
|
||||||
-spec open(Name::string()) -> read_file().
|
-spec open(Name::string()) -> read_file().
|
||||||
open(Name) ->
|
open(Name) ->
|
||||||
open(Name, random).
|
open(Name, [random]).
|
||||||
|
|
||||||
-spec open(Name::string(), sequential|random) -> read_file().
|
-type config() :: [sequential | random | {atom(), term()}].
|
||||||
|
|
||||||
%% this is how to open a btree for sequential scanning (merge, fold)
|
-spec open(Name::string(), config()) -> read_file().
|
||||||
open(Name, sequential) ->
|
|
||||||
{ok, File} = file:open(Name, [raw,read,{read_ahead, 1024 * 32},binary]),
|
|
||||||
open2(Name, File);
|
|
||||||
|
|
||||||
%% this is how to open a btree for random access
|
open(Name, Config) ->
|
||||||
open(Name, random) ->
|
case proplists:get_bool(sequential, Config) of
|
||||||
{ok, File} = file:open(Name, [read,binary]),
|
true ->
|
||||||
open2(Name, File).
|
ReadBufferSize = hanoi:get_opt(read_buffer_size, Config, 512 * 1024),
|
||||||
|
{ok, File} = file:open(Name, [raw,read,{read_ahead, ReadBufferSize},binary]);
|
||||||
|
false ->
|
||||||
|
{ok, File} = file:open(Name, [read,binary])
|
||||||
|
end,
|
||||||
|
|
||||||
open2(Name, File) ->
|
|
||||||
{ok, FileInfo} = file:read_file_info(Name),
|
{ok, FileInfo} = file:read_file_info(Name),
|
||||||
|
|
||||||
%% read root position
|
%% read root position
|
||||||
|
|
|
@ -80,11 +80,12 @@ close(Ref) ->
|
||||||
init([Name,Options]) ->
|
init([Name,Options]) ->
|
||||||
|
|
||||||
Size = proplists:get_value(size, Options, 2048),
|
Size = proplists:get_value(size, Options, 2048),
|
||||||
|
WriteBufferSize = hanoi:get_opt(write_buffer_size, Options, 512 * 1024),
|
||||||
|
|
||||||
% io:format("got name: ~p~n", [Name]),
|
% io:format("got name: ~p~n", [Name]),
|
||||||
BlockSize = hanoi:get_opt(block_size, Options, ?NODE_SIZE),
|
BlockSize = hanoi:get_opt(block_size, Options, ?NODE_SIZE),
|
||||||
case file:open( hanoi_util:index_file_name(Name),
|
case file:open( hanoi_util:index_file_name(Name),
|
||||||
[raw, exclusive, write, {delayed_write, BlockSize * 4, 2000}]) of
|
[raw, exclusive, write, {delayed_write, WriteBufferSize, 2000}]) of
|
||||||
{ok, IdxFile} ->
|
{ok, IdxFile} ->
|
||||||
{ok, BloomFilter} = ebloom:new(erlang:min(Size,16#ffffffff), 0.01, 123),
|
{ok, BloomFilter} = ebloom:new(erlang:min(Size,16#ffffffff), 0.01, 123),
|
||||||
{ok, #state{ name=Name,
|
{ok, #state{ name=Name,
|
||||||
|
|
|
@ -103,7 +103,7 @@ start(Partition, Config) ->
|
||||||
ok ->
|
ok ->
|
||||||
case get_data_dir(DataRoot, integer_to_list(Partition)) of
|
case get_data_dir(DataRoot, integer_to_list(Partition)) of
|
||||||
{ok, DataDir} ->
|
{ok, DataDir} ->
|
||||||
case hanoi:open(DataDir) of
|
case hanoi:open(DataDir, Config) of
|
||||||
{ok, Tree} ->
|
{ok, Tree} ->
|
||||||
{ok, #state{tree=Tree, partition=Partition}};
|
{ok, #state{tree=Tree, partition=Partition}};
|
||||||
{error, OpenReason}=OpenError ->
|
{error, OpenReason}=OpenError ->
|
||||||
|
|
Loading…
Reference in a new issue