Improve the compression code, add potential for LZ4 algorithm when ready.
This commit is contained in:
parent
db41532258
commit
75df2b86bf
4 changed files with 46 additions and 42 deletions
|
@ -57,7 +57,7 @@
|
|||
|
||||
-type hanoidb() :: pid().
|
||||
-type key_range() :: #key_range{}.
|
||||
-type config_option() :: {compress, none | gzip | snappy}
|
||||
-type config_option() :: {compress, none | gzip | snappy} %lz4
|
||||
| {page_size, pos_integer()}
|
||||
| {read_buffer_size, pos_integer()}
|
||||
| {write_buffer_size, pos_integer()}
|
||||
|
|
|
@ -39,6 +39,7 @@
|
|||
%%
|
||||
-define(HIBERNATE_TIMEOUT, 5000).
|
||||
|
||||
-define(COMPRESSION_METHOD, gzip).
|
||||
|
||||
%%
|
||||
%% Most likely, there will be plenty of I/O being generated by
|
||||
|
@ -85,7 +86,7 @@ hibernate_scan(Keep) ->
|
|||
erlang:garbage_collect(),
|
||||
receive
|
||||
{step, From, HowMany} ->
|
||||
{BT1, BT2, OutBin, IsLastLevel, AKVs, BKVs, N} = erlang:binary_to_term( zlib:gunzip( Keep ) ),
|
||||
{BT1, BT2, OutBin, IsLastLevel, AKVs, BKVs, N} = erlang:binary_to_term(hanoidb_util:uncompress(Keep)),
|
||||
scan(hanoidb_reader:deserialize(BT1),
|
||||
hanoidb_reader:deserialize(BT2),
|
||||
hanoidb_writer:deserialize(OutBin),
|
||||
|
@ -109,7 +110,7 @@ scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, {N, FromPID}) when N < 1, AKVs =/=
|
|||
Args = {hanoidb_reader:serialize(BT1),
|
||||
hanoidb_reader:serialize(BT2),
|
||||
hanoidb_writer:serialize(Out), IsLastLevel, AKVs, BKVs, N},
|
||||
Keep = zlib:gzip ( erlang:term_to_binary( Args ) ),
|
||||
Keep = hanoidb_util:compress(?COMPRESSION_METHOD, erlang:term_to_binary(Args)),
|
||||
hibernate_scan(Keep);
|
||||
false ->
|
||||
scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, {0, none})
|
||||
|
@ -168,7 +169,7 @@ hibernate_scan_only(Keep) ->
|
|||
erlang:garbage_collect(),
|
||||
receive
|
||||
{step, From, HowMany} ->
|
||||
{BT, OutBin, IsLastLevel, KVs, N} = erlang:binary_to_term( zlib:gunzip( Keep ) ),
|
||||
{BT, OutBin, IsLastLevel, KVs, N} = erlang:binary_to_term(hanoidb_util:uncompress(Keep)),
|
||||
scan_only(hanoidb_reader:deserialize(BT),
|
||||
hanoidb_writer:deserialize(OutBin),
|
||||
IsLastLevel, KVs, {N+HowMany, From})
|
||||
|
@ -189,7 +190,7 @@ scan_only(BT, Out, IsLastLevel, KVs, {N, FromPID}) when N < 1, KVs =/= [] ->
|
|||
after ?HIBERNATE_TIMEOUT ->
|
||||
Args = {hanoidb_reader:serialize(BT),
|
||||
hanoidb_writer:serialize(Out), IsLastLevel, KVs, N},
|
||||
Keep = zlib:gzip ( erlang:term_to_binary( Args ) ),
|
||||
Keep = hanoidb_util:compress(?COMPRESSION_METHOD, erlang:term_to_binary(Args)),
|
||||
hibernate_scan_only(Keep)
|
||||
end;
|
||||
|
||||
|
|
|
@ -76,50 +76,53 @@ estimate_node_size_increment(_KVList,Key,Value) ->
|
|||
-define(NO_COMPRESSION, 0).
|
||||
-define(SNAPPY_COMPRESSION, 1).
|
||||
-define(GZIP_COMPRESSION, 2).
|
||||
%-define(LZ4_COMPRESSION, 3).
|
||||
|
||||
encode_index_node(KVList, Compress) ->
|
||||
compress(Method, Bin) ->
|
||||
{MethodName, Compressed} = do_compression(Method, Bin),
|
||||
case MethodName of
|
||||
?NO_COMPRESSION ->
|
||||
{?NO_COMPRESSION, Bin};
|
||||
_ ->
|
||||
case byte_size(Compressed) < erlang:iolist_size(Bin) of
|
||||
true ->
|
||||
{MethodName, Compressed};
|
||||
false ->
|
||||
{?NO_COMPRESSION, Bin}
|
||||
end
|
||||
end.
|
||||
|
||||
do_compression(snappy, Bin) ->
|
||||
{ok, SnappyCompressed} = snappy:compress(Bin),
|
||||
{?SNAPPY_COMPRESSION, SnappyCompressed};
|
||||
%do_compression(lz4, Bin) ->
|
||||
% {?LZ4_COMPRESSION, lz4:compress(Bin)};
|
||||
do_compression(gzip, Bin) ->
|
||||
{?GZIP_COMPRESSION, zlib:gzip(Bin)};
|
||||
do_compression(_, Bin) ->
|
||||
{?NO_COMPRESSION, Bin}.
|
||||
|
||||
decompress(<<?NO_COMPRESSION, Data/binary>>) ->
|
||||
Data;
|
||||
decompress(<<?SNAPPY_COMPRESSION, Data/binary>>) ->
|
||||
{ok, UncompressedData} = snappy:decompress(Data),
|
||||
UncompressedData;
|
||||
%decompress(<<?LZ4_COMPRESSION, Data/binary>>) ->
|
||||
% lz4:uncompress(Data);
|
||||
decompress(<<?GZIP_COMPRESSION, Data/binary>>) ->
|
||||
zlib:gunzip(Data).
|
||||
|
||||
encode_index_node(KVList, Method) ->
|
||||
TermData = [ ?TAG_END |
|
||||
lists:map(fun ({Key,Value}) ->
|
||||
crc_encapsulate_kv_entry(Key, Value)
|
||||
end,
|
||||
KVList) ],
|
||||
{MethodName, OutData} = compress(Method, TermData),
|
||||
{ok, [MethodName | OutData]}.
|
||||
|
||||
case Compress of
|
||||
snappy ->
|
||||
DataSize = erlang:iolist_size(TermData),
|
||||
{ok, Snappied} = snappy:compress(TermData),
|
||||
if byte_size(Snappied) > DataSize ->
|
||||
OutData = [?NO_COMPRESSION|TermData];
|
||||
true ->
|
||||
OutData = [?SNAPPY_COMPRESSION|Snappied]
|
||||
end;
|
||||
gzip ->
|
||||
DataSize = erlang:iolist_size(TermData),
|
||||
GZipData = zlib:gzip(TermData),
|
||||
if byte_size(GZipData) > DataSize ->
|
||||
OutData = [?NO_COMPRESSION|TermData];
|
||||
true ->
|
||||
OutData = [?GZIP_COMPRESSION|GZipData]
|
||||
end;
|
||||
_ ->
|
||||
OutData = [?NO_COMPRESSION|TermData]
|
||||
end,
|
||||
|
||||
{ok, OutData}.
|
||||
|
||||
|
||||
decode_index_node(Level, <<Tag, Data/binary>>) ->
|
||||
|
||||
case Tag of
|
||||
?NO_COMPRESSION ->
|
||||
TermData = Data;
|
||||
?SNAPPY_COMPRESSION ->
|
||||
{ok, TermData} = snappy:decompress(Data);
|
||||
?GZIP_COMPRESSION ->
|
||||
TermData = zlib:gunzip(Data)
|
||||
end,
|
||||
|
||||
decode_index_node(Level, Data) ->
|
||||
TermData = decompress(Data),
|
||||
{ok, KVList} = decode_kv_list(TermData),
|
||||
{ok, {node, Level, KVList}}.
|
||||
|
||||
|
|
|
@ -55,7 +55,7 @@
|
|||
|
||||
bloom,
|
||||
block_size = ?NODE_SIZE,
|
||||
compress = none :: none | snappy | gzip,
|
||||
compress = none :: none | snappy | gzip, % | lz4,
|
||||
opts = [],
|
||||
|
||||
value_count = 0,
|
||||
|
|
Loading…
Reference in a new issue