Implement compression + block size

option {compression, none|gzip|snappy}

... except right now using snappy is broken,
it seems that it causes bloom filters to
crash. Needs investigation.

option {block_size, 32768} 

... writes data to disk in chunks of ~32k.
This commit is contained in:
Kresten Krab Thorup 2012-04-23 03:49:08 +02:00
parent 14ef03e06a
commit d37b227936
8 changed files with 87 additions and 45 deletions

9
TODO
View file

@ -1,10 +1,9 @@
* lsm_btree
* hanoi (in order of priority)
* [2i] secondary index support
* atomic multi-commit/recovery
* add checkpoint/1 and sync/1 - flush pending writes to stable storage
(nursery:finish() and finish/flush any merges)
* [config] add config parameters on open
* {sync, boolean()} fdsync or not on write
* {cache, bytes(), name} share max(bytes) cache named 'name' via etc
* [stats] statistics
* For each level {#merges, {merge-time-min, max, average}}
@ -28,10 +27,10 @@
PHASE 2:
* lsm_btree
* hanoi
* Define a standard struct which is the metadata added at the end of the
file, e.g. [btree-nodes] [meta-data] [offset of meta-data]. This is written
in lsm_btree_writer:flush_nodes, and read in lsm_btree_reader:open2.
in hanoi_writer:flush_nodes, and read in hanoi_reader:open2.
* [feature] compression, encryption on disk
@ -42,7 +41,7 @@ REVIEW LITERATURE AND OTHER SIMILAR IMPLEMENTATAIONS:
* http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.44.2782&rep=rep1&type=pdf
1: make the "first level" have more thatn 2^5 entries (controlled by the constant TOP_LEVEL in lsm_btree.hrl); this means a new set of files is opened/closed/merged for every 32 insert/updates/deletes. Setting this higher will just make the nursery correspondingly larger, which should be absolutely fine.
1: make the "first level" have more thatn 2^5 entries (controlled by the constant TOP_LEVEL in hanoi.hrl); this means a new set of files is opened/closed/merged for every 32 insert/updates/deletes. Setting this higher will just make the nursery correspondingly larger, which should be absolutely fine.
2: Right now, the streaming btree writer emits a btree page based on number of elements. This could be changed to be based on the size of the node (say, some block-size boudary) and then add padding at the end so that each node read becomes a clean block transfer. Right now, we're probably taking way to many reads.

View file

@ -5,6 +5,7 @@
{erl_opts, [debug_info,{d,'TRIQ',true}]}.
{deps, [
{snappy, "1.0.*", {git, "git://github.com/fdmanana/snappy-erlang-nif.git", {branch, "master"}}},
{plain_fsm, "1.1.*", {git, "git://github.com/uwiger/plain_fsm", {branch, "master"}}},
{ebloom, "1.1.*", {git, "git://github.com/basho/ebloom.git", {branch, "master"}}},
{basho_bench, ".*", {git, "git://github.com/basho/basho_bench.git", {branch, "master"}}},

View file

@ -31,10 +31,12 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-export([open/1, close/1, get/2, lookup/2, delete/2, put/3,
-export([open/1, open/2, close/1, get/2, lookup/2, delete/2, put/3,
async_fold/3, async_fold_range/4,
fold/3, fold_range/4]).
-export([get_opt/2, get_opt/3]).
-include("hanoi.hrl").
-include_lib("kernel/include/file.hrl").
-include_lib("include/hanoi.hrl").
@ -194,12 +196,12 @@ async_receive_fold_range(PID,Fun,Acc0,Ref,Range) ->
init([Dir, Opts]) ->
case file:read_file_info(Dir) of
{ok, #file_info{ type=directory }} ->
{ok, TopLevel} = open_levels(Dir),
{ok, TopLevel} = open_levels(Dir,Opts),
{ok, Nursery} = hanoi_nursery:recover(Dir, TopLevel);
{error, E} when E =:= enoent ->
ok = file:make_dir(Dir),
{ok, TopLevel} = hanoi_level:open(Dir, ?TOP_LEVEL, undefined),
{ok, TopLevel} = hanoi_level:open(Dir, ?TOP_LEVEL, undefined, Opts),
{ok, Nursery} = hanoi_nursery:new(Dir)
end,
@ -207,7 +209,7 @@ init([Dir, Opts]) ->
open_levels(Dir) ->
open_levels(Dir,Options) ->
{ok, Files} = file:list_dir(Dir),
%% parse file names and find max level
@ -231,7 +233,7 @@ open_levels(Dir) ->
TopLevel =
lists:foldl( fun(LevelNo, Prev) ->
{ok, Level} = hanoi_level:open(Dir,LevelNo,Prev),
{ok, Level} = hanoi_level:open(Dir,LevelNo,Prev,Options),
Level
end,
undefined,
@ -334,11 +336,14 @@ start_app() ->
end.
get_opt(Key, Opts) ->
get_opt(Key, Opts, undefined).
get_opt(Key, Opts, Default) ->
case proplists:get_value(Key, Opts) of
undefined ->
case application:get_env(?MODULE, Key) of
{ok, Value} -> Value;
undefined -> undefined
undefined -> Default
end;
Value ->
Value

View file

@ -42,22 +42,23 @@
-behavior(plain_fsm).
-export([data_vsn/0, code_change/3]).
-export([open/3, lookup/2, inject/2, close/1, snapshot_range/3, blocking_range/3, incremental_merge/2]).
-export([open/4, lookup/2, inject/2, close/1, snapshot_range/3, blocking_range/3, incremental_merge/2]).
-include_lib("kernel/include/file.hrl").
-record(state, {
a, b, c, next, dir, level, inject_done_ref, merge_pid, folding = [],
step_next_ref, step_caller, step_merge_ref
step_next_ref, step_caller, step_merge_ref,
opts = []
}).
%%%%% PUBLIC OPERATIONS
open(Dir,Level,Next) when Level>0 ->
open(Dir,Level,Next,Opts) when Level>0 ->
PID = plain_fsm:spawn_link(?MODULE,
fun() ->
process_flag(trap_exit,true),
initialize(#state{dir=Dir,level=Level,next=Next})
initialize(#state{dir=Dir,level=Level,next=Next,opts=Opts})
end),
{ok, PID}.
@ -465,7 +466,8 @@ main_loop(State = #state{ next=Next }) ->
{merge_done, _, OutFileName} ->
State1 =
if Next =:= undefined ->
{ok, PID} = ?MODULE:open(State#state.dir, State#state.level + 1, undefined),
{ok, PID} = ?MODULE:open(State#state.dir, State#state.level + 1, undefined,
State#state.opts ),
State#state{ next=PID };
true ->
State
@ -567,12 +569,20 @@ begin_merge(State) ->
file:delete(XFileName),
MergePID = proc_lib:spawn_link(fun() ->
try
{ok, OutCount} = hanoi_merger:merge(AFileName, BFileName, XFileName,
?BTREE_SIZE(State#state.level + 1),
State#state.next =:= undefined),
State#state.next =:= undefined,
State#state.opts ),
% error_logger:info_msg("merge done ~p,~p -> ~p~n", [AFileName, BFileName, XFileName]),
Owner ! {merge_done, OutCount, XFileName}
catch
C:E ->
error_logger:error_msg("merge failed ~p:~p ~p~n",
[C,E,erlang:get_stacktrace()]),
erlang:raise(C,E,erlang:get_stacktrace())
end
end),
{ok, MergePID}.

View file

@ -29,7 +29,7 @@
%% Merging two BTrees
%%
-export([merge/5]).
-export([merge/6]).
-include("hanoi.hrl").
@ -40,14 +40,14 @@
%%
-define(LOCAL_WRITER, true).
merge(A,B,C, Size, IsLastLevel) ->
merge(A,B,C, Size, IsLastLevel, Options) ->
{ok, BT1} = hanoi_reader:open(A, sequential),
{ok, BT2} = hanoi_reader:open(B, sequential),
case ?LOCAL_WRITER of
true ->
{ok, Out} = hanoi_writer:init([C, Size]);
{ok, Out} = hanoi_writer:init([C, [{size,Size} | Options]]);
false ->
{ok, Out} = hanoi_writer:open(C, Size)
{ok, Out} = hanoi_writer:open(C, [{size,Size} | Options])
end,
{node, AKVs} = hanoi_reader:first_node(BT1),
@ -79,11 +79,8 @@ scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, {0, FromPID}) ->
PID ! {Ref, step_done}
end,
% error_logger:info_msg("waiting for step in ~p~n", [self()]),
receive
{step, From, HowMany} ->
% error_logger:info_msg("got step ~p,~p in ~p~n", [From,HowMany, self()]),
scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, {HowMany, From})
end;

View file

@ -161,7 +161,8 @@ finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile, total_size=_TotalSize,
N when N>0 ->
%% next, flush cache to a new BTree
BTreeFileName = filename:join(Dir, "nursery.data"),
{ok, BT} = hanoi_writer:open(BTreeFileName, ?BTREE_SIZE(?TOP_LEVEL)),
{ok, BT} = hanoi_writer:open(BTreeFileName, [{size,?BTREE_SIZE(?TOP_LEVEL)},
{compress, none}]),
try
lists:foreach( fun({Key,Value}) ->
ok = hanoi_writer:add(BT, Key, Value)

View file

@ -45,16 +45,40 @@ estimate_node_size_increment(_KVList,Key,Value) ->
13
end.
encode_index_node(Level, KVList) ->
Data = %zlib:zip(
erlang:term_to_binary(KVList)
% )
,
Size = byte_size(Data)+2,
{ok, Size+4, [ <<Size:32/unsigned, Level:16/unsigned>> | Data ] }.
-define(NO_COMPRESSION, 0).
-define(SNAPPY_COMPRESSION, 1).
-define(GZIP_COMPRESSION, 2).
decode_index_node(Level, <<Data/binary>>) ->
KVList = erlang:binary_to_term(Data), %zlib:unzip(Data)),
encode_index_node(Level, KVList, Compress) ->
TermData = erlang:term_to_binary(KVList),
case Compress of
snappy ->
{ok, Snappied} = snappy:compress(TermData),
CompressedData = [?SNAPPY_COMPRESSION|Snappied];
gzip ->
CompressedData = [?GZIP_COMPRESSION|zlib:gzip(TermData)];
_ ->
CompressedData = [?NO_COMPRESSION|TermData]
end,
Size = erlang:iolist_size(CompressedData),
{ok, Size+6, [ <<(Size+2):32/unsigned, Level:16/unsigned>> | CompressedData ] }.
decode_index_node(Level, <<Tag, Data/binary>>) ->
case Tag of
?NO_COMPRESSION ->
TermData = Data;
?SNAPPY_COMPRESSION ->
{ok, TermData} = snappy:decompress(Data);
?GZIP_COMPRESSION ->
TermData = zlib:gunzip(Data)
end,
KVList = erlang:binary_to_term(TermData),
{ok, {node, Level, KVList}}.

View file

@ -53,18 +53,19 @@
name :: string(),
bloom
bloom,
block_size = ?NODE_SIZE,
compress = none :: none | snappy | gzip
}).
%%% PUBLIC API
open(Name,Size) ->
gen_server:start_link(?MODULE, [Name,Size], []).
open(Name,Options) ->
gen_server:start_link(?MODULE, [Name,Options], []).
open(Name) ->
gen_server:start_link(?MODULE, [Name,2048], []).
gen_server:start_link(?MODULE, [Name,[]], []).
add(Ref,Key,Data) ->
@ -76,7 +77,9 @@ close(Ref) ->
%%%
init([Name,Size]) ->
init([Name,Options]) ->
Size = proplists:get_value(size, Options, 2048),
% io:format("got name: ~p~n", [Name]),
@ -86,7 +89,9 @@ init([Name,Size]) ->
{ok, BloomFilter} = ebloom:new(erlang:min(Size,16#ffffffff), 0.01, 123),
{ok, #state{ name=Name,
index_file_pos=0, index_file=IdxFile,
bloom = BloomFilter
bloom = BloomFilter,
block_size = hanoi:get_opt(block_size, Options, ?NODE_SIZE),
compress = hanoi:get_opt(compress, Options, none)
}};
{error, _}=Error ->
error_logger:error_msg("hanoi_writer cannot open ~p: ~p~n", [Name, Error]),
@ -164,7 +169,7 @@ add_record(Level, Key, Value,
NewSize = NodeSize + hanoi_util:estimate_node_size_increment(List, Key, Value),
ebloom:insert( State#state.bloom, Key ),
ok = ebloom:insert( State#state.bloom, Key ),
NodeMembers = [{Key,Value} | List],
if
@ -180,9 +185,9 @@ add_record(Level, Key, Value, #state{ nodes=Nodes }=State) ->
close_node(#state{nodes=[#node{ level=Level, members=NodeMembers }|RestNodes]} = State) ->
close_node(#state{nodes=[#node{ level=Level, members=NodeMembers }|RestNodes], compress=Compress} = State) ->
OrderedMembers = lists:reverse(NodeMembers),
{ok, DataSize, Data} = hanoi_util:encode_index_node(Level, OrderedMembers),
{ok, DataSize, Data} = hanoi_util:encode_index_node(Level, OrderedMembers, Compress),
NodePos = State#state.index_file_pos,
ok = file:write(State#state.index_file, Data),