WIP: PB append_chunk end-to-end works!

This commit is contained in:
Scott Lystig Fritchie 2015-06-23 14:45:24 +09:00
parent 5ef499ec73
commit cb06c53dc0
4 changed files with 39 additions and 85 deletions

View file

@ -372,7 +372,7 @@ do_append_midtail(_RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra,
do_append_midtail2([], _Prefix, File, Offset, Chunk, do_append_midtail2([], _Prefix, File, Offset, Chunk,
_ChunkExtra, _Ws, _Depth, _STime, S) -> _ChunkExtra, _Ws, _Depth, _STime, S) ->
%% io:format(user, "ok!\n", []), %% io:format(user, "ok!\n", []),
{reply, {ok, {Offset, iolist_size(Chunk), File}}, S}; {reply, {ok, {Offset, chunk_wrapper_size(Chunk), File}}, S};
do_append_midtail2([FLU|RestFLUs]=FLUs, Prefix, File, Offset, Chunk, do_append_midtail2([FLU|RestFLUs]=FLUs, Prefix, File, Offset, Chunk,
ChunkExtra, Ws, Depth, STime, ChunkExtra, Ws, Depth, STime,
#state{epoch_id=EpochID, proxies_dict=PD}=S) -> #state{epoch_id=EpochID, proxies_dict=PD}=S) ->
@ -828,3 +828,8 @@ sleep_a_while(1) ->
ok; ok;
sleep_a_while(Depth) -> sleep_a_while(Depth) ->
timer:sleep(30 + trunc(math:pow(1.9, Depth))). timer:sleep(30 + trunc(math:pow(1.9, Depth))).
chunk_wrapper_size({_TaggedCSum, Chunk}) ->
iolist_size(Chunk);
chunk_wrapper_size(Chunk) ->
iolist_size(Chunk).

View file

@ -75,7 +75,6 @@
-include_lib("kernel/include/file.hrl"). -include_lib("kernel/include/file.hrl").
-include("machi.hrl"). -include("machi.hrl").
-include("machi_pb.hrl").
-include("machi_projection.hrl"). -include("machi_projection.hrl").
-define(SERVER_CMD_READ_TIMEOUT, 600*1000). -define(SERVER_CMD_READ_TIMEOUT, 600*1000).
@ -348,7 +347,12 @@ net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) ->
ok = gen_tcp:send(Sock, <<"OK\n">>), ok = gen_tcp:send(Sock, <<"OK\n">>),
ok = inet:setopts(Sock, [{packet, 4}, ok = inet:setopts(Sock, [{packet, 4},
{packet_size, 33*1024*1024}]), {packet_size, 33*1024*1024}]),
protocol_buffers_loop(Sock, S); {ok, Proj} = machi_projection_store:read_latest_projection(
S#state.proj_store, private),
Ps = [P_srvr ||
{_, P_srvr} <- orddict:to_list(
Proj#projection_v1.members_dict)],
machi_pb_server:run_loop(Sock, Ps);
_ -> _ ->
machi_util:verb("Else Got: ~p\n", [Line]), machi_util:verb("Else Got: ~p\n", [Line]),
io:format(user, "TODO: Else Got: ~p\n", [Line]), io:format(user, "TODO: Else Got: ~p\n", [Line]),
@ -1015,21 +1019,6 @@ http_harvest_headers({ok, Hdr}, Sock, Acc) ->
http_harvest_headers(gen_tcp:recv(Sock, 0, ?SERVER_CMD_READ_TIMEOUT), http_harvest_headers(gen_tcp:recv(Sock, 0, ?SERVER_CMD_READ_TIMEOUT),
Sock, [Hdr|Acc]). Sock, [Hdr|Acc]).
protocol_buffers_loop(Sock, S) ->
case gen_tcp:recv(Sock, 0) of
{ok, Bin} ->
R = do_pb_request(catch machi_pb:decode_mpb_request(Bin)),
%% R = #mpb_response{req_id= <<"not paying any attention">>,
%% generic=#mpb_errorresp{code=-6,
%% msg="not implemented"}},
Resp = machi_pb:encode_mpb_response(R),
ok = gen_tcp:send(Sock, Resp),
protocol_buffers_loop(Sock, S);
{error, _} ->
(catch gen_tcp:close(Sock)),
exit(normal)
end.
digest_header_goop([], G) -> digest_header_goop([], G) ->
G; G;
digest_header_goop([{http_header, _, 'Content-Length', _, Str}|T], G) -> digest_header_goop([{http_header, _, 'Content-Length', _, Str}|T], G) ->
@ -1049,68 +1038,3 @@ split_uri_options(OpsBin) ->
[<<"size">>, Bin] -> [<<"size">>, Bin] ->
{size, binary_to_integer(Bin)} {size, binary_to_integer(Bin)}
end || X <- L]. end || X <- L].
do_pb_request(#mpb_request{req_id=ReqID,
echo=#mpb_echoreq{message=Msg}}) ->
#mpb_response{req_id=ReqID,
echo=#mpb_echoresp{message=Msg}};
do_pb_request(#mpb_request{req_id=ReqID,
auth=#mpb_authreq{}}) ->
#mpb_response{req_id=ReqID,
generic=#mpb_errorresp{code=1,
msg="AUTH not implemented"}};
do_pb_request(#mpb_request{req_id=ReqID,
append_chunk=AC=#mpb_appendchunkreq{}}) ->
#mpb_appendchunkreq{placement_key=____PK,
prefix=Prefix,
chunk=ChunkBin,
csum=#mpb_chunkcsum{type=CSumType, csum=CSum},
chunk_extra=ChunkExtra} = AC,
TaggedCSum = case CSumType of
'CSUM_TAG_NONE' ->
C = machi_util:checksum_chunk(ChunkBin),
machi_util:make_tagged_csum(server_sha, C);
'CSUM_TAG_CLIENT_SHA' ->
machi_util:make_tagged_csum(client_sha, CSum)
end,
Chunk = {TaggedCSum, ChunkBin},
case (catch machi_cr_client:append_chunk(todo_fixme,
Prefix, Chunk)) of
{ok, {Offset, Size, File}} ->
make_append_resp(ReqID, 'OK',
#mpb_chunkpos{offset=Offset,
chunk_size=Size,
file_name=File});
{error, bad_arg} ->
make_append_resp(ReqID, 'BAD_ARG');
{error, wedged} ->
make_append_resp(ReqID, 'WEDGED');
{error, bad_checksum} ->
make_append_resp(ReqID, 'BAD_CHECKSUM');
{error, partition} ->
make_append_resp(ReqID, 'PARTITION');
_Else ->
make_error_resp(ReqID, 66, io_lib:format("err ~p", [_Else]))
end;
do_pb_request(#mpb_request{req_id=ReqID}) ->
#mpb_response{req_id=ReqID,
generic=#mpb_errorresp{code=66,
msg="Unknown request"}};
do_pb_request(_Else) ->
#mpb_response{req_id= <<>>,
generic=#mpb_errorresp{code=67,
msg="Unknown PB request"}}.
make_append_resp(ReqID, Status) ->
make_append_resp(ReqID, Status, undefined).
make_append_resp(ReqID, Status, Where) ->
#mpb_response{req_id=ReqID,
append_chunk=#mpb_appendchunkresp{status=Status,
chunk_pos=Where}}.
make_error_resp(ReqID, Code, Msg) ->
#mpb_response{req_id=ReqID,
generic=#mpb_errorresp{code=Code,
msg=Msg}}.

View file

@ -215,7 +215,8 @@ do_send_sync({append_chunk, PlacementKey, Prefix, Chunk, CSum, ChunkExtra},
{ok, Bin1B} = gen_tcp:recv(Sock, 0), {ok, Bin1B} = gen_tcp:recv(Sock, 0),
case (catch machi_pb:decode_mpb_response(Bin1B)) of case (catch machi_pb:decode_mpb_response(Bin1B)) of
#mpb_response{req_id=ReqID, append_chunk=R} when R /= undefined -> #mpb_response{req_id=ReqID, append_chunk=R} when R /= undefined ->
{R, S#state{count=Count+1}}; Result = convert_append_chunk_resp(R),
{Result, S#state{count=Count+1}};
#mpb_response{req_id=ReqID, generic=G} when G /= undefined -> #mpb_response{req_id=ReqID, generic=G} when G /= undefined ->
#mpb_errorresp{code=Code, msg=Msg, extra=Extra} = G, #mpb_errorresp{code=Code, msg=Msg, extra=Extra} = G,
{{error, {Code, Msg, Extra}}, S#state{count=Count+1}} {{error, {Code, Msg, Extra}}, S#state{count=Count+1}}
@ -225,4 +226,18 @@ do_send_sync({append_chunk, PlacementKey, Prefix, Chunk, CSum, ChunkExtra},
{Res, S#state{count=Count+1}} {Res, S#state{count=Count+1}}
end. end.
convert_append_chunk_resp(#mpb_appendchunkresp{status='OK', chunk_pos=CP}) ->
#mpb_chunkpos{offset=Offset, chunk_size=Size, file_name=File} = CP,
{ok, {Offset, Size, File}};
convert_append_chunk_resp(#mpb_appendchunkresp{status='BAD_ARG'}) ->
{error, bad_arg};
convert_append_chunk_resp(#mpb_appendchunkresp{status='WEDGED'}) ->
{error, wedged};
convert_append_chunk_resp(#mpb_appendchunkresp{status='BAD_CHECKSUM'}) ->
{error, bad_checksum};
convert_append_chunk_resp(#mpb_appendchunkresp{status='PARTITION'}) ->
{error, partition};
convert_append_chunk_resp(#mpb_appendchunkresp{status='BAD_JOSS'}) ->
throw({error, bad_joss_taipan_fixme}).

View file

@ -37,6 +37,7 @@ smoke_test2() ->
Port = 5720, Port = 5720,
Ps = [#p_srvr{name=a, address="localhost", port=Port, props="./data.a"} Ps = [#p_srvr{name=a, address="localhost", port=Port, props="./data.a"}
], ],
D = orddict:from_list([{P#p_srvr.name, P} || P <- Ps]),
[os:cmd("rm -rf " ++ P#p_srvr.props) || P <- Ps], [os:cmd("rm -rf " ++ P#p_srvr.props) || P <- Ps],
{ok, SupPid} = machi_flu_sup:start_link(), {ok, SupPid} = machi_flu_sup:start_link(),
@ -45,6 +46,7 @@ smoke_test2() ->
#p_srvr{name=Name, port=Port, props=Dir} = P, #p_srvr{name=Name, port=Port, props=Dir} = P,
{ok, _} = machi_flu_psup:start_flu_package(Name, Port, Dir, []) {ok, _} = machi_flu_psup:start_flu_package(Name, Port, Dir, [])
end || P <- Ps], end || P <- Ps],
ok = machi_chain_manager1:set_chain_members(a_chmgr, D),
[machi_chain_manager1:test_react_to_env(a_chmgr) || _ <-lists:seq(1,5)], [machi_chain_manager1:test_react_to_env(a_chmgr) || _ <-lists:seq(1,5)],
{ok, Clnt} = ?C:start_link(Ps), {ok, Clnt} = ?C:start_link(Ps),
@ -61,7 +63,15 @@ smoke_test2() ->
PK = <<>>, PK = <<>>,
Prefix = <<"prefix">>, Prefix = <<"prefix">>,
Chunk1 = <<"Hello, chunk!">>, Chunk1 = <<"Hello, chunk!">>,
yo = ?C:append_chunk(Clnt, PK, Prefix, Chunk1, none, 0), {ok, {Off1, Size1, File1}} =
?C:append_chunk(Clnt, PK, Prefix, Chunk1, none, 0),
Chunk2 = "It's another chunk",
CSum2 = {client_sha, machi_util:checksum_chunk(Chunk2)},
{ok, {Off2, Size2, File2}} =
?C:append_chunk(Clnt, PK, Prefix, Chunk2, CSum2, 1024),
%% Chunk3 = ["This is a ", <<"test,">>, 32, [["Hello, world!"]]],
%% {ok, {Off3, Size3, File3}} =
%% ?C:write_chunk(Clnt, File2, Off2+iolist_size(Chunk2), Chunk3),
ok ok
after after