WIP: refactoring & edoc'ing
This commit is contained in:
parent
310fdb1f6a
commit
3d2b49b7e5
3 changed files with 109 additions and 37 deletions
|
@ -209,6 +209,9 @@ message Mpb_ChecksumListReq {
|
||||||
|
|
||||||
message Mpb_ChecksumListResp {
|
message Mpb_ChecksumListResp {
|
||||||
required Mpb_GeneralStatusCode status = 1;
|
required Mpb_GeneralStatusCode status = 1;
|
||||||
|
// For data type rationale, see comments for
|
||||||
|
// machi_flu1_client:checksum_list/4 or
|
||||||
|
// http://basho.github.io/machi/edoc/machi_flu1_client.html#checksum_list-4
|
||||||
optional bytes chunk = 2;
|
optional bytes chunk = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -403,6 +406,9 @@ message Mpb_LL_ChecksumListReq {
|
||||||
|
|
||||||
message Mpb_LL_ChecksumListResp {
|
message Mpb_LL_ChecksumListResp {
|
||||||
required Mpb_GeneralStatusCode status = 1;
|
required Mpb_GeneralStatusCode status = 1;
|
||||||
|
// For data type rationale, see comments for
|
||||||
|
// machi_flu1_client:checksum_list/4 or
|
||||||
|
// http://basho.github.io/machi/edoc/machi_flu1_client.html#checksum_list-4
|
||||||
optional bytes chunk = 2;
|
optional bytes chunk = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -87,6 +87,9 @@
|
||||||
-export([start_link/1, stop/1,
|
-export([start_link/1, stop/1,
|
||||||
update_wedge_state/3]).
|
update_wedge_state/3]).
|
||||||
-export([make_listener_regname/1, make_projection_server_regname/1]).
|
-export([make_listener_regname/1, make_projection_server_regname/1]).
|
||||||
|
-export([encode_csum_file_entry/3, encode_csum_file_entry_bin/3,
|
||||||
|
decode_csum_file_entry/1,
|
||||||
|
split_checksum_list_blob/1, split_checksum_list_blob_decode/1]).
|
||||||
|
|
||||||
-record(state, {
|
-record(state, {
|
||||||
flu_name :: atom(),
|
flu_name :: atom(),
|
||||||
|
@ -498,7 +501,7 @@ do_server_write_chunk2(_File, Offset, Chunk, CSum_tag,
|
||||||
Size = iolist_size(Chunk),
|
Size = iolist_size(Chunk),
|
||||||
case file:pwrite(FHd, Offset, Chunk) of
|
case file:pwrite(FHd, Offset, Chunk) of
|
||||||
ok ->
|
ok ->
|
||||||
CSum_info = encode_csum_file_entry_nothex(Offset, Size, TaggedCSum),
|
CSum_info = encode_csum_file_entry(Offset, Size, TaggedCSum),
|
||||||
ok = file:write(FHc, CSum_info),
|
ok = file:write(FHc, CSum_info),
|
||||||
ok;
|
ok;
|
||||||
_Else3 ->
|
_Else3 ->
|
||||||
|
@ -752,7 +755,7 @@ seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}=FH_, FileNum, Offset) ->
|
||||||
end,
|
end,
|
||||||
From ! {assignment, Offset, File},
|
From ! {assignment, Offset, File},
|
||||||
Size = iolist_size(Chunk),
|
Size = iolist_size(Chunk),
|
||||||
CSum_info = encode_csum_file_entry_nothex(Offset, Size, TaggedCSum),
|
CSum_info = encode_csum_file_entry(Offset, Size, TaggedCSum),
|
||||||
ok = file:write(FHc, CSum_info),
|
ok = file:write(FHc, CSum_info),
|
||||||
seq_append_server_loop(DataDir, Prefix, File, FH_,
|
seq_append_server_loop(DataDir, Prefix, File, FH_,
|
||||||
FileNum, Offset + Size + Extra);
|
FileNum, Offset + Size + Extra);
|
||||||
|
@ -875,42 +878,85 @@ split_uri_options(OpsBin) ->
|
||||||
{size, binary_to_integer(Bin)}
|
{size, binary_to_integer(Bin)}
|
||||||
end || X <- L].
|
end || X <- L].
|
||||||
|
|
||||||
encode_csum_file_entry_nothex(Offset, Size, TaggedCSum) ->
|
%% @doc Encode `Offset + Size + TaggedCSum' into an `iolist()' type for
|
||||||
|
%% internal storage by the FLU.
|
||||||
|
|
||||||
|
-spec encode_csum_file_entry(
|
||||||
|
machi_dt:file_offset(), machi_dt:chunk_size(), machi_dt:chunk_s()) ->
|
||||||
|
iolist().
|
||||||
|
encode_csum_file_entry(Offset, Size, TaggedCSum) ->
|
||||||
Len = 8 + 4 + byte_size(TaggedCSum),
|
Len = 8 + 4 + byte_size(TaggedCSum),
|
||||||
[<<Len:8/unsigned-big, Offset:64/unsigned-big, Size:32/unsigned-big>>,
|
[<<Len:8/unsigned-big, Offset:64/unsigned-big, Size:32/unsigned-big>>,
|
||||||
TaggedCSum].
|
TaggedCSum].
|
||||||
|
|
||||||
decode_csum_file_entry_nothex(<<_:8/unsigned-big, Offset:64/unsigned-big, Size:32/unsigned-big, TaggedCSum/binary>>) ->
|
%% @doc Encode `Offset + Size + TaggedCSum' into an `binary()' type for
|
||||||
|
%% internal storage by the FLU.
|
||||||
|
|
||||||
|
-spec encode_csum_file_entry_bin(
|
||||||
|
machi_dt:file_offset(), machi_dt:chunk_size(), machi_dt:chunk_s()) ->
|
||||||
|
binary().
|
||||||
|
encode_csum_file_entry_bin(Offset, Size, TaggedCSum) ->
|
||||||
|
Len = 8 + 4 + byte_size(TaggedCSum),
|
||||||
|
<<Len:8/unsigned-big, Offset:64/unsigned-big, Size:32/unsigned-big,
|
||||||
|
TaggedCSum/binary>>.
|
||||||
|
|
||||||
|
%% @doc Decode a single `binary()' blob into an
|
||||||
|
%% `{Offset,Size,TaggedCSum}' tuple.
|
||||||
|
%%
|
||||||
|
%% The internal encoding (which is currently exposed to the outside world
|
||||||
|
%% via this function and related ones) is:
|
||||||
|
%%
|
||||||
|
%% <ul>
|
||||||
|
%% <li> 1 byte: record length
|
||||||
|
%% </li>
|
||||||
|
%% <li> 8 bytes (unsigned big-endian): byte offset
|
||||||
|
%% </li>
|
||||||
|
%% <li> 4 bytes (unsigned big-endian): chunk size
|
||||||
|
%% </li>
|
||||||
|
%% <li> all remaining bytes: tagged checksum (1st byte = type tag)
|
||||||
|
%% </li>
|
||||||
|
%% </ul>
|
||||||
|
%%
|
||||||
|
%% See `machi.hrl' for the tagged checksum types, e.g.,
|
||||||
|
%% `?CSUM_TAG_NONE'.
|
||||||
|
|
||||||
|
-spec decode_csum_file_entry(binary()) ->
|
||||||
|
{machi_dt:file_offset(), machi_dt:chunk_size(), machi_dt:chunk_s()}.
|
||||||
|
decode_csum_file_entry(<<_:8/unsigned-big, Offset:64/unsigned-big, Size:32/unsigned-big, TaggedCSum/binary>>) ->
|
||||||
{Offset, Size, TaggedCSum}.
|
{Offset, Size, TaggedCSum}.
|
||||||
|
|
||||||
split_1byte_len_tag_decode(Bin) ->
|
%% @doc Split a `binary()' blob of `checksum_list' data into a list of
|
||||||
split_1byte_len_tag_decode(Bin, []).
|
%% unparsed `binary()' blobs, one per entry.
|
||||||
|
%%
|
||||||
|
%% Decode the unparsed blobs with {@link decode_csum_file_entry/1}, if
|
||||||
|
%% desired.
|
||||||
|
|
||||||
split_1byte_len_tag_decode(<<Len:8/unsigned-big, Part:Len/binary, Rest/binary>>, Acc)->
|
-spec split_checksum_list_blob(binary()) ->
|
||||||
split_1byte_len_tag_decode(Rest, [decode_csum_file_entry_nothex(Part)|Acc]);
|
list(binary()).
|
||||||
split_1byte_len_tag_decode(Other, Acc) ->
|
split_checksum_list_blob(Bin) ->
|
||||||
{lists:reverse(Acc), Other}.
|
split_checksum_list_blob(Bin, []).
|
||||||
|
|
||||||
split_1byte_len_tag(Bin) ->
|
split_checksum_list_blob(<<Len:8/unsigned-big, Part:Len/binary, Rest/binary>>, Acc)->
|
||||||
split_1byte_len_tag(Bin, []).
|
|
||||||
|
|
||||||
split_1byte_len_tag(<<Len:8/unsigned-big, Part:Len/binary, Rest/binary>>, Acc)->
|
|
||||||
case get(hack_length) of
|
case get(hack_length) of
|
||||||
Len -> ok;
|
Len -> ok;
|
||||||
_ -> put(hack_different, true)
|
_ -> put(hack_different, true)
|
||||||
end,
|
end,
|
||||||
split_1byte_len_tag(Rest, [<<Len:8/unsigned-big, Part/binary>>|Acc]);
|
split_checksum_list_blob(Rest, [<<Len:8/unsigned-big, Part/binary>>|Acc]);
|
||||||
split_1byte_len_tag(Other, Acc) ->
|
split_checksum_list_blob(Rest, Acc) ->
|
||||||
{lists:reverse(Acc), Other}.
|
{lists:reverse(Acc), Rest}.
|
||||||
|
|
||||||
%% split_1byte_len_tag(<<Len:8/unsigned-big, Part:Len/binary, Rest/binary>>, Acc)->
|
%% @doc Split a `binary()' blob of `checksum_list' data into a list of
|
||||||
%% case get(hack_length) of
|
%% `{Offset,Size,TaggedCSum}' tuples.
|
||||||
%% Len -> ok;
|
|
||||||
%% _ -> put(hack_different, true)
|
-spec split_checksum_list_blob_decode(binary()) ->
|
||||||
%% end,
|
list({machi_dt:file_offset(), machi_dt:chunk_size(), machi_dt:chunk_s()}).
|
||||||
%% split_1byte_len_tag(Rest, [<<Len:8/unsigned-big, Part/binary>>|Acc]);
|
split_checksum_list_blob_decode(Bin) ->
|
||||||
%% split_1byte_len_tag(Other, Acc) ->
|
split_checksum_list_blob_decode(Bin, []).
|
||||||
%% {lists:reverse(Acc), Other}.
|
|
||||||
|
split_checksum_list_blob_decode(<<Len:8/unsigned-big, Part:Len/binary, Rest/binary>>, Acc)->
|
||||||
|
split_checksum_list_blob_decode(Rest, [decode_csum_file_entry(Part)|Acc]);
|
||||||
|
split_checksum_list_blob_decode(Rest, Acc) ->
|
||||||
|
{lists:reverse(Acc), Rest}.
|
||||||
|
|
||||||
check_or_make_tagged_checksum(?CSUM_TAG_NONE, Client_CSum, Chunk) ->
|
check_or_make_tagged_checksum(?CSUM_TAG_NONE, Client_CSum, Chunk) ->
|
||||||
%% TODO: If the client was foolish enough to use
|
%% TODO: If the client was foolish enough to use
|
||||||
|
@ -971,9 +1017,9 @@ timing_demo_test2() ->
|
||||||
{HexUSec, _} =
|
{HexUSec, _} =
|
||||||
timer:tc(fun() ->
|
timer:tc(fun() ->
|
||||||
lists:foldl(fun(X, _) ->
|
lists:foldl(fun(X, _) ->
|
||||||
B = encode_csum_file_entry(X, 100, CSum),
|
B = encode_csum_file_entry_hex(X, 100, CSum),
|
||||||
%% file:write(ZZZ, [B, 10]),
|
%% file:write(ZZZ, [B, 10]),
|
||||||
decode_csum_file_entry(list_to_binary(B))
|
decode_csum_file_entry_hex(list_to_binary(B))
|
||||||
end, x, Xs)
|
end, x, Xs)
|
||||||
end),
|
end),
|
||||||
io:format(user, "~.3f sec\n", [HexUSec / 1000000]),
|
io:format(user, "~.3f sec\n", [HexUSec / 1000000]),
|
||||||
|
@ -984,14 +1030,14 @@ timing_demo_test2() ->
|
||||||
{NotSortedUSec, _} =
|
{NotSortedUSec, _} =
|
||||||
timer:tc(fun() ->
|
timer:tc(fun() ->
|
||||||
lists:foldl(fun(X, _) ->
|
lists:foldl(fun(X, _) ->
|
||||||
B = encode_csum_file_entry_nothex(X, 100, CSum),
|
B = encode_csum_file_entry(X, 100, CSum),
|
||||||
decode_csum_file_entry_nothex(list_to_binary(B))
|
decode_csum_file_entry(list_to_binary(B))
|
||||||
end, x, Xs)
|
end, x, Xs)
|
||||||
end),
|
end),
|
||||||
io:format(user, "~.3f sec\n", [NotSortedUSec / 1000000]),
|
io:format(user, "~.3f sec\n", [NotSortedUSec / 1000000]),
|
||||||
|
|
||||||
NotHexList = lists:foldl(fun(X, Acc) ->
|
NotHexList = lists:foldl(fun(X, Acc) ->
|
||||||
B = encode_csum_file_entry_nothex(X, 100, CSum),
|
B = encode_csum_file_entry(X, 100, CSum),
|
||||||
[B|Acc]
|
[B|Acc]
|
||||||
end, [], Xs),
|
end, [], Xs),
|
||||||
NotHexBin = iolist_to_binary(NotHexList),
|
NotHexBin = iolist_to_binary(NotHexList),
|
||||||
|
@ -1002,7 +1048,7 @@ timing_demo_test2() ->
|
||||||
timer:tc(fun() ->
|
timer:tc(fun() ->
|
||||||
put(hack_length, 29),
|
put(hack_length, 29),
|
||||||
put(hack_different, false),
|
put(hack_different, false),
|
||||||
{Sorted, _Leftover} = split_1byte_len_tag(NotHexBin),
|
{Sorted, _Leftover} = split_checksum_list_blob(NotHexBin),
|
||||||
io:format(user, " Leftover ~p (hack_different ~p) ", [_Leftover, get(hack_different)]),
|
io:format(user, " Leftover ~p (hack_different ~p) ", [_Leftover, get(hack_different)]),
|
||||||
Sorted
|
Sorted
|
||||||
end),
|
end),
|
||||||
|
@ -1045,7 +1091,7 @@ timing_demo_test2() ->
|
||||||
{NotHexTupleCreationUSec, NotHexTupleList} =
|
{NotHexTupleCreationUSec, NotHexTupleList} =
|
||||||
timer:tc(fun() ->
|
timer:tc(fun() ->
|
||||||
lists:foldl(fun(X, Acc) ->
|
lists:foldl(fun(X, Acc) ->
|
||||||
B = encode_csum_file_entry(
|
B = encode_csum_file_entry_hex(
|
||||||
X, 100, CSum),
|
X, 100, CSum),
|
||||||
[B|Acc]
|
[B|Acc]
|
||||||
end, [], Xs)
|
end, [], Xs)
|
||||||
|
@ -1076,7 +1122,7 @@ sort_input_fun(FH, PrevStuff) ->
|
||||||
true ->
|
true ->
|
||||||
<<PrevStuff/binary, NewStuff/binary>>
|
<<PrevStuff/binary, NewStuff/binary>>
|
||||||
end,
|
end,
|
||||||
{SplitRes, Leftover} = split_1byte_len_tag(AllStuff),
|
{SplitRes, Leftover} = split_checksum_list_blob(AllStuff),
|
||||||
{SplitRes, sort_input_fun(FH, Leftover)};
|
{SplitRes, sort_input_fun(FH, Leftover)};
|
||||||
eof ->
|
eof ->
|
||||||
end_of_input
|
end_of_input
|
||||||
|
@ -1093,13 +1139,13 @@ sort_output_fun(FH) ->
|
||||||
sort_output_fun(FH)
|
sort_output_fun(FH)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
encode_csum_file_entry(Offset, Size, TaggedCSum) ->
|
encode_csum_file_entry_hex(Offset, Size, TaggedCSum) ->
|
||||||
OffsetHex = machi_util:bin_to_hexstr(<<Offset:64/big>>),
|
OffsetHex = machi_util:bin_to_hexstr(<<Offset:64/big>>),
|
||||||
SizeHex = machi_util:bin_to_hexstr(<<Size:32/big>>),
|
SizeHex = machi_util:bin_to_hexstr(<<Size:32/big>>),
|
||||||
CSumHex = machi_util:bin_to_hexstr(TaggedCSum),
|
CSumHex = machi_util:bin_to_hexstr(TaggedCSum),
|
||||||
[OffsetHex, 32, SizeHex, 32, CSumHex].
|
[OffsetHex, 32, SizeHex, 32, CSumHex].
|
||||||
|
|
||||||
decode_csum_file_entry(<<OffsetHex:16/binary, _:1/binary, SizeHex:8/binary, _:1/binary, CSumHex/binary>>) ->
|
decode_csum_file_entry_hex(<<OffsetHex:16/binary, _:1/binary, SizeHex:8/binary, _:1/binary, CSumHex/binary>>) ->
|
||||||
Offset = machi_util:hexstr_to_bin(OffsetHex),
|
Offset = machi_util:hexstr_to_bin(OffsetHex),
|
||||||
Size = machi_util:hexstr_to_bin(SizeHex),
|
Size = machi_util:hexstr_to_bin(SizeHex),
|
||||||
CSum = machi_util:hexstr_to_bin(CSumHex),
|
CSum = machi_util:hexstr_to_bin(CSumHex),
|
||||||
|
|
|
@ -174,16 +174,36 @@ read_chunk(Host, TcpPort, EpochID, File, Offset, Size)
|
||||||
%% @doc Fetch the list of chunk checksums for `File'.
|
%% @doc Fetch the list of chunk checksums for `File'.
|
||||||
|
|
||||||
-spec checksum_list(port_wrap(), machi_dt:epoch_id(), machi_dt:file_name()) ->
|
-spec checksum_list(port_wrap(), machi_dt:epoch_id(), machi_dt:file_name()) ->
|
||||||
{ok, [machi_dt:chunk_summary()]} |
|
{ok, binary()} |
|
||||||
{error, machi_dt:error_general() | 'no_such_file' | 'partial_read'} |
|
{error, machi_dt:error_general() | 'no_such_file' | 'partial_read'} |
|
||||||
{error, term()}.
|
{error, term()}.
|
||||||
checksum_list(Sock, EpochID, File) ->
|
checksum_list(Sock, EpochID, File) ->
|
||||||
checksum_list2(Sock, EpochID, File).
|
checksum_list2(Sock, EpochID, File).
|
||||||
|
|
||||||
%% @doc Fetch the list of chunk checksums for `File'.
|
%% @doc Fetch the list of chunk checksums for `File'.
|
||||||
|
%%
|
||||||
|
%% Why return a simple `binary()' type rather than
|
||||||
|
%% `[machi_dt:chunk_summary()]'? The two reasons are:
|
||||||
|
%% <ol>
|
||||||
|
%% <li> Server overhead: the CPU required to chop up the implementation-
|
||||||
|
%% specific store into zillions of very small terms is very high.
|
||||||
|
%% </li>
|
||||||
|
%% <li> Protocol encoding and decoding overhead: the cost is non-zero,
|
||||||
|
%% and the sum of cost of encoding and decoding a zillion small terms
|
||||||
|
%% is substantial.
|
||||||
|
%% </li>
|
||||||
|
%% </ol>
|
||||||
|
%%
|
||||||
|
%% For both reasons, the server's protocol response is absurdly simple
|
||||||
|
%% and very fast: send back a `binary()' blob to the client. Then it
|
||||||
|
%% is the client's responsibility to spend the CPU time to parse the
|
||||||
|
%% blob.
|
||||||
|
%%
|
||||||
|
%% Details of the encoding used inside the `binary()' blog can be found
|
||||||
|
%% in the EDoc comments for {@link machi_flu1:decode_csum_file_entry/1}.
|
||||||
|
|
||||||
-spec checksum_list(machi_dt:inet_host(), machi_dt:inet_port(), machi_dt:epoch_id(), machi_dt:file_name()) ->
|
-spec checksum_list(machi_dt:inet_host(), machi_dt:inet_port(), machi_dt:epoch_id(), machi_dt:file_name()) ->
|
||||||
{ok, [machi_dt:chunk_summary()]} |
|
{ok, binary()} |
|
||||||
{error, machi_dt:error_general() | 'no_such_file'} | {error, term()}.
|
{error, machi_dt:error_general() | 'no_such_file'} | {error, term()}.
|
||||||
checksum_list(Host, TcpPort, EpochID, File) when is_integer(TcpPort) ->
|
checksum_list(Host, TcpPort, EpochID, File) when is_integer(TcpPort) ->
|
||||||
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
|
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
|
||||||
|
|
Loading…
Reference in a new issue