WIP: giant hairball, bleh, low-level checksum_list() barely working

This commit is contained in:
Scott Lystig Fritchie 2015-06-26 16:25:12 +09:00
parent 90efc41167
commit 6d95d8669c
10 changed files with 660 additions and 374 deletions

View file

@ -30,3 +30,6 @@
-define(CSUM_TAG_SERVER_SHA, 2). % Server-genereated SHA1
-define(CSUM_TAG_SERVER_REGEN_SHA, 3). % Server-regenerated SHA1
%% Protocol Buffers goop
-define(PB_MAX_MSG_SIZE, (33*1024*1024)).
-define(PB_PACKET_OPTS, [{packet, 4}, {packet_size, ?PB_MAX_MSG_SIZE}]).

View file

@ -43,6 +43,7 @@ enum Mpb_GeneralStatusCode {
PARTITION = 4;
NOT_WRITTEN = 5;
WRITTEN = 6;
NO_SUCH_FILE = 7;
BAD_JOSS = 255; // Only for testing by the Taipan
}
@ -538,6 +539,13 @@ message Mpb_LL_Request {
optional Mpb_LL_WriteProjectionReq proj_wp = 15;
optional Mpb_LL_GetAllProjectionsReq proj_ga = 16;
optional Mpb_LL_ListAllProjectionsReq proj_la = 17;
optional Mpb_LL_AppendChunkReq append_chunk = 30;
optional Mpb_LL_WriteChunkReq write_chunk = 31;
optional Mpb_LL_ReadChunkReq read_chunk = 32;
optional Mpb_LL_ChecksumListReq checksum_list = 33;
optional Mpb_LL_ListFilesReq list_files = 34;
optional Mpb_LL_WedgeStatusReq wedge_status = 35;
}
message Mpb_LL_Response {
@ -562,4 +570,11 @@ message Mpb_LL_Response {
optional Mpb_LL_WriteProjectionResp proj_wp = 15;
optional Mpb_LL_GetAllProjectionsResp proj_ga = 16;
optional Mpb_LL_ListAllProjectionsResp proj_la = 17;
optional Mpb_LL_AppendChunkResp append_chunk = 30;
optional Mpb_LL_WriteChunkResp write_chunk = 31;
optional Mpb_LL_ReadChunkResp read_chunk = 32;
optional Mpb_LL_ChecksumListResp checksum_list = 33;
optional Mpb_LL_ListFilesResp list_files = 34;
optional Mpb_LL_WedgeStatusResp wedge_status = 35;
}

View file

@ -75,6 +75,7 @@
-include_lib("kernel/include/file.hrl").
-include("machi.hrl").
-include("machi_pb.hrl").
-include("machi_projection.hrl").
-define(SERVER_CMD_READ_TIMEOUT, 600*1000).
@ -207,8 +208,8 @@ start_append_server(S, AckPid) ->
run_listen_server(#state{flu_name=FluName, tcp_port=TcpPort}=S) ->
register(make_listener_regname(FluName), self()),
SockOpts = [{reuseaddr, true},
{mode, binary}, {active, false}, {packet, line}],
SockOpts = ?PB_PACKET_OPTS ++
[{reuseaddr, true}, {mode, binary}, {active, false}],
case gen_tcp:listen(TcpPort, SockOpts) of
{ok, LSock} ->
listen_server_loop(LSock, S);
@ -268,8 +269,41 @@ decode_epoch_id(EpochIDHex) ->
machi_util:hexstr_to_bin(EpochIDHex),
{EpochNum, EpochCSum}.
net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) ->
ok = inet:setopts(Sock, [{packet, line}]),
net_server_loop(Sock, S) ->
case gen_tcp:recv(Sock, 0, ?SERVER_CMD_READ_TIMEOUT) of
{ok, Bin} ->
{R, S2} = do_pb_request(catch machi_pb:decode_mpb_ll_request(Bin), S),
Resp = machi_pb:encode_mpb_ll_response(R),
ok = gen_tcp:send(Sock, Resp),
net_server_loop(Sock, S2);
{error, SockError} ->
Msg = io_lib:format("Socket error ~w", [SockError]),
io:format(user, "\nSSS SockError ~p\n", [SockError]),
R = #mpb_ll_response{req_id= <<>>,
generic=#mpb_errorresp{code=1, msg=Msg}},
Resp = machi_pb:encode_mpb_ll_response(R),
_ = (catch gen_tcp:send(Sock, Resp)),
(catch gen_tcp:close(Sock)),
exit(normal)
end.
do_pb_request(PB_request, S) ->
Req = machi_pb_translate:from_pb(PB_request),
io:format(user, "\nSSS Req ~p\n", [Req]),
{ReqID, Cmd, Result, S2} =
case Req of
{RqID, {low_echo, Msg}=CMD} ->
Rs = Msg,
{RqID, CMD, Rs, S};
{RqID, {low_checksum_list, EpochID, File}=CMD} ->
Rs = do_pb_server_checksum_listing(File, S),
{RqID, CMD, Rs, S};
nope ->
{foo, bar, baz}
end,
{machi_pb_translate:to_pb(ReqID, Cmd, Result), S2}.
net_server_loop_old(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) ->
%% TODO: Add testing control knob to adjust this timeout and/or inject
%% timeout condition.
case gen_tcp:recv(Sock, 0, ?SERVER_CMD_READ_TIMEOUT) of
@ -345,8 +379,7 @@ net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) ->
http_server_hack(FluName, PutLine, Sock, S);
<<"PROTOCOL-BUFFERS\n">> ->
ok = gen_tcp:send(Sock, <<"OK\n">>),
ok = inet:setopts(Sock, [{packet, 4},
{packet_size, 33*1024*1024}]),
ok = inet:setopts(Sock, ?PB_PACKET_OPTS),
{ok, Proj} = machi_projection_store:read_latest_projection(
S#state.proj_store, private),
Ps = [P_srvr ||
@ -360,7 +393,7 @@ net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) ->
catch gen_tcp:close(Sock),
exit(normal)
end,
net_server_loop(Sock, S);
net_server_loop_old(Sock, S);
_ ->
catch gen_tcp:close(Sock),
exit(normal)
@ -660,6 +693,28 @@ do_net_server_checksum_listing(Sock, File, DataDir, S) ->
ok = gen_tcp:send(Sock, <<"ERROR BAD-ARG\n">>)
end.
do_pb_server_checksum_listing(File, #state{data_dir=DataDir}=S) ->
{Wedged_p, _CurrentEpochId} = ets:lookup_element(S#state.etstab, epoch, 2),
case {Wedged_p, sanitize_file_string(File)} of
{true, _} ->
{error, wedged};
{false, ok} ->
ok = sync_checksum_file(File),
CSumPath = machi_util:make_checksum_filename(DataDir, File),
%% case file:open(CSumPath, [read, raw, binary]) of
case file:read_file(CSumPath) of
{ok, Bin} ->
{ok, Bin};
{error, enoent} ->
{error, no_such_file};
{error, _} ->
{error, bad_arg}
end;
_ ->
{error, bad_arg}
end.
do_net_server_checksum_listing2(Sock, File, DataDir) ->
ok = sync_checksum_file(File),

View file

@ -51,6 +51,7 @@
-module(machi_flu1_client).
-include("machi.hrl").
-include("machi_pb.hrl").
-include("machi_projection.hrl").
-define(HARD_TIMEOUT, 2500).
@ -73,6 +74,7 @@
list_all_projections/2, list_all_projections/3,
%% Common API
echo/2, echo/3,
quit/1,
%% Connection management API
@ -361,6 +363,25 @@ list_all_projections(Host, TcpPort, ProjType)
disconnect(Sock)
end.
%% @doc Echo -- test protocol round-trip.
-spec echo(port_wrap(), string()) ->
string() | {error, term()}.
echo(Sock, String) when is_list(String) ->
echo2(Sock, String).
%% @doc Get all epoch numbers from the FLU's projection store.
-spec echo(machi_dt:inet_host(), machi_dt:inet_port(), string()) ->
{ok, [non_neg_integer()]} | {error, term()}.
echo(Host, TcpPort, String) when is_list(String) ->
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
try
echo2(Sock, String)
after
disconnect(Sock)
end.
%% @doc Quit &amp; close the connection to remote FLU.
-spec quit(port_wrap()) ->
@ -625,42 +646,17 @@ wedge_status2(Sock) ->
{error, {badmatch, BadMatch}}
end.
echo2(Sock, Message) ->
ReqID = <<"id">>,
Req = machi_pb_translate:to_pb(
ReqID, {low_echo, Message}),
do_pb_request_common(Sock, ReqID, Req).
checksum_list2(Sock, EpochID, File) ->
erase(bad_sock),
try
{EpochNum, EpochCSum} = EpochID,
EpochIDHex = machi_util:bin_to_hexstr(
<<EpochNum:(4*8)/big, EpochCSum/binary>>),
ok = w_send(Sock, [<<"C ">>, EpochIDHex, File, <<"\n">>]),
ok = w_setopts(Sock, [{packet, line}]),
case w_recv(Sock, 0) of
{ok, <<"OK ", Rest/binary>> = Line} ->
put(status, ok), % may be unset later
RestLen = byte_size(Rest) - 1,
<<LenHex:RestLen/binary, _:1/binary>> = Rest,
<<Len:64/big>> = machi_util:hexstr_to_bin(LenHex),
ok = w_setopts(Sock, [{packet, raw}]),
{ok, checksum_list_finish(checksum_list_fast(Sock, Len))};
{ok, <<"ERROR NO-SUCH-FILE", _/binary>>} ->
{error, no_such_file};
{ok, <<"ERROR BAD-ARG", _/binary>>} ->
{error, bad_arg};
{ok, <<"ERROR WEDGED", _/binary>>} ->
{error, wedged};
{ok, Else} ->
throw({server_protocol_error, Else})
end
catch
throw:Error ->
put(bad_sock, Sock),
Error;
error:{case_clause,_}=Noo ->
put(bad_sock, Sock),
{error, {badmatch, Noo, erlang:get_stacktrace()}};
error:{badmatch,_}=BadMatch ->
put(bad_sock, Sock),
{error, {badmatch, BadMatch}}
end.
ReqID = <<"id">>,
Req = machi_pb_translate:to_pb(
ReqID, {low_checksum_list, EpochID, File}),
do_pb_request_common(Sock, ReqID, Req).
checksum_list_fast(Sock, 0) ->
{ok, <<".\n">> = _Line} = w_recv(Sock, 2),
@ -802,58 +798,57 @@ trunc_hack2(Sock, EpochID, File) ->
end.
get_latest_epochid2(Sock, ProjType) ->
ReqID = <<42>>,
Req = machi_pb_wrap:make_projection_req(
<<42>>, {get_latest_epochid, ProjType}),
do_projection_common(Sock, Req).
ReqID, {get_latest_epochid, ProjType}),
do_pb_request_common(Sock, ReqID, Req).
read_latest_projection2(Sock, ProjType) ->
ReqID = <<42>>,
Req = machi_pb_wrap:make_projection_req(
<<42>>, {read_latest_projection, ProjType}),
do_projection_common(Sock, Req).
ReqID, {read_latest_projection, ProjType}),
do_pb_request_common(Sock, ReqID, Req).
read_projection2(Sock, ProjType, Epoch) ->
ReqID = <<42>>,
Req = machi_pb_wrap:make_projection_req(
<<42>>, {read_projection, ProjType, Epoch}),
do_projection_common(Sock, Req).
ReqID, {read_projection, ProjType, Epoch}),
do_pb_request_common(Sock, ReqID, Req).
write_projection2(Sock, ProjType, Proj) ->
ReqID = <<42>>,
Req = machi_pb_wrap:make_projection_req(
<<42>>, {write_projection, ProjType, Proj}),
do_projection_common(Sock, Req).
ReqID, {write_projection, ProjType, Proj}),
do_pb_request_common(Sock, ReqID, Req).
get_all_projections2(Sock, ProjType) ->
ReqID = <<42>>,
Req = machi_pb_wrap:make_projection_req(
<<42>>, {get_all_projections, ProjType}),
do_projection_common(Sock, Req).
ReqID, {get_all_projections, ProjType}),
do_pb_request_common(Sock, ReqID, Req).
list_all_projections2(Sock, ProjType) ->
ReqID = <<42>>,
Req = machi_pb_wrap:make_projection_req(
<<42>>, {list_all_projections, ProjType}),
do_projection_common(Sock, Req).
ReqID, {list_all_projections, ProjType}),
do_pb_request_common(Sock, ReqID, Req).
do_projection_common(Sock, Req) ->
do_pb_request_common(Sock, ReqID, Req) ->
erase(bad_sock),
try
ReqBin = list_to_binary(machi_pb:encode_mpb_ll_request(Req)),
Len = iolist_size(ReqBin),
true = (Len =< ?MAX_CHUNK_SIZE),
LenHex = machi_util:int_to_hexbin(Len, 32),
Cmd = [<<"PROJ ">>, LenHex, <<"\n">>],
ok = w_send(Sock, [Cmd, ReqBin]),
ok = w_setopts(Sock, [{packet, line}]),
io:format(user, "\nCCC Req ~p\n", [Req]),
ok = w_send(Sock, ReqBin),
case w_recv(Sock, 0) of
{ok, Line} ->
case Line of
<<"OK ", ResLenHex:8/binary, "\n">> ->
ResLen = machi_util:hexstr_to_int(ResLenHex),
ok = w_setopts(Sock, [{packet, raw}]),
{ok, RespBin} = w_recv(Sock, ResLen),
ok = w_setopts(Sock, [{packet, line}]),
Resp = machi_pb:decode_mpb_ll_response(RespBin),
machi_pb_wrap:unmake_projection_resp(Resp);
Else ->
{error, Else}
end
{ok, RespBin} ->
Resp = machi_pb:decode_mpb_ll_response(RespBin),
io:format(user, "\nCCC Resp ~p\n", [Resp]),
{ReqID2, Reply} = machi_pb_translate:from_pb(Resp),
io:format(user, "\nCCC ReqID2 ~p Reply ~p\n", [ReqID2, Reply]),
true = (ReqID == ReqID2 orelse ReqID2 == <<>>),
Reply;
Else ->
{error, Else}
end
catch
throw:Error ->
@ -874,6 +869,7 @@ w_connect(#p_srvr{proto_mod=?MODULE, address=Host, port=Port, props=Props})->
case proplists:get_value(session_proto, Props, tcp) of
tcp ->
Sock = machi_util:connect(Host, Port, ?HARD_TIMEOUT),
ok = inet:setopts(Sock, ?PB_PACKET_OPTS),
{w,tcp,Sock};
%% sctp ->
%% %% TODO: not implemented

View file

@ -169,10 +169,8 @@ try_connect(#state{server_list=Ps}=S) ->
do_connect_to_pb_listener(P) ->
try
{ok, Sock} = gen_tcp:connect(P#p_srvr.address, P#p_srvr.port,
[{packet, line}, binary, {active, false}]),
ok = gen_tcp:send(Sock, <<"PROTOCOL-BUFFERS\n">>),
{ok, <<"OK\n">>} = gen_tcp:recv(Sock, 0),
ok = inet:setopts(Sock, [{packet,4}]),
?PB_PACKET_OPTS ++
[binary, {active, false}]),
Sock
catch _X:_Y ->
io:format(user, "\n~p ~p @ ~p\n", [_X, _Y, erlang:get_stacktrace()]),
@ -369,6 +367,8 @@ convert_general_status_code('NOT_WRITTEN') ->
{error, not_written};
convert_general_status_code('WRITTEN') ->
{error, written};
convert_general_status_code('NO_SUCH_FILE') ->
{error, no_such_file};
convert_general_status_code('BAD_JOSS') ->
throw({error, bad_joss_taipan_fixme}).

View file

@ -18,6 +18,8 @@
%%
%% -------------------------------------------------------------------
%% @doc High level protocol server-side processing (temporary?)
-module(machi_pb_server).
-include("machi.hrl").
@ -29,8 +31,7 @@
-export([run_loop/2]).
run_loop(Sock, P_srvr_list) ->
ok = inet:setopts(Sock, [{packet, 4},
{packet_size, 33*1024*1024}]),
ok = inet:setopts(Sock, ?PB_PACKET_OPTS),
{ok, Clnt} = machi_cr_client:start_link(P_srvr_list),
protocol_buffers_loop(Sock, Clnt).
@ -38,178 +39,48 @@ protocol_buffers_loop(Sock, Clnt) ->
case gen_tcp:recv(Sock, 0, ?SERVER_CMD_READ_TIMEOUT) of
{ok, Bin} ->
R = do_pb_request(catch machi_pb:decode_mpb_request(Bin), Clnt),
%% R = #mpb_response{req_id= <<"not paying any attention">>,
%% generic=#mpb_errorresp{code=-6,
%% msg="not implemented"}},
Resp = machi_pb:encode_mpb_response(R),
ok = gen_tcp:send(Sock, Resp),
protocol_buffers_loop(Sock, Clnt);
{error, _} ->
{error, SockError} ->
Msg = io_lib:format("Socket error ~w", [SockError]),
R = #mpb_errorresp{code=1, msg=Msg},
Resp = machi_pb:encode_mpb_response(R),
_ = (catch gen_tcp:send(Sock, Resp)),
(catch gen_tcp:close(Sock)),
exit(normal)
end.
do_pb_request(#mpb_request{req_id=ReqID,
echo=#mpb_echoreq{message=Msg}}, _Clnt) ->
#mpb_response{req_id=ReqID,
echo=#mpb_echoresp{message=Msg}};
do_pb_request(#mpb_request{req_id=ReqID,
auth=#mpb_authreq{}}, _Clnt) ->
#mpb_response{req_id=ReqID,
generic=#mpb_errorresp{code=1,
msg="AUTH not implemented"}};
do_pb_request(#mpb_request{req_id=ReqID,
append_chunk=IR=#mpb_appendchunkreq{}}, Clnt) ->
#mpb_appendchunkreq{placement_key=__todo__PK,
prefix=Prefix,
chunk=ChunkBin,
csum=CSum,
chunk_extra=ChunkExtra} = IR,
TaggedCSum = make_tagged_csum(CSum, ChunkBin),
Chunk = {TaggedCSum, ChunkBin},
case (catch machi_cr_client:append_chunk_extra(Clnt, Prefix, Chunk,
ChunkExtra)) of
{ok, {Offset, Size, File}} ->
make_append_resp(ReqID, 'OK',
#mpb_chunkpos{offset=Offset,
chunk_size=Size,
file_name=File});
{error, bad_arg} ->
make_append_resp(ReqID, 'BAD_ARG');
{error, wedged} ->
make_append_resp(ReqID, 'WEDGED');
{error, bad_checksum} ->
make_append_resp(ReqID, 'BAD_CHECKSUM');
{error, partition} ->
make_append_resp(ReqID, 'PARTITION');
_Else ->
make_error_resp(ReqID, 66, io_lib:format("err ~p", [_Else]))
end;
do_pb_request(#mpb_request{req_id=ReqID,
write_chunk=IR=#mpb_writechunkreq{}}, Clnt) ->
#mpb_writechunkreq{file=File,
offset=Offset,
chunk=ChunkBin,
csum=CSum} = IR,
TaggedCSum = make_tagged_csum(CSum, ChunkBin),
Chunk = {TaggedCSum, ChunkBin},
case (catch machi_cr_client:write_chunk(Clnt, File, Offset, Chunk)) of
{ok, {_,_,_}} ->
%% machi_cr_client returns ok 2-tuple, convert to simple ok.
make_write_resp(ReqID, 'OK');
{error, bad_arg} ->
make_write_resp(ReqID, 'BAD_ARG');
{error, wedged} ->
make_write_resp(ReqID, 'WEDGED');
{error, bad_checksum} ->
make_write_resp(ReqID, 'BAD_CHECKSUM');
{error, partition} ->
make_write_resp(ReqID, 'PARTITION');
_Else ->
make_error_resp(ReqID, 66, io_lib:format("err ~p", [_Else]))
end;
do_pb_request(#mpb_request{req_id=ReqID,
read_chunk=IR=#mpb_readchunkreq{}}, Clnt) ->
#mpb_readchunkreq{file=File,
offset=Offset,
size=Size} = IR,
%% TODO: implement the optional flags in Mpb_ReadChunkReq
case (catch machi_cr_client:read_chunk(Clnt, File, Offset, Size)) of
{ok, Chunk} ->
make_read_resp(ReqID, 'OK', Chunk);
{error, bad_arg} ->
make_read_resp(ReqID, 'BAD_ARG', undefined);
{error, wedged} ->
make_read_resp(ReqID, 'WEDGED', undefined);
{error, bad_checksum} ->
make_read_resp(ReqID, 'BAD_CHECKSUM', undefined);
{error, partition} ->
make_read_resp(ReqID, 'PARTITION', undefined);
_Else ->
make_error_resp(ReqID, 66, io_lib:format("err ~p", [_Else]))
end;
do_pb_request(#mpb_request{req_id=ReqID,
checksum_list=IR=#mpb_checksumlistreq{}}, Clnt) ->
#mpb_checksumlistreq{file=File} = IR,
%% TODO: implement the optional flags in Mpb_ReadChunkReq
case (catch machi_cr_client:checksum_list(Clnt, File)) of
{ok, Chunk} ->
make_checksum_list_resp(ReqID, 'OK', Chunk);
{error, bad_arg} ->
make_checksum_list_resp(ReqID, 'BAD_ARG', undefined);
{error, wedged} ->
make_checksum_list_resp(ReqID, 'WEDGED', undefined);
{error, bad_checksum} ->
make_checksum_list_resp(ReqID, 'BAD_CHECKSUM', undefined);
{error, partition} ->
make_checksum_list_resp(ReqID, 'PARTITION', undefined);
_Else ->
make_error_resp(ReqID, 66, io_lib:format("err ~p", [_Else]))
end;
do_pb_request(#mpb_request{req_id=ReqID,
list_files=IR=#mpb_listfilesreq{}}, Clnt) ->
#mpb_listfilesreq{} = IR,
%% TODO: implement the optional flags in Mpb_ReadChunkReq
case (catch machi_cr_client:list_files(Clnt)) of
{ok, FileInfo} ->
make_list_files_resp(ReqID, 'OK', FileInfo);
{error, bad_arg} ->
make_list_files_resp(ReqID, 'BAD_ARG', []);
{error, wedged} ->
make_list_files_resp(ReqID, 'WEDGED', []);
{error, bad_checksum} ->
make_list_files_resp(ReqID, 'BAD_CHECKSUM', []);
{error, partition} ->
make_list_files_resp(ReqID, 'PARTITION', []);
_Else ->
make_error_resp(ReqID, 66, io_lib:format("err ~p", [_Else]))
end;
do_pb_request(#mpb_request{req_id=ReqID}, _Clnt) ->
#mpb_response{req_id=ReqID,
generic=#mpb_errorresp{code=66,
msg="Unknown request"}};
do_pb_request(_Else, _Clnt) ->
#mpb_response{req_id= <<>>,
generic=#mpb_errorresp{code=67,
msg="Unknown PB request"}}.
do_pb_request(PB_request, Clnt) ->
{ReqID, Cmd, Result} =
case machi_pb_translate:from_pb(PB_request) of
{RqID, {high_echo, Msg}=CMD} ->
Rs = Msg,
{RqID, CMD, Rs};
{RqID, {high_auth, _User, _Pass}} ->
{RqID, not_implemented};
{RqID, {high_append_chunk, _todoPK, Prefix, ChunkBin, TaggedCSum,
ChunkExtra}=CMD} ->
Chunk = {TaggedCSum, ChunkBin},
Rs = machi_cr_client:append_chunk_extra(Clnt, Prefix, Chunk,
ChunkExtra),
make_tagged_csum(#mpb_chunkcsum{type='CSUM_TAG_NONE'}, ChunkBin) ->
C = machi_util:checksum_chunk(ChunkBin),
machi_util:make_tagged_csum(server_sha, C);
make_tagged_csum(#mpb_chunkcsum{type='CSUM_TAG_CLIENT_SHA', csum=CSum}, _CB) ->
machi_util:make_tagged_csum(client_sha, CSum).
make_append_resp(ReqID, Status) ->
make_append_resp(ReqID, Status, undefined).
make_append_resp(ReqID, Status, Where) ->
#mpb_response{req_id=ReqID,
append_chunk=#mpb_appendchunkresp{status=Status,
chunk_pos=Where}}.
make_write_resp(ReqID, Status) ->
#mpb_response{req_id=ReqID,
write_chunk=#mpb_writechunkresp{status=Status}}.
make_read_resp(ReqID, Status, Chunk) ->
#mpb_response{req_id=ReqID,
read_chunk=#mpb_readchunkresp{status=Status,
chunk=Chunk}}.
make_checksum_list_resp(ReqID, Status, __todo__Chunk) ->
Chunk = <<"TODO item: refactor the checksum_list op to return simply the text file representation of the checksums?">>,
#mpb_response{req_id=ReqID,
checksum_list=#mpb_checksumlistresp{status=Status,
chunk=Chunk}}.
make_list_files_resp(ReqID, Status, FileInfo) ->
Files = [#mpb_fileinfo{file_size=Size, file_name=Name} ||
{Size, Name} <- FileInfo],
#mpb_response{req_id=ReqID,
list_files=#mpb_listfilesresp{status=Status,
files=Files}}.
make_error_resp(ReqID, Code, Msg) ->
#mpb_response{req_id=ReqID,
generic=#mpb_errorresp{code=Code,
msg=Msg}}.
{RqID, CMD, Rs};
{RqID, {high_write_chunk, File, Offset, ChunkBin, TaggedCSum}=CMD} ->
Chunk = {TaggedCSum, ChunkBin},
Rs = machi_cr_client:write_chunk(Clnt, File, Offset, Chunk),
{RqID, CMD, Rs};
{RqID, {high_read_chunk, File, Offset, Size}=CMD} ->
Rs = machi_cr_client:read_chunk(Clnt, File, Offset, Size),
{RqID, CMD, Rs};
{RqID, {high_checksum_list, File}=CMD} ->
Rs = machi_cr_client:checksum_list(Clnt, File),
{RqID, CMD, Rs};
{RqID, {high_list_files}=CMD} ->
Rs = machi_cr_client:list_files(Clnt),
{RqID, CMD, Rs};
{RqID, {high_error, ErrCode, ErrMsg}=CMD} ->
Rs = {ErrCode, ErrMsg},
{RqID, CMD, Rs}
end,
machi_pb_translate:to_pb(ReqID, Cmd, Result).

465
src/machi_pb_translate.erl Normal file
View file

@ -0,0 +1,465 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(machi_pb_translate).
-include("machi.hrl").
-include("machi_pb.hrl").
-include("machi_projection.hrl").
-export([from_pb/1,
to_pb/2,
to_pb/3
]).
from_pb(#mpb_ll_request{req_id=ReqID,
echo=#mpb_echoreq{message=Msg}}) ->
{ReqID, {low_echo, Msg}};
from_pb(#mpb_ll_request{req_id=ReqID,
auth=#mpb_authreq{user=User, password=Pass}}) ->
{ReqID, {low_auth, User, Pass}};
from_pb(#mpb_ll_request{
req_id=ReqID,
checksum_list=#mpb_ll_checksumlistreq{epoch_id=PB_EpochID,
file=File}}) ->
EpochID = conv_to_epoch_id(PB_EpochID),
{ReqID, {low_checksum_list, EpochID, File}};
from_pb(#mpb_ll_response{req_id=ReqID,
echo=#mpb_echoresp{message=Msg}}) ->
{ReqID, Msg};
from_pb(#mpb_ll_response{req_id=ReqID,
checksum_list=#mpb_ll_checksumlistresp{
status=Status, chunk=Chunk}}) ->
case Status of
'OK' ->
{ReqID, {ok, Chunk}};
_ ->
{ReqID, machi_pb_high_client:convert_general_status_code(Status)}
end;
from_pb(#mpb_ll_response{req_id=ReqID,
proj_gl=#mpb_ll_getlatestepochidresp{
status=Status, epoch_id=EID}}) ->
case Status of
'OK' ->
#mpb_epochid{epoch_number=Epoch, epoch_csum=CSum} = EID,
{ReqID, {ok, {Epoch, CSum}}};
_ ->
{ReqID, machi_pb_high_client:convert_general_status_code(Status)}
end;
from_pb(#mpb_ll_response{req_id=ReqID,
proj_rl=#mpb_ll_readlatestprojectionresp{
status=Status, proj=P}}) ->
case Status of
'OK' ->
{ReqID, {ok, conv_to_projection_v1(P)}};
_ ->
{ReqID, machi_pb_high_client:convert_general_status_code(Status)}
end;
from_pb(#mpb_ll_response{req_id=ReqID,
proj_rp=#mpb_ll_readprojectionresp{
status=Status, proj=P}}) ->
case Status of
'OK' ->
{ReqID, {ok, conv_to_projection_v1(P)}};
_ ->
{ReqID, machi_pb_high_client:convert_general_status_code(Status)}
end;
from_pb(#mpb_ll_response{req_id=ReqID,
proj_wp=#mpb_ll_writeprojectionresp{
status=Status}}) ->
{ReqID, machi_pb_high_client:convert_general_status_code(Status)};
from_pb(#mpb_ll_response{req_id=ReqID,
proj_ga=#mpb_ll_getallprojectionsresp{
status=Status, projs=ProjsM}}) ->
case Status of
'OK' ->
{ReqID, {ok, [conv_to_projection_v1(ProjM) || ProjM <- ProjsM]}};
_ ->
{ReqID, machi_pb_high_client:convert_general_status_code(Status)}
end;
from_pb(#mpb_ll_response{req_id=ReqID,
proj_la=#mpb_ll_listallprojectionsresp{
status=Status, epochs=Epochs}}) ->
case Status of
'OK' ->
{ReqID, {ok, Epochs}};
_ ->
{ReqID< machi_pb_high_client:convert_general_status_code(Status)}
end;
%-%-%-%-%
from_pb(#mpb_request{req_id=ReqID,
echo=#mpb_echoreq{message=Msg}}) ->
{ReqID, {high_echo, Msg}};
from_pb(#mpb_request{req_id=ReqID,
auth=#mpb_authreq{user=User, password=Pass}}) ->
{ReqID, {high_auth, User, Pass}};
from_pb(#mpb_request{req_id=ReqID,
append_chunk=IR=#mpb_appendchunkreq{}}) ->
#mpb_appendchunkreq{placement_key=__todoPK,
prefix=Prefix,
chunk=ChunkBin,
csum=CSum,
chunk_extra=ChunkExtra} = IR,
TaggedCSum = make_tagged_csum(CSum, ChunkBin),
{ReqID, {high_append_chunk, __todoPK, Prefix, ChunkBin, TaggedCSum,
ChunkExtra}};
from_pb(#mpb_request{req_id=ReqID,
write_chunk=IR=#mpb_writechunkreq{}}) ->
#mpb_writechunkreq{file=File,
offset=Offset,
chunk=ChunkBin,
csum=CSum} = IR,
TaggedCSum = make_tagged_csum(CSum, ChunkBin),
{ReqID, {high_write_chunk, File, Offset, ChunkBin, TaggedCSum}};
from_pb(#mpb_request{req_id=ReqID,
read_chunk=IR=#mpb_readchunkreq{}}) ->
#mpb_readchunkreq{file=File,
offset=Offset,
size=Size} = IR,
{ReqID, {high_read_chunk, File, Offset, Size}};
from_pb(#mpb_request{req_id=ReqID,
checksum_list=IR=#mpb_checksumlistreq{}}) ->
#mpb_checksumlistreq{file=File} = IR,
{ReqID, {high_checksum_list, File}};
from_pb(#mpb_request{req_id=ReqID,
list_files=_IR=#mpb_listfilesreq{}}) ->
{ReqID, {high_list_files}};
from_pb(#mpb_request{req_id=ReqID}) ->
{ReqID, {high_error, 999966, "Unknown request"}};
from_pb(_) ->
{<<>>, {high_error, 999667, "Unknown PB request"}}.
to_pb(ReqID, {low_echo, Msg}) ->
#mpb_ll_request{
req_id=ReqID,
echo=#mpb_echoreq{message=Msg}};
to_pb(ReqID, {low_checksum_list, EpochID, File}) ->
PB_EpochID = conv_from_epoch_id(EpochID),
#mpb_ll_request{
req_id=ReqID,
checksum_list=#mpb_ll_checksumlistreq{epoch_id=PB_EpochID,
file=File}}.
to_pb(ReqID, {low_echo, Msg}, Resp) ->
#mpb_ll_response{
req_id=ReqID,
echo=#mpb_echoresp{message=Msg}};
to_pb(ReqID, {low_checksum_list, _EpochID, _File}, Resp) ->
case Resp of
{ok, Chunk} ->
make_ll_checksum_list_resp(ReqID, 'OK', Chunk);
{error, bad_arg} ->
make_ll_checksum_list_resp(ReqID, 'BAD_ARG', undefined);
{error, wedged} ->
make_ll_checksum_list_resp(ReqID, 'WEDGED', undefined);
{error, bad_checksum} ->
make_ll_checksum_list_resp(ReqID, 'BAD_CHECKSUM', undefined);
{error, partition} ->
make_ll_checksum_list_resp(ReqID, 'PARTITION', undefined);
{error, no_such_file} ->
make_ll_checksum_list_resp(ReqID, 'NO_SUCH_FILE', undefined);
_Else ->
make_ll_error_resp(ReqID, 66, io_lib:format("err ~p", [_Else]))
end;
to_pb(ReqID, {high_echo, _Msg}, Resp) ->
Msg = Resp,
#mpb_response{req_id=ReqID,
echo=#mpb_echoresp{message=Msg}};
to_pb(ReqID, {high_auth, _User, _Pass}, _Resp) ->
#mpb_response{req_id=ReqID,
generic=#mpb_errorresp{code=1,
msg="AUTH not implemented"}};
to_pb(ReqID, {high_append_chunk, _TODO, _Prefix, _ChunkBin, _TSum, _CE}, Resp)->
case Resp of
{ok, {Offset, Size, File}} ->
make_append_resp(ReqID, 'OK',
#mpb_chunkpos{offset=Offset,
chunk_size=Size,
file_name=File});
{error, bad_arg} ->
make_append_resp(ReqID, 'BAD_ARG');
{error, wedged} ->
make_append_resp(ReqID, 'WEDGED');
{error, bad_checksum} ->
make_append_resp(ReqID, 'BAD_CHECKSUM');
{error, partition} ->
make_append_resp(ReqID, 'PARTITION');
_Else ->
make_error_resp(ReqID, 66, io_lib:format("err ~p", [_Else]))
end;
to_pb(ReqID, {high_write_chunk, _File, _Offset, _ChunkBin, _TaggedCSum}, Resp) ->
case Resp of
{ok, {_,_,_}} ->
%% machi_cr_client returns ok 2-tuple, convert to simple ok.
make_write_resp(ReqID, 'OK');
{error, bad_arg} ->
make_write_resp(ReqID, 'BAD_ARG');
{error, wedged} ->
make_write_resp(ReqID, 'WEDGED');
{error, bad_checksum} ->
make_write_resp(ReqID, 'BAD_CHECKSUM');
{error, partition} ->
make_write_resp(ReqID, 'PARTITION');
_Else ->
make_error_resp(ReqID, 66, io_lib:format("err ~p", [_Else]))
end;
to_pb(ReqID, {high_read_chunk, _File, _Offset, _Size}, Resp) ->
case Resp of
{ok, Chunk} ->
make_read_resp(ReqID, 'OK', Chunk);
{error, bad_arg} ->
make_read_resp(ReqID, 'BAD_ARG', undefined);
{error, wedged} ->
make_read_resp(ReqID, 'WEDGED', undefined);
{error, bad_checksum} ->
make_read_resp(ReqID, 'BAD_CHECKSUM', undefined);
{error, partition} ->
make_read_resp(ReqID, 'PARTITION', undefined);
_Else ->
make_error_resp(ReqID, 66, io_lib:format("err ~p", [_Else]))
end;
to_pb(ReqID, {high_checksum_list, _File}, Resp) ->
case Resp of
{ok, Chunk} ->
make_checksum_list_resp(ReqID, 'OK', Chunk);
{error, bad_arg} ->
make_checksum_list_resp(ReqID, 'BAD_ARG', undefined);
{error, wedged} ->
make_checksum_list_resp(ReqID, 'WEDGED', undefined);
{error, bad_checksum} ->
make_checksum_list_resp(ReqID, 'BAD_CHECKSUM', undefined);
{error, partition} ->
make_checksum_list_resp(ReqID, 'PARTITION', undefined);
_Else ->
make_error_resp(ReqID, 66, io_lib:format("err ~p", [_Else]))
end;
to_pb(ReqID, {high_list_files}, Resp) ->
case Resp of
{ok, FileInfo} ->
make_list_files_resp(ReqID, 'OK', FileInfo);
{error, bad_arg} ->
make_list_files_resp(ReqID, 'BAD_ARG', []);
{error, wedged} ->
make_list_files_resp(ReqID, 'WEDGED', []);
{error, bad_checksum} ->
make_list_files_resp(ReqID, 'BAD_CHECKSUM', []);
{error, partition} ->
make_list_files_resp(ReqID, 'PARTITION', []);
_Else ->
make_error_resp(ReqID, 66, io_lib:format("err ~p", [_Else]))
end;
to_pb(ReqID, {high_error, _, _}, {ErrCode, ErrMsg}) ->
make_error_resp(ReqID, ErrCode, ErrMsg).
make_tagged_csum(#mpb_chunkcsum{type='CSUM_TAG_NONE'}, ChunkBin) ->
C = machi_util:checksum_chunk(ChunkBin),
machi_util:make_tagged_csum(server_sha, C);
make_tagged_csum(#mpb_chunkcsum{type='CSUM_TAG_CLIENT_SHA', csum=CSum}, _CB) ->
machi_util:make_tagged_csum(client_sha, CSum).
make_ll_checksum_list_resp(ReqID, Status, __todo__Chunk) ->
Chunk = <<"TODO item: refactor the checksum_list op to return simply the text file representation of the checksums?">>,
#mpb_ll_response{req_id=ReqID,
checksum_list=#mpb_ll_checksumlistresp{status=Status,
chunk=Chunk}}.
make_ll_error_resp(ReqID, Code, Msg) ->
#mpb_ll_response{req_id=ReqID,
generic=#mpb_errorresp{code=Code,
msg=Msg}}.
make_append_resp(ReqID, Status) ->
make_append_resp(ReqID, Status, undefined).
make_append_resp(ReqID, Status, Where) ->
#mpb_response{req_id=ReqID,
append_chunk=#mpb_appendchunkresp{status=Status,
chunk_pos=Where}}.
make_write_resp(ReqID, Status) ->
#mpb_response{req_id=ReqID,
write_chunk=#mpb_writechunkresp{status=Status}}.
make_read_resp(ReqID, Status, Chunk) ->
#mpb_response{req_id=ReqID,
read_chunk=#mpb_readchunkresp{status=Status,
chunk=Chunk}}.
make_checksum_list_resp(ReqID, Status, __todo__Chunk) ->
Chunk = <<"TODO item: refactor the checksum_list op to return simply the text file representation of the checksums?">>,
#mpb_response{req_id=ReqID,
checksum_list=#mpb_checksumlistresp{status=Status,
chunk=Chunk}}.
make_list_files_resp(ReqID, Status, FileInfo) ->
Files = [#mpb_fileinfo{file_size=Size, file_name=Name} ||
{Size, Name} <- FileInfo],
#mpb_response{req_id=ReqID,
list_files=#mpb_listfilesresp{status=Status,
files=Files}}.
make_error_resp(ReqID, Code, Msg) ->
#mpb_response{req_id=ReqID,
generic=#mpb_errorresp{code=Code,
msg=Msg}}.
conv_from_epoch_id({Epoch, EpochCSum}) ->
#mpb_epochid{epoch_number=Epoch,
epoch_csum=EpochCSum}.
conv_to_epoch_id(#mpb_epochid{epoch_number=Epoch,
epoch_csum=EpochCSum}) ->
{Epoch, EpochCSum}.
conv_to_projection_v1(#mpb_projectionv1{epoch_number=Epoch,
epoch_csum=CSum,
author_server=Author,
all_members=AllMembers,
creation_time=CTime,
mode=Mode,
upi=UPI,
repairing=Repairing,
down=Down,
opaque_flap=Flap,
opaque_inner=Inner,
opaque_dbg=Dbg,
opaque_dbg2=Dbg2,
members_dict=MembersDict}) ->
#projection_v1{epoch_number=Epoch,
epoch_csum=CSum,
author_server=to_atom(Author),
all_members=[to_atom(X) || X <- AllMembers],
creation_time=conv_to_now(CTime),
mode=conv_to_mode(Mode),
upi=[to_atom(X) || X <- UPI],
repairing=[to_atom(X) || X <- Repairing],
down=[to_atom(X) || X <- Down],
flap=dec_optional_sexp(Flap),
inner=dec_optional_sexp(Inner),
dbg=dec_sexp(Dbg),
dbg2=dec_sexp(Dbg2),
members_dict=conv_to_members_dict(MembersDict)}.
enc_sexp(T) ->
term_to_binary(T).
dec_sexp(Bin) when is_binary(Bin) ->
binary_to_term(Bin).
enc_optional_sexp(undefined) ->
undefined;
enc_optional_sexp(T) ->
enc_sexp(T).
dec_optional_sexp(undefined) ->
undefined;
dec_optional_sexp(T) ->
dec_sexp(T).
conv_from_members_dict(D) ->
%% Use list_to_binary() here to "flatten" the serialized #p_srvr{}
[#mpb_membersdictentry{key=to_list(K), val=conv_from_p_srvr(V)} ||
{K, V} <- orddict:to_list(D)].
conv_to_members_dict(List) ->
orddict:from_list([{to_atom(K), conv_to_p_srvr(V)} ||
#mpb_membersdictentry{key=K, val=V} <- List]).
conv_from_p_srvr(#p_srvr{name=Name,
proto_mod=ProtoMod,
address=Address,
port=Port,
props=Props}) ->
#mpb_p_srvr{name=to_list(Name),
proto_mod=to_list(ProtoMod),
address=to_list(Address),
port=to_list(Port),
opaque_props=enc_sexp(Props)}.
conv_to_p_srvr(#mpb_p_srvr{name=Name,
proto_mod=ProtoMod,
address=Address,
port=Port,
opaque_props=Props}) ->
#p_srvr{name=to_atom(Name),
proto_mod=to_atom(ProtoMod),
address=to_list(Address),
port=to_integer(Port),
props=dec_sexp(Props)}.
to_list(X) when is_atom(X) ->
atom_to_list(X);
to_list(X) when is_binary(X) ->
binary_to_list(X);
to_list(X) when is_integer(X) ->
integer_to_list(X);
to_list(X) when is_list(X) ->
X.
to_atom(X) when is_list(X) ->
list_to_atom(X);
to_atom(X) when is_binary(X) ->
erlang:binary_to_atom(X, latin1);
to_atom(X) when is_atom(X) ->
X.
to_integer(X) when is_list(X) ->
list_to_integer(X);
to_integer(X) when is_binary(X) ->
list_to_binary(binary_to_list(X));
to_integer(X) when is_integer(X) ->
X.
conv_from_now({A,B,C}) ->
#mpb_now{sec=(1000000 * A) + B,
usec=C}.
conv_to_now(#mpb_now{sec=Sec, usec=USec}) ->
{Sec div 1000000, Sec rem 1000000, USec}.
conv_from_mode(ap_mode) -> 'AP_MODE';
conv_from_mode(cp_mode) -> 'CP_MODE'.
conv_to_mode('AP_MODE') -> ap_mode;
conv_to_mode('CP_MODE') -> cp_mode.
conv_from_type(private) -> 'PRIVATE';
conv_from_type(public) -> 'PUBLIC'.
conv_to_type('PRIVATE') -> private;
conv_to_type('PUBLIC') -> public.
conv_from_status(ok) ->
'OK';
conv_from_status({error, bad_arg}) ->
'BAD_ARG';
conv_from_status({error, wedged}) ->
'WEDGED';
conv_from_status({error, bad_checksum}) ->
'BAD_CHECKSUM';
conv_from_status({error, partition}) ->
'PARTITION';
conv_from_status({error, not_written}) ->
'NOT_WRITTEN';
conv_from_status({error, written}) ->
'WRITTEN';
conv_from_status({error, no_such_file}) ->
'NO_SUCH_FILE';
conv_from_status(_OOPS) ->
io:format(user, "HEY, ~s:~w got ~w\n", [?MODULE, ?LINE, _OOPS]),
'BAD_JOSS'.

View file

@ -33,10 +33,12 @@
-include("machi_pb.hrl").
-include("machi_projection.hrl").
-ifdef(COMMENT_DELME).
-export([enc_p_srvr/1, dec_p_srvr/1,
enc_projection_v1/1, dec_projection_v1/1,
make_projection_req/2, unmake_projection_req/1,
make_projection_resp/3, unmake_projection_resp/1]).
make_projection_resp/3]).
-ifdef(TEST).
-compile(export_all).
-endif. % TEST
@ -75,7 +77,8 @@ enc_projection_v1(P) ->
machi_pb:encode_mpb_projectionv1(conv_from_projection_v1(P))).
dec_projection_v1(Bin) ->
conv_to_projection_v1(machi_pb:decode_mpb_projectionv1(Bin)).
delme.
%% conv_to_projection_v1(machi_pb:decode_mpb_projectionv1(Bin)).
conv_from_projection_v1(#projection_v1{epoch_number=Epoch,
epoch_csum=CSum,
@ -106,35 +109,6 @@ conv_from_projection_v1(#projection_v1{epoch_number=Epoch,
opaque_dbg2=enc_sexp(Dbg2),
members_dict=conv_from_members_dict(MembersDict)}.
conv_to_projection_v1(#mpb_projectionv1{epoch_number=Epoch,
epoch_csum=CSum,
author_server=Author,
all_members=AllMembers,
creation_time=CTime,
mode=Mode,
upi=UPI,
repairing=Repairing,
down=Down,
opaque_flap=Flap,
opaque_inner=Inner,
opaque_dbg=Dbg,
opaque_dbg2=Dbg2,
members_dict=MembersDict}) ->
#projection_v1{epoch_number=Epoch,
epoch_csum=CSum,
author_server=to_atom(Author),
all_members=[to_atom(X) || X <- AllMembers],
creation_time=conv_to_now(CTime),
mode=conv_to_mode(Mode),
upi=[to_atom(X) || X <- UPI],
repairing=[to_atom(X) || X <- Repairing],
down=[to_atom(X) || X <- Down],
flap=dec_optional_sexp(Flap),
inner=dec_optional_sexp(Inner),
dbg=dec_sexp(Dbg),
dbg2=dec_sexp(Dbg2),
members_dict=conv_to_members_dict(MembersDict)}.
make_projection_req(ID, {get_latest_epochid, ProjType}) ->
#mpb_ll_request{req_id=ID,
proj_gl=#mpb_ll_getlatestepochidreq{type=conv_from_type(ProjType)}};
@ -174,7 +148,7 @@ unmake_projection_req(
#mpb_ll_request{req_id=ID,
proj_wp=#mpb_ll_writeprojectionreq{type=ProjType,
proj=ProjM}}) ->
Proj = conv_to_projection_v1(ProjM),
Proj = delme, %% conv_to_projection_v1(ProjM),
{ID, {write_projection, conv_to_type(ProjType), Proj}};
unmake_projection_req(
#mpb_ll_request{req_id=ID,
@ -234,50 +208,6 @@ make_projection_resp(ID, list_all_projections, Status) ->
proj_la=#mpb_ll_listallprojectionsresp{
status=conv_from_status(Status)}}.
unmake_projection_resp(#mpb_ll_response{proj_gl=#mpb_ll_getlatestepochidresp{
status=Status, epoch_id=EID}}) ->
case Status of
'OK' ->
#mpb_epochid{epoch_number=Epoch, epoch_csum=CSum} = EID,
{ok, {Epoch, CSum}};
_ ->
machi_pb_high_client:convert_general_status_code(Status)
end;
unmake_projection_resp(#mpb_ll_response{proj_rl=#mpb_ll_readlatestprojectionresp{
status=Status, proj=P}}) ->
case Status of
'OK' ->
{ok, conv_to_projection_v1(P)};
_ ->
machi_pb_high_client:convert_general_status_code(Status)
end;
unmake_projection_resp(#mpb_ll_response{proj_rp=#mpb_ll_readprojectionresp{
status=Status, proj=P}}) ->
case Status of
'OK' ->
{ok, conv_to_projection_v1(P)};
_ ->
machi_pb_high_client:convert_general_status_code(Status)
end;
unmake_projection_resp(#mpb_ll_response{proj_wp=#mpb_ll_writeprojectionresp{
status=Status}}) ->
machi_pb_high_client:convert_general_status_code(Status);
unmake_projection_resp(#mpb_ll_response{proj_ga=#mpb_ll_getallprojectionsresp{
status=Status, projs=ProjsM}}) ->
case Status of
'OK' ->
{ok, [conv_to_projection_v1(ProjM) || ProjM <- ProjsM]};
_ ->
machi_pb_high_client:convert_general_status_code(Status)
end;
unmake_projection_resp(#mpb_ll_response{proj_la=#mpb_ll_listallprojectionsresp{
status=Status, epochs=Epochs}}) ->
case Status of
'OK' ->
{ok, Epochs};
_ ->
machi_pb_high_client:convert_general_status_code(Status)
end.
%%%%%%%%%%%%%%%%%%%
@ -329,39 +259,4 @@ to_integer(X) when is_binary(X) ->
to_integer(X) when is_integer(X) ->
X.
conv_from_now({A,B,C}) ->
#mpb_now{sec=(1000000 * A) + B,
usec=C}.
conv_to_now(#mpb_now{sec=Sec, usec=USec}) ->
{Sec div 1000000, Sec rem 1000000, USec}.
conv_from_mode(ap_mode) -> 'AP_MODE';
conv_from_mode(cp_mode) -> 'CP_MODE'.
conv_to_mode('AP_MODE') -> ap_mode;
conv_to_mode('CP_MODE') -> cp_mode.
conv_from_type(private) -> 'PRIVATE';
conv_from_type(public) -> 'PUBLIC'.
conv_to_type('PRIVATE') -> private;
conv_to_type('PUBLIC') -> public.
conv_from_status(ok) ->
'OK';
conv_from_status({error, bad_arg}) ->
'BAD_ARG';
conv_from_status({error, wedged}) ->
'WEDGED';
conv_from_status({error, bad_checksum}) ->
'BAD_CHECKSUM';
conv_from_status({error, partition}) ->
'PARTITION';
conv_from_status({error, not_written}) ->
'NOT_WRITTEN';
conv_from_status({error, written}) ->
'WRITTEN';
conv_from_status(_OOPS) ->
io:format(user, "HEY, ~s:~w got ~w\n", [?MODULE, ?LINE, _OOPS]),
'BAD_JOSS'.
-endif. % COMMENT_DELME

View file

@ -70,6 +70,8 @@ flu_smoke_test() ->
W_props = [{initial_wedged, false}],
FLU1 = setup_test_flu(smoke_flu, TcpPort, DataDir, W_props),
try
Msg = "Hello, world!",
Msg = ?FLU_C:echo(Host, TcpPort, Msg),
{error, no_such_file} = ?FLU_C:checksum_list(Host, TcpPort,
?DUMMY_PV1_EPOCH,
"does-not-exist"),

View file

@ -63,22 +63,6 @@ smoke_responses_test() ->
ok.
smoke_p_srvr_test() ->
P1 = #p_srvr{name=a, address="localhost", port=5555,
props=[{dir,"./data.a"}]},
P1 = machi_pb_wrap:dec_p_srvr(
list_to_binary(machi_pb_wrap:enc_p_srvr(P1))),
ok.
smoke_projection_v1_test() ->
P1 = #p_srvr{name=a, address="localhost", port=5555,
props=[{dir,"./data.a"}]},
D = orddict:from_list([ {P1#p_srvr.name, P1} ]),
Proj1 = machi_projection:new(a, D, [a], [], [], [{property, 42}]),
Proj1 = machi_pb_wrap:dec_projection_v1(
machi_pb_wrap:enc_projection_v1(Proj1)),
ok.
encdec_request(M) ->
machi_pb:decode_mpb_request(
list_to_binary(machi_pb:encode_mpb_request(M))).