WIP: giant hairball 4

This commit is contained in:
Scott Lystig Fritchie 2015-06-26 20:47:55 +09:00
parent 0e32fd25c9
commit 9a212fb19f
4 changed files with 174 additions and 176 deletions

View file

@ -289,20 +289,141 @@ io:format(user, "\nSSS SockError ~p\n", [SockError]),
do_pb_request(PB_request, S) -> do_pb_request(PB_request, S) ->
Req = machi_pb_translate:from_pb_request(PB_request), Req = machi_pb_translate:from_pb_request(PB_request),
io:format(user, "\nSSS Req ~p\n", [Req]), io:format(user, "\nSSS Req ~p\n", [Req]),
{ReqID, Cmd, Result, S2} = {ReqID, Cmd, Result, S2} =
case Req of case Req of
{RqID, {low_echo, Msg}=CMD} -> {RqID, CMD} ->
Rs = Msg, EpochID = element(2, CMD), % by common convention
{RqID, CMD, Rs, S}; {Rs, NewS} = do_pb_request2(EpochID, CMD, S),
{RqID, {low_checksum_list, EpochID, File}=CMD} -> {RqID, CMD, Rs, NewS};
Rs = do_pb_server_checksum_listing(File, S),
{RqID, CMD, Rs, S};
nope -> nope ->
{foo, bar, baz} {foo, bar, baz}
end, end,
io:format(user, "\nSSS Result ~p\n", [Result]),
{machi_pb_translate:to_pb_response(ReqID, Cmd, Result), S2}. {machi_pb_translate:to_pb_response(ReqID, Cmd, Result), S2}.
do_pb_request2(EpochID, CMD, S) ->
{Wedged_p, CurrentEpochID} = ets:lookup_element(S#state.etstab, epoch, 2),
if Wedged_p == true ->
{error, wedged};
EpochID /= undefined andalso EpochID /= CurrentEpochID ->
{Epoch, _} = EpochID,
{CurrentEpoch, _} = CurrentEpochID,
if Epoch < CurrentEpoch ->
ok;
true ->
%% We're at same epoch # but different checksum, or
%% we're at a newer/bigger epoch #.
io:format(user, "\n\nTODO: wedge myself!\n\n", []),
todo_wedge_myself
end,
{error, bad_epoch};
true ->
do_pb_request3(CMD, S)
end.
do_pb_request3({low_echo, _BogusEpochID, Msg}, S) ->
{Msg, S};
do_pb_request3({low_append_chunk, EpochID, PKey, Prefix, Chunk, CSum_tag, CSum,
ChunkExtra}, S) ->
{do_pb_server_append_chunk(PKey, Prefix, Chunk, CSum_tag, CSum,
ChunkExtra, S), S};
do_pb_request3({low_checksum_list, _EpochID, File}, S) ->
{do_pb_server_checksum_listing(File, S), S};
do_pb_request3({low_list_files, _EpochID}, S) ->
{do_pb_server_list_files(S), S};
do_pb_request3({low_wedge_status, _EpochID}, S) ->
{do_pb_server_wedge_status(S), S}.
do_pb_server_append_chunk(PKey, Prefix, Chunk, CSum_tag, CSum,
ChunkExtra, S) ->
case sanitize_file_string(Prefix) of
ok ->
do_pb_server_append_chunk3(PKey, Prefix, Chunk, CSum_tag, CSum,
ChunkExtra, S);
_ ->
{error, bad_arg}
end.
do_pb_server_append_chunk3(_PKey, Prefix, Chunk, CSum_tag, Client_CSum,
ChunkExtra, #state{flu_name=FluName}=_S) ->
%% TODO: Do anything with PKey?
try
CSum = case CSum_tag of
?CSUM_TAG_NONE ->
%% TODO: If the client was foolish enough to use
%% this type of non-checksum, then the client gets
%% what it deserves wrt data integrity, alas. In
%% the client-side Chain Replication method, each
%% server will calculated this independently, which
%% isn't exactly what ought to happen for best data
%% integrity checking. In server-side CR, the csum
%% should be calculated by the head and passed down
%% the chain together with the value.
CS = machi_util:checksum_chunk(Chunk),
machi_util:make_tagged_csum(server_sha, CS);
?CSUM_TAG_CLIENT_SHA ->
CS = machi_util:checksum_chunk(Chunk),
if CS == Client_CSum ->
Client_CSum;
true ->
throw({bad_csum, CS})
end
end,
FluName ! {seq_append, self(), Prefix, Chunk, CSum, ChunkExtra},
receive
{assignment, Offset, File} ->
Size = if is_binary(Chunk) ->
byte_size(Chunk);
is_list(Chunk) ->
iolist_size(Chunk)
end,
{ok, {Offset, Size, File}};
wedged ->
{error, wedged}
after 10*1000 ->
{error, partition}
end
catch
throw:{bad_csum, _CS} ->
{error, bad_checksum};
error:badarg ->
error_logger:error_msg("Message send to ~p gave badarg, make certain server is running with correct registered name\n", [?MODULE]),
{error, bad_arg}
end.
do_pb_server_checksum_listing(File, #state{data_dir=DataDir}=_S) ->
case sanitize_file_string(File) of
ok ->
ok = sync_checksum_file(File),
CSumPath = machi_util:make_checksum_filename(DataDir, File),
%% TODO: If this file is legitimately bigger than our
%% {packet_size,N} limit, then we'll have a difficult time, eh?
case file:read_file(CSumPath) of
{ok, Bin} ->
{ok, Bin};
{error, enoent} ->
{error, no_such_file};
{error, _} ->
{error, bad_arg}
end;
_ ->
{error, bad_arg}
end.
do_pb_server_list_files(#state{data_dir=DataDir}=_S) ->
{_, WildPath} = machi_util:make_data_filename(DataDir, ""),
Files = filelib:wildcard("*", WildPath),
{ok, [begin
{ok, FI} = file:read_file_info(WildPath ++ "/" ++ File),
Size = FI#file_info.size,
{Size, File}
end || File <- Files]}.
do_pb_server_wedge_status(S) ->
{Wedged_p, CurrentEpochID} = ets:lookup_element(S#state.etstab, epoch, 2),
{Wedged_p, CurrentEpochID}.
net_server_loop_old(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) -> net_server_loop_old(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) ->
%% TODO: Add testing control knob to adjust this timeout and/or inject %% TODO: Add testing control knob to adjust this timeout and/or inject
%% timeout condition. %% timeout condition.
@ -335,7 +456,7 @@ net_server_loop_old(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) ->
EpochID, S); EpochID, S);
<<"L ", EpochIDHex:(?EpochIDSpace)/binary, "\n">> -> <<"L ", EpochIDHex:(?EpochIDSpace)/binary, "\n">> ->
_EpochID = decode_epoch_id(EpochIDHex), _EpochID = decode_epoch_id(EpochIDHex),
do_net_server_listing(Sock, DataDir, S); delme; %% do_net_server_listing(Sock, DataDir, S);
<<"C ", <<"C ",
EpochIDHex:(?EpochIDSpace)/binary, EpochIDHex:(?EpochIDSpace)/binary,
File:CSumFileLenLF/binary, "\n">> -> File:CSumFileLenLF/binary, "\n">> ->
@ -659,29 +780,6 @@ decode_and_reply_net_server_ec_read_version_a(Sock, Rest) ->
<<Body:BodyLen/binary, _/binary>> = Rest2, <<Body:BodyLen/binary, _/binary>> = Rest2,
ok = gen_tcp:send(Sock, ["ERASURE ", BodyLenHex, " ", Hdr, Body]). ok = gen_tcp:send(Sock, ["ERASURE ", BodyLenHex, " ", Hdr, Body]).
do_net_server_listing(Sock, DataDir, S) ->
{Wedged_p, _CurrentEpochId} = ets:lookup_element(S#state.etstab, epoch, 2),
if Wedged_p ->
ok = gen_tcp:send(Sock, <<"ERROR WEDGED\n">>);
true ->
do_net_server_listing2(Sock, DataDir)
end.
do_net_server_listing2(Sock, DataDir) ->
{_, WildPath} = machi_util:make_data_filename(DataDir, ""),
Files = filelib:wildcard("*", WildPath),
Out = ["OK\n",
[begin
{ok, FI} = file:read_file_info(WildPath ++ "/" ++ File),
Size = FI#file_info.size,
SizeBin = <<Size:64/big>>,
[machi_util:bin_to_hexstr(SizeBin), <<" ">>,
list_to_binary(File), <<"\n">>]
end || File <- Files],
".\n"
],
ok = gen_tcp:send(Sock, Out).
do_net_server_checksum_listing(Sock, File, DataDir, S) -> do_net_server_checksum_listing(Sock, File, DataDir, S) ->
{Wedged_p, _CurrentEpochId} = ets:lookup_element(S#state.etstab, epoch, 2), {Wedged_p, _CurrentEpochId} = ets:lookup_element(S#state.etstab, epoch, 2),
case {Wedged_p, sanitize_file_string(File)} of case {Wedged_p, sanitize_file_string(File)} of
@ -693,28 +791,6 @@ do_net_server_checksum_listing(Sock, File, DataDir, S) ->
ok = gen_tcp:send(Sock, <<"ERROR BAD-ARG\n">>) ok = gen_tcp:send(Sock, <<"ERROR BAD-ARG\n">>)
end. end.
do_pb_server_checksum_listing(File, #state{data_dir=DataDir}=S) ->
{Wedged_p, _CurrentEpochId} = ets:lookup_element(S#state.etstab, epoch, 2),
case {Wedged_p, sanitize_file_string(File)} of
{true, _} ->
{error, wedged};
{false, ok} ->
ok = sync_checksum_file(File),
CSumPath = machi_util:make_checksum_filename(DataDir, File),
%% case file:open(CSumPath, [read, raw, binary]) of
case file:read_file(CSumPath) of
{ok, Bin} ->
{ok, Bin};
{error, enoent} ->
{error, no_such_file};
{error, _} ->
{error, bad_arg}
end;
_ ->
{error, bad_arg}
end.
do_net_server_checksum_listing2(Sock, File, DataDir) -> do_net_server_checksum_listing2(Sock, File, DataDir) ->
ok = sync_checksum_file(File), ok = sync_checksum_file(File),

View file

@ -478,55 +478,6 @@ trunc_hack(Host, TcpPort, EpochID, File) when is_integer(TcpPort) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%
append_chunk2(Sock, EpochID, Prefix0, Chunk0, ChunkExtra) ->
erase(bad_sock),
try
%% TODO: add client-side checksum to the server's protocol
%% _ = machi_util:checksum_chunk(Chunk),
Prefix = machi_util:make_binary(Prefix0),
{CSum, Chunk} = case Chunk0 of
{_,_} ->
Chunk0;
XX when is_binary(XX) ->
SHA = machi_util:checksum_chunk(Chunk0),
{<<?CSUM_TAG_CLIENT_SHA:8, SHA/binary>>, Chunk0}
end,
Len = iolist_size(Chunk),
true = (Len =< ?MAX_CHUNK_SIZE),
{EpochNum, EpochCSum} = EpochID,
EpochIDHex = machi_util:bin_to_hexstr(
<<EpochNum:(4*8)/big, EpochCSum/binary>>),
CSumHex = machi_util:bin_to_hexstr(CSum),
LenHex = machi_util:int_to_hexbin(Len, 32),
ExtraHex = machi_util:int_to_hexbin(ChunkExtra, 32),
Cmd = [<<"A ">>, EpochIDHex, CSumHex, LenHex, ExtraHex, Prefix, 10],
ok = w_send(Sock, [Cmd, Chunk]),
{ok, Line} = w_recv(Sock, 0),
PathLen = byte_size(Line) - 3 - (2*(1+20)) - 16 - 1 - 1 - 1,
case Line of
<<"OK ", ServerCSum:(2*(1+20))/binary, " ",
OffsetHex:16/binary, " ",
Path:PathLen/binary, _:1/binary>> ->
Offset = machi_util:hexstr_to_int(OffsetHex),
{ok, {Offset, Len, Path}};
<<"ERROR BAD-ARG", _/binary>> ->
{error, bad_arg};
<<"ERROR WEDGED", _/binary>> ->
{error, wedged};
<<"ERROR BAD-CHECKSUM", _/binary>> ->
{error, bad_checksum};
<<"ERROR ", Rest/binary>> ->
{error, Rest}
end
catch
throw:Error ->
put(bad_sock, Sock),
Error;
error:{badmatch,_}=BadMatch ->
put(bad_sock, Sock),
{error, {badmatch, BadMatch, erlang:get_stacktrace()}}
end.
read_chunk2(Sock, EpochID, File0, Offset, Size) -> read_chunk2(Sock, EpochID, File0, Offset, Size) ->
erase(bad_sock), erase(bad_sock),
try try
@ -582,74 +533,40 @@ read_chunk2(Sock, EpochID, File0, Offset, Size) ->
{error, {badmatch, BadMatch, erlang:get_stacktrace()}} {error, {badmatch, BadMatch, erlang:get_stacktrace()}}
end. end.
list2(Sock, EpochID) -> append_chunk2(Sock, EpochID, Prefix0, Chunk0, ChunkExtra) ->
try ReqID = <<"id">>,
{EpochNum, EpochCSum} = EpochID, {Chunk, CSum_tag, CSum} =
EpochIDHex = machi_util:bin_to_hexstr( case Chunk0 of
<<EpochNum:(4*8)/big, EpochCSum/binary>>), X when is_binary(X) ->
ok = w_send(Sock, [<<"L ">>, EpochIDHex, <<"\n">>]), {Chunk0, ?CSUM_TAG_NONE, <<>>};
ok = w_setopts(Sock, [{packet, line}]), {ChunkCSum, Chk} ->
case w_recv(Sock, 0) of {Tag, CS} = machi_util:unmake_tagged_csum(ChunkCSum),
{ok, <<"OK\n">>} -> {Chk, Tag, CS}
Res = list3(w_recv(Sock, 0), Sock), end,
ok = w_setopts(Sock, [{packet, raw}]), PKey = <<>>, % TODO
{ok, Res}; Prefix = machi_util:make_binary(Prefix0),
{ok, <<"ERROR WEDGED\n">>} -> Req = machi_pb_translate:to_pb_request(
{error, wedged}; ReqID,
{ok, <<"ERROR ", Rest/binary>>} -> {low_append_chunk, EpochID, PKey, Prefix, Chunk, CSum_tag, CSum,
{error, Rest} ChunkExtra}),
end do_pb_request_common(Sock, ReqID, Req).
catch
throw:Error ->
Error;
error:{case_clause,_}=Noo ->
put(bad_sock, Sock),
{error, {badmatch, Noo, erlang:get_stacktrace()}};
error:{badmatch,_}=BadMatch ->
put(bad_sock, Sock),
{error, {badmatch, BadMatch}}
end.
list3({ok, <<".\n">>}, _Sock) -> list2(Sock, EpochID) ->
[]; ReqID = <<"id">>,
list3({ok, Line}, Sock) -> Req = machi_pb_translate:to_pb_request(
FileLen = byte_size(Line) - 16 - 1 - 1, ReqID, {low_list_files, EpochID}),
<<SizeHex:16/binary, " ", File:FileLen/binary, _/binary>> = Line, do_pb_request_common(Sock, ReqID, Req).
Size = machi_util:hexstr_to_int(SizeHex),
[{Size, File}|list3(w_recv(Sock, 0), Sock)];
list3(Else, _Sock) ->
throw({server_protocol_error, Else}).
wedge_status2(Sock) -> wedge_status2(Sock) ->
try ReqID = <<"id">>,
ok = w_send(Sock, [<<"WEDGE-STATUS\n">>]), Req = machi_pb_translate:to_pb_request(
ok = w_setopts(Sock, [{packet, line}]), ReqID, {low_wedge_status, undefined}),
{ok, <<"OK ", do_pb_request_common(Sock, ReqID, Req).
BooleanHex:2/binary, " ",
EpochHex:8/binary, " ",
CSumHex:40/binary, "\n">>} = w_recv(Sock, 0),
ok = w_setopts(Sock, [{packet, raw}]),
Boolean = if BooleanHex == <<"00">> -> false;
BooleanHex == <<"01">> -> true
end,
Res = {Boolean, {machi_util:hexstr_to_int(EpochHex),
machi_util:hexstr_to_bin(CSumHex)}},
{ok, Res}
catch
throw:Error ->
Error;
error:{case_clause,_}=Noo ->
put(bad_sock, Sock),
{error, {badmatch, Noo, erlang:get_stacktrace()}};
error:{badmatch,_}=BadMatch ->
put(bad_sock, Sock),
{error, {badmatch, BadMatch}}
end.
echo2(Sock, Message) -> echo2(Sock, Message) ->
ReqID = <<"id">>, ReqID = <<"id">>,
Req = machi_pb_translate:to_pb_request( Req = machi_pb_translate:to_pb_request(
ReqID, {low_echo, Message}), ReqID, {low_echo, undefined, Message}),
do_pb_request_common(Sock, ReqID, Req). do_pb_request_common(Sock, ReqID, Req).
checksum_list2(Sock, EpochID, File) -> checksum_list2(Sock, EpochID, File) ->

View file

@ -35,11 +35,11 @@
from_pb_request(#mpb_ll_request{ from_pb_request(#mpb_ll_request{
req_id=ReqID, req_id=ReqID,
echo=#mpb_echoreq{message=Msg}}) -> echo=#mpb_echoreq{message=Msg}}) ->
{ReqID, {low_echo, Msg}}; {ReqID, {low_echo, undefined, Msg}};
from_pb_request(#mpb_ll_request{ from_pb_request(#mpb_ll_request{
req_id=ReqID, req_id=ReqID,
auth=#mpb_authreq{user=User, password=Pass}}) -> auth=#mpb_authreq{user=User, password=Pass}}) ->
{ReqID, {low_auth, User, Pass}}; {ReqID, {low_auth, undefined, User, Pass}};
from_pb_request(#mpb_ll_request{ from_pb_request(#mpb_ll_request{
req_id=ReqID, req_id=ReqID,
append_chunk=#mpb_ll_appendchunkreq{ append_chunk=#mpb_ll_appendchunkreq{
@ -93,7 +93,7 @@ from_pb_request(#mpb_ll_request{
from_pb_request(#mpb_ll_request{ from_pb_request(#mpb_ll_request{
req_id=ReqID, req_id=ReqID,
wedge_status=#mpb_ll_wedgestatusreq{}}) -> wedge_status=#mpb_ll_wedgestatusreq{}}) ->
{ReqID, {low_wedge_status}}; {ReqID, {low_wedge_status, undefined}};
%%qqq %%qqq
from_pb_request(#mpb_request{req_id=ReqID, from_pb_request(#mpb_request{req_id=ReqID,
echo=#mpb_echoreq{message=Msg}}) -> echo=#mpb_echoreq{message=Msg}}) ->
@ -203,7 +203,7 @@ from_pb_response(#mpb_ll_response{
Wedged_p = if PB_Wedged == 1 -> true; Wedged_p = if PB_Wedged == 1 -> true;
PB_Wedged == 0 -> false PB_Wedged == 0 -> false
end, end,
{ReqID, {EpochID, Wedged_p}}; {ReqID, {ok, {Wedged_p, EpochID}}};
%%qqq %%qqq
from_pb_response(#mpb_ll_response{ from_pb_response(#mpb_ll_response{
req_id=ReqID, req_id=ReqID,
@ -265,11 +265,11 @@ from_pb_response(#mpb_ll_response{
%% TODO: move the #mbp_* record making code from %% TODO: move the #mbp_* record making code from
%% machi_pb_high_client:do_send_sync() clauses into to_pb_request(). %% machi_pb_high_client:do_send_sync() clauses into to_pb_request().
to_pb_request(ReqID, {low_echo, Msg}) -> to_pb_request(ReqID, {low_echo, _BogusEpochID, Msg}) ->
#mpb_ll_request{ #mpb_ll_request{
req_id=ReqID, req_id=ReqID,
echo=#mpb_echoreq{message=Msg}}; echo=#mpb_echoreq{message=Msg}};
to_pb_request(ReqID, {low_auth, User, Pass}) -> to_pb_request(ReqID, {low_auth, _BogusEpochID, User, Pass}) ->
#mpb_ll_request{req_id=ReqID, #mpb_ll_request{req_id=ReqID,
auth=#mpb_authreq{user=User, password=Pass}}; auth=#mpb_authreq{user=User, password=Pass}};
to_pb_request(ReqID, {low_append_chunk, EpochID, PKey, Prefix, Chunk, to_pb_request(ReqID, {low_append_chunk, EpochID, PKey, Prefix, Chunk,
@ -316,16 +316,16 @@ to_pb_request(ReqID, {low_list_files, EpochID}) ->
PB_EpochID = conv_from_epoch_id(EpochID), PB_EpochID = conv_from_epoch_id(EpochID),
#mpb_ll_request{req_id=ReqID, #mpb_ll_request{req_id=ReqID,
list_files=#mpb_ll_listfilesreq{epoch_id=PB_EpochID}}; list_files=#mpb_ll_listfilesreq{epoch_id=PB_EpochID}};
to_pb_request(ReqID, {low_wedge_status}) -> to_pb_request(ReqID, {low_wedge_status, _BogusEpochID}) ->
#mpb_ll_request{req_id=ReqID, #mpb_ll_request{req_id=ReqID,
wedge_status=#mpb_ll_wedgestatusreq{}}. wedge_status=#mpb_ll_wedgestatusreq{}}.
%%qqq %%qqq
to_pb_response(ReqID, {low_echo, _Msg}, Resp) -> to_pb_response(ReqID, {low_echo, _BogusEpochID, _Msg}, Resp) ->
#mpb_ll_response{ #mpb_ll_response{
req_id=ReqID, req_id=ReqID,
echo=#mpb_echoresp{message=Resp}}; echo=#mpb_echoresp{message=Resp}};
to_pb_response(ReqID, {low_auth, _, _}, Resp) -> to_pb_response(ReqID, {low_auth, _, _, _}, Resp) ->
#mpb_ll_response{ #mpb_ll_response{
req_id=ReqID, req_id=ReqID,
auth=#mpb_authresp{code=Resp}}; auth=#mpb_authresp{code=Resp}};
@ -392,11 +392,10 @@ to_pb_response(ReqID, {low_list_files, _EpochID}, Resp) ->
_Else -> _Else ->
make_ll_error_resp(ReqID, 66, io_lib:format("err ~p", [_Else])) make_ll_error_resp(ReqID, 66, io_lib:format("err ~p", [_Else]))
end; end;
to_pb_response(ReqID, {low_wedge_status}, {EpochID, Wedged_p}=_Resp) -> to_pb_response(ReqID, {low_wedge_status, _BogusEpochID}, Resp) ->
{Wedged_p, EpochID} = Resp,
PB_Wedged = conv_from_boolean(Wedged_p),
PB_EpochID = conv_from_epoch_id(EpochID), PB_EpochID = conv_from_epoch_id(EpochID),
PB_Wedged = if Wedged_p == true -> 1;
Wedged_p == false -> 0
end,
#mpb_ll_response{req_id=ReqID, #mpb_ll_response{req_id=ReqID,
wedge_status=#mpb_ll_wedgestatusresp{epoch_id=PB_EpochID, wedge_status=#mpb_ll_wedgestatusresp{epoch_id=PB_EpochID,
wedged_flag=PB_Wedged}}; wedged_flag=PB_Wedged}};
@ -666,3 +665,8 @@ conv_to_boolean(0) ->
false; false;
conv_to_boolean(N) when is_integer(N) -> conv_to_boolean(N) when is_integer(N) ->
true. true.
conv_from_boolean(false) ->
0;
conv_from_boolean(true) ->
1.

View file

@ -25,6 +25,7 @@
-export([ -export([
checksum_chunk/1, checksum_chunk/1,
make_tagged_csum/1, make_tagged_csum/2, make_tagged_csum/1, make_tagged_csum/2,
unmake_tagged_csum/1,
hexstr_to_bin/1, bin_to_hexstr/1, hexstr_to_bin/1, bin_to_hexstr/1,
hexstr_to_int/1, int_to_hexstr/2, int_to_hexbin/2, hexstr_to_int/1, int_to_hexstr/2, int_to_hexbin/2,
make_binary/1, make_string/1, make_binary/1, make_string/1,