diff --git a/src/machi_cr_client.erl b/src/machi_cr_client.erl index 09ef18d..ce30722 100644 --- a/src/machi_cr_client.erl +++ b/src/machi_cr_client.erl @@ -372,7 +372,7 @@ do_append_midtail(_RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra, do_append_midtail2([], _Prefix, File, Offset, Chunk, _ChunkExtra, _Ws, _Depth, _STime, S) -> %% io:format(user, "ok!\n", []), - {reply, {ok, {Offset, iolist_size(Chunk), File}}, S}; + {reply, {ok, {Offset, chunk_wrapper_size(Chunk), File}}, S}; do_append_midtail2([FLU|RestFLUs]=FLUs, Prefix, File, Offset, Chunk, ChunkExtra, Ws, Depth, STime, #state{epoch_id=EpochID, proxies_dict=PD}=S) -> @@ -828,3 +828,8 @@ sleep_a_while(1) -> ok; sleep_a_while(Depth) -> timer:sleep(30 + trunc(math:pow(1.9, Depth))). + +chunk_wrapper_size({_TaggedCSum, Chunk}) -> + iolist_size(Chunk); +chunk_wrapper_size(Chunk) -> + iolist_size(Chunk). diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index 3927a01..cbd4242 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -75,7 +75,6 @@ -include_lib("kernel/include/file.hrl"). -include("machi.hrl"). --include("machi_pb.hrl"). -include("machi_projection.hrl"). -define(SERVER_CMD_READ_TIMEOUT, 600*1000). @@ -348,7 +347,12 @@ net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) -> ok = gen_tcp:send(Sock, <<"OK\n">>), ok = inet:setopts(Sock, [{packet, 4}, {packet_size, 33*1024*1024}]), - protocol_buffers_loop(Sock, S); + {ok, Proj} = machi_projection_store:read_latest_projection( + S#state.proj_store, private), + Ps = [P_srvr || + {_, P_srvr} <- orddict:to_list( + Proj#projection_v1.members_dict)], + machi_pb_server:run_loop(Sock, Ps); _ -> machi_util:verb("Else Got: ~p\n", [Line]), io:format(user, "TODO: Else Got: ~p\n", [Line]), @@ -1015,21 +1019,6 @@ http_harvest_headers({ok, Hdr}, Sock, Acc) -> http_harvest_headers(gen_tcp:recv(Sock, 0, ?SERVER_CMD_READ_TIMEOUT), Sock, [Hdr|Acc]). -protocol_buffers_loop(Sock, S) -> - case gen_tcp:recv(Sock, 0) of - {ok, Bin} -> - R = do_pb_request(catch machi_pb:decode_mpb_request(Bin)), - %% R = #mpb_response{req_id= <<"not paying any attention">>, - %% generic=#mpb_errorresp{code=-6, - %% msg="not implemented"}}, - Resp = machi_pb:encode_mpb_response(R), - ok = gen_tcp:send(Sock, Resp), - protocol_buffers_loop(Sock, S); - {error, _} -> - (catch gen_tcp:close(Sock)), - exit(normal) - end. - digest_header_goop([], G) -> G; digest_header_goop([{http_header, _, 'Content-Length', _, Str}|T], G) -> @@ -1049,68 +1038,3 @@ split_uri_options(OpsBin) -> [<<"size">>, Bin] -> {size, binary_to_integer(Bin)} end || X <- L]. - -do_pb_request(#mpb_request{req_id=ReqID, - echo=#mpb_echoreq{message=Msg}}) -> - #mpb_response{req_id=ReqID, - echo=#mpb_echoresp{message=Msg}}; -do_pb_request(#mpb_request{req_id=ReqID, - auth=#mpb_authreq{}}) -> - #mpb_response{req_id=ReqID, - generic=#mpb_errorresp{code=1, - msg="AUTH not implemented"}}; -do_pb_request(#mpb_request{req_id=ReqID, - append_chunk=AC=#mpb_appendchunkreq{}}) -> - #mpb_appendchunkreq{placement_key=____PK, - prefix=Prefix, - chunk=ChunkBin, - csum=#mpb_chunkcsum{type=CSumType, csum=CSum}, - chunk_extra=ChunkExtra} = AC, - TaggedCSum = case CSumType of - 'CSUM_TAG_NONE' -> - C = machi_util:checksum_chunk(ChunkBin), - machi_util:make_tagged_csum(server_sha, C); - 'CSUM_TAG_CLIENT_SHA' -> - machi_util:make_tagged_csum(client_sha, CSum) - end, - Chunk = {TaggedCSum, ChunkBin}, - case (catch machi_cr_client:append_chunk(todo_fixme, - Prefix, Chunk)) of - {ok, {Offset, Size, File}} -> - make_append_resp(ReqID, 'OK', - #mpb_chunkpos{offset=Offset, - chunk_size=Size, - file_name=File}); - {error, bad_arg} -> - make_append_resp(ReqID, 'BAD_ARG'); - {error, wedged} -> - make_append_resp(ReqID, 'WEDGED'); - {error, bad_checksum} -> - make_append_resp(ReqID, 'BAD_CHECKSUM'); - {error, partition} -> - make_append_resp(ReqID, 'PARTITION'); - _Else -> - make_error_resp(ReqID, 66, io_lib:format("err ~p", [_Else])) - end; -do_pb_request(#mpb_request{req_id=ReqID}) -> - #mpb_response{req_id=ReqID, - generic=#mpb_errorresp{code=66, - msg="Unknown request"}}; -do_pb_request(_Else) -> - #mpb_response{req_id= <<>>, - generic=#mpb_errorresp{code=67, - msg="Unknown PB request"}}. - -make_append_resp(ReqID, Status) -> - make_append_resp(ReqID, Status, undefined). - -make_append_resp(ReqID, Status, Where) -> - #mpb_response{req_id=ReqID, - append_chunk=#mpb_appendchunkresp{status=Status, - chunk_pos=Where}}. - -make_error_resp(ReqID, Code, Msg) -> - #mpb_response{req_id=ReqID, - generic=#mpb_errorresp{code=Code, - msg=Msg}}. - diff --git a/src/machi_pb_high_client.erl b/src/machi_pb_high_client.erl index 7c62e44..6529bca 100644 --- a/src/machi_pb_high_client.erl +++ b/src/machi_pb_high_client.erl @@ -215,7 +215,8 @@ do_send_sync({append_chunk, PlacementKey, Prefix, Chunk, CSum, ChunkExtra}, {ok, Bin1B} = gen_tcp:recv(Sock, 0), case (catch machi_pb:decode_mpb_response(Bin1B)) of #mpb_response{req_id=ReqID, append_chunk=R} when R /= undefined -> - {R, S#state{count=Count+1}}; + Result = convert_append_chunk_resp(R), + {Result, S#state{count=Count+1}}; #mpb_response{req_id=ReqID, generic=G} when G /= undefined -> #mpb_errorresp{code=Code, msg=Msg, extra=Extra} = G, {{error, {Code, Msg, Extra}}, S#state{count=Count+1}} @@ -225,4 +226,18 @@ do_send_sync({append_chunk, PlacementKey, Prefix, Chunk, CSum, ChunkExtra}, {Res, S#state{count=Count+1}} end. +convert_append_chunk_resp(#mpb_appendchunkresp{status='OK', chunk_pos=CP}) -> + #mpb_chunkpos{offset=Offset, chunk_size=Size, file_name=File} = CP, + {ok, {Offset, Size, File}}; +convert_append_chunk_resp(#mpb_appendchunkresp{status='BAD_ARG'}) -> + {error, bad_arg}; +convert_append_chunk_resp(#mpb_appendchunkresp{status='WEDGED'}) -> + {error, wedged}; +convert_append_chunk_resp(#mpb_appendchunkresp{status='BAD_CHECKSUM'}) -> + {error, bad_checksum}; +convert_append_chunk_resp(#mpb_appendchunkresp{status='PARTITION'}) -> + {error, partition}; +convert_append_chunk_resp(#mpb_appendchunkresp{status='BAD_JOSS'}) -> + throw({error, bad_joss_taipan_fixme}). + diff --git a/test/machi_pb_high_client_test.erl b/test/machi_pb_high_client_test.erl index dce1395..73b35a5 100644 --- a/test/machi_pb_high_client_test.erl +++ b/test/machi_pb_high_client_test.erl @@ -37,6 +37,7 @@ smoke_test2() -> Port = 5720, Ps = [#p_srvr{name=a, address="localhost", port=Port, props="./data.a"} ], + D = orddict:from_list([{P#p_srvr.name, P} || P <- Ps]), [os:cmd("rm -rf " ++ P#p_srvr.props) || P <- Ps], {ok, SupPid} = machi_flu_sup:start_link(), @@ -45,6 +46,7 @@ smoke_test2() -> #p_srvr{name=Name, port=Port, props=Dir} = P, {ok, _} = machi_flu_psup:start_flu_package(Name, Port, Dir, []) end || P <- Ps], + ok = machi_chain_manager1:set_chain_members(a_chmgr, D), [machi_chain_manager1:test_react_to_env(a_chmgr) || _ <-lists:seq(1,5)], {ok, Clnt} = ?C:start_link(Ps), @@ -61,7 +63,15 @@ smoke_test2() -> PK = <<>>, Prefix = <<"prefix">>, Chunk1 = <<"Hello, chunk!">>, - yo = ?C:append_chunk(Clnt, PK, Prefix, Chunk1, none, 0), + {ok, {Off1, Size1, File1}} = + ?C:append_chunk(Clnt, PK, Prefix, Chunk1, none, 0), + Chunk2 = "It's another chunk", + CSum2 = {client_sha, machi_util:checksum_chunk(Chunk2)}, + {ok, {Off2, Size2, File2}} = + ?C:append_chunk(Clnt, PK, Prefix, Chunk2, CSum2, 1024), + %% Chunk3 = ["This is a ", <<"test,">>, 32, [["Hello, world!"]]], + %% {ok, {Off3, Size3, File3}} = + %% ?C:write_chunk(Clnt, File2, Off2+iolist_size(Chunk2), Chunk3), ok after