WIP: giant hairball

This commit is contained in:
Scott Lystig Fritchie 2015-06-26 16:58:24 +09:00
parent 6d95d8669c
commit fb975eea46
4 changed files with 155 additions and 179 deletions

View file

@ -288,7 +288,7 @@ io:format(user, "\nSSS SockError ~p\n", [SockError]),
end.
do_pb_request(PB_request, S) ->
Req = machi_pb_translate:from_pb(PB_request),
Req = machi_pb_translate:from_pb_request(PB_request),
io:format(user, "\nSSS Req ~p\n", [Req]),
{ReqID, Cmd, Result, S2} =
case Req of
@ -301,7 +301,7 @@ io:format(user, "\nSSS Req ~p\n", [Req]),
nope ->
{foo, bar, baz}
end,
{machi_pb_translate:to_pb(ReqID, Cmd, Result), S2}.
{machi_pb_translate:to_pb_response(ReqID, Cmd, Result), S2}.
net_server_loop_old(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) ->
%% TODO: Add testing control knob to adjust this timeout and/or inject

View file

@ -648,13 +648,13 @@ wedge_status2(Sock) ->
echo2(Sock, Message) ->
ReqID = <<"id">>,
Req = machi_pb_translate:to_pb(
Req = machi_pb_translate:to_pb_request(
ReqID, {low_echo, Message}),
do_pb_request_common(Sock, ReqID, Req).
checksum_list2(Sock, EpochID, File) ->
ReqID = <<"id">>,
Req = machi_pb_translate:to_pb(
Req = machi_pb_translate:to_pb_request(
ReqID, {low_checksum_list, EpochID, File}),
do_pb_request_common(Sock, ReqID, Req).
@ -843,7 +843,7 @@ io:format(user, "\nCCC Req ~p\n", [Req]),
{ok, RespBin} ->
Resp = machi_pb:decode_mpb_ll_response(RespBin),
io:format(user, "\nCCC Resp ~p\n", [Resp]),
{ReqID2, Reply} = machi_pb_translate:from_pb(Resp),
{ReqID2, Reply} = machi_pb_translate:from_pb_response(Resp),
io:format(user, "\nCCC ReqID2 ~p Reply ~p\n", [ReqID2, Reply]),
true = (ReqID == ReqID2 orelse ReqID2 == <<>>),
Reply;

View file

@ -53,7 +53,7 @@ protocol_buffers_loop(Sock, Clnt) ->
do_pb_request(PB_request, Clnt) ->
{ReqID, Cmd, Result} =
case machi_pb_translate:from_pb(PB_request) of
case machi_pb_translate:from_pb_request(PB_request) of
{RqID, {high_echo, Msg}=CMD} ->
Rs = Msg,
{RqID, CMD, Rs};
@ -83,4 +83,4 @@ do_pb_request(PB_request, Clnt) ->
Rs = {ErrCode, ErrMsg},
{RqID, CMD, Rs}
end,
machi_pb_translate:to_pb(ReqID, Cmd, Result).
machi_pb_translate:to_pb_response(ReqID, Cmd, Result).

View file

@ -20,98 +20,40 @@
-module(machi_pb_translate).
%% @doc Adapt impedence mismatches between Erlang and Protocol Buffers.
-include("machi.hrl").
-include("machi_pb.hrl").
-include("machi_projection.hrl").
-export([from_pb/1,
to_pb/2,
to_pb/3
-export([from_pb_request/1,
from_pb_response/1,
to_pb_request/2,
to_pb_response/3
]).
from_pb(#mpb_ll_request{req_id=ReqID,
echo=#mpb_echoreq{message=Msg}}) ->
from_pb_request(#mpb_ll_request{
req_id=ReqID,
echo=#mpb_echoreq{message=Msg}}) ->
{ReqID, {low_echo, Msg}};
from_pb(#mpb_ll_request{req_id=ReqID,
auth=#mpb_authreq{user=User, password=Pass}}) ->
from_pb_request(#mpb_ll_request{
req_id=ReqID,
auth=#mpb_authreq{user=User, password=Pass}}) ->
{ReqID, {low_auth, User, Pass}};
from_pb(#mpb_ll_request{
req_id=ReqID,
checksum_list=#mpb_ll_checksumlistreq{epoch_id=PB_EpochID,
file=File}}) ->
from_pb_request(#mpb_ll_request{
req_id=ReqID,
checksum_list=#mpb_ll_checksumlistreq{epoch_id=PB_EpochID,
file=File}}) ->
EpochID = conv_to_epoch_id(PB_EpochID),
{ReqID, {low_checksum_list, EpochID, File}};
from_pb(#mpb_ll_response{req_id=ReqID,
echo=#mpb_echoresp{message=Msg}}) ->
{ReqID, Msg};
from_pb(#mpb_ll_response{req_id=ReqID,
checksum_list=#mpb_ll_checksumlistresp{
status=Status, chunk=Chunk}}) ->
case Status of
'OK' ->
{ReqID, {ok, Chunk}};
_ ->
{ReqID, machi_pb_high_client:convert_general_status_code(Status)}
end;
from_pb(#mpb_ll_response{req_id=ReqID,
proj_gl=#mpb_ll_getlatestepochidresp{
status=Status, epoch_id=EID}}) ->
case Status of
'OK' ->
#mpb_epochid{epoch_number=Epoch, epoch_csum=CSum} = EID,
{ReqID, {ok, {Epoch, CSum}}};
_ ->
{ReqID, machi_pb_high_client:convert_general_status_code(Status)}
end;
from_pb(#mpb_ll_response{req_id=ReqID,
proj_rl=#mpb_ll_readlatestprojectionresp{
status=Status, proj=P}}) ->
case Status of
'OK' ->
{ReqID, {ok, conv_to_projection_v1(P)}};
_ ->
{ReqID, machi_pb_high_client:convert_general_status_code(Status)}
end;
from_pb(#mpb_ll_response{req_id=ReqID,
proj_rp=#mpb_ll_readprojectionresp{
status=Status, proj=P}}) ->
case Status of
'OK' ->
{ReqID, {ok, conv_to_projection_v1(P)}};
_ ->
{ReqID, machi_pb_high_client:convert_general_status_code(Status)}
end;
from_pb(#mpb_ll_response{req_id=ReqID,
proj_wp=#mpb_ll_writeprojectionresp{
status=Status}}) ->
{ReqID, machi_pb_high_client:convert_general_status_code(Status)};
from_pb(#mpb_ll_response{req_id=ReqID,
proj_ga=#mpb_ll_getallprojectionsresp{
status=Status, projs=ProjsM}}) ->
case Status of
'OK' ->
{ReqID, {ok, [conv_to_projection_v1(ProjM) || ProjM <- ProjsM]}};
_ ->
{ReqID, machi_pb_high_client:convert_general_status_code(Status)}
end;
from_pb(#mpb_ll_response{req_id=ReqID,
proj_la=#mpb_ll_listallprojectionsresp{
status=Status, epochs=Epochs}}) ->
case Status of
'OK' ->
{ReqID, {ok, Epochs}};
_ ->
{ReqID< machi_pb_high_client:convert_general_status_code(Status)}
end;
%-%-%-%-%
from_pb(#mpb_request{req_id=ReqID,
echo=#mpb_echoreq{message=Msg}}) ->
from_pb_request(#mpb_request{req_id=ReqID,
echo=#mpb_echoreq{message=Msg}}) ->
{ReqID, {high_echo, Msg}};
from_pb(#mpb_request{req_id=ReqID,
auth=#mpb_authreq{user=User, password=Pass}}) ->
from_pb_request(#mpb_request{req_id=ReqID,
auth=#mpb_authreq{user=User, password=Pass}}) ->
{ReqID, {high_auth, User, Pass}};
from_pb(#mpb_request{req_id=ReqID,
append_chunk=IR=#mpb_appendchunkreq{}}) ->
from_pb_request(#mpb_request{req_id=ReqID,
append_chunk=IR=#mpb_appendchunkreq{}}) ->
#mpb_appendchunkreq{placement_key=__todoPK,
prefix=Prefix,
chunk=ChunkBin,
@ -120,151 +62,184 @@ from_pb(#mpb_request{req_id=ReqID,
TaggedCSum = make_tagged_csum(CSum, ChunkBin),
{ReqID, {high_append_chunk, __todoPK, Prefix, ChunkBin, TaggedCSum,
ChunkExtra}};
from_pb(#mpb_request{req_id=ReqID,
write_chunk=IR=#mpb_writechunkreq{}}) ->
from_pb_request(#mpb_request{req_id=ReqID,
write_chunk=IR=#mpb_writechunkreq{}}) ->
#mpb_writechunkreq{file=File,
offset=Offset,
chunk=ChunkBin,
csum=CSum} = IR,
TaggedCSum = make_tagged_csum(CSum, ChunkBin),
{ReqID, {high_write_chunk, File, Offset, ChunkBin, TaggedCSum}};
from_pb(#mpb_request{req_id=ReqID,
read_chunk=IR=#mpb_readchunkreq{}}) ->
from_pb_request(#mpb_request{req_id=ReqID,
read_chunk=IR=#mpb_readchunkreq{}}) ->
#mpb_readchunkreq{file=File,
offset=Offset,
size=Size} = IR,
{ReqID, {high_read_chunk, File, Offset, Size}};
from_pb(#mpb_request{req_id=ReqID,
checksum_list=IR=#mpb_checksumlistreq{}}) ->
from_pb_request(#mpb_request{req_id=ReqID,
checksum_list=IR=#mpb_checksumlistreq{}}) ->
#mpb_checksumlistreq{file=File} = IR,
{ReqID, {high_checksum_list, File}};
from_pb(#mpb_request{req_id=ReqID,
list_files=_IR=#mpb_listfilesreq{}}) ->
from_pb_request(#mpb_request{req_id=ReqID,
list_files=_IR=#mpb_listfilesreq{}}) ->
{ReqID, {high_list_files}};
from_pb(#mpb_request{req_id=ReqID}) ->
from_pb_request(#mpb_request{req_id=ReqID}) ->
{ReqID, {high_error, 999966, "Unknown request"}};
from_pb(_) ->
from_pb_request(_) ->
{<<>>, {high_error, 999667, "Unknown PB request"}}.
to_pb(ReqID, {low_echo, Msg}) ->
from_pb_response(#mpb_ll_response{
req_id=ReqID,
echo=#mpb_echoresp{message=Msg}}) ->
{ReqID, Msg};
from_pb_response(#mpb_ll_response{
req_id=ReqID,
checksum_list=#mpb_ll_checksumlistresp{
status=Status, chunk=Chunk}}) ->
case Status of
'OK' ->
{ReqID, {ok, Chunk}};
_ ->
{ReqID, machi_pb_high_client:convert_general_status_code(Status)}
end;
from_pb_response(#mpb_ll_response{
req_id=ReqID,
proj_gl=#mpb_ll_getlatestepochidresp{
status=Status, epoch_id=EID}}) ->
case Status of
'OK' ->
#mpb_epochid{epoch_number=Epoch, epoch_csum=CSum} = EID,
{ReqID, {ok, {Epoch, CSum}}};
_ ->
{ReqID, machi_pb_high_client:convert_general_status_code(Status)}
end;
from_pb_response(#mpb_ll_response{
req_id=ReqID,
proj_rl=#mpb_ll_readlatestprojectionresp{
status=Status, proj=P}}) ->
case Status of
'OK' ->
{ReqID, {ok, conv_to_projection_v1(P)}};
_ ->
{ReqID, machi_pb_high_client:convert_general_status_code(Status)}
end;
from_pb_response(#mpb_ll_response{
req_id=ReqID,
proj_rp=#mpb_ll_readprojectionresp{
status=Status, proj=P}}) ->
case Status of
'OK' ->
{ReqID, {ok, conv_to_projection_v1(P)}};
_ ->
{ReqID, machi_pb_high_client:convert_general_status_code(Status)}
end;
from_pb_response(#mpb_ll_response{
req_id=ReqID,
proj_wp=#mpb_ll_writeprojectionresp{
status=Status}}) ->
{ReqID, machi_pb_high_client:convert_general_status_code(Status)};
from_pb_response(#mpb_ll_response{
req_id=ReqID,
proj_ga=#mpb_ll_getallprojectionsresp{
status=Status, projs=ProjsM}}) ->
case Status of
'OK' ->
{ReqID, {ok, [conv_to_projection_v1(ProjM) || ProjM <- ProjsM]}};
_ ->
{ReqID, machi_pb_high_client:convert_general_status_code(Status)}
end;
from_pb_response(#mpb_ll_response{
req_id=ReqID,
proj_la=#mpb_ll_listallprojectionsresp{
status=Status, epochs=Epochs}}) ->
case Status of
'OK' ->
{ReqID, {ok, Epochs}};
_ ->
{ReqID< machi_pb_high_client:convert_general_status_code(Status)}
end.
to_pb_request(ReqID, {low_echo, Msg}) ->
#mpb_ll_request{
req_id=ReqID,
echo=#mpb_echoreq{message=Msg}};
to_pb(ReqID, {low_checksum_list, EpochID, File}) ->
req_id=ReqID,
echo=#mpb_echoreq{message=Msg}};
to_pb_request(ReqID, {low_checksum_list, EpochID, File}) ->
PB_EpochID = conv_from_epoch_id(EpochID),
#mpb_ll_request{
req_id=ReqID,
checksum_list=#mpb_ll_checksumlistreq{epoch_id=PB_EpochID,
file=File}}.
to_pb(ReqID, {low_echo, Msg}, Resp) ->
req_id=ReqID,
checksum_list=#mpb_ll_checksumlistreq{epoch_id=PB_EpochID,
file=File}}.
to_pb_response(ReqID, {low_echo, _Msg}, Resp) ->
#mpb_ll_response{
req_id=ReqID,
echo=#mpb_echoresp{message=Msg}};
to_pb(ReqID, {low_checksum_list, _EpochID, _File}, Resp) ->
req_id=ReqID,
echo=#mpb_echoresp{message=Resp}};
to_pb_response(ReqID, {low_checksum_list, _EpochID, _File}, Resp) ->
case Resp of
{ok, Chunk} ->
make_ll_checksum_list_resp(ReqID, 'OK', Chunk);
{error, bad_arg} ->
make_ll_checksum_list_resp(ReqID, 'BAD_ARG', undefined);
{error, wedged} ->
make_ll_checksum_list_resp(ReqID, 'WEDGED', undefined);
{error, bad_checksum} ->
make_ll_checksum_list_resp(ReqID, 'BAD_CHECKSUM', undefined);
{error, partition} ->
make_ll_checksum_list_resp(ReqID, 'PARTITION', undefined);
{error, no_such_file} ->
make_ll_checksum_list_resp(ReqID, 'NO_SUCH_FILE', undefined);
{ok, _Chunk}=OK ->
make_ll_checksum_list_resp(ReqID, OK);
{error, _}=Error ->
make_ll_checksum_list_resp(ReqID, Error);
_Else ->
make_ll_error_resp(ReqID, 66, io_lib:format("err ~p", [_Else]))
end;
to_pb(ReqID, {high_echo, _Msg}, Resp) ->
to_pb_response(ReqID, {high_echo, _Msg}, Resp) ->
Msg = Resp,
#mpb_response{req_id=ReqID,
echo=#mpb_echoresp{message=Msg}};
to_pb(ReqID, {high_auth, _User, _Pass}, _Resp) ->
to_pb_response(ReqID, {high_auth, _User, _Pass}, _Resp) ->
#mpb_response{req_id=ReqID,
generic=#mpb_errorresp{code=1,
msg="AUTH not implemented"}};
to_pb(ReqID, {high_append_chunk, _TODO, _Prefix, _ChunkBin, _TSum, _CE}, Resp)->
to_pb_response(ReqID, {high_append_chunk, _TODO, _Prefix, _ChunkBin, _TSum, _CE}, Resp)->
case Resp of
{ok, {Offset, Size, File}} ->
make_append_resp(ReqID, 'OK',
#mpb_chunkpos{offset=Offset,
chunk_size=Size,
file_name=File});
{error, bad_arg} ->
make_append_resp(ReqID, 'BAD_ARG');
{error, wedged} ->
make_append_resp(ReqID, 'WEDGED');
{error, bad_checksum} ->
make_append_resp(ReqID, 'BAD_CHECKSUM');
{error, partition} ->
make_append_resp(ReqID, 'PARTITION');
{error, Status} ->
make_append_resp(ReqID, conv_from_status(Status), undefined);
_Else ->
make_error_resp(ReqID, 66, io_lib:format("err ~p", [_Else]))
end;
to_pb(ReqID, {high_write_chunk, _File, _Offset, _ChunkBin, _TaggedCSum}, Resp) ->
to_pb_response(ReqID, {high_write_chunk, _File, _Offset, _ChunkBin, _TaggedCSum}, Resp) ->
case Resp of
{ok, {_,_,_}} ->
%% machi_cr_client returns ok 2-tuple, convert to simple ok.
make_write_resp(ReqID, 'OK');
{error, bad_arg} ->
make_write_resp(ReqID, 'BAD_ARG');
{error, wedged} ->
make_write_resp(ReqID, 'WEDGED');
{error, bad_checksum} ->
make_write_resp(ReqID, 'BAD_CHECKSUM');
{error, partition} ->
make_write_resp(ReqID, 'PARTITION');
{error, Status} ->
make_write_resp(ReqID, conv_from_status(Status));
_Else ->
make_error_resp(ReqID, 66, io_lib:format("err ~p", [_Else]))
end;
to_pb(ReqID, {high_read_chunk, _File, _Offset, _Size}, Resp) ->
to_pb_response(ReqID, {high_read_chunk, _File, _Offset, _Size}, Resp) ->
case Resp of
{ok, Chunk} ->
make_read_resp(ReqID, 'OK', Chunk);
{error, bad_arg} ->
make_read_resp(ReqID, 'BAD_ARG', undefined);
{error, wedged} ->
make_read_resp(ReqID, 'WEDGED', undefined);
{error, bad_checksum} ->
make_read_resp(ReqID, 'BAD_CHECKSUM', undefined);
{error, partition} ->
make_read_resp(ReqID, 'PARTITION', undefined);
{error, Status} ->
make_read_resp(ReqID, conv_from_status(Status), undefined);
_Else ->
make_error_resp(ReqID, 66, io_lib:format("err ~p", [_Else]))
end;
to_pb(ReqID, {high_checksum_list, _File}, Resp) ->
to_pb_response(ReqID, {high_checksum_list, _File}, Resp) ->
case Resp of
{ok, Chunk} ->
make_checksum_list_resp(ReqID, 'OK', Chunk);
{error, bad_arg} ->
make_checksum_list_resp(ReqID, 'BAD_ARG', undefined);
{error, wedged} ->
make_checksum_list_resp(ReqID, 'WEDGED', undefined);
{error, bad_checksum} ->
make_checksum_list_resp(ReqID, 'BAD_CHECKSUM', undefined);
{error, partition} ->
make_checksum_list_resp(ReqID, 'PARTITION', undefined);
{error, Status} ->
make_checksum_list_resp(ReqID, conv_from_status(Status), undefined);
_Else ->
make_error_resp(ReqID, 66, io_lib:format("err ~p", [_Else]))
end;
to_pb(ReqID, {high_list_files}, Resp) ->
to_pb_response(ReqID, {high_list_files}, Resp) ->
case Resp of
{ok, FileInfo} ->
make_list_files_resp(ReqID, 'OK', FileInfo);
{error, bad_arg} ->
make_list_files_resp(ReqID, 'BAD_ARG', []);
{error, wedged} ->
make_list_files_resp(ReqID, 'WEDGED', []);
{error, bad_checksum} ->
make_list_files_resp(ReqID, 'BAD_CHECKSUM', []);
{error, partition} ->
make_list_files_resp(ReqID, 'PARTITION', []);
{error, Status} ->
make_list_files_resp(ReqID, conv_from_status(Status), []);
_Else ->
make_error_resp(ReqID, 66, io_lib:format("err ~p", [_Else]))
end;
to_pb(ReqID, {high_error, _, _}, {ErrCode, ErrMsg}) ->
to_pb_response(ReqID, {high_error, _, _}, {ErrCode, ErrMsg}) ->
make_error_resp(ReqID, ErrCode, ErrMsg).
make_tagged_csum(#mpb_chunkcsum{type='CSUM_TAG_NONE'}, ChunkBin) ->
@ -273,19 +248,20 @@ make_tagged_csum(#mpb_chunkcsum{type='CSUM_TAG_NONE'}, ChunkBin) ->
make_tagged_csum(#mpb_chunkcsum{type='CSUM_TAG_CLIENT_SHA', csum=CSum}, _CB) ->
machi_util:make_tagged_csum(client_sha, CSum).
make_ll_checksum_list_resp(ReqID, Status, __todo__Chunk) ->
Chunk = <<"TODO item: refactor the checksum_list op to return simply the text file representation of the checksums?">>,
make_ll_checksum_list_resp(ReqID, {ok, Chunk}) ->
#mpb_ll_response{req_id=ReqID,
checksum_list=#mpb_ll_checksumlistresp{status=Status,
chunk=Chunk}}.
checksum_list=#mpb_ll_checksumlistresp{status='OK',
chunk=Chunk}};
make_ll_checksum_list_resp(ReqID, Error) ->
Status = conv_from_status(Error),
#mpb_ll_response{req_id=ReqID,
checksum_list=#mpb_ll_checksumlistresp{status=Status}}.
make_ll_error_resp(ReqID, Code, Msg) ->
#mpb_ll_response{req_id=ReqID,
generic=#mpb_errorresp{code=Code,
msg=Msg}}.
make_append_resp(ReqID, Status) ->
make_append_resp(ReqID, Status, undefined).
make_append_resp(ReqID, Status, Where) ->
#mpb_response{req_id=ReqID,
append_chunk=#mpb_appendchunkresp{status=Status,
@ -381,10 +357,10 @@ conv_to_members_dict(List) ->
#mpb_membersdictentry{key=K, val=V} <- List]).
conv_from_p_srvr(#p_srvr{name=Name,
proto_mod=ProtoMod,
address=Address,
port=Port,
props=Props}) ->
proto_mod=ProtoMod,
address=Address,
port=Port,
props=Props}) ->
#mpb_p_srvr{name=to_list(Name),
proto_mod=to_list(ProtoMod),
address=to_list(Address),