diff --git a/src/machi_merkle_tree_mgr.erl b/src/machi_merkle_tree_mgr.erl index 7a1e833..2e7a144 100644 --- a/src/machi_merkle_tree_mgr.erl +++ b/src/machi_merkle_tree_mgr.erl @@ -18,18 +18,10 @@ %% %% ------------------------------------------------------------------- -%% @doc This manager maintains a Merkle tree per file per FLU. The leaf -%% nodes are stored in the same manner as the checksum data files, except -%% they are represented as -%% -%% `<>' for unwritten bytes -%% `<>' for trimmed bytes -%% `<>' for written bytes -%% -%% In this case, the checksum tag is thrown away. The tree feeds these -%% leaf nodes into hashes representing 10 GB ranges, called Level 1. We aim for -%% around %% 10 hashes at level 2, and then 2 hashes level 3 and finally the -%% root. +%% @doc This manager maintains a Merkle tree per file per FLU as implemented +%% by the `merklet' library. Keys are encoded as `<>' +%% values encoded as `<>' *or* as <<0>> for unwritten +%% bytes, or <<1>> for trimmed bytes. -module(machi_merkle_tree_mgr). -behaviour(gen_server). @@ -41,8 +33,7 @@ start_link/3, initialize/2, update/5, - fetch/2, - fetch/3 + fetch/2 ]). %% gen_server callbacks @@ -63,24 +54,14 @@ -record(mt, { filename :: string(), - recalc = true :: boolean(), - root :: 'undefined' | binary(), - lvl1 = [] :: [ binary() ], - lvl2 = [] :: [ binary() ], - lvl3 = [] :: [ binary() ], - leaves = orddict:new() :: mt_entry() + tree :: merklet:tree() }). --type mt_entry() :: orddict:orddict(). - --define(WRITTEN(Offset, Size, Csum), <>). --define(TRIMMED(Offset, Size), <>). --define(UNWRITTEN(Offset, Size), <>). - --define(CHUNK_SIZE, (10*1024*1024)). --define(LEVEL_SIZE, 10). --define(H, sha). +-define(TRIMMED, <<1>>). +-define(UNWRITTEN, <<0>>). +-define(ENCODE(Offset, Size), <>). +-define(NEW_MERKLET, undefined). -define(TIMEOUT, (10*1000)). %% public API @@ -117,23 +98,11 @@ update(FluName, Filename, Offset, Length, Csum) -> {update, Filename, Offset, Length, Csum}). -spec fetch ( FluName :: atom(), - Filename :: string() ) -> {ok, [ Data :: binary() ]}. -%% @doc Returns the entire merkle tree for the given filename. -%% {@link fetch/3} + Filename :: string() ) -> {ok, 'undefined'|merklet:tree()}. +%% @doc Returns the merkle tree for the given filename. fetch(FluName, Filename) -> - fetch(FluName, Filename, all). - --spec fetch( FluName :: atom(), - Filename :: string(), - Level :: 'root' | 'all' - ) -> {ok, [ Data :: binary() ]}. -%% @doc Return the current merkle tree for the given filename. -%% If `root' is specified, returns a list with 1 element, the root -%% checksum of the tree. If `all' is specified, returns a list -%% with all levels. -fetch(FluName, Filename, Level) -> gen_server:call(make_merkle_tree_mgr_name(FluName), - {fetch, Filename, Level}, ?TIMEOUT). + {fetch, Filename}, ?TIMEOUT). %% gen_server callbacks init({FluName, DataDir, Options}) -> @@ -146,8 +115,8 @@ init({FluName, DataDir, Options}) -> end, {ok, #state{fluname=FluName, datadir=DataDir, tid = Tid}}. -handle_call({fetch, Filename, Level}, _From, S = #state{ tid = Tid }) -> - Res = handle_fetch(Tid, Filename, Level), +handle_call({fetch, Filename}, _From, S = #state{ tid = Tid }) -> + Res = handle_fetch(Tid, Filename), {reply, {ok, Res}, S}; handle_call(Req, _From, State) -> lager:warning("Unknown call: ~p", [Req]), @@ -193,151 +162,47 @@ get_files(DataDir) -> load_filename(Tid, DataDir, Filename) -> CsumFile = machi_util:make_checksum_filename(DataDir, Filename), - case file:read_file(CsumFile) of - {error, enoent} -> - insert(Tid, Filename, {0, ?MINIMUM_OFFSET, unwritten}); - {ok, Bin} -> - load_bin(Tid, Filename, Bin); - Error -> - throw(Error) - end. - -load_bin(Tid, Filename, Bin) -> - {CsumL, _} = machi_csum_table:split_checksum_list_blob_decode(Bin), - iter_csum_list(Tid, Filename, CsumL). - -iter_csum_list(Tid, Filename, []) -> - insert(Tid, Filename, {0, ?MINIMUM_OFFSET, unwritten}); -iter_csum_list(Tid, Filename, L = [ {Last, _, _} | _ ]) -> - make_insert(Tid, Filename, Last, L). - -make_insert(_Tid, _Filename, _Last, []) -> - ok; -%% case where Last offset matches Current, just insert current -make_insert(Tid, Filename, Last, [H={Last, Len, _Csum}|T]) -> - insert(Tid, Filename, H), - make_insert(Tid, Filename, Last+Len, T); -%% case where we have a hole -make_insert(Tid, Filename, Last, [H={Off, Len, _Csum}|T]) -> - Hole = Off - Last, - insert(Tid, Filename, {Last, Hole, unwritten}), - insert(Tid, Filename, H), - make_insert(Tid, Filename, Off+Len, T). - -insert(Tid, Filename, {Offset, Length, trimmed}) -> - do_insert(Tid, Filename, Offset, Length, ?TRIMMED(Offset, Length)); - -insert(Tid, Filename, {Offset, Length, unwritten}) -> - do_insert(Tid, Filename, Offset, Length, ?UNWRITTEN(Offset, Length)); - -insert(Tid, Filename, {Offset, Length, <<_Tag:8, Csum/binary>>}) -> - do_insert(Tid, Filename, Offset, Length, ?WRITTEN(Offset, Length, Csum)). - -do_insert(Tid, Filename, Offset, Length, Csum) -> - MT = case find(Tid, Filename) of - [] -> - #mt{ filename = Filename }; - V -> - V - end, - ok = maybe_update(Tid, Offset, Length, Csum, MT), - ok. - -maybe_update(Tid, Offset, Length, Csum, MT) -> - case orddict:find({Offset, Length}, MT#mt.leaves) of - error -> - %% range not found in our orddict, so fill it in - do_update(Tid, Offset, Length, Csum, MT); - {ok, Csum} -> - %% trying to insert a value we already have that - %% matches - ok; - {ok, ?UNWRITTEN(Offset, Length)} -> - %% old value was unwritten, now we are filling it in - %% so that's legit - do_update(Tid, Offset, Length, Csum, MT); - {ok, ?TRIMMED(Offset, Length)} -> - %% XXX FIXME - %% Scott - range was trimmed - do we fill it in with new stuff? - ok; - {ok, Other} -> - %% we found a checksum that is different - %% this shouldn't happen because once we write a range, it's written - %% TODO - file corruption? insanity? - lager:error("Tried to update merkle tree for file ~p at offset ~p, length ~p, got checksum ~p and tried to insert ~p", - [MT#mt.filename, Offset, Length, Other, Csum]), - throw({weird, Other}) - end. - -do_update(Tid, Offset, Length, Csum, MT) -> - D = orddict:store({Offset, Length}, Csum, MT#mt.leaves), - true = ets:insert(Tid, MT#mt{ recalc = true, leaves = D }), + {ok, T} = machi_csum_table:open(CsumFile, []), + %% docs say that the traversal order of ets:foldl is non-determinstic + %% but hopefully since csum_table uses an ordered set that's not true... + {_LastPosition, M} = machi_csum_table:foldl_chunks(fun insert_csum/2, + {?MINIMUM_OFFSET, ?NEW_MERKLET}, T), + true = ets:insert_new(Tid, #mt{ filename = Filename, tree = M}), + ok = machi_csum_table:close(T), ok. -handle_fetch(Tid, Filename, root) -> - case find(Tid, Filename) of - [] -> undefined; - #mt{ root = undefined } -> undefined; - #mt{ recalc = true } = MT -> hd(build_tree(Tid, MT)); - #mt{ root = R, recalc = false } -> R - end; +insert_csum({Last, Size, _Csum}=In, {Last, MT}) -> + %% no gap here, insert a record + {Last+Size, update_merkle_tree(In, MT)}; +insert_csum({Offset, Size, _Csum}=In, {Last, MT}) -> + %% gap here, insert unwritten record + %% *AND* insert written record + Hole = Offset - Last, + MT0 = update_merkle_tree({Last, Hole, unwritten}, MT), + {Offset+Size, update_merkle_tree(In, MT0)}. -handle_fetch(Tid, Filename, all) -> - case find(Tid, Filename) of - [] -> undefined; - #mt{ recalc = true } = MT -> build_tree(Tid, MT); - #mt{ recalc = false, - root = R, - lvl1 = L1, - lvl2 = L2, - lvl3 = L3 } -> [ R, L3, L2, L1 ] +insert(Tid, Filename, Term) -> + case ets:lookup(Tid, Filename) of + [] -> error(not_found); %% TODO: Something better? + [R] -> + NewMT = update_merkle_tree(Term, R#mt.tree), + %% we choose update_element because it + %% makes atomic changes so it is concurrent + %% safe. The regular 'insert' function + %% does not provide that guarantee. + true = ets:update_element(Tid, Filename, {#mt.tree, NewMT}), + ok end. -find(Tid, Filename) -> - case ets:lookup(Tid, Filename) of - [] -> []; - [R] -> R +handle_fetch(Tid, Filename) -> + case ets:lookup(Tid, Filename) of + [] -> undefined; + [R] -> R#mt.tree end. -build_tree(Tid, MT = #mt{ leaves = D }) -> - Leaves = lists:map(fun map_dict/1, orddict:to_list(D)), - io:format(user, "Leaves: ~p~n", [Leaves]), - Lvl1s = build_level_1(?CHUNK_SIZE, Leaves, 1, [ crypto:hash_init(?H) ]), - io:format(user, "Lvl1: ~p~n", [Lvl1s]), - Mod2 = length(Lvl1s) div ?LEVEL_SIZE, - Lvl2s = build_int_level(Mod2, Lvl1s, 1, [ crypto:hash_init(?H) ]), - io:format(user, "Lvl2: ~p~n", [Lvl2s]), - Mod3 = length(Lvl2s) div 2, - Lvl3s = build_int_level(Mod3, Lvl2s, 1, [ crypto:hash_init(?H) ]), - io:format(user, "Lvl3: ~p~n", [Lvl3s]), - Root = build_root(Lvl3s, crypto:hash_init(?H)), - io:format(user, "Root: ~p~n", [Root]), - ets:insert(Tid, MT#mt{ root = Root, lvl1 = Lvl1s, lvl2 = Lvl2s, lvl3 = Lvl3s, recalc = false }), - [Root, Lvl3s, Lvl2s, Lvl1s]. - -build_root([], Ctx) -> - crypto:hash_final(Ctx); -build_root([H|T], Ctx) -> - build_root(T, crypto:hash_update(Ctx, H)). - -build_int_level(_Mod, [], _Cnt, [ Ctx | Rest ]) -> - lists:reverse( [ crypto:hash_final(Ctx) | Rest ] ); -build_int_level(Mod, [H|T], Cnt, [ Ctx | Rest ]) when Cnt rem Mod == 0 -> - NewCtx = crypto:hash_init(?H), - build_int_level(Mod, T, Cnt + 1, [ crypto:hash_update(NewCtx, H), crypto:hash_final(Ctx) | Rest ]); -build_int_level(Mod, [H|T], Cnt, [ Ctx | Rest ]) -> - build_int_level(Mod, T, Cnt+1, [ crypto:hash_update(Ctx, H) | Rest ]). - -map_dict({{Offset, Len}, Hash}) -> - {Offset + Len, Hash}. - -build_level_1(_Size, [], _Multiple, [ Ctx | Rest ]) -> - lists:reverse([ crypto:hash_final(Ctx) | Rest ]); -build_level_1(Size, [{Pos, Hash}|T], Multiple, [ Ctx | Rest ]) when Pos > ( Size * Multiple ) -> - NewCtx = crypto:hash_init(?H), - build_level_1(Size, T, Multiple+1, - [ crypto:hash_update(NewCtx, Hash), crypto:hash_final(Ctx) | Rest ]); -build_level_1(Size, [{Pos, Hash}|T], Multiple, [ Ctx | Rest ]) when Pos =< ( Size * Multiple ) -> - io:format(user, "Size: ~p, Pos: ~p, Multiple: ~p~n", [Size, Pos, Multiple]), - build_level_1(Size, T, Multiple, [ crypto:hash_update(Ctx, Hash) | Rest ]). - +update_merkle_tree({Offset, Size, unwritten}, MT) -> + merklet:insert({?ENCODE(Offset, Size), ?UNWRITTEN}, MT); +update_merkle_tree({Offset, Size, trimmed}, MT) -> + merklet:insert({?ENCODE(Offset, Size), ?TRIMMED}, MT); +update_merkle_tree({Offset, Size, Csum}, MT) -> + merklet:insert({?ENCODE(Offset, Size), Csum}, MT). diff --git a/test/machi_merkle_tree_mgr_test.erl b/test/machi_merkle_tree_mgr_test.erl index 8e8336c..bc9e0db 100644 --- a/test/machi_merkle_tree_mgr_test.erl +++ b/test/machi_merkle_tree_mgr_test.erl @@ -91,8 +91,9 @@ test() -> _ = machi_merkle_tree_mgr:start_link(test, ".", []), machi_merkle_tree_mgr:initialize(test, ?TESTFILE), timer:sleep(1000), - Root = machi_merkle_tree_mgr:fetch(test, ?TESTFILE, root), - ?debugFmt("Root: ~p~n", [Root]), + All = machi_merkle_tree_mgr:fetch(test, ?TESTFILE), + ?debugFmt("All: ~p~n", [All]), + timer:sleep(1000), All = machi_merkle_tree_mgr:fetch(test, ?TESTFILE), ?debugFmt("All: ~p~n", [All]), ok.