diff --git a/TODO b/TODO index df9b0a3..9dd8112 100644 --- a/TODO +++ b/TODO @@ -1,10 +1,9 @@ -* lsm_btree +* hanoi (in order of priority) * [2i] secondary index support * atomic multi-commit/recovery * add checkpoint/1 and sync/1 - flush pending writes to stable storage (nursery:finish() and finish/flush any merges) * [config] add config parameters on open - * {sync, boolean()} fdsync or not on write * {cache, bytes(), name} share max(bytes) cache named 'name' via etc * [stats] statistics * For each level {#merges, {merge-time-min, max, average}} @@ -28,10 +27,10 @@ PHASE 2: -* lsm_btree +* hanoi * Define a standard struct which is the metadata added at the end of the file, e.g. [btree-nodes] [meta-data] [offset of meta-data]. This is written - in lsm_btree_writer:flush_nodes, and read in lsm_btree_reader:open2. + in hanoi_writer:flush_nodes, and read in hanoi_reader:open2. * [feature] compression, encryption on disk @@ -42,7 +41,7 @@ REVIEW LITERATURE AND OTHER SIMILAR IMPLEMENTATAIONS: * http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.44.2782&rep=rep1&type=pdf -1: make the "first level" have more thatn 2^5 entries (controlled by the constant TOP_LEVEL in lsm_btree.hrl); this means a new set of files is opened/closed/merged for every 32 insert/updates/deletes. Setting this higher will just make the nursery correspondingly larger, which should be absolutely fine. +1: make the "first level" have more thatn 2^5 entries (controlled by the constant TOP_LEVEL in hanoi.hrl); this means a new set of files is opened/closed/merged for every 32 insert/updates/deletes. Setting this higher will just make the nursery correspondingly larger, which should be absolutely fine. 2: Right now, the streaming btree writer emits a btree page based on number of elements. This could be changed to be based on the size of the node (say, some block-size boudary) and then add padding at the end so that each node read becomes a clean block transfer. Right now, we're probably taking way to many reads. diff --git a/rebar.config b/rebar.config index 5b1a0ef..effb652 100644 --- a/rebar.config +++ b/rebar.config @@ -5,6 +5,7 @@ {erl_opts, [debug_info,{d,'TRIQ',true}]}. {deps, [ + {snappy, "1.0.*", {git, "git://github.com/fdmanana/snappy-erlang-nif.git", {branch, "master"}}}, {plain_fsm, "1.1.*", {git, "git://github.com/uwiger/plain_fsm", {branch, "master"}}}, {ebloom, "1.1.*", {git, "git://github.com/basho/ebloom.git", {branch, "master"}}}, {basho_bench, ".*", {git, "git://github.com/basho/basho_bench.git", {branch, "master"}}}, diff --git a/src/hanoi.erl b/src/hanoi.erl index 72c1e43..8758049 100644 --- a/src/hanoi.erl +++ b/src/hanoi.erl @@ -31,10 +31,12 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([open/1, close/1, get/2, lookup/2, delete/2, put/3, +-export([open/1, open/2, close/1, get/2, lookup/2, delete/2, put/3, async_fold/3, async_fold_range/4, fold/3, fold_range/4]). +-export([get_opt/2, get_opt/3]). + -include("hanoi.hrl"). -include_lib("kernel/include/file.hrl"). -include_lib("include/hanoi.hrl"). @@ -194,12 +196,12 @@ async_receive_fold_range(PID,Fun,Acc0,Ref,Range) -> init([Dir, Opts]) -> case file:read_file_info(Dir) of {ok, #file_info{ type=directory }} -> - {ok, TopLevel} = open_levels(Dir), + {ok, TopLevel} = open_levels(Dir,Opts), {ok, Nursery} = hanoi_nursery:recover(Dir, TopLevel); {error, E} when E =:= enoent -> ok = file:make_dir(Dir), - {ok, TopLevel} = hanoi_level:open(Dir, ?TOP_LEVEL, undefined), + {ok, TopLevel} = hanoi_level:open(Dir, ?TOP_LEVEL, undefined, Opts), {ok, Nursery} = hanoi_nursery:new(Dir) end, @@ -207,7 +209,7 @@ init([Dir, Opts]) -> -open_levels(Dir) -> +open_levels(Dir,Options) -> {ok, Files} = file:list_dir(Dir), %% parse file names and find max level @@ -231,7 +233,7 @@ open_levels(Dir) -> TopLevel = lists:foldl( fun(LevelNo, Prev) -> - {ok, Level} = hanoi_level:open(Dir,LevelNo,Prev), + {ok, Level} = hanoi_level:open(Dir,LevelNo,Prev,Options), Level end, undefined, @@ -334,11 +336,14 @@ start_app() -> end. get_opt(Key, Opts) -> + get_opt(Key, Opts, undefined). + +get_opt(Key, Opts, Default) -> case proplists:get_value(Key, Opts) of undefined -> case application:get_env(?MODULE, Key) of {ok, Value} -> Value; - undefined -> undefined + undefined -> Default end; Value -> Value diff --git a/src/hanoi_level.erl b/src/hanoi_level.erl index 448e645..1774b67 100644 --- a/src/hanoi_level.erl +++ b/src/hanoi_level.erl @@ -42,22 +42,23 @@ -behavior(plain_fsm). -export([data_vsn/0, code_change/3]). --export([open/3, lookup/2, inject/2, close/1, snapshot_range/3, blocking_range/3, incremental_merge/2]). +-export([open/4, lookup/2, inject/2, close/1, snapshot_range/3, blocking_range/3, incremental_merge/2]). -include_lib("kernel/include/file.hrl"). -record(state, { a, b, c, next, dir, level, inject_done_ref, merge_pid, folding = [], - step_next_ref, step_caller, step_merge_ref + step_next_ref, step_caller, step_merge_ref, + opts = [] }). %%%%% PUBLIC OPERATIONS -open(Dir,Level,Next) when Level>0 -> +open(Dir,Level,Next,Opts) when Level>0 -> PID = plain_fsm:spawn_link(?MODULE, fun() -> process_flag(trap_exit,true), - initialize(#state{dir=Dir,level=Level,next=Next}) + initialize(#state{dir=Dir,level=Level,next=Next,opts=Opts}) end), {ok, PID}. @@ -465,7 +466,8 @@ main_loop(State = #state{ next=Next }) -> {merge_done, _, OutFileName} -> State1 = if Next =:= undefined -> - {ok, PID} = ?MODULE:open(State#state.dir, State#state.level + 1, undefined), + {ok, PID} = ?MODULE:open(State#state.dir, State#state.level + 1, undefined, + State#state.opts ), State#state{ next=PID }; true -> State @@ -567,12 +569,20 @@ begin_merge(State) -> file:delete(XFileName), MergePID = proc_lib:spawn_link(fun() -> + try {ok, OutCount} = hanoi_merger:merge(AFileName, BFileName, XFileName, - ?BTREE_SIZE(State#state.level + 1), - State#state.next =:= undefined), + ?BTREE_SIZE(State#state.level + 1), + State#state.next =:= undefined, + State#state.opts ), % error_logger:info_msg("merge done ~p,~p -> ~p~n", [AFileName, BFileName, XFileName]), Owner ! {merge_done, OutCount, XFileName} + catch + C:E -> + error_logger:error_msg("merge failed ~p:~p ~p~n", + [C,E,erlang:get_stacktrace()]), + erlang:raise(C,E,erlang:get_stacktrace()) + end end), {ok, MergePID}. diff --git a/src/hanoi_merger.erl b/src/hanoi_merger.erl index 1f9055a..f1cfd02 100644 --- a/src/hanoi_merger.erl +++ b/src/hanoi_merger.erl @@ -29,7 +29,7 @@ %% Merging two BTrees %% --export([merge/5]). +-export([merge/6]). -include("hanoi.hrl"). @@ -40,14 +40,14 @@ %% -define(LOCAL_WRITER, true). -merge(A,B,C, Size, IsLastLevel) -> +merge(A,B,C, Size, IsLastLevel, Options) -> {ok, BT1} = hanoi_reader:open(A, sequential), {ok, BT2} = hanoi_reader:open(B, sequential), case ?LOCAL_WRITER of true -> - {ok, Out} = hanoi_writer:init([C, Size]); + {ok, Out} = hanoi_writer:init([C, [{size,Size} | Options]]); false -> - {ok, Out} = hanoi_writer:open(C, Size) + {ok, Out} = hanoi_writer:open(C, [{size,Size} | Options]) end, {node, AKVs} = hanoi_reader:first_node(BT1), @@ -79,11 +79,8 @@ scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, {0, FromPID}) -> PID ! {Ref, step_done} end, -% error_logger:info_msg("waiting for step in ~p~n", [self()]), - receive {step, From, HowMany} -> -% error_logger:info_msg("got step ~p,~p in ~p~n", [From,HowMany, self()]), scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, {HowMany, From}) end; diff --git a/src/hanoi_nursery.erl b/src/hanoi_nursery.erl index 5a4505f..87479a1 100644 --- a/src/hanoi_nursery.erl +++ b/src/hanoi_nursery.erl @@ -161,7 +161,8 @@ finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile, total_size=_TotalSize, N when N>0 -> %% next, flush cache to a new BTree BTreeFileName = filename:join(Dir, "nursery.data"), - {ok, BT} = hanoi_writer:open(BTreeFileName, ?BTREE_SIZE(?TOP_LEVEL)), + {ok, BT} = hanoi_writer:open(BTreeFileName, [{size,?BTREE_SIZE(?TOP_LEVEL)}, + {compress, none}]), try lists:foreach( fun({Key,Value}) -> ok = hanoi_writer:add(BT, Key, Value) diff --git a/src/hanoi_util.erl b/src/hanoi_util.erl index 7ae2bfe..5bcbe95 100644 --- a/src/hanoi_util.erl +++ b/src/hanoi_util.erl @@ -45,16 +45,40 @@ estimate_node_size_increment(_KVList,Key,Value) -> 13 end. -encode_index_node(Level, KVList) -> - Data = %zlib:zip( - erlang:term_to_binary(KVList) - % ) - , - Size = byte_size(Data)+2, - {ok, Size+4, [ <> | Data ] }. +-define(NO_COMPRESSION, 0). +-define(SNAPPY_COMPRESSION, 1). +-define(GZIP_COMPRESSION, 2). -decode_index_node(Level, <>) -> - KVList = erlang:binary_to_term(Data), %zlib:unzip(Data)), +encode_index_node(Level, KVList, Compress) -> + + TermData = erlang:term_to_binary(KVList), + + case Compress of + snappy -> + {ok, Snappied} = snappy:compress(TermData), + CompressedData = [?SNAPPY_COMPRESSION|Snappied]; + gzip -> + CompressedData = [?GZIP_COMPRESSION|zlib:gzip(TermData)]; + _ -> + CompressedData = [?NO_COMPRESSION|TermData] + end, + + Size = erlang:iolist_size(CompressedData), + + {ok, Size+6, [ <<(Size+2):32/unsigned, Level:16/unsigned>> | CompressedData ] }. + +decode_index_node(Level, <>) -> + + case Tag of + ?NO_COMPRESSION -> + TermData = Data; + ?SNAPPY_COMPRESSION -> + {ok, TermData} = snappy:decompress(Data); + ?GZIP_COMPRESSION -> + TermData = zlib:gunzip(Data) + end, + + KVList = erlang:binary_to_term(TermData), {ok, {node, Level, KVList}}. diff --git a/src/hanoi_writer.erl b/src/hanoi_writer.erl index b6592b5..131a146 100644 --- a/src/hanoi_writer.erl +++ b/src/hanoi_writer.erl @@ -53,18 +53,19 @@ name :: string(), - bloom + bloom, + block_size = ?NODE_SIZE, + compress = none :: none | snappy | gzip }). %%% PUBLIC API -open(Name,Size) -> - gen_server:start_link(?MODULE, [Name,Size], []). - +open(Name,Options) -> + gen_server:start_link(?MODULE, [Name,Options], []). open(Name) -> - gen_server:start_link(?MODULE, [Name,2048], []). + gen_server:start_link(?MODULE, [Name,[]], []). add(Ref,Key,Data) -> @@ -76,7 +77,9 @@ close(Ref) -> %%% -init([Name,Size]) -> +init([Name,Options]) -> + + Size = proplists:get_value(size, Options, 2048), % io:format("got name: ~p~n", [Name]), @@ -86,7 +89,9 @@ init([Name,Size]) -> {ok, BloomFilter} = ebloom:new(erlang:min(Size,16#ffffffff), 0.01, 123), {ok, #state{ name=Name, index_file_pos=0, index_file=IdxFile, - bloom = BloomFilter + bloom = BloomFilter, + block_size = hanoi:get_opt(block_size, Options, ?NODE_SIZE), + compress = hanoi:get_opt(compress, Options, none) }}; {error, _}=Error -> error_logger:error_msg("hanoi_writer cannot open ~p: ~p~n", [Name, Error]), @@ -164,7 +169,7 @@ add_record(Level, Key, Value, NewSize = NodeSize + hanoi_util:estimate_node_size_increment(List, Key, Value), - ebloom:insert( State#state.bloom, Key ), + ok = ebloom:insert( State#state.bloom, Key ), NodeMembers = [{Key,Value} | List], if @@ -180,9 +185,9 @@ add_record(Level, Key, Value, #state{ nodes=Nodes }=State) -> -close_node(#state{nodes=[#node{ level=Level, members=NodeMembers }|RestNodes]} = State) -> +close_node(#state{nodes=[#node{ level=Level, members=NodeMembers }|RestNodes], compress=Compress} = State) -> OrderedMembers = lists:reverse(NodeMembers), - {ok, DataSize, Data} = hanoi_util:encode_index_node(Level, OrderedMembers), + {ok, DataSize, Data} = hanoi_util:encode_index_node(Level, OrderedMembers, Compress), NodePos = State#state.index_file_pos, ok = file:write(State#state.index_file, Data),