From 2068f707009dba99f25dd235648d5bdc1b1fa4b5 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Wed, 24 Jun 2015 12:50:37 +0900 Subject: [PATCH 01/12] WIP: encoding #p_srvr and #projection_v1, just starting. Damn tedious. --- src/machi.proto | 37 +++++++++--- src/machi_pb_wrap.erl | 125 +++++++++++++++++++++++++++++++++++++++++ test/machi_pb_test.erl | 17 ++++++ 3 files changed, 172 insertions(+), 7 deletions(-) create mode 100644 src/machi_pb_wrap.erl diff --git a/src/machi.proto b/src/machi.proto index 9dfa246..831e32c 100644 --- a/src/machi.proto +++ b/src/machi.proto @@ -83,6 +83,15 @@ message Mpb_FileInfo { required string file_name = 2; } +// #p_srvr{} type +message Mpb_P_Srvr { + required string name = 1; + required string proto_mod = 2; + required string address = 3; + required string port = 4; + required bytes props = 5; +} + ////////////////////////////////////////// // // requests & responses @@ -97,7 +106,21 @@ message Mpb_ErrorResp { optional bytes extra = 3; } -// ping() request & response +////////////////////////////////////////// +// +// High Level API requests: +// +// echo() : Mpb_EchoReq and Mpb_EchoResp +// auth() : Mpb_AuthReq and Mpb_AuthResp +// append_chunk() : Mpb_AppendChunkReq and Mpb_AppendChunkResp +// write_chunk() : Mpb_WriteChunkReq and Mpb_WriteChunkResp +// read_chunk() : Mpb_ReadChunkReq and Mpb_ReadChunkResp +// checksum_list() : Mpb_ChecksumListReq and Mpb_ChecksumListResp +// list_files() : Mpb_ListFilesReq and Mpb_ListFilesResp +// +////////////////////////////////////////// + +// High level API: echo() request & response message Mpb_EchoReq { optional string message = 1; @@ -107,7 +130,7 @@ message Mpb_EchoResp { optional string message = 1; } -// Authentication request & response +// High level API: auth() request & response (not yet implemented) message Mpb_AuthReq { required bytes user = 1; @@ -119,7 +142,7 @@ message Mpb_AuthResp { // TODO: not implemented yet } -// append_chunk() request & response +// High level API: append_chunk() request & response message Mpb_AppendChunkReq { optional bytes placement_key = 1; @@ -135,7 +158,7 @@ message Mpb_AppendChunkResp { optional Mpb_ChunkPos chunk_pos = 2; } -// write_chunk() request & response +// High level API: write_chunk() request & response message Mpb_WriteChunkReq { required string file = 1; @@ -148,7 +171,7 @@ message Mpb_WriteChunkResp { required Mpb_GeneralStatusCode status = 1; } -// read_chunk() request & response +// High level API: read_chunk() request & response message Mpb_ReadChunkReq { required string file = 1; @@ -167,7 +190,7 @@ message Mpb_ReadChunkResp { optional Mpb_ChunkCSum csum = 3; } -// checksum_list() request & response +// High level API: checksum_list() request & response message Mpb_ChecksumListReq { required string file = 1; @@ -178,7 +201,7 @@ message Mpb_ChecksumListResp { optional bytes chunk = 2; } -// list_files() request & response +// High level API: list_files() request & response message Mpb_ListFilesReq { } diff --git a/src/machi_pb_wrap.erl b/src/machi_pb_wrap.erl new file mode 100644 index 0000000..d79f3ae --- /dev/null +++ b/src/machi_pb_wrap.erl @@ -0,0 +1,125 @@ +%% ------------------------------------------------------------------- +%% +%% Machi: a small village of replicated files +%% +%% Copyright (c) 2014-2015 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +-module(machi_pb_wrap). + +-include("machi_pb.hrl"). +-include("machi_projection.hrl"). + +-export([enc_p_srvr/1, dec_p_srvr/1, + enc_projection_v1/1, dec_projection_v1/1]). +-ifdef(TEST). +-compile(export_all). +-endif. % TEST + +enc_p_srvr(#p_srvr{name=Name, + proto_mod=ProtoMod, + address=Address, + port=Port, + props=Props}) -> + machi_pb:encode_mpb_p_srvr(#mpb_p_srvr{name=to_list(Name), + proto_mod=to_list(ProtoMod), + address=to_list(Address), + port=to_list(Port), + props=todo_enc_opaque(Props)}). + +dec_p_srvr(Bin) -> + #mpb_p_srvr{name=Name, + proto_mod=ProtoMod, + address=Address, + port=Port, + props=Props} = machi_pb:decode_mpb_p_srvr(Bin), + #p_srvr{name=to_atom(Name), + proto_mod=to_atom(ProtoMod), + address=to_list(Address), + port=to_integer(Port), + props=todo_dec_opaque(Props)}. + +enc_projection_v1(#projection_v1{dbg=Dbg, + dbg2=Dbg2, + members_dict=MembersDict} = P) -> + term_to_binary(P#projection_v1{dbg=enc_sexp(Dbg), + dbg2=enc_sexp(Dbg2), + members_dict=enc_members_dict(MembersDict)}). + +dec_projection_v1(Bin) -> + P = #projection_v1{dbg=Dbg, + dbg2=Dbg2, + members_dict=MembersDict} = binary_to_term(Bin), + P#projection_v1{dbg=dec_sexp(Dbg), + dbg2=dec_sexp(Dbg2), + members_dict=dec_members_dict(MembersDict)}. + +%%%%%%%%%%%%%%%%%%% + +enc_sexp(T) -> + lists:flatten(io_lib:format("~w.", [T])). + +dec_sexp(String) -> + {ok,Tks,_} = erl_scan:string(String), + {ok,E} = erl_parse:parse_exprs(Tks), + {value,Funs,_} = erl_eval:exprs(E,[]), + Funs. + +enc_members_dict(D) -> + %% Use list_to_binary() here to "flatten" the serialized #p_srvr{} + [{K, list_to_binary(enc_p_srvr(V))} || {K, V} <- orddict:to_list(D)]. + +dec_members_dict(List) -> + orddict:from_list([{K, dec_p_srvr(V)} || {K, V} <- List]). + +to_binary(X) when is_list(X) -> + list_to_binary(X); +to_binary(X) when is_integer(X) -> + list_to_binary(integer_to_list(X)); +to_binary(X) when is_atom(X) -> + erlang:atom_to_binary(X, latin1); +to_binary(X) when is_binary(X) -> + X. + +to_list(X) when is_atom(X) -> + atom_to_list(X); +to_list(X) when is_binary(X) -> + binary_to_list(X); +to_list(X) when is_integer(X) -> + integer_to_list(X); +to_list(X) when is_list(X) -> + X. + +to_atom(X) when is_list(X) -> + list_to_atom(X); +to_atom(X) when is_binary(X) -> + erlang:binary_to_atom(X, latin1); +to_atom(X) when is_atom(X) -> + X. + +to_integer(X) when is_list(X) -> + list_to_integer(X); +to_integer(X) when is_binary(X) -> + list_to_binary(binary_to_list(X)); +to_integer(X) when is_integer(X) -> + X. + +todo_enc_opaque(X) -> + erlang:term_to_binary(X). + +todo_dec_opaque(X) -> + erlang:binary_to_term(X). diff --git a/test/machi_pb_test.erl b/test/machi_pb_test.erl index 1044bf6..0712b7f 100644 --- a/test/machi_pb_test.erl +++ b/test/machi_pb_test.erl @@ -22,6 +22,7 @@ -module(machi_pb_test). -include("machi_pb.hrl"). +-include("machi_projection.hrl"). -compile(export_all). @@ -62,6 +63,22 @@ smoke_responses_test() -> ok. +smoke_p_srvr_test() -> + P1 = #p_srvr{name=a, address="localhost", port=5555, + props=[{dir,"./data.a"}]}, + P1 = machi_pb_wrap:dec_p_srvr( + list_to_binary(machi_pb_wrap:enc_p_srvr(P1))), + ok. + +smoke_projection_v1_test() -> + P1 = #p_srvr{name=a, address="localhost", port=5555, + props=[{dir,"./data.a"}]}, + D = orddict:from_list([ {P1#p_srvr.name, P1} ]), + Proj1 = machi_projection:new(a, D, [a], [], [], [{property, 42}]), + Proj1 = machi_pb_wrap:dec_projection_v1( + machi_pb_wrap:enc_projection_v1(Proj1)), + ok. + encdec_request(M) -> machi_pb:decode_mpb_request( list_to_binary(machi_pb:encode_mpb_request(M))). From 1b0cf06f1cac0a2394a1727380e1649e00f7de32 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Wed, 24 Jun 2015 14:06:17 +0900 Subject: [PATCH 02/12] Fix type problem, oops --- test/machi_pb_high_client_test.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/machi_pb_high_client_test.erl b/test/machi_pb_high_client_test.erl index 04ff08e..37ecffa 100644 --- a/test/machi_pb_high_client_test.erl +++ b/test/machi_pb_high_client_test.erl @@ -91,7 +91,7 @@ smoke_test2() -> end after exit(SupPid, normal), - [os:cmd("rm -rf " ++ P#p_srvr.props) || {_,P} <- Ps], + [os:cmd("rm -rf " ++ P#p_srvr.props) || P <- Ps], machi_util:wait_for_death(SupPid, 100), ok end. From 725b10ba9098362171bf1e7d225a76b143aa15d5 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Wed, 24 Jun 2015 16:13:11 +0900 Subject: [PATCH 03/12] Complete PB round-trip for #projection_v1{}, bleh --- src/machi.proto | 180 ++++++++++++++++++++++++++++++++++++++++-- src/machi_pb_wrap.erl | 155 ++++++++++++++++++++++++++---------- 2 files changed, 285 insertions(+), 50 deletions(-) diff --git a/src/machi.proto b/src/machi.proto index 831e32c..db0fbb1 100644 --- a/src/machi.proto +++ b/src/machi.proto @@ -31,7 +31,7 @@ option java_outer_classname = "MachiPB"; ////////////////////////////////////////// // -// enums +// Enums // ////////////////////////////////////////// @@ -52,9 +52,14 @@ enum Mpb_CSumType { CSUM_TAG_SERVER_REGEN_SHA = 3; } +enum Mpb_ProjType { + PRIVATE = 20; + PUBLIC = 21; +} + ////////////////////////////////////////// // -// basic data types +// Basic data types // ////////////////////////////////////////// @@ -72,7 +77,7 @@ message Mpb_ChunkCSum { } // epoch_id() type -message Mpb_EpochId { +message Mpb_EpochID { required uint32 epoch_num = 1; required bytes epoch_csum = 2; } @@ -94,7 +99,7 @@ message Mpb_P_Srvr { ////////////////////////////////////////// // -// requests & responses +// Requests & responses // ////////////////////////////////////////// @@ -108,7 +113,7 @@ message Mpb_ErrorResp { ////////////////////////////////////////// // -// High Level API requests: +// High level API requests: // // echo() : Mpb_EchoReq and Mpb_EchoResp // auth() : Mpb_AuthReq and Mpb_AuthResp @@ -214,7 +219,7 @@ message Mpb_ListFilesResp { ////////////////////////////////////////// // -// request & response wrapper +// High level API request & response wrapper // ////////////////////////////////////////// @@ -248,7 +253,7 @@ message Mpb_Response { // Generic error response, typically used when something quite // bad/unexpected happened within the server. // Clients should always check this response and, if defined, - // ignroe any request-specific response at codes 10+. + // ignore any request-specific response at codes 10+. optional Mpb_ErrorResp generic = 2; // Specific responses. @@ -260,3 +265,164 @@ message Mpb_Response { optional Mpb_ChecksumListResp checksum_list = 15; optional Mpb_ListFilesResp list_files = 16; } + +////////////////////////////////////////// +// +// Low level API data types +// +////////////////////////////////////////// + +enum Mpb_Mode { + AP_MODE = 30; + CP_MODE = 31; +} + +message Mpb_Now { + required uint64 sec = 1; + required uint32 usec = 2; +} + +message Mpb_MembersDictEntry { + required string key = 1; + required Mpb_P_Srvr val = 2; +} + +message Mpb_ProjectionV1 { + required uint32 epoch_number = 1; + required bytes epoch_csum = 2; + required string author_server = 3; + repeated string all_members = 4; + required Mpb_Now creation_time = 5; + required Mpb_Mode mode = 6; + repeated string upi = 7; + repeated string repairing = 8; + repeated string down = 9; + optional bytes opaque_flap = 10; + optional bytes opaque_inner = 11; + required bytes opaque_dbg = 12; + required bytes opaque_dbg2 = 13; + repeated Mpb_MembersDictEntry members_dict = 14; +} + +////////////////////////////////////////// +// +// Low level API requests: +// +// echo() : Mpb_EchoReq and Mpb_EchoResp (reused from high level API) +// auth() : Mpb_AuthReq and Mpb_AuthResp (reused from high level API) +// get_latest_epochid() : Mpb_GetLatestEpochIDReq and Mpb_GetLatestEpochIDResp +// +////////////////////////////////////////// + +// Low level API: get_latest_epochid() request & response + +message Mpb_LL_GetLatestEpochIDReq { + required Mpb_ProjType type = 1; +} + +message Mpb_LL_GetLatestEpochIDResp { + required uint32 Mpb_GeneralStatusCode = 1; + optional Mpb_EpochID epoch_id = 2; +} + +// Low level API: read_latest_projection() request & response + +message Mpb_LL_ReadLatestProjectionReq { + required Mpb_ProjType type = 1; +} + +message Mpb_LL_ReadLatestProjectionResp { + required uint32 Mpb_GeneralStatusCode = 1; + optional Mpb_ProjectionV1 proj = 1; +} + +// Low level API: read_projection() request & response + +message Mpb_LL_ReadProjectionReq { + required Mpb_ProjType type = 1; + required uint32 epoch_number = 2; +} + +message Mpb_LL_ReadProjectionResp { + required uint32 Mpb_GeneralStatusCode = 1; + optional Mpb_ProjectionV1 proj = 2; +} + +// Low level API: write_projection() request & response + +message Mpb_LL_WriteProjectionReq { + required Mpb_ProjType type = 1; + required Mpb_ProjectionV1 proj = 1; +} + +message Mpb_LL_WriteProjectionResp { + required uint32 Mpb_GeneralStatusCode = 1; +} + +// Low level API: get_all_projections() request & response + +message Mpb_LL_GetAllProjectionsReq { + required Mpb_ProjType type = 1; +} + +message Mpb_LL_GetAllProjectionsResp { + required uint32 Mpb_GeneralStatusCode = 1; + repeated Mpb_ProjectionV1 proj = 2; +} + +// Low level API: list_all_projections() request & response + +message Mpb_LL_ListAllProjectionsReq { + required Mpb_ProjType type = 1; +} + +message Mpb_LL_ListAllProjectionsResp { + required uint32 Mpb_GeneralStatusCode = 1; + repeated uint32 epochs = 2; +} + +////////////////////////////////////////// +// +// Low API request & response wrapper +// +////////////////////////////////////////// + +message Mpb_LL_Request { + // TODO: If we wish to support pipelined requests sometime in the + // future, this is the placeholder to do it. + required bytes req_id = 1; + + // The client should only define one request message. If the client + // includes multiple requests here, the server may pick/choose an + // arbitrary one. + // NOTE: The erlang protobuffs compiler doesn't support 'oneof'. + // But 'oneof' appears to be a very tiny memory optimization + // that not all languages might care about? (Erlang doesn't) + optional Mpb_EchoReq echo = 10; // Re-use from high level API + optional Mpb_AuthReq auth = 11; // Re-use from high level API + optional Mpb_LL_GetLatestEpochIDReq = 12; + optional Mpb_LL_ReadLatestProjectionReq = 13; + optional Mpb_LL_ReadProjectionReq = 14; + optional Mpb_LL_WriteProjectionReq = 15; + optional Mpb_LL_GetAllProjectionsReq = 16; + optional Mpb_LL_ListAllProjectionsReq = 17; +} + +message Mpb_LL_Response { + // TODO: If we wish to support pipelined requests sometime in the + // future, this is the placeholder to do it. + required bytes req_id = 1; + + // The server will define only one of the optional responses below. + + // Generic error response, typically used when something quite + // bad/unexpected happened within the server. + // Clients should always check this response and, if defined, + // ignore any request-specific response at codes 10+. + optional Mpb_ErrorResp generic = 2; + + // Specific responses. + optional Mpb_EchoResp echo = 10; // Re-use from high level API + optional Mpb_AuthResp auth = 11; // Re-use from high level API + optional Mpb_LL_GetLatestEpochIDResp = 12; +} diff --git a/src/machi_pb_wrap.erl b/src/machi_pb_wrap.erl index d79f3ae..c2e21bc 100644 --- a/src/machi_pb_wrap.erl +++ b/src/machi_pb_wrap.erl @@ -30,70 +30,131 @@ -compile(export_all). -endif. % TEST -enc_p_srvr(#p_srvr{name=Name, +enc_p_srvr(P) -> + machi_pb:encode_mpb_p_srvr(conv_from_p_srvr(P)). + +dec_p_srvr(Bin) -> + conv_to_p_srvr(machi_pb:decode_mpb_p_srvr(Bin)). + +conv_from_p_srvr(#p_srvr{name=Name, proto_mod=ProtoMod, address=Address, port=Port, props=Props}) -> - machi_pb:encode_mpb_p_srvr(#mpb_p_srvr{name=to_list(Name), - proto_mod=to_list(ProtoMod), - address=to_list(Address), - port=to_list(Port), - props=todo_enc_opaque(Props)}). + #mpb_p_srvr{name=to_list(Name), + proto_mod=to_list(ProtoMod), + address=to_list(Address), + port=to_list(Port), + props=enc_sexp(Props)}. -dec_p_srvr(Bin) -> - #mpb_p_srvr{name=Name, - proto_mod=ProtoMod, - address=Address, - port=Port, - props=Props} = machi_pb:decode_mpb_p_srvr(Bin), +conv_to_p_srvr(#mpb_p_srvr{name=Name, + proto_mod=ProtoMod, + address=Address, + port=Port, + props=Props}) -> #p_srvr{name=to_atom(Name), proto_mod=to_atom(ProtoMod), address=to_list(Address), port=to_integer(Port), - props=todo_dec_opaque(Props)}. + props=dec_sexp(to_list(Props))}. -enc_projection_v1(#projection_v1{dbg=Dbg, - dbg2=Dbg2, - members_dict=MembersDict} = P) -> - term_to_binary(P#projection_v1{dbg=enc_sexp(Dbg), - dbg2=enc_sexp(Dbg2), - members_dict=enc_members_dict(MembersDict)}). +enc_projection_v1(P) -> + %% Awww, flatten it here + list_to_binary( + machi_pb:encode_mpb_projectionv1(conv_from_projection_v1(P))). dec_projection_v1(Bin) -> - P = #projection_v1{dbg=Dbg, - dbg2=Dbg2, - members_dict=MembersDict} = binary_to_term(Bin), - P#projection_v1{dbg=dec_sexp(Dbg), - dbg2=dec_sexp(Dbg2), - members_dict=dec_members_dict(MembersDict)}. + conv_to_projection_v1(machi_pb:decode_mpb_projectionv1(Bin)). + +conv_from_projection_v1(#projection_v1{epoch_number=Epoch, + epoch_csum=CSum, + author_server=Author, + all_members=AllMembers, + creation_time=CTime, + mode=Mode, + upi=UPI, + repairing=Repairing, + down=Down, + flap=Flap, + inner=Inner, + dbg=Dbg, + dbg2=Dbg2, + members_dict=MembersDict}) -> + #mpb_projectionv1{epoch_number=Epoch, + epoch_csum=CSum, + author_server=to_list(Author), + all_members=[to_list(X) || X <- AllMembers], + creation_time=conv_from_now(CTime), + mode=conv_from_mode(Mode), + upi=[to_list(X) || X <- UPI], + repairing=[to_list(X) || X <- Repairing], + down=[to_list(X) || X <- Down], + opaque_flap=enc_optional_sexp(Flap), + opaque_inner=enc_optional_sexp(Inner), + opaque_dbg=enc_sexp(Dbg), + opaque_dbg2=enc_sexp(Dbg2), + members_dict=conv_from_members_dict(MembersDict)}. + +conv_to_projection_v1(#mpb_projectionv1{epoch_number=Epoch, + epoch_csum=CSum, + author_server=Author, + all_members=AllMembers, + creation_time=CTime, + mode=Mode, + upi=UPI, + repairing=Repairing, + down=Down, + opaque_flap=Flap, + opaque_inner=Inner, + opaque_dbg=Dbg, + opaque_dbg2=Dbg2, + members_dict=MembersDict}) -> + #projection_v1{epoch_number=Epoch, + epoch_csum=CSum, + author_server=to_atom(Author), + all_members=[to_atom(X) || X <- AllMembers], + creation_time=conv_to_now(CTime), + mode=conv_to_mode(Mode), + upi=[to_atom(X) || X <- UPI], + repairing=[to_atom(X) || X <- Repairing], + down=[to_atom(X) || X <- Down], + flap=dec_optional_sexp(Flap), + inner=dec_optional_sexp(Inner), + dbg=dec_sexp(Dbg), + dbg2=dec_sexp(Dbg2), + members_dict=conv_to_members_dict(MembersDict)}. %%%%%%%%%%%%%%%%%%% enc_sexp(T) -> lists:flatten(io_lib:format("~w.", [T])). -dec_sexp(String) -> +dec_sexp(Bin) when is_binary(Bin) -> + dec_sexp(binary_to_list(Bin)); +dec_sexp(String) when is_list(String) -> {ok,Tks,_} = erl_scan:string(String), {ok,E} = erl_parse:parse_exprs(Tks), {value,Funs,_} = erl_eval:exprs(E,[]), Funs. -enc_members_dict(D) -> +enc_optional_sexp(undefined) -> + undefined; +enc_optional_sexp(T) -> + enc_sexp(T). + +dec_optional_sexp(undefined) -> + undefined; +dec_optional_sexp(T) -> + dec_sexp(T). + +conv_from_members_dict(D) -> %% Use list_to_binary() here to "flatten" the serialized #p_srvr{} - [{K, list_to_binary(enc_p_srvr(V))} || {K, V} <- orddict:to_list(D)]. + [#mpb_membersdictentry{key=to_list(K), val=conv_from_p_srvr(V)} || + {K, V} <- orddict:to_list(D)]. -dec_members_dict(List) -> - orddict:from_list([{K, dec_p_srvr(V)} || {K, V} <- List]). - -to_binary(X) when is_list(X) -> - list_to_binary(X); -to_binary(X) when is_integer(X) -> - list_to_binary(integer_to_list(X)); -to_binary(X) when is_atom(X) -> - erlang:atom_to_binary(X, latin1); -to_binary(X) when is_binary(X) -> - X. +conv_to_members_dict(List) -> + orddict:from_list([{to_atom(K), conv_to_p_srvr(V)} || + #mpb_membersdictentry{key=K, val=V} <- List]). to_list(X) when is_atom(X) -> atom_to_list(X); @@ -118,8 +179,16 @@ to_integer(X) when is_binary(X) -> to_integer(X) when is_integer(X) -> X. -todo_enc_opaque(X) -> - erlang:term_to_binary(X). +conv_from_now({A,B,C}) -> + #mpb_now{sec=(1000000 * A) + B, + usec=C}. + +conv_to_now(#mpb_now{sec=Sec, usec=USec}) -> + {Sec div 1000000, Sec rem 1000000, USec}. + +conv_from_mode(ap_mode) -> 'AP_MODE'; +conv_from_mode(cp_mode) -> 'CP_MODE'. + +conv_to_mode('AP_MODE') -> ap_mode; +conv_to_mode('CP_MODE') -> cp_mode. -todo_dec_opaque(X) -> - erlang:binary_to_term(X). From 31c5bcc0c749b4cf63a5c77d1e6107cdcb982689 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Wed, 24 Jun 2015 17:20:18 +0900 Subject: [PATCH 04/12] WIP: 1/2 of low-level projection proto finished, machi_flu1_test fails --- src/machi.proto | 33 +++++++++-------- src/machi_flu1_client.erl | 43 +++++++++++++---------- src/machi_pb_high_client.erl | 3 ++ src/machi_pb_wrap.erl | 68 +++++++++++++++++++++++++++++++++++- 4 files changed, 114 insertions(+), 33 deletions(-) diff --git a/src/machi.proto b/src/machi.proto index db0fbb1..92b89b8 100644 --- a/src/machi.proto +++ b/src/machi.proto @@ -78,7 +78,7 @@ message Mpb_ChunkCSum { // epoch_id() type message Mpb_EpochID { - required uint32 epoch_num = 1; + required uint32 epoch_number = 1; required bytes epoch_csum = 2; } @@ -321,7 +321,7 @@ message Mpb_LL_GetLatestEpochIDReq { } message Mpb_LL_GetLatestEpochIDResp { - required uint32 Mpb_GeneralStatusCode = 1; + required Mpb_GeneralStatusCode status = 1; optional Mpb_EpochID epoch_id = 2; } @@ -332,7 +332,7 @@ message Mpb_LL_ReadLatestProjectionReq { } message Mpb_LL_ReadLatestProjectionResp { - required uint32 Mpb_GeneralStatusCode = 1; + required Mpb_GeneralStatusCode status = 1; optional Mpb_ProjectionV1 proj = 1; } @@ -344,7 +344,7 @@ message Mpb_LL_ReadProjectionReq { } message Mpb_LL_ReadProjectionResp { - required uint32 Mpb_GeneralStatusCode = 1; + required Mpb_GeneralStatusCode status = 1; optional Mpb_ProjectionV1 proj = 2; } @@ -356,7 +356,7 @@ message Mpb_LL_WriteProjectionReq { } message Mpb_LL_WriteProjectionResp { - required uint32 Mpb_GeneralStatusCode = 1; + required Mpb_GeneralStatusCode status = 1; } // Low level API: get_all_projections() request & response @@ -366,7 +366,7 @@ message Mpb_LL_GetAllProjectionsReq { } message Mpb_LL_GetAllProjectionsResp { - required uint32 Mpb_GeneralStatusCode = 1; + required Mpb_GeneralStatusCode status = 1; repeated Mpb_ProjectionV1 proj = 2; } @@ -377,7 +377,7 @@ message Mpb_LL_ListAllProjectionsReq { } message Mpb_LL_ListAllProjectionsResp { - required uint32 Mpb_GeneralStatusCode = 1; + required Mpb_GeneralStatusCode status = 1; repeated uint32 epochs = 2; } @@ -400,12 +400,12 @@ message Mpb_LL_Request { // that not all languages might care about? (Erlang doesn't) optional Mpb_EchoReq echo = 10; // Re-use from high level API optional Mpb_AuthReq auth = 11; // Re-use from high level API - optional Mpb_LL_GetLatestEpochIDReq = 12; - optional Mpb_LL_ReadLatestProjectionReq = 13; - optional Mpb_LL_ReadProjectionReq = 14; - optional Mpb_LL_WriteProjectionReq = 15; - optional Mpb_LL_GetAllProjectionsReq = 16; - optional Mpb_LL_ListAllProjectionsReq = 17; + optional Mpb_LL_GetLatestEpochIDReq proj_gl = 12; + optional Mpb_LL_ReadLatestProjectionReq proj_rl = 13; + optional Mpb_LL_ReadProjectionReq proj_rp = 14; + optional Mpb_LL_WriteProjectionReq proj_wp = 15; + optional Mpb_LL_GetAllProjectionsReq proj_ga = 16; + optional Mpb_LL_ListAllProjectionsReq proj_la = 17; } message Mpb_LL_Response { @@ -424,5 +424,10 @@ message Mpb_LL_Response { // Specific responses. optional Mpb_EchoResp echo = 10; // Re-use from high level API optional Mpb_AuthResp auth = 11; // Re-use from high level API - optional Mpb_LL_GetLatestEpochIDResp = 12; + optional Mpb_LL_GetLatestEpochIDResp proj_gl = 12; + optional Mpb_LL_ReadLatestProjectionResp proj_rl = 13; + optional Mpb_LL_ReadProjectionResp proj_rp = 14; + optional Mpb_LL_WriteProjectionResp proj_wp = 15; + optional Mpb_LL_GetAllProjectionsResp proj_ga = 16; + optional Mpb_LL_ListAllProjectionsResp proj_la = 17; } diff --git a/src/machi_flu1_client.erl b/src/machi_flu1_client.erl index 4a7996b..df59d7d 100644 --- a/src/machi_flu1_client.erl +++ b/src/machi_flu1_client.erl @@ -802,38 +802,44 @@ trunc_hack2(Sock, EpochID, File) -> end. get_latest_epochid2(Sock, ProjType) -> - ProjCmd = {get_latest_epochid, ProjType}, - do_projection_common(Sock, ProjCmd). + Req = machi_pb_wrap:make_projection_req( + <<42>>, {get_latest_epochid, ProjType}), + do_projection_common(Sock, Req). read_latest_projection2(Sock, ProjType) -> - ProjCmd = {read_latest_projection, ProjType}, - do_projection_common(Sock, ProjCmd). + Req = machi_pb_wrap:make_projection_req( + <<42>>, {read_latest_projection, ProjType}), + do_projection_common(Sock, Req). read_projection2(Sock, ProjType, Epoch) -> - ProjCmd = {read_projection, ProjType, Epoch}, - do_projection_common(Sock, ProjCmd). + Req = machi_pb_wrap:make_projection_req( + <<42>>, {read_projection, ProjType, Epoch}), + do_projection_common(Sock, Req). write_projection2(Sock, ProjType, Proj) -> - ProjCmd = {write_projection, ProjType, Proj}, - do_projection_common(Sock, ProjCmd). + Req = machi_pb_wrap:make_projection_req( + <<42>>, {write_projection, ProjType, Proj}), + do_projection_common(Sock, Req). get_all_projections2(Sock, ProjType) -> - ProjCmd = {get_all_projections, ProjType}, - do_projection_common(Sock, ProjCmd). + Req = machi_pb_wrap:make_projection_req( + <<42>>, {get_all_projections, ProjType}), + do_projection_common(Sock, Req). list_all_projections2(Sock, ProjType) -> - ProjCmd = {list_all_projections, ProjType}, - do_projection_common(Sock, ProjCmd). + Req = machi_pb_wrap:make_projection_req( + <<42>>, {list_all_projections, ProjType}), + do_projection_common(Sock, Req). -do_projection_common(Sock, ProjCmd) -> +do_projection_common(Sock, Req) -> erase(bad_sock), try - ProjCmdBin = term_to_binary(ProjCmd), - Len = iolist_size(ProjCmdBin), + ReqBin = machi_pb:encode_mpb_ll_request(Req), + Len = iolist_size(ReqBin), true = (Len =< ?MAX_CHUNK_SIZE), LenHex = machi_util:int_to_hexbin(Len, 32), Cmd = [<<"PROJ ">>, LenHex, <<"\n">>], - ok = w_send(Sock, [Cmd, ProjCmdBin]), + ok = w_send(Sock, [Cmd, ReqBin]), ok = w_setopts(Sock, [{packet, line}]), case w_recv(Sock, 0) of {ok, Line} -> @@ -841,9 +847,10 @@ do_projection_common(Sock, ProjCmd) -> <<"OK ", ResLenHex:8/binary, "\n">> -> ResLen = machi_util:hexstr_to_int(ResLenHex), ok = w_setopts(Sock, [{packet, raw}]), - {ok, ResBin} = w_recv(Sock, ResLen), + {ok, RespBin} = w_recv(Sock, ResLen), ok = w_setopts(Sock, [{packet, line}]), - binary_to_term(ResBin); + Resp = machi_pb:decode_mbp_ll_response(RespBin), + machi_pb_wrap:unmake_projection_resp(Resp); Else -> {error, Else} end diff --git a/src/machi_pb_high_client.erl b/src/machi_pb_high_client.erl index adf380d..14fa9f5 100644 --- a/src/machi_pb_high_client.erl +++ b/src/machi_pb_high_client.erl @@ -44,6 +44,7 @@ checksum_list/2, checksum_list/3, list_files/1, list_files/2 ]). +-export([convert_general_status_code/1]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -354,6 +355,8 @@ convert_append_chunk_resp(#mpb_appendchunkresp{status='OK', chunk_pos=CP}) -> convert_append_chunk_resp(#mpb_appendchunkresp{status=Status}) -> convert_general_status_code(Status). +convert_general_status_code('OK') -> + ok; convert_general_status_code('BAD_ARG') -> {error, bad_arg}; convert_general_status_code('WEDGED') -> diff --git a/src/machi_pb_wrap.erl b/src/machi_pb_wrap.erl index c2e21bc..94c4117 100644 --- a/src/machi_pb_wrap.erl +++ b/src/machi_pb_wrap.erl @@ -19,13 +19,23 @@ %% under the License. %% %% ------------------------------------------------------------------- + +%% @doc Wrappers for Protocol Buffers encoding, including hacks to fix +%% impedance mismatches between Erlang terms and PB encodings. +%% +%% TODO: Any use of enc_sexp() and dec_sexp() should be eliminated, +%% except for the possibility of items where we are 100% sure +%% that a non-Erlang software component can get away with always +%% treating that item as an opaque thing. + -module(machi_pb_wrap). -include("machi_pb.hrl"). -include("machi_projection.hrl"). -export([enc_p_srvr/1, dec_p_srvr/1, - enc_projection_v1/1, dec_projection_v1/1]). + enc_projection_v1/1, dec_projection_v1/1, + make_projection_req/2, unmake_projection_resp/1]). -ifdef(TEST). -compile(export_all). -endif. % TEST @@ -124,6 +134,57 @@ conv_to_projection_v1(#mpb_projectionv1{epoch_number=Epoch, dbg2=dec_sexp(Dbg2), members_dict=conv_to_members_dict(MembersDict)}. +make_projection_req(ID, {get_latest_epochid, ProjType}) -> + #mpb_ll_request{req_id=ID, + proj_gl=#mpb_ll_getlatestepochidreq{type=conv_from_type(ProjType)}}; +make_projection_req(ID, {read_latest_projection, ProjType}) -> + #mpb_ll_request{req_id=ID, + proj_rl=#mpb_ll_readlatestprojectionreq{type=conv_from_type(ProjType)}}; +make_projection_req(ID, {read_projection, ProjType, Epoch}) -> + #mpb_ll_request{req_id=ID, + proj_rp=#mpb_ll_readprojectionreq{type=conv_from_type(ProjType), + epoch_number=Epoch}}; +make_projection_req(ID, {write_projection, ProjType, Proj}) -> + ProjM = conv_from_projection_v1(Proj), + #mpb_ll_request{req_id=ID, + proj_wp=#mpb_ll_writeprojectionreq{type=conv_from_type(ProjType), + proj=ProjM}}; +make_projection_req(ID, {get_all_projections, ProjType}) -> + #mpb_ll_request{req_id=ID, + proj_ga=#mpb_ll_getallprojectionsreq{type=conv_from_type(ProjType)}}; +make_projection_req(ID, {list_all_projections, ProjType}) -> + #mpb_ll_request{req_id=ID, + proj_la=#mpb_ll_listallprojectionsreq{type=conv_from_type(ProjType)}}. + +unmake_projection_resp(#mpb_ll_response{proj_gl=#mpb_ll_getlatestepochidresp{ + status=Status, epoch_id=EID}}) -> + case Status of + 'OK' -> + #mpb_epochid{epoch_number=Epoch, epoch_csum=CSum} = EID, + {ok, {Epoch, CSum}}; + _ -> + machi_pb_high_client:convert_general_status_code(Status) + end; +unmake_projection_resp(#mpb_ll_response{proj_rl=#mpb_ll_readlatestprojectionresp{ + status=Status, proj=P}}) -> + case Status of + 'OK' -> + {ok, conv_to_projection_v1(P)}; + _ -> + machi_pb_high_client:convert_general_status_code(Status) + end; +unmake_projection_resp(#mpb_ll_response{proj_rp=#mpb_ll_readprojectionresp{ + status=Status, proj=P}}) -> + case Status of + 'OK' -> + {ok, conv_to_projection_v1(P)}; + _ -> + machi_pb_high_client:convert_general_status_code(Status) + end; +unmake_projection_resp(#mpb_ll_response{proj_wp=#mpb_ll_writeprojectionresp{ + status=Status}}) -> + machi_pb_high_client:convert_general_status_code(Status). + %%%%%%%%%%%%%%%%%%% enc_sexp(T) -> @@ -192,3 +253,8 @@ conv_from_mode(cp_mode) -> 'CP_MODE'. conv_to_mode('AP_MODE') -> ap_mode; conv_to_mode('CP_MODE') -> cp_mode. +conv_from_type(private) -> 'PRIVATE'; +conv_from_type(public) -> 'PUBLIC'. + +conv_to_type('PRIVATE') -> private; +conv_to_type('PUBLIC') -> public. From d9407b76b744a5d68540db065683292c0aadb597 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Wed, 24 Jun 2015 18:00:25 +0900 Subject: [PATCH 05/12] WIP: dinnertime, machi_flu1_test still broken --- src/machi.proto | 3 +- src/machi_flu1.erl | 9 ++- src/machi_flu1_client.erl | 2 +- src/machi_pb_high_client.erl | 2 + src/machi_pb_wrap.erl | 114 ++++++++++++++++++++++++++++++++++- 5 files changed, 123 insertions(+), 7 deletions(-) diff --git a/src/machi.proto b/src/machi.proto index 92b89b8..7ae4c96 100644 --- a/src/machi.proto +++ b/src/machi.proto @@ -41,6 +41,7 @@ enum Mpb_GeneralStatusCode { WEDGED = 2; BAD_CHECKSUM = 3; PARTITION = 4; + NOT_WRITTEN = 5; BAD_JOSS = 255; // Only for testing by the Taipan } @@ -367,7 +368,7 @@ message Mpb_LL_GetAllProjectionsReq { message Mpb_LL_GetAllProjectionsResp { required Mpb_GeneralStatusCode status = 1; - repeated Mpb_ProjectionV1 proj = 2; + repeated Mpb_ProjectionV1 projs = 2; } // Low level API: list_all_projections() request & response diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index 9dfe6a6..5f957c5 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -864,11 +864,14 @@ do_projection_command(Sock, LenHex, S) -> ok = inet:setopts(Sock, [{packet, raw}]), {ok, ProjCmdBin} = gen_tcp:recv(Sock, Len), ok = inet:setopts(Sock, [{packet, line}]), - ProjCmd = binary_to_term(ProjCmdBin), + ProjCmdM = machi_pb:decode_mpb_ll_request(ProjCmdBin), + {ID, ProjCmd} = machi_pb_wrap:unmake_projection_req(ProjCmdM), + ProjOp = element(1, ProjCmd), put(hack, ProjCmd), Res = handle_projection_command(ProjCmd, S), - ResBin = term_to_binary(Res), - ResLenHex = machi_util:int_to_hexbin(byte_size(ResBin), 32), + ResM = machi_pb_wrap:make_projection_resp(ID, ProjOp, Res), + ResBin = machi_pb:encode_mpb_ll_response(ResM), + ResLenHex = machi_util:int_to_hexbin(iolist_size(ResBin), 32), ok = gen_tcp:send(Sock, [<<"OK ">>, ResLenHex, <<"\n">>, ResBin]) catch What:Why -> diff --git a/src/machi_flu1_client.erl b/src/machi_flu1_client.erl index df59d7d..a6e19a5 100644 --- a/src/machi_flu1_client.erl +++ b/src/machi_flu1_client.erl @@ -849,7 +849,7 @@ do_projection_common(Sock, Req) -> ok = w_setopts(Sock, [{packet, raw}]), {ok, RespBin} = w_recv(Sock, ResLen), ok = w_setopts(Sock, [{packet, line}]), - Resp = machi_pb:decode_mbp_ll_response(RespBin), + Resp = machi_pb:decode_mpb_ll_response(RespBin), machi_pb_wrap:unmake_projection_resp(Resp); Else -> {error, Else} diff --git a/src/machi_pb_high_client.erl b/src/machi_pb_high_client.erl index 14fa9f5..bd42741 100644 --- a/src/machi_pb_high_client.erl +++ b/src/machi_pb_high_client.erl @@ -365,6 +365,8 @@ convert_general_status_code('BAD_CHECKSUM') -> {error, bad_checksum}; convert_general_status_code('PARTITION') -> {error, partition}; +convert_general_status_code('NOT_WRITTEN') -> + {error, not_written}; convert_general_status_code('BAD_JOSS') -> throw({error, bad_joss_taipan_fixme}). diff --git a/src/machi_pb_wrap.erl b/src/machi_pb_wrap.erl index 94c4117..5d97186 100644 --- a/src/machi_pb_wrap.erl +++ b/src/machi_pb_wrap.erl @@ -35,7 +35,8 @@ -export([enc_p_srvr/1, dec_p_srvr/1, enc_projection_v1/1, dec_projection_v1/1, - make_projection_req/2, unmake_projection_resp/1]). + make_projection_req/2, unmake_projection_req/1, + make_projection_resp/3, unmake_projection_resp/1]). -ifdef(TEST). -compile(export_all). -endif. % TEST @@ -156,6 +157,83 @@ make_projection_req(ID, {list_all_projections, ProjType}) -> #mpb_ll_request{req_id=ID, proj_la=#mpb_ll_listallprojectionsreq{type=conv_from_type(ProjType)}}. +unmake_projection_req( + #mpb_ll_request{req_id=ID, + proj_gl=#mpb_ll_getlatestepochidreq{type=ProjType}}) -> + {ID, {get_latest_epochid, conv_to_type(ProjType)}}; +unmake_projection_req( + #mpb_ll_request{req_id=ID, + proj_rl=#mpb_ll_readlatestprojectionreq{type=ProjType}}) -> + {ID, {read_latest_projection, conv_to_type(ProjType)}}; +unmake_projection_req( + #mpb_ll_request{req_id=ID, + proj_rp=#mpb_ll_readprojectionreq{type=ProjType, + epoch_number=Epoch}}) -> + {ID, {read_projection, conv_to_type(ProjType), Epoch}}; +unmake_projection_req( + #mpb_ll_request{req_id=ID, + proj_wp=#mpb_ll_writeprojectionreq{type=ProjType, + proj=ProjM}}) -> + Proj = conv_to_projection_v1(ProjM), + {ID, {write_projection, conv_to_type(ProjType), Proj}}; +unmake_projection_req( + #mpb_ll_request{req_id=ID, + proj_ga=#mpb_ll_getallprojectionsreq{type=ProjType}}) -> + {ID, {get_all_projections, conv_to_type(ProjType)}}; +unmake_projection_req( + #mpb_ll_request{req_id=ID, + proj_la=#mpb_ll_listallprojectionsreq{type=ProjType}}) -> + {ID, {list_all_projections, conv_to_type(ProjType)}}. + +make_projection_resp(ID, get_latest_epochid, {ok, {Epoch, CSum}}) -> + EID = #mpb_epochid{epoch_number=Epoch, epoch_csum=CSum}, + #mpb_ll_response{req_id=ID, + proj_gl=#mpb_ll_getlatestepochidresp{ + status='OK', epoch_id=EID}}; +make_projection_resp(ID, get_latest_epochid, Status) -> + #mpb_ll_response{req_id=ID, + proj_gl=#mpb_ll_getlatestepochidresp{ + status=conv_from_status(Status)}}; +make_projection_resp(ID, read_latest_projection, {ok, Proj}) -> + ProjM = conv_from_projection_v1(Proj), + #mpb_ll_response{req_id=ID, + proj_rl=#mpb_ll_readlatestprojectionresp{ + status='OK', proj=ProjM}}; +make_projection_resp(ID, read_latest_projection, Status) -> + #mpb_ll_response{req_id=ID, + proj_rl=#mpb_ll_readlatestprojectionresp{ + status=conv_from_status(Status)}}; +make_projection_resp(ID, read_projection, {ok, Proj}) -> + ProjM = conv_from_projection_v1(Proj), + #mpb_ll_response{req_id=ID, + proj_rp=#mpb_ll_readprojectionresp{ + status='OK', proj=ProjM}}; +make_projection_resp(ID, read_projection, Status) -> + #mpb_ll_response{req_id=ID, + proj_rp=#mpb_ll_readprojectionresp{ + status=conv_from_status(Status)}}; +make_projection_resp(ID, write_projection, Status) -> + #mpb_ll_response{req_id=ID, + proj_wp=#mpb_ll_writeprojectionresp{ + status=conv_from_status(Status)}}; +make_projection_resp(ID, get_all_projections, {ok, Projs}) -> + ProjsM = [conv_from_projection_v1(Proj) || Proj <- Projs], + #mpb_ll_response{req_id=ID, + proj_ga=#mpb_ll_getallprojectionsresp{ + status='OK', projs=ProjsM}}; +make_projection_resp(ID, get_all_projections, Status) -> + #mpb_ll_response{req_id=ID, + proj_ga=#mpb_ll_getallprojectionsresp{ + status=conv_from_status(Status)}}; +make_projection_resp(ID, list_all_projections, {ok, Epochs}) -> + #mpb_ll_response{req_id=ID, + proj_la=#mpb_ll_listallprojectionsresp{ + status='OK', epochs=Epochs}}; +make_projection_resp(ID, list_all_projections, Status) -> + #mpb_ll_response{req_id=ID, + proj_la=#mpb_ll_listallprojectionsresp{ + status=conv_from_status(Status)}}. + unmake_projection_resp(#mpb_ll_response{proj_gl=#mpb_ll_getlatestepochidresp{ status=Status, epoch_id=EID}}) -> case Status of @@ -183,7 +261,23 @@ unmake_projection_resp(#mpb_ll_response{proj_rp=#mpb_ll_readprojectionresp{ end; unmake_projection_resp(#mpb_ll_response{proj_wp=#mpb_ll_writeprojectionresp{ status=Status}}) -> - machi_pb_high_client:convert_general_status_code(Status). + machi_pb_high_client:convert_general_status_code(Status); +unmake_projection_resp(#mpb_ll_response{proj_ga=#mpb_ll_getallprojectionsresp{ + status=Status, projs=ProjsM}}) -> + case Status of + 'OK' -> + {ok, [conv_to_projection_v1(ProjM) || ProjM <- ProjsM]}; + _ -> + machi_pb_high_client:convert_general_status_code(Status) + end; +unmake_projection_resp(#mpb_ll_response{proj_la=#mpb_ll_listallprojectionsresp{ + status=Status, epochs=Epochs}}) -> + case Status of + 'OK' -> + {ok, Epochs}; + _ -> + machi_pb_high_client:convert_general_status_code(Status) + end. %%%%%%%%%%%%%%%%%%% @@ -258,3 +352,19 @@ conv_from_type(public) -> 'PUBLIC'. conv_to_type('PRIVATE') -> private; conv_to_type('PUBLIC') -> public. + +conv_from_status(ok) -> + 'OK'; +conv_from_status({error, bad_arg}) -> + 'BAD_ARG'; +conv_from_status({error, wedged}) -> + 'WEDGED'; +conv_from_status({error, bad_checksum}) -> + 'BAD_CHECKSUM'; +conv_from_status({error, partition}) -> + 'PARTITION'; +conv_from_status({error, not_written}) -> + 'NOT_WRITTEN'; +conv_from_status(_OOPS) -> + io:format(user, "HEY, ~s:~w got ~w\n", [?MODULE, ?LINE, _OOPS]), + 'BAD_JOSS'. From 4fc0578a9d02993d5f17c7e6a2628ead2589bf23 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Thu, 25 Jun 2015 15:08:40 +0900 Subject: [PATCH 06/12] WIP: bugfixes, machi_flu1_test still broken --- src/machi.proto | 2 +- src/machi_flu1_client.erl | 3 ++- src/machi_pb_wrap.erl | 2 +- test/machi_flu1_test.erl | 2 +- 4 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/machi.proto b/src/machi.proto index 7ae4c96..dbf5237 100644 --- a/src/machi.proto +++ b/src/machi.proto @@ -353,7 +353,7 @@ message Mpb_LL_ReadProjectionResp { message Mpb_LL_WriteProjectionReq { required Mpb_ProjType type = 1; - required Mpb_ProjectionV1 proj = 1; + required Mpb_ProjectionV1 proj = 2; } message Mpb_LL_WriteProjectionResp { diff --git a/src/machi_flu1_client.erl b/src/machi_flu1_client.erl index a6e19a5..8f90381 100644 --- a/src/machi_flu1_client.erl +++ b/src/machi_flu1_client.erl @@ -834,7 +834,8 @@ list_all_projections2(Sock, ProjType) -> do_projection_common(Sock, Req) -> erase(bad_sock), try - ReqBin = machi_pb:encode_mpb_ll_request(Req), + ReqBin = list_to_binary(machi_pb:encode_mpb_ll_request(Req)), +io:format(user, "\nTrying: ~p\n", [machi_pb:decode_mpb_ll_request(ReqBin)]), Len = iolist_size(ReqBin), true = (Len =< ?MAX_CHUNK_SIZE), LenHex = machi_util:int_to_hexbin(Len, 32), diff --git a/src/machi_pb_wrap.erl b/src/machi_pb_wrap.erl index 5d97186..be089ab 100644 --- a/src/machi_pb_wrap.erl +++ b/src/machi_pb_wrap.erl @@ -149,7 +149,7 @@ make_projection_req(ID, {write_projection, ProjType, Proj}) -> ProjM = conv_from_projection_v1(Proj), #mpb_ll_request{req_id=ID, proj_wp=#mpb_ll_writeprojectionreq{type=conv_from_type(ProjType), - proj=ProjM}}; + proj=ProjM}}; make_projection_req(ID, {get_all_projections, ProjType}) -> #mpb_ll_request{req_id=ID, proj_ga=#mpb_ll_getallprojectionsreq{type=conv_from_type(ProjType)}}; diff --git a/test/machi_flu1_test.erl b/test/machi_flu1_test.erl index 7a0045d..f14c9b6 100644 --- a/test/machi_flu1_test.erl +++ b/test/machi_flu1_test.erl @@ -173,7 +173,7 @@ flu_projection_smoke_test() -> {ok, []} = ?FLU_C:list_all_projections(Host, TcpPort, T), {ok, []} = ?FLU_C:get_all_projections(Host, TcpPort, T), - P_a = #p_srvr{name=a}, + P_a = #p_srvr{name=a, port=4321}, P1 = machi_projection:new(1, a, [P_a], [], [a], [], []), ok = ?FLU_C:write_projection(Host, TcpPort, T, P1), {error, written} = ?FLU_C:write_projection(Host, TcpPort, T, P1), From 841235b3b5ff828d1a282a0abac2d97eb2d52f0c Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Thu, 25 Jun 2015 15:10:24 +0900 Subject: [PATCH 07/12] WIP: bugfixes, add {error, written} --- src/machi.proto | 1 + src/machi_pb_high_client.erl | 2 ++ src/machi_pb_wrap.erl | 2 ++ 3 files changed, 5 insertions(+) diff --git a/src/machi.proto b/src/machi.proto index dbf5237..c3f40f8 100644 --- a/src/machi.proto +++ b/src/machi.proto @@ -42,6 +42,7 @@ enum Mpb_GeneralStatusCode { BAD_CHECKSUM = 3; PARTITION = 4; NOT_WRITTEN = 5; + WRITTEN = 6; BAD_JOSS = 255; // Only for testing by the Taipan } diff --git a/src/machi_pb_high_client.erl b/src/machi_pb_high_client.erl index bd42741..abec13c 100644 --- a/src/machi_pb_high_client.erl +++ b/src/machi_pb_high_client.erl @@ -367,6 +367,8 @@ convert_general_status_code('PARTITION') -> {error, partition}; convert_general_status_code('NOT_WRITTEN') -> {error, not_written}; +convert_general_status_code('WRITTEN') -> + {error, written}; convert_general_status_code('BAD_JOSS') -> throw({error, bad_joss_taipan_fixme}). diff --git a/src/machi_pb_wrap.erl b/src/machi_pb_wrap.erl index be089ab..9daa54c 100644 --- a/src/machi_pb_wrap.erl +++ b/src/machi_pb_wrap.erl @@ -365,6 +365,8 @@ conv_from_status({error, partition}) -> 'PARTITION'; conv_from_status({error, not_written}) -> 'NOT_WRITTEN'; +conv_from_status({error, written}) -> + 'WRITTEN'; conv_from_status(_OOPS) -> io:format(user, "HEY, ~s:~w got ~w\n", [?MODULE, ?LINE, _OOPS]), 'BAD_JOSS'. From 5d8b648a24593e410ddbe932bca9b425ece144f6 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Thu, 25 Jun 2015 15:26:35 +0900 Subject: [PATCH 08/12] All projection store protocol operations are now using Protocol Buffers! So, there's some cheating going on, because some of the parts of the #projection_v1{} and #p_srvr{} records aren't fully specified. Those parts are being specified as "opaque" in the field names, e.g. optional bytes opaque_flap = 10; optional bytes opaque_inner = 11; required bytes opaque_dbg = 12; required bytes opaque_dbg2 = 13; The serialization that's being used is erlang term sexprs. That isn't portable. So if/when we really need to deal with a non-Erlang language, we'll have to straighten this out further. --- src/machi.proto | 2 +- src/machi_flu1_client.erl | 1 - test/machi_flu1_test.erl | 2 +- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/machi.proto b/src/machi.proto index c3f40f8..2afc2c5 100644 --- a/src/machi.proto +++ b/src/machi.proto @@ -335,7 +335,7 @@ message Mpb_LL_ReadLatestProjectionReq { message Mpb_LL_ReadLatestProjectionResp { required Mpb_GeneralStatusCode status = 1; - optional Mpb_ProjectionV1 proj = 1; + optional Mpb_ProjectionV1 proj = 2; } // Low level API: read_projection() request & response diff --git a/src/machi_flu1_client.erl b/src/machi_flu1_client.erl index 8f90381..f5a39d3 100644 --- a/src/machi_flu1_client.erl +++ b/src/machi_flu1_client.erl @@ -835,7 +835,6 @@ do_projection_common(Sock, Req) -> erase(bad_sock), try ReqBin = list_to_binary(machi_pb:encode_mpb_ll_request(Req)), -io:format(user, "\nTrying: ~p\n", [machi_pb:decode_mpb_ll_request(ReqBin)]), Len = iolist_size(ReqBin), true = (Len =< ?MAX_CHUNK_SIZE), LenHex = machi_util:int_to_hexbin(Len, 32), diff --git a/test/machi_flu1_test.erl b/test/machi_flu1_test.erl index f14c9b6..fa9d05c 100644 --- a/test/machi_flu1_test.erl +++ b/test/machi_flu1_test.erl @@ -173,7 +173,7 @@ flu_projection_smoke_test() -> {ok, []} = ?FLU_C:list_all_projections(Host, TcpPort, T), {ok, []} = ?FLU_C:get_all_projections(Host, TcpPort, T), - P_a = #p_srvr{name=a, port=4321}, + P_a = #p_srvr{name=a, address="localhost", port=4321}, P1 = machi_projection:new(1, a, [P_a], [], [a], [], []), ok = ?FLU_C:write_projection(Host, TcpPort, T, P1), {error, written} = ?FLU_C:write_projection(Host, TcpPort, T, P1), From 2763b16ca2b15ac16b0eee3e06bc2e5f3c4ec4aa Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Thu, 25 Jun 2015 16:03:00 +0900 Subject: [PATCH 09/12] timing_pb_encoding_test_... speed factor=35.95 [2.730 s] ok So, the PB style encoding of the Mpb_LL_WriteProjectionReq message is about 35-36 times slower than using Erlang's term_to_binary() and binary_to_term(). {sigh} --- src/machi.proto | 2 +- src/machi_pb_wrap.erl | 4 ++-- test/machi_flu1_test.erl | 37 +++++++++++++++++++++++++++++++++++++ 3 files changed, 40 insertions(+), 3 deletions(-) diff --git a/src/machi.proto b/src/machi.proto index 2afc2c5..20aeefb 100644 --- a/src/machi.proto +++ b/src/machi.proto @@ -96,7 +96,7 @@ message Mpb_P_Srvr { required string proto_mod = 2; required string address = 3; required string port = 4; - required bytes props = 5; + required bytes opaque_props = 5; } ////////////////////////////////////////// diff --git a/src/machi_pb_wrap.erl b/src/machi_pb_wrap.erl index 9daa54c..ec84de3 100644 --- a/src/machi_pb_wrap.erl +++ b/src/machi_pb_wrap.erl @@ -56,13 +56,13 @@ conv_from_p_srvr(#p_srvr{name=Name, proto_mod=to_list(ProtoMod), address=to_list(Address), port=to_list(Port), - props=enc_sexp(Props)}. + opaque_props=enc_sexp(Props)}. conv_to_p_srvr(#mpb_p_srvr{name=Name, proto_mod=ProtoMod, address=Address, port=Port, - props=Props}) -> + opaque_props=Props}) -> #p_srvr{name=to_atom(Name), proto_mod=to_atom(ProtoMod), address=to_list(Address), diff --git a/test/machi_flu1_test.erl b/test/machi_flu1_test.erl index fa9d05c..c9c24bb 100644 --- a/test/machi_flu1_test.erl +++ b/test/machi_flu1_test.erl @@ -210,5 +210,42 @@ bad_checksum_test() -> ok = ?FLU:stop(FLU1) end. +%% The purpose of timing_pb_encoding_test_ and timing_bif_encoding_test_ is +%% to show the relative speed of the PB encoding of something like a +%% projection store command is about 35x slower than simply using the Erlang +%% BIFs term_to_binary() and binary_to_term(). We try to do enough work, at +%% least a couple of seconds, so that any dynamic CPU voltage adjustment +%% might kick into highest speed, in theory. + +timing_pb_encoding_test_() -> + {timeout, 60, fun() -> timing_pb_encoding_test2() end}. + +timing_pb_encoding_test2() -> + P_a = #p_srvr{name=a, address="localhost", port=4321}, + P1 = machi_projection:new(1, a, [P_a], [], [a], [], []), + DoIt1 = fun() -> + Req = machi_pb_wrap:make_projection_req( + <<1,2,3,4>>, {write_projection, public, P1}), + Bin = list_to_binary(machi_pb:encode_mpb_ll_request(Req)), + ZZ = machi_pb:decode_mpb_ll_request(Bin), + _ = machi_pb_wrap:unmake_projection_req(ZZ) + end, + XX = lists:seq(1,30*1000), + erlang:garbage_collect(), + RUN1 = timer:tc(fun() -> begin [_ = DoIt1() || _ <- XX], ok end end), + erlang:garbage_collect(), + + DoIt2 = fun() -> + Req = term_to_binary({ + <<1,2,3,4>>, {write_projection, public, P1}}), + _ = binary_to_term(Req) + end, + erlang:garbage_collect(), + RUN2 = timer:tc(fun() -> begin [_ = DoIt2() || _ <- XX], ok end end), + erlang:garbage_collect(), + Factor = (element(1, RUN1) / element(1, RUN2)), + io:format(user, " speed factor=~.2f ", [Factor]), + ok. + -endif. % !PULSE -endif. % TEST From d9694a992a38279a878f0cee83734f8506cdfff9 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Thu, 25 Jun 2015 16:08:04 +0900 Subject: [PATCH 10/12] Alright, use term_to_binary() for opaque/sexp-style encoding, only 15x slower. machi_flu1_test: timing_pb_encoding_test_... speed factor=15.12 [2.678 s] ok --- src/machi_pb_wrap.erl | 11 +++-------- test/machi_flu1_test.erl | 2 +- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/src/machi_pb_wrap.erl b/src/machi_pb_wrap.erl index ec84de3..8456602 100644 --- a/src/machi_pb_wrap.erl +++ b/src/machi_pb_wrap.erl @@ -67,7 +67,7 @@ conv_to_p_srvr(#mpb_p_srvr{name=Name, proto_mod=to_atom(ProtoMod), address=to_list(Address), port=to_integer(Port), - props=dec_sexp(to_list(Props))}. + props=dec_sexp(Props)}. enc_projection_v1(P) -> %% Awww, flatten it here @@ -282,15 +282,10 @@ unmake_projection_resp(#mpb_ll_response{proj_la=#mpb_ll_listallprojectionsresp{ %%%%%%%%%%%%%%%%%%% enc_sexp(T) -> - lists:flatten(io_lib:format("~w.", [T])). + term_to_binary(T). dec_sexp(Bin) when is_binary(Bin) -> - dec_sexp(binary_to_list(Bin)); -dec_sexp(String) when is_list(String) -> - {ok,Tks,_} = erl_scan:string(String), - {ok,E} = erl_parse:parse_exprs(Tks), - {value,Funs,_} = erl_eval:exprs(E,[]), - Funs. + binary_to_term(Bin). enc_optional_sexp(undefined) -> undefined; diff --git a/test/machi_flu1_test.erl b/test/machi_flu1_test.erl index c9c24bb..3c41e3d 100644 --- a/test/machi_flu1_test.erl +++ b/test/machi_flu1_test.erl @@ -230,7 +230,7 @@ timing_pb_encoding_test2() -> ZZ = machi_pb:decode_mpb_ll_request(Bin), _ = machi_pb_wrap:unmake_projection_req(ZZ) end, - XX = lists:seq(1,30*1000), + XX = lists:seq(1,70*1000), erlang:garbage_collect(), RUN1 = timer:tc(fun() -> begin [_ = DoIt1() || _ <- XX], ok end end), erlang:garbage_collect(), From c2faf9f49989273afc37094d9a8fad9294ef87d6 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Thu, 25 Jun 2015 16:36:14 +0900 Subject: [PATCH 11/12] yolo, un-do experimental type hack --- src/machi_dt.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/machi_dt.erl b/src/machi_dt.erl index 6d94e6d..b5c5980 100644 --- a/src/machi_dt.erl +++ b/src/machi_dt.erl @@ -34,7 +34,7 @@ -type epoch_num() :: -1 | non_neg_integer(). -type epoch_id() :: {epoch_num(), epoch_csum()}. -type file_info() :: {file_size(), file_name_s()}. --type file_name() :: float().%%%binary() | list(). +-type file_name() :: binary() | list(). -type file_name_s() :: binary(). % server reply -type file_offset() :: non_neg_integer(). -type file_size() :: non_neg_integer(). From 0f4d5ed775902499b77e8f6297eaa15dae5abfa5 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Thu, 25 Jun 2015 16:36:29 +0900 Subject: [PATCH 12/12] Silence dialyzer unused function clause --- src/machi_flu1.erl | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index 5f957c5..d9933a7 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -899,9 +899,7 @@ handle_projection_command({get_all_projections, ProjType}, machi_projection_store:get_all_projections(ProjStore, ProjType); handle_projection_command({list_all_projections, ProjType}, #state{proj_store=ProjStore}) -> - machi_projection_store:list_all_projections(ProjStore, ProjType); -handle_projection_command(Else, _S) -> - {error, unknown_cmd, Else}. + machi_projection_store:list_all_projections(ProjStore, ProjType). make_listener_regname(BaseName) -> list_to_atom(atom_to_list(BaseName) ++ "_listener").