WIP: giant hairball 01

This commit is contained in:
Scott Lystig Fritchie 2015-06-29 16:10:43 +09:00
parent f45dc7829e
commit 64817dd7e8
6 changed files with 165 additions and 76 deletions

View file

@ -233,6 +233,8 @@ message Mpb_Request {
// TODO: If we wish to support pipelined requests sometime in the
// future, this is the placeholder to do it.
required bytes req_id = 1;
// CLIENTS must not set 'do_not_alter' flag; leave it to default.
required uint32 do_not_alter = 2 [default=1];
// The client should only define one request message. If the client
// includes multiple requests here, the server may pick/choose an
@ -240,13 +242,13 @@ message Mpb_Request {
// NOTE: The erlang protobuffs compiler doesn't support 'oneof'.
// But 'oneof' appears to be a very tiny memory optimization
// that not all languages might care about? (Erlang doesn't)
optional Mpb_EchoReq echo = 10;
optional Mpb_AuthReq auth = 11;
optional Mpb_AppendChunkReq append_chunk = 12;
optional Mpb_WriteChunkReq write_chunk = 13;
optional Mpb_ReadChunkReq read_chunk = 14;
optional Mpb_ChecksumListReq checksum_list = 15;
optional Mpb_ListFilesReq list_files = 16;
optional Mpb_EchoReq echo = 110;
optional Mpb_AuthReq auth = 111;
optional Mpb_AppendChunkReq append_chunk = 112;
optional Mpb_WriteChunkReq write_chunk = 113;
optional Mpb_ReadChunkReq read_chunk = 114;
optional Mpb_ChecksumListReq checksum_list = 115;
optional Mpb_ListFilesReq list_files = 116;
}
message Mpb_Response {
@ -525,6 +527,8 @@ message Mpb_LL_Request {
// TODO: If we wish to support pipelined requests sometime in the
// future, this is the placeholder to do it.
required bytes req_id = 1;
// CLIENTS must not set 'do_not_alter' flag; leave it to default.
required uint32 do_not_alter = 2 [default=2];
// The client should only define one request message. If the client
// includes multiple requests here, the server may pick/choose an

View file

@ -93,6 +93,8 @@
wedged = true :: boolean(),
etstab :: ets:tid(),
epoch_id :: 'undefined' | machi_dt:epoch_id(),
pb_mode = undefined :: 'undefined' | 'high' | 'low',
high_clnt :: 'undefined' | pid(),
dbg_props = [] :: list(), % proplist
props = [] :: list() % proplist
}).
@ -272,10 +274,20 @@ decode_epoch_id(EpochIDHex) ->
net_server_loop(Sock, S) ->
case gen_tcp:recv(Sock, 0, ?SERVER_CMD_READ_TIMEOUT) of
{ok, Bin} ->
io:format(user, "\nSSS RawReq ~p\n", [catch machi_pb:decode_mpb_ll_request(Bin)]),
{R, S2} = do_pb_request(catch machi_pb:decode_mpb_ll_request(Bin), S),
Resp = machi_pb:encode_mpb_ll_response(R),
ok = gen_tcp:send(Sock, Resp),
{RespBin, S2} =
case machi_pb:decode_mpb_ll_request(Bin) of
LL_req when LL_req#mpb_ll_request.do_not_alter == 2 ->
io:format(user, "SSS low req ~p\n", [LL_req]),
{R, NewS} = do_pb_ll_request(LL_req, S),
{machi_pb:encode_mpb_ll_response(R), mode(low, NewS)};
_ ->
HL_req = machi_pb:decode_mpb_request(Bin),
io:format(user, "SSS high req ~p\n", [HL_req]),
1 = HL_req#mpb_request.do_not_alter,
{R, NewS} = do_pb_hl_request(HL_req, make_high_clnt(S)),
{machi_pb:encode_mpb_response(R), mode(high, NewS)}
end,
ok = gen_tcp:send(Sock, RespBin),
net_server_loop(Sock, S2);
{error, SockError} ->
Msg = io_lib:format("Socket error ~w", [SockError]),
@ -288,31 +300,50 @@ io:format(user, "\nSSS SockError ~p\n", [SockError]),
exit(normal)
end.
do_pb_request(PB_request, S) ->
mode(Mode, #state{pb_mode=undefined}=S) ->
S#state{pb_mode=Mode};
mode(_, S) ->
S.
make_high_clnt(#state{high_clnt=undefined}=S) ->
{ok, Proj} = machi_projection_store:read_latest_projection(
S#state.proj_store, private),
Ps = [P_srvr || {_, P_srvr} <- orddict:to_list(
Proj#projection_v1.members_dict)],
{ok, Clnt} = machi_cr_client:start_link(Ps),
S#state{high_clnt=Clnt};
make_high_clnt(S) ->
S.
do_pb_ll_request(#mpb_ll_request{req_id=ReqID}, #state{pb_mode=high}=S) ->
Result = {high_error, 41, "Low protocol request while in high mode"},
{machi_pb_translate:to_pb_response(ReqID, unused, Result), S};
do_pb_ll_request(PB_request, S) ->
Req = machi_pb_translate:from_pb_request(PB_request),
io:format(user, "\nSSS Req ~p\n", [Req]),
{ReqID, Cmd, Result, S2} =
case Req of
{RqID, {low_proj, _}=CMD} ->
%% Skip wedge check for projection commands!
{Rs, NewS} = do_pb_request3(CMD, S),
{Rs, NewS} = do_pb_ll_request3(CMD, S),
{RqID, CMD, Rs, NewS};
{RqID, CMD} ->
EpochID = element(2, CMD), % by common convention
{Rs, NewS} = do_pb_request2(EpochID, CMD, S),
{Rs, NewS} = do_pb_ll_request2(EpochID, CMD, S),
{RqID, CMD, Rs, NewS};
nope ->
{foo, bar, baz}
end,
io:format(user, "\nSSS Result ~p\n", [Result]),
io:format(user, "\nSSS Result1 ~p\n", [Result]),
io:format(user, "\nSSS Result2 ~p\n", [catch machi_pb_translate:to_pb_response(ReqID, Cmd, Result)]),
{machi_pb_translate:to_pb_response(ReqID, Cmd, Result), S2}.
do_pb_request2(EpochID, CMD, S) ->
do_pb_ll_request2(EpochID, CMD, S) ->
{Wedged_p, CurrentEpochID} = ets:lookup_element(S#state.etstab, epoch, 2),
if Wedged_p == true ->
io:format(user, "LINE ~s ~p : ~p\n", [?MODULE, ?LINE, ets:lookup_element(S#state.etstab, epoch, 2)]),
{{error, wedged}, S};
not (EpochID == undefined orelse EpochID == ?DUMMY_PV1_EPOCH)
not ((not is_tuple(EpochID)) orelse EpochID == ?DUMMY_PV1_EPOCH)
andalso
EpochID /= CurrentEpochID ->
{Epoch, _} = EpochID,
@ -329,35 +360,72 @@ io:format(user, "LINE ~s ~p\n", [?MODULE, ?LINE]),
{{error, bad_epoch}, S};
true ->
io:format(user, "LINE ~s ~p\n", [?MODULE, ?LINE]),
do_pb_request3(CMD, S)
do_pb_ll_request3(CMD, S)
end.
do_pb_request3({low_echo, _BogusEpochID, Msg}, S) ->
do_pb_ll_request3({low_echo, _BogusEpochID, Msg}, S) ->
{Msg, S};
do_pb_request3({low_auth, _BogusEpochID, _User, _Pass}, S) ->
do_pb_ll_request3({low_auth, _BogusEpochID, _User, _Pass}, S) ->
{-6, S};
do_pb_request3({low_append_chunk, _EpochID, PKey, Prefix, Chunk, CSum_tag,
do_pb_ll_request3({low_append_chunk, _EpochID, PKey, Prefix, Chunk, CSum_tag,
CSum, ChunkExtra}, S) ->
{do_pb_server_append_chunk(PKey, Prefix, Chunk, CSum_tag, CSum,
ChunkExtra, S), S};
do_pb_request3({low_write_chunk, _EpochID, File, Offset, Chunk, CSum_tag,
do_pb_ll_request3({low_write_chunk, _EpochID, File, Offset, Chunk, CSum_tag,
CSum}, S) ->
{do_pb_server_write_chunk(File, Offset, Chunk, CSum_tag, CSum, S), S};
do_pb_request3({low_read_chunk, _EpochID, File, Offset, Size, Opts}, S) ->
do_pb_ll_request3({low_read_chunk, _EpochID, File, Offset, Size, Opts}, S) ->
{do_pb_server_read_chunk(File, Offset, Size, Opts, S), S};
do_pb_request3({low_checksum_list, _EpochID, File}, S) ->
do_pb_ll_request3({low_checksum_list, _EpochID, File}, S) ->
{do_pb_server_checksum_listing(File, S), S};
do_pb_request3({low_list_files, _EpochID}, S) ->
do_pb_ll_request3({low_list_files, _EpochID}, S) ->
{do_pb_server_list_files(S), S};
do_pb_request3({low_wedge_status, _EpochID}, S) ->
do_pb_ll_request3({low_wedge_status, _EpochID}, S) ->
{do_pb_server_wedge_status(S), S};
do_pb_request3({low_delete_migration, _EpochID, File}, S) ->
do_pb_ll_request3({low_delete_migration, _EpochID, File}, S) ->
{do_pb_server_delete_migration(File, S), S};
do_pb_request3({low_trunc_hack, _EpochID, File}, S) ->
do_pb_ll_request3({low_trunc_hack, _EpochID, File}, S) ->
{do_pb_server_trunc_hack(File, S), S};
do_pb_request3({low_proj, PCMD}, S) ->
do_pb_ll_request3({low_proj, PCMD}, S) ->
{do_pb_server_proj_request(PCMD, S), S}.
do_pb_hl_request(#mpb_request{req_id=ReqID}, #state{pb_mode=low}=S) ->
Result = {low_error, 41, "High protocol request while in low mode"},
{machi_pb_translate:to_pb_response(ReqID, unused, Result), S};
do_pb_hl_request(PB_request, S) ->
{ReqID, Cmd} = machi_pb_translate:from_pb_request(PB_request),
io:format(user, "\nSSS high Cmd ~p\n", [Cmd]),
{Result, S2} = do_pb_hl_request2(Cmd, S),
io:format(user, "\nSSS high Result1 ~p\n", [Result]),
io:format(user, "\nSSS high Result2 ~p\n", [catch machi_pb_translate:to_pb_response(ReqID, Cmd, Result)]),
{machi_pb_translate:to_pb_response(ReqID, Cmd, Result), S2}.
do_pb_hl_request2({high_echo, Msg}, S) ->
{Msg, S};
do_pb_hl_request2({high_auth, _User, _Pass}, S) ->
{-77, S};
do_pb_hl_request2({high_append_chunk, _todoPK, Prefix, ChunkBin, TaggedCSum,
ChunkExtra}, #state{high_clnt=Clnt}=S) ->
Chunk = {TaggedCSum, ChunkBin},
Res = machi_cr_client:append_chunk_extra(Clnt, Prefix, Chunk,
ChunkExtra),
{Res, S};
do_pb_hl_request2({high_write_chunk, File, Offset, ChunkBin, TaggedCSum},
#state{high_clnt=Clnt}=S) ->
Chunk = {TaggedCSum, ChunkBin},
Res = machi_cr_client:write_chunk(Clnt, File, Offset, Chunk),
{Res, S};
do_pb_hl_request2({high_read_chunk, File, Offset, Size},
#state{high_clnt=Clnt}=S) ->
Res = machi_cr_client:read_chunk(Clnt, File, Offset, Size),
{Res, S};
do_pb_hl_request2({high_checksum_list, File}, #state{high_clnt=Clnt}=S) ->
Res = machi_cr_client:checksum_list(Clnt, File),
{Res, S};
do_pb_hl_request2({high_list_files}, #state{high_clnt=Clnt}=S) ->
Res = machi_cr_client:list_files(Clnt),
{Res, S}.
do_pb_server_proj_request({get_latest_epochid, ProjType},
#state{proj_store=ProjStore}) ->
machi_projection_store:get_latest_epochid(ProjStore, ProjType);
@ -436,11 +504,11 @@ do_pb_server_write_chunk(File, Offset, Chunk, CSum_tag, CSum,
case sanitize_file_string(File) of
ok ->
CSumPath = machi_util:make_checksum_filename(DataDir, File),
case file:open(CSumPath, [write, read, binary, raw]) of
case file:open(CSumPath, [append, raw, binary]) of
{ok, FHc} ->
Path = DataDir ++ "/data/" ++
machi_util:make_string(File),
{ok, FHd} = file:open(Path, [write, binary, raw]),
{ok, FHd} = file:open(Path, [read, write, raw, binary]),
try
do_pb_server_write_chunk2(
File, Offset, Chunk, CSum_tag, CSum, DataDir,
@ -484,6 +552,7 @@ do_pb_server_write_chunk2(_File, Offset, Chunk, CSum_tag,
end
end,
Size = iolist_size(Chunk),
io:format(user, "LINE ~s ~p append/no/no/write @ offset ~w\n", [?MODULE, ?LINE, Offset]),
case file:pwrite(FHd, Offset, Chunk) of
ok ->
OffsetHex = machi_util:bin_to_hexstr(<<Offset:64/big>>),
@ -1052,11 +1121,10 @@ seq_append_server_loop(DataDir, Prefix, FileNum) ->
{File, FullPath} = machi_util:make_data_filename(
DataDir, Prefix, SequencerNameHack, FileNum),
{ok, FHd} = file:open(FullPath,
[write, binary, raw]),
%% [write, binary, raw, delayed_write]),
[read, write, raw, binary]),
CSumPath = machi_util:make_checksum_filename(
DataDir, Prefix, SequencerNameHack, FileNum),
{ok, FHc} = file:open(CSumPath, [append, raw, binary, delayed_write]),
{ok, FHc} = file:open(CSumPath, [append, raw, binary]),
seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}, FileNum,
?MINIMUM_OFFSET).
@ -1071,6 +1139,7 @@ seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}=FH_, FileNum, Offset) ->
receive
{seq_append, From, Prefix, Chunk, CSum, Extra} ->
if Chunk /= <<>> ->
io:format(user, "LINE ~s ~p append/pwrite @ offset ~w FHd ~p\n", [?MODULE, ?LINE, Offset, FHd]),
ok = file:pwrite(FHd, Offset, Chunk);
true ->
ok

View file

@ -171,16 +171,21 @@ do_connect_to_pb_listener(P) ->
{ok, Sock} = gen_tcp:connect(P#p_srvr.address, P#p_srvr.port,
?PB_PACKET_OPTS ++
[binary, {active, false}]),
io:format(user, "HHC connected to ~p on sock ~p\n", [P, Sock]),
Sock
catch _X:_Y ->
io:format(user, "\n~p ~p @ ~p\n", [_X, _Y, erlang:get_stacktrace()]),
bummer
end.
do_send_sync({echo, String}, #state{sock=Sock}=S) ->
do_send_sync(Cmd, S) ->
io:format(user, "\nHHC Req ~p\n", [Cmd]),
do_send_sync2(Cmd, S).
do_send_sync2({echo, String}, #state{sock=Sock}=S) ->
try
ReqID = <<0>>,
R1a = #mpb_request{req_id=ReqID,
R1a = #mpb_request{req_id=ReqID, do_not_alter=1,
echo=#mpb_echoreq{message=String}},
Bin1a = machi_pb:encode_mpb_request(R1a),
ok = gen_tcp:send(Sock, Bin1a),
@ -196,10 +201,10 @@ do_send_sync({echo, String}, #state{sock=Sock}=S) ->
Res = {bummer, {X, Y, erlang:get_stacktrace()}},
{Res, S}
end;
do_send_sync({auth, User, Pass}, #state{sock=Sock}=S) ->
do_send_sync2({auth, User, Pass}, #state{sock=Sock}=S) ->
try
ReqID = <<0>>,
R1a = #mpb_request{req_id=ReqID,
R1a = #mpb_request{req_id=ReqID, do_not_alter=1,
auth=#mpb_authreq{user=User, password=Pass}},
Bin1a = machi_pb:encode_mpb_request(R1a),
ok = gen_tcp:send(Sock, Bin1a),
@ -215,7 +220,7 @@ do_send_sync({auth, User, Pass}, #state{sock=Sock}=S) ->
Res = {bummer, {X, Y, erlang:get_stacktrace()}},
{Res, S}
end;
do_send_sync({append_chunk, PlacementKey, Prefix, Chunk, CSum, ChunkExtra},
do_send_sync2({append_chunk, PlacementKey, Prefix, Chunk, CSum, ChunkExtra},
#state{sock=Sock, sock_id=Index, count=Count}=S) ->
try
ReqID = <<Index:64/big, Count:64/big>>,
@ -228,8 +233,9 @@ do_send_sync({append_chunk, PlacementKey, Prefix, Chunk, CSum, ChunkExtra},
chunk=Chunk,
csum=CSumT,
chunk_extra=ChunkExtra},
R1a = #mpb_request{req_id=ReqID,
R1a = #mpb_request{req_id=ReqID, do_not_alter=1,
append_chunk=Req},
io:format(user, "HHC app on ~p req ~p\n", [Sock, R1a]),
Bin1a = machi_pb:encode_mpb_request(R1a),
ok = gen_tcp:send(Sock, Bin1a),
{ok, Bin1B} = gen_tcp:recv(Sock, 0),
@ -245,7 +251,7 @@ do_send_sync({append_chunk, PlacementKey, Prefix, Chunk, CSum, ChunkExtra},
Res = {bummer, {X, Y, erlang:get_stacktrace()}},
{Res, S#state{count=Count+1}}
end;
do_send_sync({write_chunk, File, Offset, Chunk, CSum},
do_send_sync2({write_chunk, File, Offset, Chunk, CSum},
#state{sock=Sock, sock_id=Index, count=Count}=S) ->
try
ReqID = <<Index:64/big, Count:64/big>>,
@ -254,7 +260,7 @@ do_send_sync({write_chunk, File, Offset, Chunk, CSum},
offset=Offset,
chunk=Chunk,
csum=CSumT},
R1a = #mpb_request{req_id=ReqID,
R1a = #mpb_request{req_id=ReqID, do_not_alter=1,
write_chunk=Req},
Bin1a = machi_pb:encode_mpb_request(R1a),
ok = gen_tcp:send(Sock, Bin1a),
@ -271,14 +277,14 @@ do_send_sync({write_chunk, File, Offset, Chunk, CSum},
Res = {bummer, {X, Y, erlang:get_stacktrace()}},
{Res, S#state{count=Count+1}}
end;
do_send_sync({read_chunk, File, Offset, Size},
do_send_sync2({read_chunk, File, Offset, Size},
#state{sock=Sock, sock_id=Index, count=Count}=S) ->
try
ReqID = <<Index:64/big, Count:64/big>>,
Req = #mpb_readchunkreq{file=File,
offset=Offset,
size=Size},
R1a = #mpb_request{req_id=ReqID,
R1a = #mpb_request{req_id=ReqID, do_not_alter=1,
read_chunk=Req},
Bin1a = machi_pb:encode_mpb_request(R1a),
ok = gen_tcp:send(Sock, Bin1a),
@ -295,12 +301,12 @@ do_send_sync({read_chunk, File, Offset, Size},
Res = {bummer, {X, Y, erlang:get_stacktrace()}},
{Res, S#state{count=Count+1}}
end;
do_send_sync({checksum_list, File},
do_send_sync2({checksum_list, File},
#state{sock=Sock, sock_id=Index, count=Count}=S) ->
try
ReqID = <<Index:64/big, Count:64/big>>,
Req = #mpb_checksumlistreq{file=File},
R1a = #mpb_request{req_id=ReqID,
R1a = #mpb_request{req_id=ReqID, do_not_alter=1,
checksum_list=Req},
Bin1a = machi_pb:encode_mpb_request(R1a),
ok = gen_tcp:send(Sock, Bin1a),
@ -317,12 +323,12 @@ do_send_sync({checksum_list, File},
Res = {bummer, {X, Y, erlang:get_stacktrace()}},
{Res, S#state{count=Count+1}}
end;
do_send_sync({list_files},
do_send_sync2({list_files},
#state{sock=Sock, sock_id=Index, count=Count}=S) ->
try
ReqID = <<Index:64/big, Count:64/big>>,
Req = #mpb_listfilesreq{},
R1a = #mpb_request{req_id=ReqID,
R1a = #mpb_request{req_id=ReqID, do_not_alter=1,
list_files=Req},
Bin1a = machi_pb:encode_mpb_request(R1a),
ok = gen_tcp:send(Sock, Bin1a),

View file

@ -319,17 +319,17 @@ from_pb_response(#mpb_ll_response{
to_pb_request(ReqID, {low_echo, _BogusEpochID, Msg}) ->
#mpb_ll_request{
req_id=ReqID,
req_id=ReqID, do_not_alter=2,
echo=#mpb_echoreq{message=Msg}};
to_pb_request(ReqID, {low_auth, _BogusEpochID, User, Pass}) ->
#mpb_ll_request{req_id=ReqID,
#mpb_ll_request{req_id=ReqID, do_not_alter=2,
auth=#mpb_authreq{user=User, password=Pass}};
to_pb_request(ReqID, {low_append_chunk, EpochID, PKey, Prefix, Chunk,
CSum_tag, CSum, ChunkExtra}) ->
PB_EpochID = conv_from_epoch_id(EpochID),
CSum_type = conv_from_csum_tag(CSum_tag),
PB_CSum = #mpb_chunkcsum{type=CSum_type, csum=CSum},
#mpb_ll_request{req_id=ReqID,
#mpb_ll_request{req_id=ReqID, do_not_alter=2,
append_chunk=#mpb_ll_appendchunkreq{
epoch_id=PB_EpochID,
placement_key=PKey,
@ -341,7 +341,7 @@ to_pb_request(ReqID, {low_write_chunk, EpochID, File, Offset, Chunk, CSum_tag, C
PB_EpochID = conv_from_epoch_id(EpochID),
CSum_type = conv_from_csum_tag(CSum_tag),
PB_CSum = #mpb_chunkcsum{type=CSum_type, csum=CSum},
#mpb_ll_request{req_id=ReqID,
#mpb_ll_request{req_id=ReqID, do_not_alter=2,
write_chunk=#mpb_ll_writechunkreq{
epoch_id=PB_EpochID,
file=File,
@ -352,7 +352,7 @@ to_pb_request(ReqID, {low_read_chunk, EpochID, File, Offset, Size, _Opts}) ->
%% TODO: stop ignoring Opts ^_^
PB_EpochID = conv_from_epoch_id(EpochID),
#mpb_ll_request{
req_id=ReqID,
req_id=ReqID, do_not_alter=2,
read_chunk=#mpb_ll_readchunkreq{
epoch_id=PB_EpochID,
file=File,
@ -360,57 +360,59 @@ to_pb_request(ReqID, {low_read_chunk, EpochID, File, Offset, Size, _Opts}) ->
size=Size}};
to_pb_request(ReqID, {low_checksum_list, EpochID, File}) ->
PB_EpochID = conv_from_epoch_id(EpochID),
#mpb_ll_request{req_id=ReqID,
#mpb_ll_request{req_id=ReqID, do_not_alter=2,
checksum_list=#mpb_ll_checksumlistreq{
epoch_id=PB_EpochID,
file=File}};
to_pb_request(ReqID, {low_list_files, EpochID}) ->
PB_EpochID = conv_from_epoch_id(EpochID),
#mpb_ll_request{req_id=ReqID,
#mpb_ll_request{req_id=ReqID, do_not_alter=2,
list_files=#mpb_ll_listfilesreq{epoch_id=PB_EpochID}};
to_pb_request(ReqID, {low_wedge_status, _BogusEpochID}) ->
#mpb_ll_request{req_id=ReqID,
#mpb_ll_request{req_id=ReqID, do_not_alter=2,
wedge_status=#mpb_ll_wedgestatusreq{}};
to_pb_request(ReqID, {low_delete_migration, EpochID, File}) ->
PB_EpochID = conv_from_epoch_id(EpochID),
#mpb_ll_request{req_id=ReqID,
#mpb_ll_request{req_id=ReqID, do_not_alter=2,
delete_migration=#mpb_ll_deletemigrationreq{
epoch_id=PB_EpochID,
file=File}};
to_pb_request(ReqID, {low_trunc_hack, EpochID, File}) ->
PB_EpochID = conv_from_epoch_id(EpochID),
#mpb_ll_request{req_id=ReqID,
#mpb_ll_request{req_id=ReqID, do_not_alter=2,
trunc_hack=#mpb_ll_trunchackreq{
epoch_id=PB_EpochID,
file=File}};
to_pb_request(ReqID, {low_proj, {get_latest_epochid, ProjType}}) ->
#mpb_ll_request{req_id=ReqID,
#mpb_ll_request{req_id=ReqID, do_not_alter=2,
proj_gl=#mpb_ll_getlatestepochidreq{type=conv_from_type(ProjType)}};
to_pb_request(ReqID, {low_proj, {read_latest_projection, ProjType}}) ->
#mpb_ll_request{req_id=ReqID,
#mpb_ll_request{req_id=ReqID, do_not_alter=2,
proj_rl=#mpb_ll_readlatestprojectionreq{type=conv_from_type(ProjType)}};
to_pb_request(ReqID, {low_proj, {read_projection, ProjType, Epoch}}) ->
#mpb_ll_request{req_id=ReqID,
#mpb_ll_request{req_id=ReqID, do_not_alter=2,
proj_rp=#mpb_ll_readprojectionreq{type=conv_from_type(ProjType),
epoch_number=Epoch}};
to_pb_request(ReqID, {low_proj, {write_projection, ProjType, Proj}}) ->
ProjM = conv_from_projection_v1(Proj),
#mpb_ll_request{req_id=ReqID,
#mpb_ll_request{req_id=ReqID, do_not_alter=2,
proj_wp=#mpb_ll_writeprojectionreq{type=conv_from_type(ProjType),
proj=ProjM}};
to_pb_request(ReqID, {low_proj, {get_all_projections, ProjType}}) ->
#mpb_ll_request{req_id=ReqID,
#mpb_ll_request{req_id=ReqID, do_not_alter=2,
proj_ga=#mpb_ll_getallprojectionsreq{type=conv_from_type(ProjType)}};
to_pb_request(ReqID, {low_proj, {list_all_projections, ProjType}}) ->
#mpb_ll_request{req_id=ReqID,
#mpb_ll_request{req_id=ReqID, do_not_alter=2,
proj_la=#mpb_ll_listallprojectionsreq{type=conv_from_type(ProjType)}}.
%%qqq
to_pb_response(ReqID, _, {low_error, ErrCode, ErrMsg}) ->
make_ll_error_resp(ReqID, ErrCode, ErrMsg);
to_pb_response(ReqID, {low_echo, _BogusEpochID, _Msg}, Resp) ->
#mpb_ll_response{
req_id=ReqID,
echo=#mpb_echoresp{message=Resp}};
to_pb_response(ReqID, {low_auth, _, _, _}, Resp) ->
to_pb_response(ReqID, {low_auth, _, _, _}, __TODO_Resp) ->
#mpb_ll_response{req_id=ReqID,
generic=#mpb_errorresp{code=1,
msg="AUTH not implemented"}};
@ -428,7 +430,7 @@ to_pb_response(ReqID, {low_append_chunk, _EID, _PKey, _Pfx, _Ch, _CST, _CS, _CE}
#mpb_ll_response{req_id=ReqID,
append_chunk=#mpb_ll_appendchunkresp{status=Status}};
_Else ->
make_error_resp(ReqID, 66, io_lib:format("err ~p", [_Else]))
make_ll_error_resp(ReqID, 66, io_lib:format("err ~p", [_Else]))
end;
to_pb_response(ReqID, {low_write_chunk, _EID, _Fl, _Off, _Ch, _CST, _CS},Resp)->
Status = conv_from_status(Resp),
@ -447,7 +449,7 @@ to_pb_response(ReqID, {low_read_chunk, _EID, _Fl, _Off, _Sz, _Opts}, Resp)->
#mpb_ll_response{req_id=ReqID,
read_chunk=#mpb_ll_readchunkresp{status=Status}};
_Else ->
make_error_resp(ReqID, 66, io_lib:format("err ~p", [_Else]))
make_ll_error_resp(ReqID, 66, io_lib:format("err ~p", [_Else]))
end;
to_pb_response(ReqID, {low_checksum_list, _EpochID, _File}, Resp) ->
case Resp of
@ -558,6 +560,8 @@ to_pb_response(ReqID, {low_proj, {list_all_projections, _ProjType}}, Resp)->
status=Status}}
end;
%%qqq
to_pb_response(ReqID, _, {high_error, ErrCode, ErrMsg}) ->
make_error_resp(ReqID, ErrCode, ErrMsg);
to_pb_response(ReqID, {high_echo, _Msg}, Resp) ->
Msg = Resp,
#mpb_response{req_id=ReqID,
@ -635,9 +639,7 @@ to_pb_response(ReqID, {high_list_files}, Resp) ->
list_files=#mpb_listfilesresp{status=Status}};
_Else ->
make_error_resp(ReqID, 66, io_lib:format("err ~p", [_Else]))
end;
to_pb_response(ReqID, {high_error, _, _}, {ErrCode, ErrMsg}) ->
make_error_resp(ReqID, ErrCode, ErrMsg).
end.
make_tagged_csum(#mpb_chunkcsum{type='CSUM_TAG_NONE'}, Chunk) ->
C = machi_util:checksum_chunk(Chunk),

View file

@ -52,19 +52,25 @@ smoke_test2() ->
{ok, Clnt} = ?C:start_link(Ps),
try
true = ?C:connected_p(Clnt),
String = "yo, dawg",
String = "yo, dawgggggggggggggggggggggggggggggggggg",
io:format(user, "LINE ~s ~p\n", [?MODULE, ?LINE]),
String = ?C:echo(Clnt, String),
io:format(user, "LINE ~s ~p\n", [?MODULE, ?LINE]),
%% TODO: auth() is not implemented. Auth requires SSL.
%% Probably ought to put client stuff that relies on SSL into
%% a separate test module? Or separate test func?
io:format(user, "LINE ~s ~p\n", [?MODULE, ?LINE]),
{error, _} = ?C:auth(Clnt, "foo", "bar"),
io:format(user, "LINE ~s ~p\n", [?MODULE, ?LINE]),
PK = <<>>,
Prefix = <<"prefix">>,
Chunk1 = <<"Hello, chunk!">>,
io:format(user, "LINE ~s ~p\n", [?MODULE, ?LINE]),
{ok, {Off1, Size1, File1}} =
?C:append_chunk(Clnt, PK, Prefix, Chunk1, none, 0),
io:format(user, "LINE ~s ~p\n", [?MODULE, ?LINE]),
Chunk2 = "It's another chunk",
CSum2 = {client_sha, machi_util:checksum_chunk(Chunk2)},
{ok, {Off2, Size2, File2}} =
@ -78,8 +84,10 @@ smoke_test2() ->
Reads = [{iolist_to_binary(Chunk1), File1, Off1, Size1},
{iolist_to_binary(Chunk2), File2, Off2, Size2},
{iolist_to_binary(Chunk3), File3, Off3, Size3}],
[{ok, Ch} = ?C:read_chunk(Clnt, Fl, Off, Sz) ||
{Ch, Fl, Off, Sz} <- Reads],
[begin
io:format(user, "HTT ~p ~p ~p\n", [Fl, Off, Sz]),
{ok, Ch} = ?C:read_chunk(Clnt, Fl, Off, Sz)
end || {Ch, Fl, Off, Sz} <- Reads],
{ok, _} = ?C:checksum_list(Clnt, File1),
{ok, [{File1Size,File1}]} = ?C:list_files(Clnt),
@ -91,7 +99,7 @@ smoke_test2() ->
end
after
exit(SupPid, normal),
[os:cmd("rm -rf " ++ P#p_srvr.props) || P <- Ps],
%%% [os:cmd("rm -rf " ++ P#p_srvr.props) || P <- Ps],
machi_util:wait_for_death(SupPid, 100),
ok
end.

View file

@ -52,10 +52,10 @@ api_smoke_test() ->
%% Stop the FLU, what happens?
machi_flu1:stop(FLU1),
{error,_} = ?MUT:append_chunk(Prox1,
FakeEpoch, Prefix, <<"data">>,
FakeEpoch, Prefix, <<"data-stopped1">>,
infinity),
{error,partition} = ?MUT:append_chunk(Prox1,
FakeEpoch, Prefix, <<"data">>,
FakeEpoch, Prefix, <<"data-stopped2">>,
infinity),
%% Start the FLU again, we should be able to do stuff immediately
FLU1b = machi_flu1_test:setup_test_flu(RegName, TcpPort, DataDir,