WIP: aside, damn, add missing hex encoding for epochid, derp

This commit is contained in:
Scott Lystig Fritchie 2015-05-13 18:57:38 +09:00
parent eec029b08f
commit 19d3c95325
3 changed files with 68 additions and 49 deletions

View file

@ -48,7 +48,7 @@
-export([repair_cp/4, repair_ap/6]).
repair_cp(Src, Dst, MembersDict, Opts) ->
repair_cp(_Src, _Dst, _MembersDict, _Opts) ->
%% TODO: add missing function: wipe away any trace of chunks
%% are present on Dst but missing on Src.
exit(todo_cp_mode).
@ -87,7 +87,7 @@ repair_ap(Src, Repairing, UPI, MembersDict, ETS, Opts) ->
[What, Why, Stack]),
{error, {What, Why, Stack}}
after
[(catch machi_proxy_flu1_client:quit(Pid, ?SHORT_TIMEOUT)) ||
[(catch machi_proxy_flu1_client:quit(Pid)) ||
Pid <- orddict:to_list(get(proxies_dict))]
end,
Res.
@ -127,8 +127,8 @@ append_file_dict(Proxy, FLU_name, D) ->
%% As an additional optimization, add a bit of #2 to start the next
%% read while the current write is still in progress.
repair_file(ap_mode, RepairMode,
File, Size, [], Verb, Src, ProxiesDict, ETS) ->
repair_file(ap_mode, _RepairMode,
File, _Size, [], Verb, Src, _ProxiesDict, _ETS) ->
?VERB("~p: ~s: present on both: ", [Src, File]),
?VERB("TODO!\n"), ok;
%%TODO: repair_both_present(File, Size, RepairMode, V, SrcS, SrcS2, DstS, DstS2);
@ -165,8 +165,8 @@ copy_file(File, SrcProxy, MissingProxiesDict, Verb, ETS) ->
{out_chunks, t_out_chunks}, {out_bytes, t_out_bytes}],
[ets:insert(ETS, {L_K, 0}) || {L_K, _T_K} <- EtsKeys],
CopyChunks =
fun({Offset, Size, CSum}, {ok, ETS, _, _} = Acc) ->
case ets:lookup_element(ETS, in_chunks, 2) rem 100 of
fun({Offset, Size, CSum}, {ok, ETab, _, _} = Acc) ->
case ets:lookup_element(ETab, in_chunks, 2) rem 100 of
0 -> ?VERB(".", []);
_ -> ok
end,
@ -184,10 +184,10 @@ copy_file(File, SrcProxy, MissingProxiesDict, Verb, ETS) ->
?SHORT_TIMEOUT),
_T4 = os:timestamp()
end || {_FLU, DstProxy} <- MissingProxiesDict],
ets:update_counter(ETS, in_chunks, 1),
ets:update_counter(ETS, in_bytes, Size),
ets:update_counter(ETS, out_chunks, N),
ets:update_counter(ETS, out_bytes, N*Size),
ets:update_counter(ETab, in_chunks, 1),
ets:update_counter(ETab, in_bytes, Size),
ets:update_counter(ETab, out_chunks, N),
ets:update_counter(ETab, out_bytes, N*Size),
Acc;
CSum_now ->
error_logger:error_msg(
@ -195,7 +195,7 @@ copy_file(File, SrcProxy, MissingProxiesDict, Verb, ETS) ->
"file ~p offset ~p size ~p: "
"expected ~p got ~p\n",
[File, Offset, Size, CSum, CSum_now]),
ets:update_counter(ETS, t_bad_chunks, 1),
ets:update_counter(ETab, t_bad_chunks, 1),
Acc
end;
(_, _=Acc) -> % failure: skip rest of file
@ -234,8 +234,8 @@ copy_file(File, SrcProxy, MissingProxiesDict, Verb, ETS) ->
%% ok
%% end.
repair_both_present(File, Size, RepairMode, V, SrcS, _SrcS2, DstS, _DstS2) ->
verb("repair_both_present TODO\n"),
repair_both_present(_File, _Size, _RepairMode, Verb, _SrcS, _SrcS2, _DstS, _DstS2) ->
?VERB("repair_both_present TODO\n"),
ok.
%% io:format("repair_both_present: ~p ~p mode ~p\n", [File, Size, RepairMode]).

View file

@ -194,7 +194,7 @@ run_listen_server(#state{flu_name=FluName, tcp_port=TcpPort}=S) ->
{ok, LSock} = gen_tcp:listen(TcpPort, SockOpts),
listen_server_loop(LSock, S).
run_append_server(FluPid, AckPid, #state{flu_name=Name,dbg_props=DbgProps,
run_append_server(FluPid, AckPid, #state{flu_name=Name,
wedged=Wedged_p,epoch_id=EpochId}=S) ->
%% Reminder: Name is the "main" name of the FLU, i.e., no suffix
register(Name, self()),
@ -220,7 +220,6 @@ append_server_loop(FluPid, #state{data_dir=DataDir,wedged=Wedged_p}=S) ->
{seq_append, From, Prefix, Chunk, CSum} ->
spawn(fun() -> append_server_dispatch(From, Prefix, Chunk, CSum,
DataDir, AppendServerPid) end),
%% DataDir, FluPid) end),
append_server_loop(FluPid, S);
{wedge_state_change, Boolean, EpochId} ->
true = ets:insert(S#state.etstab, {epoch, {Boolean, EpochId}}),
@ -235,7 +234,12 @@ append_server_loop(FluPid, #state{data_dir=DataDir,wedged=Wedged_p}=S) ->
append_server_loop(FluPid, S)
end.
-define(EpochIDSpace, (4+20)).
-define(EpochIDSpace, ((4*2)+(20*2))). % hexencodingwhee!
decode_epoch_id(EpochIDHex) ->
<<EpochNum:(4*8)/big, EpochCSum/binary>> =
machi_util:hexstr_to_bin(EpochIDHex),
{EpochNum, EpochCSum}.
net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) ->
ok = inet:setopts(Sock, [{packet, line}]),
@ -250,21 +254,25 @@ net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) ->
case Line of
%% For normal use
<<"A ",
_EpochIDRaw:(?EpochIDSpace)/binary,
EpochIDHex:(?EpochIDSpace)/binary,
LenHex:8/binary,
Prefix:PrefixLenLF/binary, "\n">> ->
_EpochID = decode_epoch_id(EpochIDHex),
do_net_server_append(FluName, Sock, LenHex, Prefix);
<<"R ",
EpochIDRaw:(?EpochIDSpace)/binary,
EpochIDHex:(?EpochIDSpace)/binary,
OffsetHex:16/binary, LenHex:8/binary,
File:FileLenLF/binary, "\n">> ->
EpochID = decode_epoch_id(EpochIDHex),
do_net_server_read(Sock, OffsetHex, LenHex, File, DataDir,
EpochIDRaw, S);
<<"L ", _EpochIDRaw:(?EpochIDSpace)/binary, "\n">> ->
EpochID, S);
<<"L ", EpochIDHex:(?EpochIDSpace)/binary, "\n">> ->
_EpochID = decode_epoch_id(EpochIDHex),
do_net_server_listing(Sock, DataDir, S);
<<"C ",
_EpochIDRaw:(?EpochIDSpace)/binary,
EpochIDHex:(?EpochIDSpace)/binary,
File:CSumFileLenLF/binary, "\n">> ->
_EpochID = decode_epoch_id(EpochIDHex),
do_net_server_checksum_listing(Sock, File, DataDir, S);
<<"QUIT\n">> ->
catch gen_tcp:close(Sock),
@ -274,20 +282,23 @@ net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) ->
exit(normal);
%% For "internal" replication only.
<<"W-repl ",
_EpochIDRaw:(?EpochIDSpace)/binary,
EpochIDHex:(?EpochIDSpace)/binary,
OffsetHex:16/binary, LenHex:8/binary,
File:WriteFileLenLF/binary, "\n">> ->
_EpochID = decode_epoch_id(EpochIDHex),
do_net_server_write(Sock, OffsetHex, LenHex, File, DataDir,
<<"fixme1">>, false, <<"fixme2">>);
%% For data migration only.
<<"DEL-migration ",
_EpochIDRaw:(?EpochIDSpace)/binary,
EpochIDHex:(?EpochIDSpace)/binary,
File:DelFileLenLF/binary, "\n">> ->
_EpochID = decode_epoch_id(EpochIDHex),
do_net_server_delete_migration_only(Sock, File, DataDir);
%% For erasure coding hackityhack
<<"TRUNC-hack--- ",
_EpochIDRaw:(?EpochIDSpace)/binary,
EpochIDHex:(?EpochIDSpace)/binary,
File:DelFileLenLF/binary, "\n">> ->
_EpochID = decode_epoch_id(EpochIDHex),
do_net_server_truncate_hackityhack(Sock, File, DataDir);
<<"PROJ ", LenHex:8/binary, "\n">> ->
do_projection_command(Sock, LenHex, S);
@ -295,6 +306,7 @@ net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) ->
do_wedge_status(FluName, Sock);
_ ->
machi_util:verb("Else Got: ~p\n", [Line]),
io:format(user, "TODO: Else Got: ~p\n", [Line]),
gen_tcp:send(Sock, "ERROR SYNTAX\n"),
catch gen_tcp:close(Sock),
exit(normal)
@ -370,7 +382,7 @@ do_wedge_status(FluName, Sock) ->
ok = gen_tcp:send(Sock, Reply).
do_net_server_read(Sock, OffsetHex, LenHex, FileBin, DataDir,
EpochIDRaw, S) ->
EpochID, S) ->
{Wedged_p, CurrentEpochId} = ets:lookup_element(S#state.etstab, epoch, 2),
DoItFun = fun(FH, Offset, Len) ->
case file:pread(FH, Offset, Len) of
@ -390,16 +402,16 @@ do_net_server_read(Sock, OffsetHex, LenHex, FileBin, DataDir,
end,
do_net_server_readwrite_common(Sock, OffsetHex, LenHex, FileBin, DataDir,
[read, binary, raw], DoItFun,
EpochIDRaw, Wedged_p, CurrentEpochId).
EpochID, Wedged_p, CurrentEpochId).
do_net_server_readwrite_common(Sock, OffsetHex, LenHex, FileBin, DataDir,
FileOpts, DoItFun,
EpochIDRaw, Wedged_p, CurrentEpochId) ->
EpochID, Wedged_p, CurrentEpochId) ->
case {Wedged_p, sanitize_file_string(FileBin)} of
{false, ok} ->
do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin,
DataDir, FileOpts, DoItFun,
EpochIDRaw, Wedged_p, CurrentEpochId);
EpochID, Wedged_p, CurrentEpochId);
{true, _} ->
ok = gen_tcp:send(Sock, <<"ERROR WEDGED\n">>);
{_, __} ->
@ -408,7 +420,7 @@ do_net_server_readwrite_common(Sock, OffsetHex, LenHex, FileBin, DataDir,
do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, DataDir,
FileOpts, DoItFun,
EpochIDRaw, Wedged_p, CurrentEpochId) ->
EpochID, Wedged_p, CurrentEpochId) ->
<<Offset:64/big>> = machi_util:hexstr_to_bin(OffsetHex),
<<Len:32/big>> = machi_util:hexstr_to_bin(LenHex),
{_, Path} = machi_util:make_data_filename(DataDir, FileBin),
@ -424,7 +436,7 @@ do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, DataDir,
do_net_server_readwrite_common(
Sock, OffsetHex, LenHex, FileBin, DataDir,
FileOpts, DoItFun,
EpochIDRaw, Wedged_p, CurrentEpochId);
EpochID, Wedged_p, CurrentEpochId);
_Else ->
%%%%%% keep?? machi_util:verb("Else ~p ~p ~p ~p\n", [Offset, Len, Path, _Else]),
ok = gen_tcp:send(Sock, <<"ERROR BAD-IO\n">>)
@ -432,20 +444,20 @@ do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, DataDir,
do_net_server_write(Sock, OffsetHex, LenHex, FileBin, DataDir,
EpochIDRaw, Wedged_p, CurrentEpochId) ->
EpochID, Wedged_p, CurrentEpochId) ->
CSumPath = machi_util:make_checksum_filename(DataDir, FileBin),
case file:open(CSumPath, [append, raw, binary, delayed_write]) of
{ok, FHc} ->
do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc,
EpochIDRaw, Wedged_p, CurrentEpochId);
EpochID, Wedged_p, CurrentEpochId);
{error, enoent} ->
ok = filelib:ensure_dir(CSumPath),
do_net_server_write(Sock, OffsetHex, LenHex, FileBin, DataDir,
EpochIDRaw, Wedged_p, CurrentEpochId)
EpochID, Wedged_p, CurrentEpochId)
end.
do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc,
EpochIDRaw, Wedged_p, CurrentEpochId) ->
EpochID, Wedged_p, CurrentEpochId) ->
DoItFun = fun(FHd, Offset, Len) ->
ok = inet:setopts(Sock, [{packet, raw}]),
{ok, Chunk} = gen_tcp:recv(Sock, Len),
@ -465,7 +477,7 @@ do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc,
end,
do_net_server_readwrite_common(Sock, OffsetHex, LenHex, FileBin, DataDir,
[write, read, binary, raw], DoItFun,
EpochIDRaw, Wedged_p, CurrentEpochId).
EpochID, Wedged_p, CurrentEpochId).
perhaps_do_net_server_ec_read(Sock, FH) ->
case file:pread(FH, 0, ?MINIMUM_OFFSET) of

View file

@ -405,9 +405,10 @@ append_chunk2(Sock, EpochID, Prefix0, Chunk0) ->
Len = iolist_size(Chunk0),
true = (Len =< ?MAX_CHUNK_SIZE),
{EpochNum, EpochCSum} = EpochID,
EpochIDRaw = <<EpochNum:(4*8)/big, EpochCSum/binary>>,
EpochIDHex = machi_util:bin_to_hexstr(
<<EpochNum:(4*8)/big, EpochCSum/binary>>),
LenHex = machi_util:int_to_hexbin(Len, 32),
Cmd = [<<"A ">>, EpochIDRaw, LenHex, Prefix, 10],
Cmd = [<<"A ">>, EpochIDHex, LenHex, Prefix, 10],
ok = gen_tcp:send(Sock, [Cmd, Chunk]),
{ok, Line} = gen_tcp:recv(Sock, 0),
PathLen = byte_size(Line) - 3 - 16 - 1 - 1,
@ -436,11 +437,12 @@ read_chunk2(Sock, EpochID, File0, Offset, Size) ->
erase(bad_sock),
try
{EpochNum, EpochCSum} = EpochID,
EpochIDRaw = <<EpochNum:(4*8)/big, EpochCSum/binary>>,
EpochIDHex = machi_util:bin_to_hexstr(
<<EpochNum:(4*8)/big, EpochCSum/binary>>),
File = machi_util:make_binary(File0),
PrefixHex = machi_util:int_to_hexbin(Offset, 64),
SizeHex = machi_util:int_to_hexbin(Size, 32),
CmdLF = [$R, 32, EpochIDRaw, PrefixHex, SizeHex, File, 10],
CmdLF = [$R, 32, EpochIDHex, PrefixHex, SizeHex, File, 10],
ok = gen_tcp:send(Sock, CmdLF),
case gen_tcp:recv(Sock, 3) of
{ok, <<"OK\n">>} ->
@ -485,8 +487,9 @@ read_chunk2(Sock, EpochID, File0, Offset, Size) ->
list2(Sock, EpochID) ->
try
{EpochNum, EpochCSum} = EpochID,
EpochIDRaw = <<EpochNum:(4*8)/big, EpochCSum/binary>>,
ok = gen_tcp:send(Sock, [<<"L ">>, EpochIDRaw, <<"\n">>]),
EpochIDHex = machi_util:bin_to_hexstr(
<<EpochNum:(4*8)/big, EpochCSum/binary>>),
ok = gen_tcp:send(Sock, [<<"L ">>, EpochIDHex, <<"\n">>]),
ok = inet:setopts(Sock, [{packet, line}]),
case gen_tcp:recv(Sock, 0) of
{ok, <<"OK\n">>} ->
@ -541,8 +544,9 @@ checksum_list2(Sock, EpochID, File) ->
erase(bad_sock),
try
{EpochNum, EpochCSum} = EpochID,
EpochIDRaw = <<EpochNum:(4*8)/big, EpochCSum/binary>>,
ok = gen_tcp:send(Sock, [<<"C ">>, EpochIDRaw, File, <<"\n">>]),
EpochIDHex = machi_util:bin_to_hexstr(
<<EpochNum:(4*8)/big, EpochCSum/binary>>),
ok = gen_tcp:send(Sock, [<<"C ">>, EpochIDHex, File, <<"\n">>]),
ok = inet:setopts(Sock, [{packet, line}]),
case gen_tcp:recv(Sock, 0) of
{ok, <<"OK ", Rest/binary>> = Line} ->
@ -603,7 +607,8 @@ write_chunk2(Sock, EpochID, File0, Offset, Chunk0) ->
erase(bad_sock),
try
{EpochNum, EpochCSum} = EpochID,
EpochIDRaw = <<EpochNum:(4*8)/big, EpochCSum/binary>>,
EpochIDHex = machi_util:bin_to_hexstr(
<<EpochNum:(4*8)/big, EpochCSum/binary>>),
%% TODO: add client-side checksum to the server's protocol
%% _ = machi_util:checksum_chunk(Chunk),
File = machi_util:make_binary(File0),
@ -613,7 +618,7 @@ write_chunk2(Sock, EpochID, File0, Offset, Chunk0) ->
Len = iolist_size(Chunk0),
true = (Len =< ?MAX_CHUNK_SIZE),
LenHex = machi_util:int_to_hexbin(Len, 32),
Cmd = [<<"W-repl ">>, EpochIDRaw, OffsetHex,
Cmd = [<<"W-repl ">>, EpochIDHex, OffsetHex,
LenHex, File, <<"\n">>],
ok = gen_tcp:send(Sock, [Cmd, Chunk]),
{ok, Line} = gen_tcp:recv(Sock, 0),
@ -641,8 +646,9 @@ delete_migration2(Sock, EpochID, File) ->
erase(bad_sock),
try
{EpochNum, EpochCSum} = EpochID,
EpochIDRaw = <<EpochNum:(4*8)/big, EpochCSum/binary>>,
Cmd = [<<"DEL-migration ">>, EpochIDRaw, File, <<"\n">>],
EpochIDHex = machi_util:bin_to_hexstr(
<<EpochNum:(4*8)/big, EpochCSum/binary>>),
Cmd = [<<"DEL-migration ">>, EpochIDHex, File, <<"\n">>],
ok = gen_tcp:send(Sock, Cmd),
ok = inet:setopts(Sock, [{packet, line}]),
case gen_tcp:recv(Sock, 0) of
@ -670,8 +676,9 @@ trunc_hack2(Sock, EpochID, File) ->
erase(bad_sock),
try
{EpochNum, EpochCSum} = EpochID,
EpochIDRaw = <<EpochNum:(4*8)/big, EpochCSum/binary>>,
Cmd = [<<"TRUNC-hack--- ">>, EpochIDRaw, File, <<"\n">>],
EpochIDHex = machi_util:bin_to_hexstr(
<<EpochNum:(4*8)/big, EpochCSum/binary>>),
Cmd = [<<"TRUNC-hack--- ">>, EpochIDHex, File, <<"\n">>],
ok = gen_tcp:send(Sock, Cmd),
ok = inet:setopts(Sock, [{packet, line}]),
case gen_tcp:recv(Sock, 0) of