diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index 1c9b5f8..033d4a1 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -78,6 +78,10 @@ -include("machi_pb.hrl"). -include("machi_projection.hrl"). +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). +-endif. % TEST + -define(SERVER_CMD_READ_TIMEOUT, 600*1000). -export([start_link/1, stop/1, @@ -263,9 +267,6 @@ append_server_loop(FluPid, #state{data_dir=DataDir,wedged=Wedged_p}=S) -> append_server_loop(FluPid, S) end. --define(EpochIDSpace, ((4*2)+(20*2))). % hexencodingwhee! --define(CSumSpace, ((1*2)+(20*2))). % hexencodingwhee! - net_server_loop(Sock, S) -> case gen_tcp:recv(Sock, 0, ?SERVER_CMD_READ_TIMEOUT) of {ok, Bin} -> @@ -444,29 +445,8 @@ do_server_append_chunk2(_PKey, Prefix, Chunk, CSum_tag, Client_CSum, ChunkExtra, #state{flu_name=FluName}=_S) -> %% TODO: Do anything with PKey? try - CSum = case CSum_tag of - ?CSUM_TAG_NONE -> - %% TODO: If the client was foolish enough to use - %% this type of non-checksum, then the client gets - %% what it deserves wrt data integrity, alas. In - %% the client-side Chain Replication method, each - %% server will calculated this independently, which - %% isn't exactly what ought to happen for best data - %% integrity checking. In server-side CR, the csum - %% should be calculated by the head and passed down - %% the chain together with the value. - CS = machi_util:checksum_chunk(Chunk), - machi_util:make_tagged_csum(server_sha, CS); - ?CSUM_TAG_CLIENT_SHA -> - CS = machi_util:checksum_chunk(Chunk), - if CS == Client_CSum -> - machi_util:make_tagged_csum(server_sha, - Client_CSum); - true -> - throw({bad_csum, CS}) - end - end, - FluName ! {seq_append, self(), Prefix, Chunk, CSum, ChunkExtra}, + TaggedCSum = check_or_make_tagged_checksum(CSum_tag, Client_CSum,Chunk), + FluName ! {seq_append, self(), Prefix, Chunk, TaggedCSum, ChunkExtra}, receive {assignment, Offset, File} -> Size = iolist_size(Chunk), @@ -514,36 +494,11 @@ do_server_write_chunk(File, Offset, Chunk, CSum_tag, CSum, do_server_write_chunk2(_File, Offset, Chunk, CSum_tag, Client_CSum, _DataDir, FHc, FHd) -> try - CSum = case CSum_tag of - ?CSUM_TAG_NONE -> - %% TODO: If the client was foolish enough to use - %% this type of non-checksum, then the client gets - %% what it deserves wrt data integrity, alas. In - %% the client-side Chain Replication method, each - %% server will calculated this independently, which - %% isn't exactly what ought to happen for best data - %% integrity checking. In server-side CR, the csum - %% should be calculated by the head and passed down - %% the chain together with the value. - CS = machi_util:checksum_chunk(Chunk), - machi_util:make_tagged_csum(server_sha,CS); - ?CSUM_TAG_CLIENT_SHA -> - CS = machi_util:checksum_chunk(Chunk), - if CS == Client_CSum -> - machi_util:make_tagged_csum(server_sha, - Client_CSum); - true -> - throw({bad_csum, CS}) - end - end, + TaggedCSum = check_or_make_tagged_checksum(CSum_tag, Client_CSum,Chunk), Size = iolist_size(Chunk), case file:pwrite(FHd, Offset, Chunk) of ok -> - OffsetHex = machi_util:bin_to_hexstr(<>), - LenHex = machi_util:bin_to_hexstr(<>), - CSumHex2 = machi_util:bin_to_hexstr(CSum), - CSum_info = [OffsetHex, 32, LenHex, 32, - CSumHex2, 10], + CSum_info = encode_csum_file_entry_nothex(Offset, Size, TaggedCSum), ok = file:write(FHc, CSum_info), ok; _Else3 -> @@ -780,21 +735,18 @@ seq_append_server_loop(DataDir, Prefix, _File, {FHd,FHc}, FileNum, Offset) run_seq_append_server2(Prefix, DataDir); seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}=FH_, FileNum, Offset) -> receive - {seq_append, From, Prefix, Chunk, CSum, Extra} -> + {seq_append, From, Prefix, Chunk, TaggedCSum, Extra} -> if Chunk /= <<>> -> ok = file:pwrite(FHd, Offset, Chunk); true -> ok end, From ! {assignment, Offset, File}, - Len = byte_size(Chunk), - OffsetHex = machi_util:bin_to_hexstr(<>), - LenHex = machi_util:bin_to_hexstr(<>), - CSumHex = machi_util:bin_to_hexstr(CSum), - CSum_info = [OffsetHex, 32, LenHex, 32, CSumHex, 10], + Size = iolist_size(Chunk), + CSum_info = encode_csum_file_entry_nothex(Offset, Size, TaggedCSum), ok = file:write(FHc, CSum_info), seq_append_server_loop(DataDir, Prefix, File, FH_, - FileNum, Offset + Len + Extra); + FileNum, Offset + Size + Extra); {sync_stuff, FromPid, Ref} -> file:sync(FHc), FromPid ! {sync_finished, Ref}, @@ -913,3 +865,235 @@ split_uri_options(OpsBin) -> [<<"size">>, Bin] -> {size, binary_to_integer(Bin)} end || X <- L]. + +encode_csum_file_entry_nothex(Offset, Size, TaggedCSum) -> + Len = 8 + 4 + byte_size(TaggedCSum), + [<>, + TaggedCSum]. + +decode_csum_file_entry_nothex(<<_:8/unsigned-big, Offset:64/unsigned-big, Size:32/unsigned-big, TaggedCSum/binary>>) -> + {Offset, Size, TaggedCSum}. + +split_1byte_len_tag_decode(Bin) -> + split_1byte_len_tag_decode(Bin, []). + +split_1byte_len_tag_decode(<>, Acc)-> + split_1byte_len_tag_decode(Rest, [decode_csum_file_entry_nothex(Part)|Acc]); +split_1byte_len_tag_decode(Other, Acc) -> + {lists:reverse(Acc), Other}. + +split_1byte_len_tag(Bin) -> + split_1byte_len_tag(Bin, []). + +split_1byte_len_tag(<>, Acc)-> + case get(hack_length) of + Len -> ok; + _ -> put(hack_different, true) + end, + split_1byte_len_tag(Rest, [<>|Acc]); +split_1byte_len_tag(Other, Acc) -> + {lists:reverse(Acc), Other}. + +%% split_1byte_len_tag(<>, Acc)-> +%% case get(hack_length) of +%% Len -> ok; +%% _ -> put(hack_different, true) +%% end, +%% split_1byte_len_tag(Rest, [<>|Acc]); +%% split_1byte_len_tag(Other, Acc) -> +%% {lists:reverse(Acc), Other}. + +check_or_make_tagged_checksum(?CSUM_TAG_NONE, Client_CSum, Chunk) -> + %% TODO: If the client was foolish enough to use + %% this type of non-checksum, then the client gets + %% what it deserves wrt data integrity, alas. In + %% the client-side Chain Replication method, each + %% server will calculated this independently, which + %% isn't exactly what ought to happen for best data + %% integrity checking. In server-side CR, the csum + %% should be calculated by the head and passed down + %% the chain together with the value. + CS = machi_util:checksum_chunk(Chunk), + machi_util:make_tagged_csum(server_sha, CS); +check_or_make_tagged_checksum(?CSUM_TAG_CLIENT_SHA, Client_CSum, Chunk) -> + CS = machi_util:checksum_chunk(Chunk), + if CS == Client_CSum -> + machi_util:make_tagged_csum(server_sha, + Client_CSum); + true -> + throw({bad_csum, CS}) + end. + +-ifdef(TEST). + +%% Remove "_COMMENTED" string to run the demo/exploratory code. + +timing_demo_test_COMMENTED_() -> + {timeout, 300, fun() -> timing_demo_test2() end}. + +%% Demo/exploratory hackery to check relative speeds of dealing with +%% checksum data in different ways. +%% +%% Summary: +%% +%% * Use compact binary encoding, with 1 byte header for entry length. +%% * Because the hex-style code is *far* slower just for enc & dec ops. +%% * For 1M entries of enc+dec: 0.215 sec vs. 15.5 sec. +%% * File sorter when sorting binaries as-is is only 30-40% slower +%% than an in-memory split (of huge binary emulated by file:read_file() +%% "big slurp") and sort of the same as-is sortable binaries. +%% * File sorter slows by a factor of about 2.5 if {order, fun compare/2} +%% function must be used, i.e. because the checksum entry lengths differ. +%% * File sorter + {order, fun compare/2} is still *far* faster than external +%% sort by OS X's sort(1) of sortable ASCII hex-style: +%% 4.5 sec vs. 21 sec. +%% * File sorter {order, fun compare/2} is faster than in-memory sort +%% of order-friendly 3-tuple-style: 4.5 sec vs. 15 sec. + +timing_demo_test2() -> + Xs = [random:uniform(1 bsl 32) || _ <- lists:duplicate(1*1000*1000, $x)], + CSum = <<"123456789abcdef0A">>, + 17 = byte_size(CSum), + io:format(user, "\n", []), + + %% %% {ok, ZZZ} = file:open("/tmp/foo.hex-style", [write, binary, raw, delayed_write]), + io:format(user, "Hex-style file entry enc+dec: ", []), + [erlang:garbage_collect(self()) || _ <- lists:seq(1, 4)], + {HexUSec, _} = + timer:tc(fun() -> + lists:foldl(fun(X, _) -> + B = encode_csum_file_entry(X, 100, CSum), + %% file:write(ZZZ, [B, 10]), + decode_csum_file_entry(list_to_binary(B)) + end, x, Xs) + end), + io:format(user, "~.3f sec\n", [HexUSec / 1000000]), + %% %% file:close(ZZZ), + + io:format(user, "Not-sortable file entry enc+dec: ", []), + [erlang:garbage_collect(self()) || _ <- lists:seq(1, 4)], + {NotSortedUSec, _} = + timer:tc(fun() -> + lists:foldl(fun(X, _) -> + B = encode_csum_file_entry_nothex(X, 100, CSum), + decode_csum_file_entry_nothex(list_to_binary(B)) + end, x, Xs) + end), + io:format(user, "~.3f sec\n", [NotSortedUSec / 1000000]), + + NotHexList = lists:foldl(fun(X, Acc) -> + B = encode_csum_file_entry_nothex(X, 100, CSum), + [B|Acc] + end, [], Xs), + NotHexBin = iolist_to_binary(NotHexList), + + io:format(user, "Split NotHexBin: ", []), + [erlang:garbage_collect(self()) || _ <- lists:seq(1, 4)], + {NotHexBinUSec, SplitRes} = + timer:tc(fun() -> + put(hack_length, 29), + put(hack_different, false), + {Sorted, _Leftover} = split_1byte_len_tag(NotHexBin), + io:format(user, " Leftover ~p (hack_different ~p) ", [_Leftover, get(hack_different)]), + Sorted + end), + io:format(user, "~.3f sec\n", [NotHexBinUSec / 1000000]), + + io:format(user, "Sort Split results: ", []), + [erlang:garbage_collect(self()) || _ <- lists:seq(1, 4)], + {SortSplitUSec, _} = + timer:tc(fun() -> + lists:sort(SplitRes) + %% lists:sort(fun sort_2lines/2, SplitRes) + end), + io:format(user, "~.3f sec\n", [SortSplitUSec / 1000000]), + + UnsortedName = "/tmp/foo.unsorted", + SortedName = "/tmp/foo.sorted", + + ok = file:write_file(UnsortedName, NotHexList), + io:format(user, "File Sort Split results: ", []), + [erlang:garbage_collect(self()) || _ <- lists:seq(1, 4)], + {FileSortUSec, _} = + timer:tc(fun() -> + {ok, FHin} = file:open(UnsortedName, [read, binary]), + {ok, FHout} = file:open(SortedName, + [write, binary, delayed_write]), + put(hack_sorter_sha_ctx, crypto:hash_init(sha)), + ok = file_sorter:sort(sort_input_fun(FHin, <<>>), + sort_output_fun(FHout), + [{format,binary}, + {header, 1} + %% , {order, fun sort_2lines/2} + ]) + end), + io:format(user, "~.3f sec\n", [FileSortUSec / 1000000]), + _SHA = crypto:hash_final(get(hack_sorter_sha_ctx)), + %% io:format(user, "SHA via (hack_sorter_sha_ctx) = ~p\n", [_SHA]), + + io:format(user, "NotHex-Not-sortable tuple list creation: ", []), + [erlang:garbage_collect(self()) || _ <- lists:seq(1, 4)], + {NotHexTupleCreationUSec, NotHexTupleList} = + timer:tc(fun() -> + lists:foldl(fun(X, Acc) -> + B = encode_csum_file_entry( + X, 100, CSum), + [B|Acc] + end, [], Xs) + end), + io:format(user, "~.3f sec\n", [NotHexTupleCreationUSec / 1000000]), + + io:format(user, "NotHex-Not-sortable tuple list sort: ", []), + [erlang:garbage_collect(self()) || _ <- lists:seq(1, 4)], + {NotHexTupleSortUSec, _} = + timer:tc(fun() -> + lists:sort(NotHexTupleList) + end), + io:format(user, "~.3f sec\n", [NotHexTupleSortUSec / 1000000]), + + ok. + +sort_2lines(<<_:1/binary, A/binary>>, <<_:1/binary, B/binary>>) -> + A < B. + +sort_input_fun(FH, PrevStuff) -> + fun(close) -> + ok; + (read) -> + case file:read(FH, 1024*1024) of + {ok, NewStuff} -> + AllStuff = if PrevStuff == <<>> -> + NewStuff; + true -> + <> + end, + {SplitRes, Leftover} = split_1byte_len_tag(AllStuff), + {SplitRes, sort_input_fun(FH, Leftover)}; + eof -> + end_of_input + end + end. + +sort_output_fun(FH) -> + fun(close) -> + file:close(FH); + (Stuff) -> + Ctx = get(hack_sorter_sha_ctx), + put(hack_sorter_sha_ctx, crypto:hash_update(Ctx, Stuff)), + ok = file:write(FH, Stuff), + sort_output_fun(FH) + end. + +encode_csum_file_entry(Offset, Size, TaggedCSum) -> + OffsetHex = machi_util:bin_to_hexstr(<>), + SizeHex = machi_util:bin_to_hexstr(<>), + CSumHex = machi_util:bin_to_hexstr(TaggedCSum), + [OffsetHex, 32, SizeHex, 32, CSumHex]. + +decode_csum_file_entry(<>) -> + Offset = machi_util:hexstr_to_bin(OffsetHex), + Size = machi_util:hexstr_to_bin(SizeHex), + CSum = machi_util:hexstr_to_bin(CSumHex), + {Offset, Size, CSum}. + +-endif. % TEST