diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index e79c6de..5d7bb78 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -289,20 +289,141 @@ io:format(user, "\nSSS SockError ~p\n", [SockError]), do_pb_request(PB_request, S) -> Req = machi_pb_translate:from_pb_request(PB_request), -io:format(user, "\nSSS Req ~p\n", [Req]), + io:format(user, "\nSSS Req ~p\n", [Req]), {ReqID, Cmd, Result, S2} = case Req of - {RqID, {low_echo, Msg}=CMD} -> - Rs = Msg, - {RqID, CMD, Rs, S}; - {RqID, {low_checksum_list, EpochID, File}=CMD} -> - Rs = do_pb_server_checksum_listing(File, S), - {RqID, CMD, Rs, S}; + {RqID, CMD} -> + EpochID = element(2, CMD), % by common convention + {Rs, NewS} = do_pb_request2(EpochID, CMD, S), + {RqID, CMD, Rs, NewS}; nope -> {foo, bar, baz} end, + io:format(user, "\nSSS Result ~p\n", [Result]), {machi_pb_translate:to_pb_response(ReqID, Cmd, Result), S2}. +do_pb_request2(EpochID, CMD, S) -> + {Wedged_p, CurrentEpochID} = ets:lookup_element(S#state.etstab, epoch, 2), + if Wedged_p == true -> + {error, wedged}; + EpochID /= undefined andalso EpochID /= CurrentEpochID -> + {Epoch, _} = EpochID, + {CurrentEpoch, _} = CurrentEpochID, + if Epoch < CurrentEpoch -> + ok; + true -> + %% We're at same epoch # but different checksum, or + %% we're at a newer/bigger epoch #. + io:format(user, "\n\nTODO: wedge myself!\n\n", []), + todo_wedge_myself + end, + {error, bad_epoch}; + true -> + do_pb_request3(CMD, S) + end. + +do_pb_request3({low_echo, _BogusEpochID, Msg}, S) -> + {Msg, S}; +do_pb_request3({low_append_chunk, EpochID, PKey, Prefix, Chunk, CSum_tag, CSum, + ChunkExtra}, S) -> + {do_pb_server_append_chunk(PKey, Prefix, Chunk, CSum_tag, CSum, + ChunkExtra, S), S}; +do_pb_request3({low_checksum_list, _EpochID, File}, S) -> + {do_pb_server_checksum_listing(File, S), S}; +do_pb_request3({low_list_files, _EpochID}, S) -> + {do_pb_server_list_files(S), S}; +do_pb_request3({low_wedge_status, _EpochID}, S) -> + {do_pb_server_wedge_status(S), S}. + +do_pb_server_append_chunk(PKey, Prefix, Chunk, CSum_tag, CSum, + ChunkExtra, S) -> + case sanitize_file_string(Prefix) of + ok -> + do_pb_server_append_chunk3(PKey, Prefix, Chunk, CSum_tag, CSum, + ChunkExtra, S); + _ -> + {error, bad_arg} + end. + +do_pb_server_append_chunk3(_PKey, Prefix, Chunk, CSum_tag, Client_CSum, + ChunkExtra, #state{flu_name=FluName}=_S) -> + %% TODO: Do anything with PKey? + try + CSum = case CSum_tag of + ?CSUM_TAG_NONE -> + %% TODO: If the client was foolish enough to use + %% this type of non-checksum, then the client gets + %% what it deserves wrt data integrity, alas. In + %% the client-side Chain Replication method, each + %% server will calculated this independently, which + %% isn't exactly what ought to happen for best data + %% integrity checking. In server-side CR, the csum + %% should be calculated by the head and passed down + %% the chain together with the value. + CS = machi_util:checksum_chunk(Chunk), + machi_util:make_tagged_csum(server_sha, CS); + ?CSUM_TAG_CLIENT_SHA -> + CS = machi_util:checksum_chunk(Chunk), + if CS == Client_CSum -> + Client_CSum; + true -> + throw({bad_csum, CS}) + end + end, + FluName ! {seq_append, self(), Prefix, Chunk, CSum, ChunkExtra}, + receive + {assignment, Offset, File} -> + Size = if is_binary(Chunk) -> + byte_size(Chunk); + is_list(Chunk) -> + iolist_size(Chunk) + end, + {ok, {Offset, Size, File}}; + wedged -> + {error, wedged} + after 10*1000 -> + {error, partition} + end + catch + throw:{bad_csum, _CS} -> + {error, bad_checksum}; + error:badarg -> + error_logger:error_msg("Message send to ~p gave badarg, make certain server is running with correct registered name\n", [?MODULE]), + {error, bad_arg} + end. + +do_pb_server_checksum_listing(File, #state{data_dir=DataDir}=_S) -> + case sanitize_file_string(File) of + ok -> + ok = sync_checksum_file(File), + CSumPath = machi_util:make_checksum_filename(DataDir, File), + %% TODO: If this file is legitimately bigger than our + %% {packet_size,N} limit, then we'll have a difficult time, eh? + case file:read_file(CSumPath) of + {ok, Bin} -> + {ok, Bin}; + {error, enoent} -> + {error, no_such_file}; + {error, _} -> + {error, bad_arg} + end; + _ -> + {error, bad_arg} + end. + +do_pb_server_list_files(#state{data_dir=DataDir}=_S) -> + {_, WildPath} = machi_util:make_data_filename(DataDir, ""), + Files = filelib:wildcard("*", WildPath), + {ok, [begin + {ok, FI} = file:read_file_info(WildPath ++ "/" ++ File), + Size = FI#file_info.size, + {Size, File} + end || File <- Files]}. + +do_pb_server_wedge_status(S) -> + {Wedged_p, CurrentEpochID} = ets:lookup_element(S#state.etstab, epoch, 2), + {Wedged_p, CurrentEpochID}. + net_server_loop_old(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) -> %% TODO: Add testing control knob to adjust this timeout and/or inject %% timeout condition. @@ -335,7 +456,7 @@ net_server_loop_old(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) -> EpochID, S); <<"L ", EpochIDHex:(?EpochIDSpace)/binary, "\n">> -> _EpochID = decode_epoch_id(EpochIDHex), - do_net_server_listing(Sock, DataDir, S); + delme; %% do_net_server_listing(Sock, DataDir, S); <<"C ", EpochIDHex:(?EpochIDSpace)/binary, File:CSumFileLenLF/binary, "\n">> -> @@ -659,29 +780,6 @@ decode_and_reply_net_server_ec_read_version_a(Sock, Rest) -> <> = Rest2, ok = gen_tcp:send(Sock, ["ERASURE ", BodyLenHex, " ", Hdr, Body]). -do_net_server_listing(Sock, DataDir, S) -> - {Wedged_p, _CurrentEpochId} = ets:lookup_element(S#state.etstab, epoch, 2), - if Wedged_p -> - ok = gen_tcp:send(Sock, <<"ERROR WEDGED\n">>); - true -> - do_net_server_listing2(Sock, DataDir) - end. - -do_net_server_listing2(Sock, DataDir) -> - {_, WildPath} = machi_util:make_data_filename(DataDir, ""), - Files = filelib:wildcard("*", WildPath), - Out = ["OK\n", - [begin - {ok, FI} = file:read_file_info(WildPath ++ "/" ++ File), - Size = FI#file_info.size, - SizeBin = <>, - [machi_util:bin_to_hexstr(SizeBin), <<" ">>, - list_to_binary(File), <<"\n">>] - end || File <- Files], - ".\n" - ], - ok = gen_tcp:send(Sock, Out). - do_net_server_checksum_listing(Sock, File, DataDir, S) -> {Wedged_p, _CurrentEpochId} = ets:lookup_element(S#state.etstab, epoch, 2), case {Wedged_p, sanitize_file_string(File)} of @@ -693,28 +791,6 @@ do_net_server_checksum_listing(Sock, File, DataDir, S) -> ok = gen_tcp:send(Sock, <<"ERROR BAD-ARG\n">>) end. -do_pb_server_checksum_listing(File, #state{data_dir=DataDir}=S) -> - {Wedged_p, _CurrentEpochId} = ets:lookup_element(S#state.etstab, epoch, 2), - case {Wedged_p, sanitize_file_string(File)} of - {true, _} -> - {error, wedged}; - {false, ok} -> - ok = sync_checksum_file(File), - - CSumPath = machi_util:make_checksum_filename(DataDir, File), - %% case file:open(CSumPath, [read, raw, binary]) of - case file:read_file(CSumPath) of - {ok, Bin} -> - {ok, Bin}; - {error, enoent} -> - {error, no_such_file}; - {error, _} -> - {error, bad_arg} - end; - _ -> - {error, bad_arg} - end. - do_net_server_checksum_listing2(Sock, File, DataDir) -> ok = sync_checksum_file(File), diff --git a/src/machi_flu1_client.erl b/src/machi_flu1_client.erl index 7ea1251..5d5f428 100644 --- a/src/machi_flu1_client.erl +++ b/src/machi_flu1_client.erl @@ -478,55 +478,6 @@ trunc_hack(Host, TcpPort, EpochID, File) when is_integer(TcpPort) -> %%%%%%%%%%%%%%%%%%%%%%%%%%% -append_chunk2(Sock, EpochID, Prefix0, Chunk0, ChunkExtra) -> - erase(bad_sock), - try - %% TODO: add client-side checksum to the server's protocol - %% _ = machi_util:checksum_chunk(Chunk), - Prefix = machi_util:make_binary(Prefix0), - {CSum, Chunk} = case Chunk0 of - {_,_} -> - Chunk0; - XX when is_binary(XX) -> - SHA = machi_util:checksum_chunk(Chunk0), - {<>, Chunk0} - end, - Len = iolist_size(Chunk), - true = (Len =< ?MAX_CHUNK_SIZE), - {EpochNum, EpochCSum} = EpochID, - EpochIDHex = machi_util:bin_to_hexstr( - <>), - CSumHex = machi_util:bin_to_hexstr(CSum), - LenHex = machi_util:int_to_hexbin(Len, 32), - ExtraHex = machi_util:int_to_hexbin(ChunkExtra, 32), - Cmd = [<<"A ">>, EpochIDHex, CSumHex, LenHex, ExtraHex, Prefix, 10], - ok = w_send(Sock, [Cmd, Chunk]), - {ok, Line} = w_recv(Sock, 0), - PathLen = byte_size(Line) - 3 - (2*(1+20)) - 16 - 1 - 1 - 1, - case Line of - <<"OK ", ServerCSum:(2*(1+20))/binary, " ", - OffsetHex:16/binary, " ", - Path:PathLen/binary, _:1/binary>> -> - Offset = machi_util:hexstr_to_int(OffsetHex), - {ok, {Offset, Len, Path}}; - <<"ERROR BAD-ARG", _/binary>> -> - {error, bad_arg}; - <<"ERROR WEDGED", _/binary>> -> - {error, wedged}; - <<"ERROR BAD-CHECKSUM", _/binary>> -> - {error, bad_checksum}; - <<"ERROR ", Rest/binary>> -> - {error, Rest} - end - catch - throw:Error -> - put(bad_sock, Sock), - Error; - error:{badmatch,_}=BadMatch -> - put(bad_sock, Sock), - {error, {badmatch, BadMatch, erlang:get_stacktrace()}} - end. - read_chunk2(Sock, EpochID, File0, Offset, Size) -> erase(bad_sock), try @@ -582,74 +533,40 @@ read_chunk2(Sock, EpochID, File0, Offset, Size) -> {error, {badmatch, BadMatch, erlang:get_stacktrace()}} end. -list2(Sock, EpochID) -> - try - {EpochNum, EpochCSum} = EpochID, - EpochIDHex = machi_util:bin_to_hexstr( - <>), - ok = w_send(Sock, [<<"L ">>, EpochIDHex, <<"\n">>]), - ok = w_setopts(Sock, [{packet, line}]), - case w_recv(Sock, 0) of - {ok, <<"OK\n">>} -> - Res = list3(w_recv(Sock, 0), Sock), - ok = w_setopts(Sock, [{packet, raw}]), - {ok, Res}; - {ok, <<"ERROR WEDGED\n">>} -> - {error, wedged}; - {ok, <<"ERROR ", Rest/binary>>} -> - {error, Rest} - end - catch - throw:Error -> - Error; - error:{case_clause,_}=Noo -> - put(bad_sock, Sock), - {error, {badmatch, Noo, erlang:get_stacktrace()}}; - error:{badmatch,_}=BadMatch -> - put(bad_sock, Sock), - {error, {badmatch, BadMatch}} - end. +append_chunk2(Sock, EpochID, Prefix0, Chunk0, ChunkExtra) -> + ReqID = <<"id">>, + {Chunk, CSum_tag, CSum} = + case Chunk0 of + X when is_binary(X) -> + {Chunk0, ?CSUM_TAG_NONE, <<>>}; + {ChunkCSum, Chk} -> + {Tag, CS} = machi_util:unmake_tagged_csum(ChunkCSum), + {Chk, Tag, CS} + end, + PKey = <<>>, % TODO + Prefix = machi_util:make_binary(Prefix0), + Req = machi_pb_translate:to_pb_request( + ReqID, + {low_append_chunk, EpochID, PKey, Prefix, Chunk, CSum_tag, CSum, + ChunkExtra}), + do_pb_request_common(Sock, ReqID, Req). -list3({ok, <<".\n">>}, _Sock) -> - []; -list3({ok, Line}, Sock) -> - FileLen = byte_size(Line) - 16 - 1 - 1, - <> = Line, - Size = machi_util:hexstr_to_int(SizeHex), - [{Size, File}|list3(w_recv(Sock, 0), Sock)]; -list3(Else, _Sock) -> - throw({server_protocol_error, Else}). +list2(Sock, EpochID) -> + ReqID = <<"id">>, + Req = machi_pb_translate:to_pb_request( + ReqID, {low_list_files, EpochID}), + do_pb_request_common(Sock, ReqID, Req). wedge_status2(Sock) -> - try - ok = w_send(Sock, [<<"WEDGE-STATUS\n">>]), - ok = w_setopts(Sock, [{packet, line}]), - {ok, <<"OK ", - BooleanHex:2/binary, " ", - EpochHex:8/binary, " ", - CSumHex:40/binary, "\n">>} = w_recv(Sock, 0), - ok = w_setopts(Sock, [{packet, raw}]), - Boolean = if BooleanHex == <<"00">> -> false; - BooleanHex == <<"01">> -> true - end, - Res = {Boolean, {machi_util:hexstr_to_int(EpochHex), - machi_util:hexstr_to_bin(CSumHex)}}, - {ok, Res} - catch - throw:Error -> - Error; - error:{case_clause,_}=Noo -> - put(bad_sock, Sock), - {error, {badmatch, Noo, erlang:get_stacktrace()}}; - error:{badmatch,_}=BadMatch -> - put(bad_sock, Sock), - {error, {badmatch, BadMatch}} - end. + ReqID = <<"id">>, + Req = machi_pb_translate:to_pb_request( + ReqID, {low_wedge_status, undefined}), + do_pb_request_common(Sock, ReqID, Req). echo2(Sock, Message) -> ReqID = <<"id">>, Req = machi_pb_translate:to_pb_request( - ReqID, {low_echo, Message}), + ReqID, {low_echo, undefined, Message}), do_pb_request_common(Sock, ReqID, Req). checksum_list2(Sock, EpochID, File) -> diff --git a/src/machi_pb_translate.erl b/src/machi_pb_translate.erl index 771984f..df14832 100644 --- a/src/machi_pb_translate.erl +++ b/src/machi_pb_translate.erl @@ -35,11 +35,11 @@ from_pb_request(#mpb_ll_request{ req_id=ReqID, echo=#mpb_echoreq{message=Msg}}) -> - {ReqID, {low_echo, Msg}}; + {ReqID, {low_echo, undefined, Msg}}; from_pb_request(#mpb_ll_request{ req_id=ReqID, auth=#mpb_authreq{user=User, password=Pass}}) -> - {ReqID, {low_auth, User, Pass}}; + {ReqID, {low_auth, undefined, User, Pass}}; from_pb_request(#mpb_ll_request{ req_id=ReqID, append_chunk=#mpb_ll_appendchunkreq{ @@ -93,7 +93,7 @@ from_pb_request(#mpb_ll_request{ from_pb_request(#mpb_ll_request{ req_id=ReqID, wedge_status=#mpb_ll_wedgestatusreq{}}) -> - {ReqID, {low_wedge_status}}; + {ReqID, {low_wedge_status, undefined}}; %%qqq from_pb_request(#mpb_request{req_id=ReqID, echo=#mpb_echoreq{message=Msg}}) -> @@ -203,7 +203,7 @@ from_pb_response(#mpb_ll_response{ Wedged_p = if PB_Wedged == 1 -> true; PB_Wedged == 0 -> false end, - {ReqID, {EpochID, Wedged_p}}; + {ReqID, {ok, {Wedged_p, EpochID}}}; %%qqq from_pb_response(#mpb_ll_response{ req_id=ReqID, @@ -265,11 +265,11 @@ from_pb_response(#mpb_ll_response{ %% TODO: move the #mbp_* record making code from %% machi_pb_high_client:do_send_sync() clauses into to_pb_request(). -to_pb_request(ReqID, {low_echo, Msg}) -> +to_pb_request(ReqID, {low_echo, _BogusEpochID, Msg}) -> #mpb_ll_request{ req_id=ReqID, echo=#mpb_echoreq{message=Msg}}; -to_pb_request(ReqID, {low_auth, User, Pass}) -> +to_pb_request(ReqID, {low_auth, _BogusEpochID, User, Pass}) -> #mpb_ll_request{req_id=ReqID, auth=#mpb_authreq{user=User, password=Pass}}; to_pb_request(ReqID, {low_append_chunk, EpochID, PKey, Prefix, Chunk, @@ -316,16 +316,16 @@ to_pb_request(ReqID, {low_list_files, EpochID}) -> PB_EpochID = conv_from_epoch_id(EpochID), #mpb_ll_request{req_id=ReqID, list_files=#mpb_ll_listfilesreq{epoch_id=PB_EpochID}}; -to_pb_request(ReqID, {low_wedge_status}) -> +to_pb_request(ReqID, {low_wedge_status, _BogusEpochID}) -> #mpb_ll_request{req_id=ReqID, wedge_status=#mpb_ll_wedgestatusreq{}}. %%qqq -to_pb_response(ReqID, {low_echo, _Msg}, Resp) -> +to_pb_response(ReqID, {low_echo, _BogusEpochID, _Msg}, Resp) -> #mpb_ll_response{ req_id=ReqID, echo=#mpb_echoresp{message=Resp}}; -to_pb_response(ReqID, {low_auth, _, _}, Resp) -> +to_pb_response(ReqID, {low_auth, _, _, _}, Resp) -> #mpb_ll_response{ req_id=ReqID, auth=#mpb_authresp{code=Resp}}; @@ -392,11 +392,10 @@ to_pb_response(ReqID, {low_list_files, _EpochID}, Resp) -> _Else -> make_ll_error_resp(ReqID, 66, io_lib:format("err ~p", [_Else])) end; -to_pb_response(ReqID, {low_wedge_status}, {EpochID, Wedged_p}=_Resp) -> +to_pb_response(ReqID, {low_wedge_status, _BogusEpochID}, Resp) -> + {Wedged_p, EpochID} = Resp, + PB_Wedged = conv_from_boolean(Wedged_p), PB_EpochID = conv_from_epoch_id(EpochID), - PB_Wedged = if Wedged_p == true -> 1; - Wedged_p == false -> 0 - end, #mpb_ll_response{req_id=ReqID, wedge_status=#mpb_ll_wedgestatusresp{epoch_id=PB_EpochID, wedged_flag=PB_Wedged}}; @@ -666,3 +665,8 @@ conv_to_boolean(0) -> false; conv_to_boolean(N) when is_integer(N) -> true. + +conv_from_boolean(false) -> + 0; +conv_from_boolean(true) -> + 1. diff --git a/src/machi_util.erl b/src/machi_util.erl index a10e186..6ebe1bc 100644 --- a/src/machi_util.erl +++ b/src/machi_util.erl @@ -25,6 +25,7 @@ -export([ checksum_chunk/1, make_tagged_csum/1, make_tagged_csum/2, + unmake_tagged_csum/1, hexstr_to_bin/1, bin_to_hexstr/1, hexstr_to_int/1, int_to_hexstr/2, int_to_hexbin/2, make_binary/1, make_string/1,