From 19d3c95325c105593696dbcc24ec996b0e8fb5df Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Wed, 13 May 2015 18:57:38 +0900 Subject: [PATCH] WIP: aside, damn, add missing hex encoding for epochid, derp --- src/machi_chain_repair.erl | 26 +++++++++--------- src/machi_flu1.erl | 56 +++++++++++++++++++++++--------------- src/machi_flu1_client.erl | 35 ++++++++++++++---------- 3 files changed, 68 insertions(+), 49 deletions(-) diff --git a/src/machi_chain_repair.erl b/src/machi_chain_repair.erl index 0e5fc73..5ae33ce 100644 --- a/src/machi_chain_repair.erl +++ b/src/machi_chain_repair.erl @@ -48,7 +48,7 @@ -export([repair_cp/4, repair_ap/6]). -repair_cp(Src, Dst, MembersDict, Opts) -> +repair_cp(_Src, _Dst, _MembersDict, _Opts) -> %% TODO: add missing function: wipe away any trace of chunks %% are present on Dst but missing on Src. exit(todo_cp_mode). @@ -87,7 +87,7 @@ repair_ap(Src, Repairing, UPI, MembersDict, ETS, Opts) -> [What, Why, Stack]), {error, {What, Why, Stack}} after - [(catch machi_proxy_flu1_client:quit(Pid, ?SHORT_TIMEOUT)) || + [(catch machi_proxy_flu1_client:quit(Pid)) || Pid <- orddict:to_list(get(proxies_dict))] end, Res. @@ -127,8 +127,8 @@ append_file_dict(Proxy, FLU_name, D) -> %% As an additional optimization, add a bit of #2 to start the next %% read while the current write is still in progress. -repair_file(ap_mode, RepairMode, - File, Size, [], Verb, Src, ProxiesDict, ETS) -> +repair_file(ap_mode, _RepairMode, + File, _Size, [], Verb, Src, _ProxiesDict, _ETS) -> ?VERB("~p: ~s: present on both: ", [Src, File]), ?VERB("TODO!\n"), ok; %%TODO: repair_both_present(File, Size, RepairMode, V, SrcS, SrcS2, DstS, DstS2); @@ -165,8 +165,8 @@ copy_file(File, SrcProxy, MissingProxiesDict, Verb, ETS) -> {out_chunks, t_out_chunks}, {out_bytes, t_out_bytes}], [ets:insert(ETS, {L_K, 0}) || {L_K, _T_K} <- EtsKeys], CopyChunks = - fun({Offset, Size, CSum}, {ok, ETS, _, _} = Acc) -> - case ets:lookup_element(ETS, in_chunks, 2) rem 100 of + fun({Offset, Size, CSum}, {ok, ETab, _, _} = Acc) -> + case ets:lookup_element(ETab, in_chunks, 2) rem 100 of 0 -> ?VERB(".", []); _ -> ok end, @@ -184,10 +184,10 @@ copy_file(File, SrcProxy, MissingProxiesDict, Verb, ETS) -> ?SHORT_TIMEOUT), _T4 = os:timestamp() end || {_FLU, DstProxy} <- MissingProxiesDict], - ets:update_counter(ETS, in_chunks, 1), - ets:update_counter(ETS, in_bytes, Size), - ets:update_counter(ETS, out_chunks, N), - ets:update_counter(ETS, out_bytes, N*Size), + ets:update_counter(ETab, in_chunks, 1), + ets:update_counter(ETab, in_bytes, Size), + ets:update_counter(ETab, out_chunks, N), + ets:update_counter(ETab, out_bytes, N*Size), Acc; CSum_now -> error_logger:error_msg( @@ -195,7 +195,7 @@ copy_file(File, SrcProxy, MissingProxiesDict, Verb, ETS) -> "file ~p offset ~p size ~p: " "expected ~p got ~p\n", [File, Offset, Size, CSum, CSum_now]), - ets:update_counter(ETS, t_bad_chunks, 1), + ets:update_counter(ETab, t_bad_chunks, 1), Acc end; (_, _=Acc) -> % failure: skip rest of file @@ -234,8 +234,8 @@ copy_file(File, SrcProxy, MissingProxiesDict, Verb, ETS) -> %% ok %% end. -repair_both_present(File, Size, RepairMode, V, SrcS, _SrcS2, DstS, _DstS2) -> - verb("repair_both_present TODO\n"), +repair_both_present(_File, _Size, _RepairMode, Verb, _SrcS, _SrcS2, _DstS, _DstS2) -> + ?VERB("repair_both_present TODO\n"), ok. %% io:format("repair_both_present: ~p ~p mode ~p\n", [File, Size, RepairMode]). diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index 6f6e8a8..4ce10d3 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -194,7 +194,7 @@ run_listen_server(#state{flu_name=FluName, tcp_port=TcpPort}=S) -> {ok, LSock} = gen_tcp:listen(TcpPort, SockOpts), listen_server_loop(LSock, S). -run_append_server(FluPid, AckPid, #state{flu_name=Name,dbg_props=DbgProps, +run_append_server(FluPid, AckPid, #state{flu_name=Name, wedged=Wedged_p,epoch_id=EpochId}=S) -> %% Reminder: Name is the "main" name of the FLU, i.e., no suffix register(Name, self()), @@ -220,7 +220,6 @@ append_server_loop(FluPid, #state{data_dir=DataDir,wedged=Wedged_p}=S) -> {seq_append, From, Prefix, Chunk, CSum} -> spawn(fun() -> append_server_dispatch(From, Prefix, Chunk, CSum, DataDir, AppendServerPid) end), - %% DataDir, FluPid) end), append_server_loop(FluPid, S); {wedge_state_change, Boolean, EpochId} -> true = ets:insert(S#state.etstab, {epoch, {Boolean, EpochId}}), @@ -235,7 +234,12 @@ append_server_loop(FluPid, #state{data_dir=DataDir,wedged=Wedged_p}=S) -> append_server_loop(FluPid, S) end. --define(EpochIDSpace, (4+20)). +-define(EpochIDSpace, ((4*2)+(20*2))). % hexencodingwhee! + +decode_epoch_id(EpochIDHex) -> + <> = + machi_util:hexstr_to_bin(EpochIDHex), + {EpochNum, EpochCSum}. net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) -> ok = inet:setopts(Sock, [{packet, line}]), @@ -250,21 +254,25 @@ net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) -> case Line of %% For normal use <<"A ", - _EpochIDRaw:(?EpochIDSpace)/binary, + EpochIDHex:(?EpochIDSpace)/binary, LenHex:8/binary, Prefix:PrefixLenLF/binary, "\n">> -> + _EpochID = decode_epoch_id(EpochIDHex), do_net_server_append(FluName, Sock, LenHex, Prefix); <<"R ", - EpochIDRaw:(?EpochIDSpace)/binary, + EpochIDHex:(?EpochIDSpace)/binary, OffsetHex:16/binary, LenHex:8/binary, File:FileLenLF/binary, "\n">> -> + EpochID = decode_epoch_id(EpochIDHex), do_net_server_read(Sock, OffsetHex, LenHex, File, DataDir, - EpochIDRaw, S); - <<"L ", _EpochIDRaw:(?EpochIDSpace)/binary, "\n">> -> + EpochID, S); + <<"L ", EpochIDHex:(?EpochIDSpace)/binary, "\n">> -> + _EpochID = decode_epoch_id(EpochIDHex), do_net_server_listing(Sock, DataDir, S); <<"C ", - _EpochIDRaw:(?EpochIDSpace)/binary, + EpochIDHex:(?EpochIDSpace)/binary, File:CSumFileLenLF/binary, "\n">> -> + _EpochID = decode_epoch_id(EpochIDHex), do_net_server_checksum_listing(Sock, File, DataDir, S); <<"QUIT\n">> -> catch gen_tcp:close(Sock), @@ -274,20 +282,23 @@ net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) -> exit(normal); %% For "internal" replication only. <<"W-repl ", - _EpochIDRaw:(?EpochIDSpace)/binary, + EpochIDHex:(?EpochIDSpace)/binary, OffsetHex:16/binary, LenHex:8/binary, File:WriteFileLenLF/binary, "\n">> -> + _EpochID = decode_epoch_id(EpochIDHex), do_net_server_write(Sock, OffsetHex, LenHex, File, DataDir, <<"fixme1">>, false, <<"fixme2">>); %% For data migration only. <<"DEL-migration ", - _EpochIDRaw:(?EpochIDSpace)/binary, + EpochIDHex:(?EpochIDSpace)/binary, File:DelFileLenLF/binary, "\n">> -> + _EpochID = decode_epoch_id(EpochIDHex), do_net_server_delete_migration_only(Sock, File, DataDir); %% For erasure coding hackityhack <<"TRUNC-hack--- ", - _EpochIDRaw:(?EpochIDSpace)/binary, + EpochIDHex:(?EpochIDSpace)/binary, File:DelFileLenLF/binary, "\n">> -> + _EpochID = decode_epoch_id(EpochIDHex), do_net_server_truncate_hackityhack(Sock, File, DataDir); <<"PROJ ", LenHex:8/binary, "\n">> -> do_projection_command(Sock, LenHex, S); @@ -295,6 +306,7 @@ net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) -> do_wedge_status(FluName, Sock); _ -> machi_util:verb("Else Got: ~p\n", [Line]), + io:format(user, "TODO: Else Got: ~p\n", [Line]), gen_tcp:send(Sock, "ERROR SYNTAX\n"), catch gen_tcp:close(Sock), exit(normal) @@ -370,7 +382,7 @@ do_wedge_status(FluName, Sock) -> ok = gen_tcp:send(Sock, Reply). do_net_server_read(Sock, OffsetHex, LenHex, FileBin, DataDir, - EpochIDRaw, S) -> + EpochID, S) -> {Wedged_p, CurrentEpochId} = ets:lookup_element(S#state.etstab, epoch, 2), DoItFun = fun(FH, Offset, Len) -> case file:pread(FH, Offset, Len) of @@ -390,16 +402,16 @@ do_net_server_read(Sock, OffsetHex, LenHex, FileBin, DataDir, end, do_net_server_readwrite_common(Sock, OffsetHex, LenHex, FileBin, DataDir, [read, binary, raw], DoItFun, - EpochIDRaw, Wedged_p, CurrentEpochId). + EpochID, Wedged_p, CurrentEpochId). do_net_server_readwrite_common(Sock, OffsetHex, LenHex, FileBin, DataDir, FileOpts, DoItFun, - EpochIDRaw, Wedged_p, CurrentEpochId) -> + EpochID, Wedged_p, CurrentEpochId) -> case {Wedged_p, sanitize_file_string(FileBin)} of {false, ok} -> do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, DataDir, FileOpts, DoItFun, - EpochIDRaw, Wedged_p, CurrentEpochId); + EpochID, Wedged_p, CurrentEpochId); {true, _} -> ok = gen_tcp:send(Sock, <<"ERROR WEDGED\n">>); {_, __} -> @@ -408,7 +420,7 @@ do_net_server_readwrite_common(Sock, OffsetHex, LenHex, FileBin, DataDir, do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, DataDir, FileOpts, DoItFun, - EpochIDRaw, Wedged_p, CurrentEpochId) -> + EpochID, Wedged_p, CurrentEpochId) -> <> = machi_util:hexstr_to_bin(OffsetHex), <> = machi_util:hexstr_to_bin(LenHex), {_, Path} = machi_util:make_data_filename(DataDir, FileBin), @@ -424,7 +436,7 @@ do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, DataDir, do_net_server_readwrite_common( Sock, OffsetHex, LenHex, FileBin, DataDir, FileOpts, DoItFun, - EpochIDRaw, Wedged_p, CurrentEpochId); + EpochID, Wedged_p, CurrentEpochId); _Else -> %%%%%% keep?? machi_util:verb("Else ~p ~p ~p ~p\n", [Offset, Len, Path, _Else]), ok = gen_tcp:send(Sock, <<"ERROR BAD-IO\n">>) @@ -432,20 +444,20 @@ do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, DataDir, do_net_server_write(Sock, OffsetHex, LenHex, FileBin, DataDir, - EpochIDRaw, Wedged_p, CurrentEpochId) -> + EpochID, Wedged_p, CurrentEpochId) -> CSumPath = machi_util:make_checksum_filename(DataDir, FileBin), case file:open(CSumPath, [append, raw, binary, delayed_write]) of {ok, FHc} -> do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc, - EpochIDRaw, Wedged_p, CurrentEpochId); + EpochID, Wedged_p, CurrentEpochId); {error, enoent} -> ok = filelib:ensure_dir(CSumPath), do_net_server_write(Sock, OffsetHex, LenHex, FileBin, DataDir, - EpochIDRaw, Wedged_p, CurrentEpochId) + EpochID, Wedged_p, CurrentEpochId) end. do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc, - EpochIDRaw, Wedged_p, CurrentEpochId) -> + EpochID, Wedged_p, CurrentEpochId) -> DoItFun = fun(FHd, Offset, Len) -> ok = inet:setopts(Sock, [{packet, raw}]), {ok, Chunk} = gen_tcp:recv(Sock, Len), @@ -465,7 +477,7 @@ do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc, end, do_net_server_readwrite_common(Sock, OffsetHex, LenHex, FileBin, DataDir, [write, read, binary, raw], DoItFun, - EpochIDRaw, Wedged_p, CurrentEpochId). + EpochID, Wedged_p, CurrentEpochId). perhaps_do_net_server_ec_read(Sock, FH) -> case file:pread(FH, 0, ?MINIMUM_OFFSET) of diff --git a/src/machi_flu1_client.erl b/src/machi_flu1_client.erl index 726071e..d48d522 100644 --- a/src/machi_flu1_client.erl +++ b/src/machi_flu1_client.erl @@ -405,9 +405,10 @@ append_chunk2(Sock, EpochID, Prefix0, Chunk0) -> Len = iolist_size(Chunk0), true = (Len =< ?MAX_CHUNK_SIZE), {EpochNum, EpochCSum} = EpochID, - EpochIDRaw = <>, + EpochIDHex = machi_util:bin_to_hexstr( + <>), LenHex = machi_util:int_to_hexbin(Len, 32), - Cmd = [<<"A ">>, EpochIDRaw, LenHex, Prefix, 10], + Cmd = [<<"A ">>, EpochIDHex, LenHex, Prefix, 10], ok = gen_tcp:send(Sock, [Cmd, Chunk]), {ok, Line} = gen_tcp:recv(Sock, 0), PathLen = byte_size(Line) - 3 - 16 - 1 - 1, @@ -436,11 +437,12 @@ read_chunk2(Sock, EpochID, File0, Offset, Size) -> erase(bad_sock), try {EpochNum, EpochCSum} = EpochID, - EpochIDRaw = <>, + EpochIDHex = machi_util:bin_to_hexstr( + <>), File = machi_util:make_binary(File0), PrefixHex = machi_util:int_to_hexbin(Offset, 64), SizeHex = machi_util:int_to_hexbin(Size, 32), - CmdLF = [$R, 32, EpochIDRaw, PrefixHex, SizeHex, File, 10], + CmdLF = [$R, 32, EpochIDHex, PrefixHex, SizeHex, File, 10], ok = gen_tcp:send(Sock, CmdLF), case gen_tcp:recv(Sock, 3) of {ok, <<"OK\n">>} -> @@ -485,8 +487,9 @@ read_chunk2(Sock, EpochID, File0, Offset, Size) -> list2(Sock, EpochID) -> try {EpochNum, EpochCSum} = EpochID, - EpochIDRaw = <>, - ok = gen_tcp:send(Sock, [<<"L ">>, EpochIDRaw, <<"\n">>]), + EpochIDHex = machi_util:bin_to_hexstr( + <>), + ok = gen_tcp:send(Sock, [<<"L ">>, EpochIDHex, <<"\n">>]), ok = inet:setopts(Sock, [{packet, line}]), case gen_tcp:recv(Sock, 0) of {ok, <<"OK\n">>} -> @@ -541,8 +544,9 @@ checksum_list2(Sock, EpochID, File) -> erase(bad_sock), try {EpochNum, EpochCSum} = EpochID, - EpochIDRaw = <>, - ok = gen_tcp:send(Sock, [<<"C ">>, EpochIDRaw, File, <<"\n">>]), + EpochIDHex = machi_util:bin_to_hexstr( + <>), + ok = gen_tcp:send(Sock, [<<"C ">>, EpochIDHex, File, <<"\n">>]), ok = inet:setopts(Sock, [{packet, line}]), case gen_tcp:recv(Sock, 0) of {ok, <<"OK ", Rest/binary>> = Line} -> @@ -603,7 +607,8 @@ write_chunk2(Sock, EpochID, File0, Offset, Chunk0) -> erase(bad_sock), try {EpochNum, EpochCSum} = EpochID, - EpochIDRaw = <>, + EpochIDHex = machi_util:bin_to_hexstr( + <>), %% TODO: add client-side checksum to the server's protocol %% _ = machi_util:checksum_chunk(Chunk), File = machi_util:make_binary(File0), @@ -613,7 +618,7 @@ write_chunk2(Sock, EpochID, File0, Offset, Chunk0) -> Len = iolist_size(Chunk0), true = (Len =< ?MAX_CHUNK_SIZE), LenHex = machi_util:int_to_hexbin(Len, 32), - Cmd = [<<"W-repl ">>, EpochIDRaw, OffsetHex, + Cmd = [<<"W-repl ">>, EpochIDHex, OffsetHex, LenHex, File, <<"\n">>], ok = gen_tcp:send(Sock, [Cmd, Chunk]), {ok, Line} = gen_tcp:recv(Sock, 0), @@ -641,8 +646,9 @@ delete_migration2(Sock, EpochID, File) -> erase(bad_sock), try {EpochNum, EpochCSum} = EpochID, - EpochIDRaw = <>, - Cmd = [<<"DEL-migration ">>, EpochIDRaw, File, <<"\n">>], + EpochIDHex = machi_util:bin_to_hexstr( + <>), + Cmd = [<<"DEL-migration ">>, EpochIDHex, File, <<"\n">>], ok = gen_tcp:send(Sock, Cmd), ok = inet:setopts(Sock, [{packet, line}]), case gen_tcp:recv(Sock, 0) of @@ -670,8 +676,9 @@ trunc_hack2(Sock, EpochID, File) -> erase(bad_sock), try {EpochNum, EpochCSum} = EpochID, - EpochIDRaw = <>, - Cmd = [<<"TRUNC-hack--- ">>, EpochIDRaw, File, <<"\n">>], + EpochIDHex = machi_util:bin_to_hexstr( + <>), + Cmd = [<<"TRUNC-hack--- ">>, EpochIDHex, File, <<"\n">>], ok = gen_tcp:send(Sock, Cmd), ok = inet:setopts(Sock, [{packet, line}]), case gen_tcp:recv(Sock, 0) of