Merge branch 'slf/pb-api-experiment3'

This commit is contained in:
Scott Lystig Fritchie 2015-07-01 18:33:33 +09:00
commit 039fd5fb78
20 changed files with 502 additions and 179 deletions

3
FAQ.md
View file

@ -602,6 +602,9 @@ Yes, sort of. For as long as the legacy of
Machi's first internal protocol & code still Machi's first internal protocol & code still
survives, it's possible to use a survives, it's possible to use a
[primitive/hack'y HTTP interface that is described in this source code commit log](https://github.com/basho/machi/commit/6cebf397232cba8e63c5c9a0a8c02ba391b20fef). [primitive/hack'y HTTP interface that is described in this source code commit log](https://github.com/basho/machi/commit/6cebf397232cba8e63c5c9a0a8c02ba391b20fef).
Please note that commit `6cebf397232cba8e63c5c9a0a8c02ba391b20fef` is
required to try using this feature: the code has since bit-rotted and
will not work on today's `master` branch.
In the long term, we'll probably want the option of an HTTP interface In the long term, we'll probably want the option of an HTTP interface
that is as well designed and REST'ful as possible. It's on the that is as well designed and REST'ful as possible. It's on the

View file

@ -37,20 +37,21 @@ being used as the initial scaffolding.
* The chain manager is ready for "AP mode" use in eventual * The chain manager is ready for "AP mode" use in eventual
consistency use cases. consistency use cases.
* The Machi client/server protocol is still the hand-crafted, * All Machi client/server protocols are based on
artisanal, bogus protocol that I hacked together for a "demo day" [Protocol Buffers](https://developers.google.com/protocol-buffers/docs/overview).
back in January and appears in the * The current specification for Machi's protocols can be found at
[prototype/demo-day-hack](prototype/demo-day-hack/) code. [https://github.com/basho/machi/blob/master/src/machi.proto](https://github.com/basho/machi/blob/master/src/machi.proto).
* Today: the only client language supported is Erlang. * The Machi PB protocol is not yet stable. Expect change!
* Today: an HTTP interface that, at the moment, is a big kludge. * The Erlang language client implementation of the high-level
If someone who really cares about an protocol flavor is very brittle (e.g., very little error
HTTP interface that is 100% REST-ful cares to contribute some handling yet).
code ... contributions are welcome! * The Erlang language client implementation of the low-level
* Work in progress now: replace the current protocol to something protocol flavor are still a work-in-progress ... but they are
based on Protocol Buffers more robust than the high-level library's implementation.
* If you'd like to work on a protocol such as Thrift, UBF,
msgpack over UDP, or some other protocol, let us know by If you'd like to work on a protocol such as Thrift, UBF,
[opening an issue](./issues/new) to discuss it. msgpack over UDP, or some other protocol, let us know by
[opening an issue](./issues/new) to discuss it.
## Contributing to Machi: source code, documentation, etc. ## Contributing to Machi: source code, documentation, etc.

View file

@ -45,6 +45,7 @@ enum Mpb_GeneralStatusCode {
WRITTEN = 6; WRITTEN = 6;
NO_SUCH_FILE = 7; NO_SUCH_FILE = 7;
PARTIAL_READ = 8; PARTIAL_READ = 8;
BAD_EPOCH = 9;
BAD_JOSS = 255; // Only for testing by the Taipan BAD_JOSS = 255; // Only for testing by the Taipan
} }
@ -209,6 +210,9 @@ message Mpb_ChecksumListReq {
message Mpb_ChecksumListResp { message Mpb_ChecksumListResp {
required Mpb_GeneralStatusCode status = 1; required Mpb_GeneralStatusCode status = 1;
// For data type rationale, see comments for
// machi_flu1_client:checksum_list/4 or
// http://basho.github.io/machi/edoc/machi_flu1_client.html#checksum_list-4
optional bytes chunk = 2; optional bytes chunk = 2;
} }
@ -403,6 +407,9 @@ message Mpb_LL_ChecksumListReq {
message Mpb_LL_ChecksumListResp { message Mpb_LL_ChecksumListResp {
required Mpb_GeneralStatusCode status = 1; required Mpb_GeneralStatusCode status = 1;
// For data type rationale, see comments for
// machi_flu1_client:checksum_list/4 or
// http://basho.github.io/machi/edoc/machi_flu1_client.html#checksum_list-4
optional bytes chunk = 2; optional bytes chunk = 2;
} }

View file

@ -94,7 +94,8 @@ verify_file_checksums_remote2(Sock1, EpochID, File) ->
verify_file_checksums_common(Sock1, EpochID, File, ReadChunk) -> verify_file_checksums_common(Sock1, EpochID, File, ReadChunk) ->
try try
case ?FLU_C:checksum_list(Sock1, EpochID, File) of case ?FLU_C:checksum_list(Sock1, EpochID, File) of
{ok, Info} -> {ok, InfoBin} ->
{Info, _} = machi_flu1:split_checksum_list_blob_decode(InfoBin),
Res = lists:foldl(verify_chunk_checksum(File, ReadChunk), Res = lists:foldl(verify_chunk_checksum(File, ReadChunk),
[], Info), [], Info),
{ok, Res}; {ok, Res};

View file

@ -135,7 +135,8 @@ find_server_info(_Id) ->
load_ets_table(Conn, ETS) -> load_ets_table(Conn, ETS) ->
{ok, Fs} = machi_cr_client:list_files(Conn), {ok, Fs} = machi_cr_client:list_files(Conn),
[begin [begin
{ok, PosList} = machi_cr_client:checksum_list(Conn, File), {ok, InfoBin} = machi_cr_client:checksum_list(Conn, File),
{PosList, _} = machi_flu1:split_checksum_list_blob_decode(InfoBin),
StartKey = ets:update_counter(ETS, max_key, 0), StartKey = ets:update_counter(ETS, max_key, 0),
%% _EndKey = lists:foldl(fun({Off,Sz,CSum}, K) -> %% _EndKey = lists:foldl(fun({Off,Sz,CSum}, K) ->
%% V = {File, Off, Sz, CSum}, %% V = {File, Off, Sz, CSum},

View file

@ -2065,7 +2065,7 @@ do_repair(
end, end,
Stats = [{K, ets:lookup_element(ETS, K, 2)} || K <- ETS_T_Keys], Stats = [{K, ets:lookup_element(ETS, K, 2)} || K <- ETS_T_Keys],
error_logger:info_msg("Repair ~s: tail ~p of ~p finished ~p repair ID ~w: " error_logger:info_msg("Repair ~s: tail ~p of ~p finished ~p repair ID ~w: "
"~w\nStats ~w\n", "~p\nStats ~p\n",
[Summary, MyName, UPI, RepairMode, RepairId, [Summary, MyName, UPI, RepairMode, RepairId,
Res, Stats]), Res, Stats]),
ets:delete(ETS), ets:delete(ETS),

View file

@ -207,11 +207,16 @@ make_repair_directives(ConsistencyMode, RepairMode, File, Size, EpochID,
C0 = [begin C0 = [begin
%% erlang:garbage_collect(), %% erlang:garbage_collect(),
Proxy = orddict:fetch(FLU, ProxiesDict), Proxy = orddict:fetch(FLU, ProxiesDict),
OffSzCs = case machi_proxy_flu1_client:checksum_list( OffSzCs =
Proxy, EpochID, File, ?LONG_TIMEOUT) of case machi_proxy_flu1_client:checksum_list(
{ok, X} -> X; Proxy, EpochID, File, ?LONG_TIMEOUT) of
{error, no_such_file} -> [] {ok, InfoBin} ->
end, {Info, _} =
machi_flu1:split_checksum_list_blob_decode(InfoBin),
Info;
{error, no_such_file} ->
[]
end,
[{?MAX_OFFSET, 0, <<>>, FLU}] % our end-of-file marker [{?MAX_OFFSET, 0, <<>>, FLU}] % our end-of-file marker
++ ++
[{Off, Sz, Cs, FLU} || {Off, Sz, Cs} <- OffSzCs] [{Off, Sz, Cs, FLU} || {Off, Sz, Cs} <- OffSzCs]
@ -313,7 +318,7 @@ execute_repair_directive({File, Cmds}, {ProxiesDict, EpochID, Verb, ETS}=Acc) ->
{in_bytes, t_in_bytes}, {out_files, t_out_files}, {in_bytes, t_in_bytes}, {out_files, t_out_files},
{out_chunks, t_out_chunks}, {out_bytes, t_out_bytes}], {out_chunks, t_out_chunks}, {out_bytes, t_out_bytes}],
[ets:insert(ETS, {L_K, 0}) || {L_K, _T_K} <- EtsKeys], [ets:insert(ETS, {L_K, 0}) || {L_K, _T_K} <- EtsKeys],
F = fun({copy, {Offset, Size, CSum, MySrc}, MyDsts}, Acc2) -> F = fun({copy, {Offset, Size, TaggedCSum, MySrc}, MyDsts}, Acc2) ->
SrcP = orddict:fetch(MySrc, ProxiesDict), SrcP = orddict:fetch(MySrc, ProxiesDict),
case ets:lookup_element(ETS, in_chunks, 2) rem 100 of case ets:lookup_element(ETS, in_chunks, 2) rem 100 of
0 -> ?VERB(".", []); 0 -> ?VERB(".", []);
@ -324,6 +329,7 @@ execute_repair_directive({File, Cmds}, {ProxiesDict, EpochID, Verb, ETS}=Acc) ->
SrcP, EpochID, File, Offset, Size, SrcP, EpochID, File, Offset, Size,
?SHORT_TIMEOUT), ?SHORT_TIMEOUT),
_T2 = os:timestamp(), _T2 = os:timestamp(),
<<_Tag:1/binary, CSum/binary>> = TaggedCSum,
case machi_util:checksum_chunk(Chunk) of case machi_util:checksum_chunk(Chunk) of
CSum_now when CSum_now == CSum -> CSum_now when CSum_now == CSum ->
[begin [begin

View file

@ -78,11 +78,18 @@
-include("machi_pb.hrl"). -include("machi_pb.hrl").
-include("machi_projection.hrl"). -include("machi_projection.hrl").
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif. % TEST
-define(SERVER_CMD_READ_TIMEOUT, 600*1000). -define(SERVER_CMD_READ_TIMEOUT, 600*1000).
-export([start_link/1, stop/1, -export([start_link/1, stop/1,
update_wedge_state/3]). update_wedge_state/3, wedge_myself/2]).
-export([make_listener_regname/1, make_projection_server_regname/1]). -export([make_listener_regname/1, make_projection_server_regname/1]).
-export([encode_csum_file_entry/3, encode_csum_file_entry_bin/3,
decode_csum_file_entry/1,
split_checksum_list_blob/1, split_checksum_list_blob_decode/1]).
-record(state, { -record(state, {
flu_name :: atom(), flu_name :: atom(),
@ -121,6 +128,10 @@ update_wedge_state(PidSpec, Boolean, EpochId)
when (Boolean == true orelse Boolean == false), is_tuple(EpochId) -> when (Boolean == true orelse Boolean == false), is_tuple(EpochId) ->
PidSpec ! {wedge_state_change, Boolean, EpochId}. PidSpec ! {wedge_state_change, Boolean, EpochId}.
wedge_myself(PidSpec, EpochId)
when is_tuple(EpochId) ->
PidSpec ! {wedge_myself, EpochId}.
%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%
ets_table_name(FluName) when is_atom(FluName) -> ets_table_name(FluName) when is_atom(FluName) ->
@ -239,7 +250,8 @@ listen_server_loop(LSock, S) ->
spawn_link(fun() -> net_server_loop(Sock, S) end), spawn_link(fun() -> net_server_loop(Sock, S) end),
listen_server_loop(LSock, S). listen_server_loop(LSock, S).
append_server_loop(FluPid, #state{data_dir=DataDir,wedged=Wedged_p}=S) -> append_server_loop(FluPid, #state{data_dir=DataDir,wedged=Wedged_p,
epoch_id=OldEpochId}=S) ->
AppendServerPid = self(), AppendServerPid = self(),
receive receive
{seq_append, From, _Prefix, _Chunk, _CSum, _Extra} when Wedged_p -> {seq_append, From, _Prefix, _Chunk, _CSum, _Extra} when Wedged_p ->
@ -250,10 +262,26 @@ append_server_loop(FluPid, #state{data_dir=DataDir,wedged=Wedged_p}=S) ->
Chunk, CSum, Extra, Chunk, CSum, Extra,
DataDir, AppendServerPid) end), DataDir, AppendServerPid) end),
append_server_loop(FluPid, S); append_server_loop(FluPid, S);
{wedge_state_change, Boolean, EpochId} -> {wedge_myself, WedgeEpochId} ->
true = ets:insert(S#state.etstab, {epoch, {Boolean, EpochId}}), if WedgeEpochId == OldEpochId ->
append_server_loop(FluPid, S#state{wedged=Boolean, true = ets:insert(S#state.etstab,
epoch_id=EpochId}); {epoch, {true, OldEpochId}}),
append_server_loop(FluPid, S#state{wedged=true});
true ->
append_server_loop(FluPid, S)
end;
{wedge_state_change, Boolean, {NewEpoch, _}=NewEpochId} ->
OldEpoch = case OldEpochId of {OldE, _} -> OldE;
undefined -> -1
end,
if NewEpoch >= OldEpoch ->
true = ets:insert(S#state.etstab,
{epoch, {Boolean, NewEpochId}}),
append_server_loop(FluPid, S#state{wedged=Boolean,
epoch_id=NewEpochId});
true ->
append_server_loop(FluPid, S)
end;
{wedge_status, FromPid} -> {wedge_status, FromPid} ->
#state{wedged=Wedged_p, epoch_id=EpochId} = S, #state{wedged=Wedged_p, epoch_id=EpochId} = S,
FromPid ! {wedge_status_reply, Wedged_p, EpochId}, FromPid ! {wedge_status_reply, Wedged_p, EpochId},
@ -263,9 +291,6 @@ append_server_loop(FluPid, #state{data_dir=DataDir,wedged=Wedged_p}=S) ->
append_server_loop(FluPid, S) append_server_loop(FluPid, S)
end. end.
-define(EpochIDSpace, ((4*2)+(20*2))). % hexencodingwhee!
-define(CSumSpace, ((1*2)+(20*2))). % hexencodingwhee!
net_server_loop(Sock, S) -> net_server_loop(Sock, S) ->
case gen_tcp:recv(Sock, 0, ?SERVER_CMD_READ_TIMEOUT) of case gen_tcp:recv(Sock, 0, ?SERVER_CMD_READ_TIMEOUT) of
{ok, Bin} -> {ok, Bin} ->
@ -287,7 +312,10 @@ net_server_loop(Sock, S) ->
R = #mpb_ll_response{req_id= <<>>, R = #mpb_ll_response{req_id= <<>>,
generic=#mpb_errorresp{code=1, msg=Msg}}, generic=#mpb_errorresp{code=1, msg=Msg}},
Resp = machi_pb:encode_mpb_ll_response(R), Resp = machi_pb:encode_mpb_ll_response(R),
_ = (catch gen_tcp:send(Sock, Resp)), %% TODO: Weird that sometimes neither catch nor try/catch
%% can prevent OTP's SASL from logging an error here.
%% Error in process <0.545.0> with exit value: {badarg,[{erlang,port_command,.......
_ = (catch gen_tcp:send(Sock, Resp)), timer:sleep(1000),
(catch gen_tcp:close(Sock)), (catch gen_tcp:close(Sock)),
exit(normal) exit(normal)
end. end.
@ -314,12 +342,11 @@ do_pb_ll_request(PB_request, S) ->
Req = machi_pb_translate:from_pb_request(PB_request), Req = machi_pb_translate:from_pb_request(PB_request),
{ReqID, Cmd, Result, S2} = {ReqID, Cmd, Result, S2} =
case Req of case Req of
{RqID, {low_proj, _}=CMD} -> {RqID, {LowCmd, _}=CMD}
when LowCmd == low_proj;
LowCmd == low_wedge_status; LowCmd == low_list_files ->
%% Skip wedge check for projection commands! %% Skip wedge check for projection commands!
{Rs, NewS} = do_pb_ll_request3(CMD, S), %% Skip wedge check for these unprivileged commands
{RqID, CMD, Rs, NewS};
{RqID, {low_wedge_status, _}=CMD} ->
%% Skip wedge check for low_wedge_status!
{Rs, NewS} = do_pb_ll_request3(CMD, S), {Rs, NewS} = do_pb_ll_request3(CMD, S),
{RqID, CMD, Rs, NewS}; {RqID, CMD, Rs, NewS};
{RqID, CMD} -> {RqID, CMD} ->
@ -333,7 +360,7 @@ do_pb_ll_request2(EpochID, CMD, S) ->
{Wedged_p, CurrentEpochID} = ets:lookup_element(S#state.etstab, epoch, 2), {Wedged_p, CurrentEpochID} = ets:lookup_element(S#state.etstab, epoch, 2),
if Wedged_p == true -> if Wedged_p == true ->
{{error, wedged}, S}; {{error, wedged}, S};
not ((not is_tuple(EpochID)) orelse EpochID == ?DUMMY_PV1_EPOCH) is_tuple(EpochID)
andalso andalso
EpochID /= CurrentEpochID -> EpochID /= CurrentEpochID ->
{Epoch, _} = EpochID, {Epoch, _} = EpochID,
@ -343,8 +370,9 @@ do_pb_ll_request2(EpochID, CMD, S) ->
true -> true ->
%% We're at same epoch # but different checksum, or %% We're at same epoch # but different checksum, or
%% we're at a newer/bigger epoch #. %% we're at a newer/bigger epoch #.
io:format(user, "\n\nTODO: wedge myself!\n\n", []), io:format(user, "\n\nTODO/monitor: wedging myself!\n\n",[]),
todo_wedge_myself wedge_myself(S#state.flu_name, CurrentEpochID),
ok
end, end,
{{error, bad_epoch}, S}; {{error, bad_epoch}, S};
true -> true ->
@ -444,29 +472,8 @@ do_server_append_chunk2(_PKey, Prefix, Chunk, CSum_tag, Client_CSum,
ChunkExtra, #state{flu_name=FluName}=_S) -> ChunkExtra, #state{flu_name=FluName}=_S) ->
%% TODO: Do anything with PKey? %% TODO: Do anything with PKey?
try try
CSum = case CSum_tag of TaggedCSum = check_or_make_tagged_checksum(CSum_tag, Client_CSum,Chunk),
?CSUM_TAG_NONE -> FluName ! {seq_append, self(), Prefix, Chunk, TaggedCSum, ChunkExtra},
%% TODO: If the client was foolish enough to use
%% this type of non-checksum, then the client gets
%% what it deserves wrt data integrity, alas. In
%% the client-side Chain Replication method, each
%% server will calculated this independently, which
%% isn't exactly what ought to happen for best data
%% integrity checking. In server-side CR, the csum
%% should be calculated by the head and passed down
%% the chain together with the value.
CS = machi_util:checksum_chunk(Chunk),
machi_util:make_tagged_csum(server_sha, CS);
?CSUM_TAG_CLIENT_SHA ->
CS = machi_util:checksum_chunk(Chunk),
if CS == Client_CSum ->
machi_util:make_tagged_csum(server_sha,
Client_CSum);
true ->
throw({bad_csum, CS})
end
end,
FluName ! {seq_append, self(), Prefix, Chunk, CSum, ChunkExtra},
receive receive
{assignment, Offset, File} -> {assignment, Offset, File} ->
Size = iolist_size(Chunk), Size = iolist_size(Chunk),
@ -514,36 +521,11 @@ do_server_write_chunk(File, Offset, Chunk, CSum_tag, CSum,
do_server_write_chunk2(_File, Offset, Chunk, CSum_tag, do_server_write_chunk2(_File, Offset, Chunk, CSum_tag,
Client_CSum, _DataDir, FHc, FHd) -> Client_CSum, _DataDir, FHc, FHd) ->
try try
CSum = case CSum_tag of TaggedCSum = check_or_make_tagged_checksum(CSum_tag, Client_CSum,Chunk),
?CSUM_TAG_NONE ->
%% TODO: If the client was foolish enough to use
%% this type of non-checksum, then the client gets
%% what it deserves wrt data integrity, alas. In
%% the client-side Chain Replication method, each
%% server will calculated this independently, which
%% isn't exactly what ought to happen for best data
%% integrity checking. In server-side CR, the csum
%% should be calculated by the head and passed down
%% the chain together with the value.
CS = machi_util:checksum_chunk(Chunk),
machi_util:make_tagged_csum(server_sha,CS);
?CSUM_TAG_CLIENT_SHA ->
CS = machi_util:checksum_chunk(Chunk),
if CS == Client_CSum ->
machi_util:make_tagged_csum(server_sha,
Client_CSum);
true ->
throw({bad_csum, CS})
end
end,
Size = iolist_size(Chunk), Size = iolist_size(Chunk),
case file:pwrite(FHd, Offset, Chunk) of case file:pwrite(FHd, Offset, Chunk) of
ok -> ok ->
OffsetHex = machi_util:bin_to_hexstr(<<Offset:64/big>>), CSum_info = encode_csum_file_entry(Offset, Size, TaggedCSum),
LenHex = machi_util:bin_to_hexstr(<<Size:32/big>>),
CSumHex2 = machi_util:bin_to_hexstr(CSum),
CSum_info = [OffsetHex, 32, LenHex, 32,
CSumHex2, 10],
ok = file:write(FHc, CSum_info), ok = file:write(FHc, CSum_info),
ok; ok;
_Else3 -> _Else3 ->
@ -606,7 +588,16 @@ do_server_checksum_listing(File, #state{data_dir=DataDir}=_S) ->
%% {packet_size,N} limit, then we'll have a difficult time, eh? %% {packet_size,N} limit, then we'll have a difficult time, eh?
case file:read_file(CSumPath) of case file:read_file(CSumPath) of
{ok, Bin} -> {ok, Bin} ->
{ok, Bin}; if byte_size(Bin) > (?PB_MAX_MSG_SIZE - 1024) ->
%% TODO: Fix this limitation by streaming the
%% binary in multiple smaller PB messages.
%% Also, don't read the file all at once. ^_^
error_logger:error_msg("~s:~w oversize ~s\n",
[?MODULE, ?LINE, CSumPath]),
{error, bad_arg};
true ->
{ok, Bin}
end;
{error, enoent} -> {error, enoent} ->
{error, no_such_file}; {error, no_such_file};
{error, _} -> {error, _} ->
@ -780,21 +771,18 @@ seq_append_server_loop(DataDir, Prefix, _File, {FHd,FHc}, FileNum, Offset)
run_seq_append_server2(Prefix, DataDir); run_seq_append_server2(Prefix, DataDir);
seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}=FH_, FileNum, Offset) -> seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}=FH_, FileNum, Offset) ->
receive receive
{seq_append, From, Prefix, Chunk, CSum, Extra} -> {seq_append, From, Prefix, Chunk, TaggedCSum, Extra} ->
if Chunk /= <<>> -> if Chunk /= <<>> ->
ok = file:pwrite(FHd, Offset, Chunk); ok = file:pwrite(FHd, Offset, Chunk);
true -> true ->
ok ok
end, end,
From ! {assignment, Offset, File}, From ! {assignment, Offset, File},
Len = byte_size(Chunk), Size = iolist_size(Chunk),
OffsetHex = machi_util:bin_to_hexstr(<<Offset:64/big>>), CSum_info = encode_csum_file_entry(Offset, Size, TaggedCSum),
LenHex = machi_util:bin_to_hexstr(<<Len:32/big>>),
CSumHex = machi_util:bin_to_hexstr(CSum),
CSum_info = [OffsetHex, 32, LenHex, 32, CSumHex, 10],
ok = file:write(FHc, CSum_info), ok = file:write(FHc, CSum_info),
seq_append_server_loop(DataDir, Prefix, File, FH_, seq_append_server_loop(DataDir, Prefix, File, FH_,
FileNum, Offset + Len + Extra); FileNum, Offset + Size + Extra);
{sync_stuff, FromPid, Ref} -> {sync_stuff, FromPid, Ref} ->
file:sync(FHc), file:sync(FHc),
FromPid ! {sync_finished, Ref}, FromPid ! {sync_finished, Ref},
@ -821,7 +809,7 @@ make_listener_regname(BaseName) ->
make_projection_server_regname(BaseName) -> make_projection_server_regname(BaseName) ->
list_to_atom(atom_to_list(BaseName) ++ "_pstore2"). list_to_atom(atom_to_list(BaseName) ++ "_pstore2").
http_server_hack(FluName, Line1, Sock, S) -> http_hack_server(FluName, Line1, Sock, S) ->
{ok, {http_request, HttpOp, URI0, _HttpV}, _x} = {ok, {http_request, HttpOp, URI0, _HttpV}, _x} =
erlang:decode_packet(http_bin, Line1, [{line_length,4095}]), erlang:decode_packet(http_bin, Line1, [{line_length,4095}]),
MyURI = case URI0 of MyURI = case URI0 of
@ -829,18 +817,18 @@ http_server_hack(FluName, Line1, Sock, S) ->
Rest; Rest;
_ -> URI0 _ -> URI0
end, end,
Hdrs = http_harvest_headers(Sock), Hdrs = http_hack_harvest_headers(Sock),
G = digest_header_goop(Hdrs, #http_goop{}), G = http_hack_digest_header_goop(Hdrs, #http_goop{}),
case HttpOp of case HttpOp of
'PUT' -> 'PUT' ->
http_server_hack_put(Sock, G, FluName, MyURI); http_hack_server_put(Sock, G, FluName, MyURI);
'GET' -> 'GET' ->
http_server_hack_get(Sock, G, FluName, MyURI, S) http_hack_server_get(Sock, G, FluName, MyURI, S)
end, end,
ok = gen_tcp:close(Sock), ok = gen_tcp:close(Sock),
exit(normal). exit(normal).
http_server_hack_put(Sock, G, FluName, MyURI) -> http_hack_server_put(Sock, G, FluName, MyURI) ->
ok = inet:setopts(Sock, [{packet, raw}]), ok = inet:setopts(Sock, [{packet, raw}]),
{ok, Chunk} = gen_tcp:recv(Sock, G#http_goop.len, 60*1000), {ok, Chunk} = gen_tcp:recv(Sock, G#http_goop.len, 60*1000),
CSum0 = machi_util:checksum_chunk(Chunk), CSum0 = machi_util:checksum_chunk(Chunk),
@ -878,34 +866,34 @@ http_server_hack_put(Sock, G, FluName, MyURI) ->
ok = gen_tcp:send(Sock, <<"HTTP/1.0 499 TIMEOUT\r\n\r\n">>) ok = gen_tcp:send(Sock, <<"HTTP/1.0 499 TIMEOUT\r\n\r\n">>)
end. end.
http_server_hack_get(Sock, _G, _FluName, _MyURI, _S) -> http_hack_server_get(Sock, _G, _FluName, _MyURI, _S) ->
ok = gen_tcp:send(Sock, <<"TODO BROKEN FEATURE see old commits\r\n">>). ok = gen_tcp:send(Sock, <<"TODO BROKEN FEATURE see old commits\r\n">>).
http_harvest_headers(Sock) -> http_hack_harvest_headers(Sock) ->
ok = inet:setopts(Sock, [{packet, httph}]), ok = inet:setopts(Sock, [{packet, httph}]),
http_harvest_headers(gen_tcp:recv(Sock, 0, ?SERVER_CMD_READ_TIMEOUT), http_hack_harvest_headers(gen_tcp:recv(Sock, 0, ?SERVER_CMD_READ_TIMEOUT),
Sock, []). Sock, []).
http_harvest_headers({ok, http_eoh}, _Sock, Acc) -> http_hack_harvest_headers({ok, http_eoh}, _Sock, Acc) ->
Acc; Acc;
http_harvest_headers({error, _}, _Sock, _Acc) -> http_hack_harvest_headers({error, _}, _Sock, _Acc) ->
[]; [];
http_harvest_headers({ok, Hdr}, Sock, Acc) -> http_hack_harvest_headers({ok, Hdr}, Sock, Acc) ->
http_harvest_headers(gen_tcp:recv(Sock, 0, ?SERVER_CMD_READ_TIMEOUT), http_hack_harvest_headers(gen_tcp:recv(Sock, 0, ?SERVER_CMD_READ_TIMEOUT),
Sock, [Hdr|Acc]). Sock, [Hdr|Acc]).
digest_header_goop([], G) -> http_hack_digest_header_goop([], G) ->
G; G;
digest_header_goop([{http_header, _, 'Content-Length', _, Str}|T], G) -> http_hack_digest_header_goop([{http_header, _, 'Content-Length', _, Str}|T], G) ->
digest_header_goop(T, G#http_goop{len=list_to_integer(Str)}); http_hack_digest_header_goop(T, G#http_goop{len=list_to_integer(Str)});
digest_header_goop([{http_header, _, "X-Checksum", _, Str}|T], G) -> http_hack_digest_header_goop([{http_header, _, "X-Checksum", _, Str}|T], G) ->
SHA = machi_util:hexstr_to_bin(Str), SHA = machi_util:hexstr_to_bin(Str),
CSum = machi_util:make_tagged_csum(client_sha, SHA), CSum = machi_util:make_tagged_csum(client_sha, SHA),
digest_header_goop(T, G#http_goop{x_csum=CSum}); http_hack_digest_header_goop(T, G#http_goop{x_csum=CSum});
digest_header_goop([_H|T], G) -> http_hack_digest_header_goop([_H|T], G) ->
digest_header_goop(T, G). http_hack_digest_header_goop(T, G).
split_uri_options(OpsBin) -> http_hack_split_uri_options(OpsBin) ->
L = binary:split(OpsBin, <<"&">>), L = binary:split(OpsBin, <<"&">>),
[case binary:split(X, <<"=">>) of [case binary:split(X, <<"=">>) of
[<<"offset">>, Bin] -> [<<"offset">>, Bin] ->
@ -913,3 +901,291 @@ split_uri_options(OpsBin) ->
[<<"size">>, Bin] -> [<<"size">>, Bin] ->
{size, binary_to_integer(Bin)} {size, binary_to_integer(Bin)}
end || X <- L]. end || X <- L].
%% @doc Encode `Offset + Size + TaggedCSum' into an `iolist()' type for
%% internal storage by the FLU.
-spec encode_csum_file_entry(
machi_dt:file_offset(), machi_dt:chunk_size(), machi_dt:chunk_s()) ->
iolist().
encode_csum_file_entry(Offset, Size, TaggedCSum) ->
Len = 8 + 4 + byte_size(TaggedCSum),
[<<Len:8/unsigned-big, Offset:64/unsigned-big, Size:32/unsigned-big>>,
TaggedCSum].
%% @doc Encode `Offset + Size + TaggedCSum' into an `binary()' type for
%% internal storage by the FLU.
-spec encode_csum_file_entry_bin(
machi_dt:file_offset(), machi_dt:chunk_size(), machi_dt:chunk_s()) ->
binary().
encode_csum_file_entry_bin(Offset, Size, TaggedCSum) ->
Len = 8 + 4 + byte_size(TaggedCSum),
<<Len:8/unsigned-big, Offset:64/unsigned-big, Size:32/unsigned-big,
TaggedCSum/binary>>.
%% @doc Decode a single `binary()' blob into an
%% `{Offset,Size,TaggedCSum}' tuple.
%%
%% The internal encoding (which is currently exposed to the outside world
%% via this function and related ones) is:
%%
%% <ul>
%% <li> 1 byte: record length
%% </li>
%% <li> 8 bytes (unsigned big-endian): byte offset
%% </li>
%% <li> 4 bytes (unsigned big-endian): chunk size
%% </li>
%% <li> all remaining bytes: tagged checksum (1st byte = type tag)
%% </li>
%% </ul>
%%
%% See `machi.hrl' for the tagged checksum types, e.g.,
%% `?CSUM_TAG_NONE'.
-spec decode_csum_file_entry(binary()) ->
error |
{machi_dt:file_offset(), machi_dt:chunk_size(), machi_dt:chunk_s()}.
decode_csum_file_entry(<<_:8/unsigned-big, Offset:64/unsigned-big, Size:32/unsigned-big, TaggedCSum/binary>>) ->
{Offset, Size, TaggedCSum};
decode_csum_file_entry(_Else) ->
error.
%% @doc Split a `binary()' blob of `checksum_list' data into a list of
%% unparsed `binary()' blobs, one per entry.
%%
%% Decode the unparsed blobs with {@link decode_csum_file_entry/1}, if
%% desired.
%%
%% The return value `TrailingJunk' is unparseable bytes at the end of
%% the checksum list blob.
-spec split_checksum_list_blob(binary()) ->
{list(binary()), TrailingJunk::binary()}.
split_checksum_list_blob(Bin) ->
split_checksum_list_blob(Bin, []).
split_checksum_list_blob(<<Len:8/unsigned-big, Part:Len/binary, Rest/binary>>, Acc)->
case get(hack_length) of
Len -> ok;
_ -> put(hack_different, true)
end,
split_checksum_list_blob(Rest, [<<Len:8/unsigned-big, Part/binary>>|Acc]);
split_checksum_list_blob(Rest, Acc) ->
{lists:reverse(Acc), Rest}.
%% @doc Split a `binary()' blob of `checksum_list' data into a list of
%% `{Offset,Size,TaggedCSum}' tuples.
-spec split_checksum_list_blob_decode(binary()) ->
{list({machi_dt:file_offset(), machi_dt:chunk_size(), machi_dt:chunk_s()}),
TrailingJunk::binary()}.
split_checksum_list_blob_decode(Bin) ->
split_checksum_list_blob_decode(Bin, []).
split_checksum_list_blob_decode(<<Len:8/unsigned-big, Part:Len/binary, Rest/binary>>, Acc)->
One = <<Len:8/unsigned-big, Part/binary>>,
case decode_csum_file_entry(One) of
error ->
split_checksum_list_blob_decode(Rest, Acc);
DecOne ->
split_checksum_list_blob_decode(Rest, [DecOne|Acc])
end;
split_checksum_list_blob_decode(Rest, Acc) ->
{lists:reverse(Acc), Rest}.
check_or_make_tagged_checksum(?CSUM_TAG_NONE, _Client_CSum, Chunk) ->
%% TODO: If the client was foolish enough to use
%% this type of non-checksum, then the client gets
%% what it deserves wrt data integrity, alas. In
%% the client-side Chain Replication method, each
%% server will calculated this independently, which
%% isn't exactly what ought to happen for best data
%% integrity checking. In server-side CR, the csum
%% should be calculated by the head and passed down
%% the chain together with the value.
CS = machi_util:checksum_chunk(Chunk),
machi_util:make_tagged_csum(server_sha, CS);
check_or_make_tagged_checksum(?CSUM_TAG_CLIENT_SHA, Client_CSum, Chunk) ->
CS = machi_util:checksum_chunk(Chunk),
if CS == Client_CSum ->
machi_util:make_tagged_csum(server_sha,
Client_CSum);
true ->
throw({bad_csum, CS})
end.
-ifdef(TEST).
%% Remove "_COMMENTED" string to run the demo/exploratory code.
timing_demo_test_COMMENTED_() ->
{timeout, 300, fun() -> timing_demo_test2() end}.
%% Demo/exploratory hackery to check relative speeds of dealing with
%% checksum data in different ways.
%%
%% Summary:
%%
%% * Use compact binary encoding, with 1 byte header for entry length.
%% * Because the hex-style code is *far* slower just for enc & dec ops.
%% * For 1M entries of enc+dec: 0.215 sec vs. 15.5 sec.
%% * File sorter when sorting binaries as-is is only 30-40% slower
%% than an in-memory split (of huge binary emulated by file:read_file()
%% "big slurp") and sort of the same as-is sortable binaries.
%% * File sorter slows by a factor of about 2.5 if {order, fun compare/2}
%% function must be used, i.e. because the checksum entry lengths differ.
%% * File sorter + {order, fun compare/2} is still *far* faster than external
%% sort by OS X's sort(1) of sortable ASCII hex-style:
%% 4.5 sec vs. 21 sec.
%% * File sorter {order, fun compare/2} is faster than in-memory sort
%% of order-friendly 3-tuple-style: 4.5 sec vs. 15 sec.
timing_demo_test2() ->
Xs = [random:uniform(1 bsl 32) || _ <- lists:duplicate(1*1000*1000, $x)],
CSum = <<"123456789abcdef0A">>,
17 = byte_size(CSum),
io:format(user, "\n", []),
%% %% {ok, ZZZ} = file:open("/tmp/foo.hex-style", [write, binary, raw, delayed_write]),
io:format(user, "Hex-style file entry enc+dec: ", []),
[erlang:garbage_collect(self()) || _ <- lists:seq(1, 4)],
{HexUSec, _} =
timer:tc(fun() ->
lists:foldl(fun(X, _) ->
B = encode_csum_file_entry_hex(X, 100, CSum),
%% file:write(ZZZ, [B, 10]),
decode_csum_file_entry_hex(list_to_binary(B))
end, x, Xs)
end),
io:format(user, "~.3f sec\n", [HexUSec / 1000000]),
%% %% file:close(ZZZ),
io:format(user, "Not-sortable file entry enc+dec: ", []),
[erlang:garbage_collect(self()) || _ <- lists:seq(1, 4)],
{NotSortedUSec, _} =
timer:tc(fun() ->
lists:foldl(fun(X, _) ->
B = encode_csum_file_entry(X, 100, CSum),
decode_csum_file_entry(list_to_binary(B))
end, x, Xs)
end),
io:format(user, "~.3f sec\n", [NotSortedUSec / 1000000]),
NotHexList = lists:foldl(fun(X, Acc) ->
B = encode_csum_file_entry(X, 100, CSum),
[B|Acc]
end, [], Xs),
NotHexBin = iolist_to_binary(NotHexList),
io:format(user, "Split NotHexBin: ", []),
[erlang:garbage_collect(self()) || _ <- lists:seq(1, 4)],
{NotHexBinUSec, SplitRes} =
timer:tc(fun() ->
put(hack_length, 29),
put(hack_different, false),
{Sorted, _Leftover} = split_checksum_list_blob(NotHexBin),
io:format(user, " Leftover ~p (hack_different ~p) ", [_Leftover, get(hack_different)]),
Sorted
end),
io:format(user, "~.3f sec\n", [NotHexBinUSec / 1000000]),
io:format(user, "Sort Split results: ", []),
[erlang:garbage_collect(self()) || _ <- lists:seq(1, 4)],
{SortSplitUSec, _} =
timer:tc(fun() ->
lists:sort(SplitRes)
%% lists:sort(fun sort_2lines/2, SplitRes)
end),
io:format(user, "~.3f sec\n", [SortSplitUSec / 1000000]),
UnsortedName = "/tmp/foo.unsorted",
SortedName = "/tmp/foo.sorted",
ok = file:write_file(UnsortedName, NotHexList),
io:format(user, "File Sort Split results: ", []),
[erlang:garbage_collect(self()) || _ <- lists:seq(1, 4)],
{FileSortUSec, _} =
timer:tc(fun() ->
{ok, FHin} = file:open(UnsortedName, [read, binary]),
{ok, FHout} = file:open(SortedName,
[write, binary, delayed_write]),
put(hack_sorter_sha_ctx, crypto:hash_init(sha)),
ok = file_sorter:sort(sort_input_fun(FHin, <<>>),
sort_output_fun(FHout),
[{format,binary},
{header, 1}
%% , {order, fun sort_2lines/2}
])
end),
io:format(user, "~.3f sec\n", [FileSortUSec / 1000000]),
_SHA = crypto:hash_final(get(hack_sorter_sha_ctx)),
%% io:format(user, "SHA via (hack_sorter_sha_ctx) = ~p\n", [_SHA]),
io:format(user, "NotHex-Not-sortable tuple list creation: ", []),
[erlang:garbage_collect(self()) || _ <- lists:seq(1, 4)],
{NotHexTupleCreationUSec, NotHexTupleList} =
timer:tc(fun() ->
lists:foldl(fun(X, Acc) ->
B = encode_csum_file_entry_hex(
X, 100, CSum),
[B|Acc]
end, [], Xs)
end),
io:format(user, "~.3f sec\n", [NotHexTupleCreationUSec / 1000000]),
io:format(user, "NotHex-Not-sortable tuple list sort: ", []),
[erlang:garbage_collect(self()) || _ <- lists:seq(1, 4)],
{NotHexTupleSortUSec, _} =
timer:tc(fun() ->
lists:sort(NotHexTupleList)
end),
io:format(user, "~.3f sec\n", [NotHexTupleSortUSec / 1000000]),
ok.
sort_2lines(<<_:1/binary, A/binary>>, <<_:1/binary, B/binary>>) ->
A < B.
sort_input_fun(FH, PrevStuff) ->
fun(close) ->
ok;
(read) ->
case file:read(FH, 1024*1024) of
{ok, NewStuff} ->
AllStuff = if PrevStuff == <<>> ->
NewStuff;
true ->
<<PrevStuff/binary, NewStuff/binary>>
end,
{SplitRes, Leftover} = split_checksum_list_blob(AllStuff),
{SplitRes, sort_input_fun(FH, Leftover)};
eof ->
end_of_input
end
end.
sort_output_fun(FH) ->
fun(close) ->
file:close(FH);
(Stuff) ->
Ctx = get(hack_sorter_sha_ctx),
put(hack_sorter_sha_ctx, crypto:hash_update(Ctx, Stuff)),
ok = file:write(FH, Stuff),
sort_output_fun(FH)
end.
encode_csum_file_entry_hex(Offset, Size, TaggedCSum) ->
OffsetHex = machi_util:bin_to_hexstr(<<Offset:64/big>>),
SizeHex = machi_util:bin_to_hexstr(<<Size:32/big>>),
CSumHex = machi_util:bin_to_hexstr(TaggedCSum),
[OffsetHex, 32, SizeHex, 32, CSumHex].
decode_csum_file_entry_hex(<<OffsetHex:16/binary, _:1/binary, SizeHex:8/binary, _:1/binary, CSumHex/binary>>) ->
Offset = machi_util:hexstr_to_bin(OffsetHex),
Size = machi_util:hexstr_to_bin(SizeHex),
CSum = machi_util:hexstr_to_bin(CSumHex),
{Offset, Size, CSum}.
-endif. % TEST

View file

@ -26,20 +26,7 @@
%% management can be found in {@link machi_proxy_flu1_client} and %% management can be found in {@link machi_proxy_flu1_client} and
%% {@link machi_cr_client}. %% {@link machi_cr_client}.
%% %%
%% TODO This EDoc was written first, and the EDoc and also `-type' and %% For the moment, this module implements a Protocol Buffers-based
%% `-spec' definitions for {@link machi_proxy_flu1_client} and {@link
%% machi_cr_client} must be improved.
%%
%% === Protocol origins ===
%%
%% The protocol implemented here is an artisanal, hand-crafted, silly
%% thing that was very quick to put together for a "demo day" proof of
%% concept. It will almost certainly be replaced with something else,
%% both in terms of wire format and better code separation of
%% serialization/deserialization vs. network transport management,
%% etc.
%%
%% For the moment, this module implements a rudimentary TCP-based
%% protocol as the sole supported access method to the server, %% protocol as the sole supported access method to the server,
%% sequencer, and projection store. Conceptually, those three %% sequencer, and projection store. Conceptually, those three
%% services are independent and ought to have their own protocols. As %% services are independent and ought to have their own protocols. As
@ -47,6 +34,10 @@
%% compatibility. Furthermore, from the perspective of failure %% compatibility. Furthermore, from the perspective of failure
%% detection, it is very convenient that all three FLU-related %% detection, it is very convenient that all three FLU-related
%% services are accessed using the same single TCP port. %% services are accessed using the same single TCP port.
%%
%% TODO This EDoc was written first, and the EDoc and also `-type' and
%% `-spec' definitions for {@link machi_proxy_flu1_client} and {@link
%% machi_cr_client} must be improved.
-module(machi_flu1_client). -module(machi_flu1_client).
@ -174,16 +165,36 @@ read_chunk(Host, TcpPort, EpochID, File, Offset, Size)
%% @doc Fetch the list of chunk checksums for `File'. %% @doc Fetch the list of chunk checksums for `File'.
-spec checksum_list(port_wrap(), machi_dt:epoch_id(), machi_dt:file_name()) -> -spec checksum_list(port_wrap(), machi_dt:epoch_id(), machi_dt:file_name()) ->
{ok, [machi_dt:chunk_summary()]} | {ok, binary()} |
{error, machi_dt:error_general() | 'no_such_file' | 'partial_read'} | {error, machi_dt:error_general() | 'no_such_file' | 'partial_read'} |
{error, term()}. {error, term()}.
checksum_list(Sock, EpochID, File) -> checksum_list(Sock, EpochID, File) ->
checksum_list2(Sock, EpochID, File). checksum_list2(Sock, EpochID, File).
%% @doc Fetch the list of chunk checksums for `File'. %% @doc Fetch the list of chunk checksums for `File'.
%%
%% Why return a simple `binary()' type rather than
%% `[machi_dt:chunk_summary()]'? The two reasons are:
%% <ol>
%% <li> Server overhead: the CPU required to chop up the implementation-
%% specific store into zillions of very small terms is very high.
%% </li>
%% <li> Protocol encoding and decoding overhead: the cost is non-zero,
%% and the sum of cost of encoding and decoding a zillion small terms
%% is substantial.
%% </li>
%% </ol>
%%
%% For both reasons, the server's protocol response is absurdly simple
%% and very fast: send back a `binary()' blob to the client. Then it
%% is the client's responsibility to spend the CPU time to parse the
%% blob.
%%
%% Details of the encoding used inside the `binary()' blog can be found
%% in the EDoc comments for {@link machi_flu1:decode_csum_file_entry/1}.
-spec checksum_list(machi_dt:inet_host(), machi_dt:inet_port(), machi_dt:epoch_id(), machi_dt:file_name()) -> -spec checksum_list(machi_dt:inet_host(), machi_dt:inet_port(), machi_dt:epoch_id(), machi_dt:file_name()) ->
{ok, [machi_dt:chunk_summary()]} | {ok, binary()} |
{error, machi_dt:error_general() | 'no_such_file'} | {error, term()}. {error, machi_dt:error_general() | 'no_such_file'} | {error, term()}.
checksum_list(Host, TcpPort, EpochID, File) when is_integer(TcpPort) -> checksum_list(Host, TcpPort, EpochID, File) when is_integer(TcpPort) ->
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),

View file

@ -352,7 +352,7 @@ convert_csum_req({client_sha, CSumBin}, _Chunk) ->
convert_append_chunk_resp(#mpb_appendchunkresp{status='OK', chunk_pos=CP}) -> convert_append_chunk_resp(#mpb_appendchunkresp{status='OK', chunk_pos=CP}) ->
#mpb_chunkpos{offset=Offset, chunk_size=Size, file_name=File} = CP, #mpb_chunkpos{offset=Offset, chunk_size=Size, file_name=File} = CP,
{ok, {Offset, Size, File}}; {ok, {Offset, Size, list_to_binary(File)}};
convert_append_chunk_resp(#mpb_appendchunkresp{status=Status}) -> convert_append_chunk_resp(#mpb_appendchunkresp{status=Status}) ->
convert_general_status_code(Status). convert_general_status_code(Status).
@ -374,6 +374,8 @@ convert_general_status_code('NO_SUCH_FILE') ->
{error, no_such_file}; {error, no_such_file};
convert_general_status_code('PARTIAL_READ') -> convert_general_status_code('PARTIAL_READ') ->
{error, partial_read}; {error, partial_read};
convert_general_status_code('BAD_EPOCH') ->
{error, bad_epoch};
convert_general_status_code('BAD_JOSS') -> convert_general_status_code('BAD_JOSS') ->
throw({error, bad_joss_taipan_fixme}). throw({error, bad_joss_taipan_fixme}).
@ -393,8 +395,9 @@ convert_checksum_list_resp(#mpb_checksumlistresp{status=Status}) ->
convert_general_status_code(Status). convert_general_status_code(Status).
convert_list_files_resp(#mpb_listfilesresp{status='OK', files=Files}) -> convert_list_files_resp(#mpb_listfilesresp{status='OK', files=Files}) ->
FileInfo = [{Size, File} || #mpb_fileinfo{file_size=Size, FileInfo = [{Size, list_to_binary(File)} ||
file_name=File} <- Files], #mpb_fileinfo{file_size=Size,
file_name=File} <- Files],
{ok, FileInfo}; {ok, FileInfo};
convert_list_files_resp(#mpb_listfilesresp{status=Status}) -> convert_list_files_resp(#mpb_listfilesresp{status=Status}) ->
convert_general_status_code(Status). convert_general_status_code(Status).

View file

@ -196,7 +196,7 @@ from_pb_response(#mpb_ll_response{
#mpb_chunkpos{offset=Offset, #mpb_chunkpos{offset=Offset,
chunk_size=Size, chunk_size=Size,
file_name=File} = ChunkPos, file_name=File} = ChunkPos,
{ReqID, {ok, {Offset, Size, File}}}; {ReqID, {ok, {Offset, Size, list_to_binary(File)}}};
_ -> _ ->
{ReqID, machi_pb_high_client:convert_general_status_code(Status)} {ReqID, machi_pb_high_client:convert_general_status_code(Status)}
end; end;
@ -230,7 +230,7 @@ from_pb_response(#mpb_ll_response{
status=Status, files=PB_Files}}) -> status=Status, files=PB_Files}}) ->
case Status of case Status of
'OK' -> 'OK' ->
Files = [{Size, Name} || Files = [{Size, list_to_binary(Name)} ||
#mpb_fileinfo{file_size=Size, #mpb_fileinfo{file_size=Size,
file_name=Name} <- PB_Files], file_name=Name} <- PB_Files],
{ReqID, {ok, Files}}; {ReqID, {ok, Files}};
@ -825,6 +825,8 @@ conv_from_status({error, no_such_file}) ->
'NO_SUCH_FILE'; 'NO_SUCH_FILE';
conv_from_status({error, partial_read}) -> conv_from_status({error, partial_read}) ->
'PARTIAL_READ'; 'PARTIAL_READ';
conv_from_status({error, bad_epoch}) ->
'BAD_EPOCH';
conv_from_status(_OOPS) -> conv_from_status(_OOPS) ->
io:format(user, "HEY, ~s:~w got ~w\n", [?MODULE, ?LINE, _OOPS]), io:format(user, "HEY, ~s:~w got ~w\n", [?MODULE, ?LINE, _OOPS]),
'BAD_JOSS'. 'BAD_JOSS'.

View file

@ -178,9 +178,10 @@ checksum_list(#yessir{name=Name,chunk_size=ChunkSize}, _EpochID, File) ->
undefined -> undefined ->
{error, no_such_file}; {error, no_such_file};
MaxOffset -> MaxOffset ->
CSum = make_csum(Name, ChunkSize), C = machi_util:make_tagged_csum(client_sha,
Cs = [{Offset, ChunkSize, CSum} || make_csum(Name, ChunkSize)),
Offset <- lists:seq(?MINIMUM_OFFSET, MaxOffset, ChunkSize)], Cs = [machi_flu1:encode_csum_file_entry_bin(Offset, ChunkSize, C) ||
Offset <- lists:seq(?MINIMUM_OFFSET, MaxOffset, ChunkSize)],
{ok, Cs} {ok, Cs}
end. end.

View file

@ -32,11 +32,10 @@
-define(FLU, machi_flu1). -define(FLU, machi_flu1).
-define(FLU_C, machi_flu1_client). -define(FLU_C, machi_flu1_client).
verify_file_checksums_test() -> verify_file_checksums_test_() ->
timer:sleep(100), {timeout, 60, fun() -> verify_file_checksums_test2() end}.
io:format(user, "\n\tverify_file_checksums_test() is broken, TODO FIX!\n", []).
verify_file_checksums_test_FIXME() -> verify_file_checksums_test2() ->
Host = "localhost", Host = "localhost",
TcpPort = 32958, TcpPort = 32958,
DataDir = "./data", DataDir = "./data",
@ -46,13 +45,15 @@ verify_file_checksums_test_FIXME() ->
Sock1 = ?FLU_C:connect(#p_srvr{address=Host, port=TcpPort}), Sock1 = ?FLU_C:connect(#p_srvr{address=Host, port=TcpPort}),
try try
Prefix = <<"verify_prefix">>, Prefix = <<"verify_prefix">>,
NumChunks = 10,
[{ok, _} = ?FLU_C:append_chunk(Sock1, ?DUMMY_PV1_EPOCH, [{ok, _} = ?FLU_C:append_chunk(Sock1, ?DUMMY_PV1_EPOCH,
Prefix, <<X:(X*8)/big>>) || Prefix, <<X:(X*8)/big>>) ||
X <- lists:seq(1,10)], X <- lists:seq(1, NumChunks)],
{ok, [{_FileSize,File}]} = ?FLU_C:list_files(Sock1, ?DUMMY_PV1_EPOCH), {ok, [{_FileSize,File}]} = ?FLU_C:list_files(Sock1, ?DUMMY_PV1_EPOCH),
{ok, []} = machi_admin_util:verify_file_checksums_remote( {ok, []} = machi_admin_util:verify_file_checksums_remote(
Host, TcpPort, ?DUMMY_PV1_EPOCH, File), Host, TcpPort, ?DUMMY_PV1_EPOCH, File),
%% Clobber the first 3 chunks, which are sizes 1/2/3.
{_, Path} = machi_util:make_data_filename(DataDir,binary_to_list(File)), {_, Path} = machi_util:make_data_filename(DataDir,binary_to_list(File)),
{ok, FH} = file:open(Path, [read,write]), {ok, FH} = file:open(Path, [read,write]),
{ok, _} = file:position(FH, ?MINIMUM_OFFSET), {ok, _} = file:position(FH, ?MINIMUM_OFFSET),
@ -61,12 +62,12 @@ verify_file_checksums_test_FIXME() ->
ok = file:write(FH, "yo!"), ok = file:write(FH, "yo!"),
ok = file:close(FH), ok = file:close(FH),
%% Check the local flavor of the API %% Check the local flavor of the API: should be 3 bad checksums
{ok, Res1} = machi_admin_util:verify_file_checksums_local( {ok, Res1} = machi_admin_util:verify_file_checksums_local(
Host, TcpPort, ?DUMMY_PV1_EPOCH, Path), Host, TcpPort, ?DUMMY_PV1_EPOCH, Path),
3 = length(Res1), 3 = length(Res1),
%% Check the remote flavor of the API %% Check the remote flavor of the API: should be 3 bad checksums
{ok, Res2} = machi_admin_util:verify_file_checksums_remote( {ok, Res2} = machi_admin_util:verify_file_checksums_remote(
Host, TcpPort, ?DUMMY_PV1_EPOCH, File), Host, TcpPort, ?DUMMY_PV1_EPOCH, File),
3 = length(Res2), 3 = length(Res2),

View file

@ -135,13 +135,20 @@ long_doc() ->
%% convergence_demo_testfun() -> %% convergence_demo_testfun() ->
%% convergence_demo_testfun(3). %% convergence_demo_testfun(3).
-define(DEFAULT_MGR_OPTS, [{private_write_verbose, false},
{active_mode,false},
{use_partition_simulator, true}]).
t() -> t() ->
t(3). t(3).
t(N) -> t(N) ->
convergence_demo_testfun(N). t(N, ?DEFAULT_MGR_OPTS).
convergence_demo_testfun(NumFLUs) -> t(N, MgrOpts) ->
convergence_demo_testfun(N, MgrOpts).
convergence_demo_testfun(NumFLUs, MgrOpts0) ->
timer:sleep(100), timer:sleep(100),
%% Faster test startup, commented: io:format(user, short_doc(), []), %% Faster test startup, commented: io:format(user, short_doc(), []),
%% Faster test startup, commented: timer:sleep(3000), %% Faster test startup, commented: timer:sleep(3000),
@ -169,8 +176,7 @@ convergence_demo_testfun(NumFLUs) ->
end || {#p_srvr{name=Name}=P, _Dir} <- PsDirs], end || {#p_srvr{name=Name}=P, _Dir} <- PsDirs],
MembersDict = machi_projection:make_members_dict(Ps), MembersDict = machi_projection:make_members_dict(Ps),
%% MgrOpts = [private_write_verbose, {active_mode,false}, %% MgrOpts = [private_write_verbose, {active_mode,false},
MgrOpts = [{active_mode,false}, MgrOpts = MgrOpts0 ++ ?DEFAULT_MGR_OPTS,
{use_partition_simulator, true}],
MgrNamez = MgrNamez =
[begin [begin
{ok, MPid} = ?MGR:start_link(P#p_srvr.name, MembersDict, MgrOpts), {ok, MPid} = ?MGR:start_link(P#p_srvr.name, MembersDict, MgrOpts),

View file

@ -370,7 +370,7 @@ prop_pulse_test_() ->
false -> 0; false -> 0;
Val2 -> list_to_integer(Val2) Val2 -> list_to_integer(Val2)
end, end,
{timeout, (Timeout+ExtraTO+300), % 300 = a bit more fudge time {timeout, (Timeout+ExtraTO+600), % 600 = a bit more fudge time
fun() -> fun() ->
?assert(eqc:quickcheck(eqc:testing_time(Timeout, ?assert(eqc:quickcheck(eqc:testing_time(Timeout,
?QC_OUT(prop_pulse())))) ?QC_OUT(prop_pulse()))))

View file

@ -129,11 +129,9 @@ smoke_test2() ->
999999999, 1), 999999999, 1),
{error, partial_read} = machi_cr_client:read_chunk(C1, File1, {error, partial_read} = machi_cr_client:read_chunk(C1, File1,
Off1, 88888888), Off1, 88888888),
%% Checksum lists are 3-tuples %% Checksum list return value is a primitive binary().
%% TODO: refactor checksum_list(), then put this test back! {ok, KludgeBin} = machi_cr_client:checksum_list(C1, File1),
%% {ok, [{_,_,_}|_]} = machi_cr_client:checksum_list(C1, File1), true = is_binary(KludgeBin),
{ok, TmpKludgeBin} = machi_cr_client:checksum_list(C1, File1),
true = is_binary(TmpKludgeBin),
{error, no_such_file} = machi_cr_client:checksum_list(C1, <<"!!!!">>), {error, no_such_file} = machi_cr_client:checksum_list(C1, <<"!!!!">>),
%% Exactly one file right now %% Exactly one file right now

View file

@ -87,11 +87,9 @@ flu_smoke_test() ->
Prefix, Chunk1), Prefix, Chunk1),
{ok, Chunk1} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH, {ok, Chunk1} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH,
File1, Off1, Len1), File1, Off1, Len1),
%% TODO: when checksum_list() is refactored, restore this test! {ok, KludgeBin} = ?FLU_C:checksum_list(Host, TcpPort,
%% {ok, [{_,_,_}]} = ?FLU_C:checksum_list(Host, TcpPort,
%% ?DUMMY_PV1_EPOCH, File1),
{ok, _} = ?FLU_C:checksum_list(Host, TcpPort,
?DUMMY_PV1_EPOCH, File1), ?DUMMY_PV1_EPOCH, File1),
true = is_binary(KludgeBin),
{error, bad_arg} = ?FLU_C:append_chunk(Host, TcpPort, {error, bad_arg} = ?FLU_C:append_chunk(Host, TcpPort,
?DUMMY_PV1_EPOCH, ?DUMMY_PV1_EPOCH,
BadPrefix, Chunk1), BadPrefix, Chunk1),

View file

@ -84,20 +84,21 @@ partial_stop_restart2() ->
WedgeStatus = fun({_,#p_srvr{address=Addr, port=TcpPort}}) -> WedgeStatus = fun({_,#p_srvr{address=Addr, port=TcpPort}}) ->
machi_flu1_client:wedge_status(Addr, TcpPort) machi_flu1_client:wedge_status(Addr, TcpPort)
end, end,
Append = fun({_,#p_srvr{address=Addr, port=TcpPort}}) -> Append = fun({_,#p_srvr{address=Addr, port=TcpPort}}, EpochID) ->
machi_flu1_client:append_chunk(Addr, TcpPort, machi_flu1_client:append_chunk(Addr, TcpPort,
?DUMMY_PV1_EPOCH, EpochID,
<<"prefix">>, <<"data">>) <<"prefix">>, <<"data">>)
end, end,
try try
[Start(P) || P <- Ps], [Start(P) || P <- Ps],
[{ok, {true, _}} = WedgeStatus(P) || P <- Ps], % all are wedged [{ok, {true, _}} = WedgeStatus(P) || P <- Ps], % all are wedged
[{error,wedged} = Append(P) || P <- Ps], % all are wedged [{error,wedged} = Append(P, ?DUMMY_PV1_EPOCH) || P <- Ps], % all are wedged
[machi_chain_manager1:set_chain_members(ChMgr, Dict) || [machi_chain_manager1:set_chain_members(ChMgr, Dict) ||
ChMgr <- ChMgrs ], ChMgr <- ChMgrs ],
[{ok, {false, _}} = WedgeStatus(P) || P <- Ps], % *not* wedged {ok, {false, EpochID1}} = WedgeStatus(hd(Ps)),
[{ok,_} = Append(P) || P <- Ps], % *not* wedged [{ok, {false, EpochID1}} = WedgeStatus(P) || P <- Ps], % *not* wedged
[{ok,_} = Append(P, EpochID1) || P <- Ps], % *not* wedged
{_,_,_} = machi_chain_manager1:test_react_to_env(hd(ChMgrs)), {_,_,_} = machi_chain_manager1:test_react_to_env(hd(ChMgrs)),
[begin [begin
@ -123,7 +124,8 @@ partial_stop_restart2() ->
Proj_mCSum = Proj_m#projection_v1.epoch_csum, Proj_mCSum = Proj_m#projection_v1.epoch_csum,
[{ok, {false, {Epoch_m, Proj_mCSum}}} = WedgeStatus(P) || % *not* wedged [{ok, {false, {Epoch_m, Proj_mCSum}}} = WedgeStatus(P) || % *not* wedged
P <- Ps], P <- Ps],
[{ok,_} = Append(P) || P <- Ps], % *not* wedged {ok, {false, EpochID2}} = WedgeStatus(hd(Ps)),
[{ok,_} = Append(P, EpochID2) || P <- Ps], % *not* wedged
%% Stop all but 'a'. %% Stop all but 'a'.
[ok = machi_flu_psup:stop_flu_package(Name) || {Name,_} <- tl(Ps)], [ok = machi_flu_psup:stop_flu_package(Name) || {Name,_} <- tl(Ps)],
@ -138,22 +140,25 @@ partial_stop_restart2() ->
true = (machi_projection:update_dbg2(Proj_m, []) == true = (machi_projection:update_dbg2(Proj_m, []) ==
machi_projection:update_dbg2(Proj_m, [])), machi_projection:update_dbg2(Proj_m, [])),
%% Confirm that 'a' is wedged %% Confirm that 'a' is wedged
{error, wedged} = Append(hd(Ps)), {error, wedged} = Append(hd(Ps), EpochID2),
{_, #p_srvr{address=Addr_a, port=TcpPort_a}} = hd(Ps), {_, #p_srvr{address=Addr_a, port=TcpPort_a}} = hd(Ps),
{error, wedged} = machi_flu1_client:read_chunk( {error, wedged} = machi_flu1_client:read_chunk(
Addr_a, TcpPort_a, ?DUMMY_PV1_EPOCH, Addr_a, TcpPort_a, ?DUMMY_PV1_EPOCH,
<<>>, 99999999, 1), <<>>, 99999999, 1),
{error, wedged} = machi_flu1_client:checksum_list( {error, wedged} = machi_flu1_client:checksum_list(
Addr_a, TcpPort_a, ?DUMMY_PV1_EPOCH, <<>>), Addr_a, TcpPort_a, ?DUMMY_PV1_EPOCH, <<>>),
{error, wedged} = machi_flu1_client:list_files( %% list_files() is permitted despite wedged status
{ok, _} = machi_flu1_client:list_files(
Addr_a, TcpPort_a, ?DUMMY_PV1_EPOCH), Addr_a, TcpPort_a, ?DUMMY_PV1_EPOCH),
%% Iterate through humming consensus once %% Iterate through humming consensus once
{now_using,_,Epoch_n} = machi_chain_manager1:test_react_to_env( {now_using,_,Epoch_n} = machi_chain_manager1:test_react_to_env(
hd(ChMgrs)), hd(ChMgrs)),
true = (Epoch_n > Epoch_m), true = (Epoch_n > Epoch_m),
{ok, {false, EpochID3}} = WedgeStatus(hd(Ps)),
%% Confirm that 'a' is *not* wedged %% Confirm that 'a' is *not* wedged
{ok, _} = Append(hd(Ps)), {ok, _} = Append(hd(Ps), EpochID3),
ok ok
after after

View file

@ -65,6 +65,7 @@ smoke_test2() ->
Chunk1 = <<"Hello, chunk!">>, Chunk1 = <<"Hello, chunk!">>,
{ok, {Off1, Size1, File1}} = {ok, {Off1, Size1, File1}} =
?C:append_chunk(Clnt, PK, Prefix, Chunk1, none, 0), ?C:append_chunk(Clnt, PK, Prefix, Chunk1, none, 0),
true = is_binary(File1),
Chunk2 = "It's another chunk", Chunk2 = "It's another chunk",
CSum2 = {client_sha, machi_util:checksum_chunk(Chunk2)}, CSum2 = {client_sha, machi_util:checksum_chunk(Chunk2)},
{ok, {Off2, Size2, File2}} = {ok, {Off2, Size2, File2}} =
@ -82,7 +83,8 @@ smoke_test2() ->
{ok, Ch} = ?C:read_chunk(Clnt, Fl, Off, Sz) {ok, Ch} = ?C:read_chunk(Clnt, Fl, Off, Sz)
end || {Ch, Fl, Off, Sz} <- Reads], end || {Ch, Fl, Off, Sz} <- Reads],
{ok, _} = ?C:checksum_list(Clnt, File1), {ok, KludgeBin} = ?C:checksum_list(Clnt, File1),
true = is_binary(KludgeBin),
{ok, [{File1Size,File1}]} = ?C:list_files(Clnt), {ok, [{File1Size,File1}]} = ?C:list_files(Clnt),
true = is_integer(File1Size), true = is_integer(File1Size),
@ -92,7 +94,7 @@ smoke_test2() ->
end end
after after
exit(SupPid, normal), exit(SupPid, normal),
%%% [os:cmd("rm -rf " ++ P#p_srvr.props) || P <- Ps], [os:cmd("rm -rf " ++ P#p_srvr.props) || P <- Ps],
machi_util:wait_for_death(SupPid, 100), machi_util:wait_for_death(SupPid, 100),
ok ok
end. end.

View file

@ -225,8 +225,9 @@ flu_restart_test() ->
(stop) -> ?MUT:read_chunk(Prox1, FakeEpoch, (stop) -> ?MUT:read_chunk(Prox1, FakeEpoch,
File1, Off1, Size1) File1, Off1, Size1)
end, end,
fun(run) -> {ok, _} = fun(run) -> {ok, KludgeBin} =
?MUT:checksum_list(Prox1, FakeEpoch, File1), ?MUT:checksum_list(Prox1, FakeEpoch, File1),
true = is_binary(KludgeBin),
ok; ok;
(line) -> io:format("line ~p, ", [?LINE]); (line) -> io:format("line ~p, ", [?LINE]);
(stop) -> ?MUT:checksum_list(Prox1, FakeEpoch, File1) (stop) -> ?MUT:checksum_list(Prox1, FakeEpoch, File1)