WIP: Whole file repair is 95% complete, yay!
This commit is contained in:
parent
f48720e4dc
commit
8807f954ff
4 changed files with 433 additions and 39 deletions
|
@ -1925,7 +1925,10 @@ perhaps_start_repair(
|
|||
proj=#projection_v1{creation_time=Start,
|
||||
upi=[_|_]=UPI,
|
||||
repairing=[_|_]}}=S) ->
|
||||
RepairFun = fun() -> do_repair_ap_mode(S) end,
|
||||
RepairId = {MyName, os:timestamp()},
|
||||
RepairOpts = [{mode, repair}, verbose, {repair_id, RepairId}],
|
||||
%% RepairOpts = [{mode, check}, verbose],
|
||||
RepairFun = fun() -> do_repair(S, RepairOpts, ap_mode) end,
|
||||
LastUPI = lists:last(UPI),
|
||||
case timer:now_diff(os:timestamp(), Start) div 1000000 of
|
||||
N when MyName == LastUPI,
|
||||
|
@ -1954,34 +1957,21 @@ sanitize_repair_state(#ch_mgr{name=MyName,
|
|||
sanitize_repair_state(S) ->
|
||||
S.
|
||||
|
||||
do_repair_ap_mode(
|
||||
do_repair(
|
||||
#ch_mgr{name=MyName,
|
||||
members_dict=MembersDict,
|
||||
proj=#projection_v1{upi=UPI,
|
||||
repairing=[Dst|_]=Repairing}}=_S_copy) ->
|
||||
error_logger:info_msg("Chain tail ~p of ~p starting repair of ~p\n",
|
||||
[MyName, UPI, Repairing]),
|
||||
{ok, ProxyA} = machi_proxy_flu1_client:start_link(
|
||||
orddict:fetch(MyName, MembersDict)),
|
||||
{ok, ProxyB} = machi_proxy_flu1_client:start_link(
|
||||
orddict:fetch(Dst, MembersDict)),
|
||||
Res = try
|
||||
QQQ1 = todo_compare_servers(ProxyA, ProxyB),
|
||||
QQQ2 = todo_compare_servers(ProxyB, ProxyA),
|
||||
yo_no_error
|
||||
catch
|
||||
What:Why ->
|
||||
io:format(user, "What Why ~p ~p @\n\t~p\n",
|
||||
[What, Why, erlang:get_stacktrace()]),
|
||||
yo_error
|
||||
after
|
||||
(catch machi_proxy_flu1_client:quit(ProxyA)),
|
||||
(catch machi_proxy_flu1_client:quit(ProxyB))
|
||||
end,
|
||||
exit({todo, Res}).
|
||||
|
||||
todo_compare_servers(ProxyA, ProxyB) ->
|
||||
io:format(user, "TODO compare ~p ~p\n", [ProxyA, ProxyB]).
|
||||
repairing=[Dst|_]=Repairing,
|
||||
members_dict=MembersDict}}=_S_copy,
|
||||
Opts, ap_mode=_RepairMode) ->
|
||||
RepairId = proplists:get_value(repair_id, Opts, id1),
|
||||
error_logger:info_msg("Chain tail ~p of ~p starting repair ~p of ~p\n",
|
||||
[MyName, UPI, RepairId, Repairing]),
|
||||
{ok, Info} = machi_chain_repair:repair_ap(MyName, Repairing, UPI,
|
||||
MembersDict, Opts),
|
||||
error_logger:info_msg("Chain tail ~p of ~p finished repair ~p: ~p\n",
|
||||
[MyName, UPI, RepairId, Info]),
|
||||
timer:sleep(12345),
|
||||
exit({todo, Info}).
|
||||
|
||||
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||
|
||||
|
|
356
src/machi_chain_repair.erl
Normal file
356
src/machi_chain_repair.erl
Normal file
|
@ -0,0 +1,356 @@
|
|||
%% -------------------------------------------------------------------
|
||||
%%
|
||||
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
|
||||
%%
|
||||
%% This file is provided to you under the Apache License,
|
||||
%% Version 2.0 (the "License"); you may not use this file
|
||||
%% except in compliance with the License. You may obtain
|
||||
%% a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing,
|
||||
%% software distributed under the License is distributed on an
|
||||
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
%% KIND, either express or implied. See the License for the
|
||||
%% specific language governing permissions and limitations
|
||||
%% under the License.
|
||||
%%
|
||||
%% -------------------------------------------------------------------
|
||||
|
||||
%% @doc Erlang API for the Machi FLU TCP protocol version 1, with a
|
||||
%% proxy-process style API for hiding messy details such as TCP
|
||||
%% connection/disconnection with the remote Machi server.
|
||||
%%
|
||||
%% Machi is intentionally avoiding using distributed Erlang for
|
||||
%% Machi's communication. This design decision makes Erlang-side code
|
||||
%% more difficult & complex, but it's the price to pay for some
|
||||
%% language independence. Later in Machi's life cycle, we need to
|
||||
%% (re-)implement some components in a non-Erlang/BEAM-based language.
|
||||
%%
|
||||
%% This module implements a "man in the middle" proxy between the
|
||||
%% Erlang client and Machi server (which is on the "far side" of a TCP
|
||||
%% connection to somewhere). This proxy process will always execute
|
||||
%% on the same Erlang node as the Erlang client that uses it. The
|
||||
%% proxy is intended to be a stable, long-lived process that survives
|
||||
%% TCP communication problems with the remote server.
|
||||
|
||||
-module(machi_chain_repair).
|
||||
|
||||
-include("machi_projection.hrl").
|
||||
|
||||
-define(SHORT_TIMEOUT, 5*1000).
|
||||
-define(LONG_TIMEOUT, 60*1000).
|
||||
|
||||
%% These macros assume there's a bound variable called Verb.
|
||||
-define(VERB(Fmt), if Verb -> io:format(Fmt, []); true -> ok end).
|
||||
-define(VERB(Fmt, Args), if Verb -> io:format(Fmt, Args); true -> ok end).
|
||||
|
||||
-export([repair_cp/4, repair_ap/5]).
|
||||
|
||||
repair_cp(Src, Dst, MembersDict, Opts) ->
|
||||
%% TODO: add missing function: wipe away any trace of chunks
|
||||
%% are present on Dst but missing on Src.
|
||||
exit(todo_cp_mode).
|
||||
|
||||
repair_ap(Src, Repairing, UPI, MembersDict, Opts) ->
|
||||
%% Use process dict so that 'after' clause can always quit all
|
||||
%% proxy pids.
|
||||
put(proxies_dict, orddict:new()),
|
||||
Add = fun(Name, Pid) -> put(proxies_dict, orddict:store(Name, Pid, get(proxies_dict))) end,
|
||||
OurFLUs = lists:usort([Src] ++ Repairing ++ UPI),
|
||||
Res = try
|
||||
[begin
|
||||
{ok, Proxy} = machi_proxy_flu1_client:start_link(P),
|
||||
Add(FLU, Proxy)
|
||||
end || {FLU,P} <- MembersDict, lists:member(FLU, OurFLUs)],
|
||||
ProxiesDict = get(proxies_dict),
|
||||
|
||||
D = dict:new(),
|
||||
D2 = lists:foldl(fun({FLU, Proxy}, Dict) ->
|
||||
append_file_dict(Proxy, FLU, Dict)
|
||||
end, D, ProxiesDict),
|
||||
MissingFileSummary = make_missing_file_summary(D2, OurFLUs),
|
||||
io:format(user, "MissingFileSummary ~p\n", [MissingFileSummary]),
|
||||
|
||||
%% Repair files from perspective of Src, i.e. tail(UPI).
|
||||
[ok = repair(ap_mode, File, Size, MissingList,
|
||||
proplists:get_value(verbose, Opts, true),
|
||||
Src, ProxiesDict) ||
|
||||
{File, {Size, MissingList}} <- MissingFileSummary],
|
||||
{ok, [yo_no_error, todo_stats_here]}
|
||||
catch
|
||||
What:Why ->
|
||||
io:format(user, "What Why ~p ~p @\n\t~p\n",
|
||||
[What, Why, erlang:get_stacktrace()]),
|
||||
{error, yo_error}
|
||||
after
|
||||
[(catch machi_proxy_flu1_client:quit(Pid)) ||
|
||||
Pid <- orddict:to_list(get(proxies_dict))]
|
||||
end,
|
||||
Res.
|
||||
|
||||
make_missing_file_summary(Dict, AllFLUs) ->
|
||||
%% FileFilterFun = fun(_) -> true end,
|
||||
FoldRes = lists:sort(dict:to_list(Dict)),
|
||||
%% NOTE: MissingFileSummary = [{File, {FileSize, ServersMissingFrom}}]
|
||||
MissingFileSummary =
|
||||
[begin
|
||||
{GotIt, Sizes} = lists:unzip(GotSizes),
|
||||
Size = lists:max(Sizes),
|
||||
Missing = {File, {Size, AllFLUs -- GotIt}},
|
||||
Missing
|
||||
end || {File, GotSizes} <- FoldRes %% , FileFilterFun(File)
|
||||
],
|
||||
MissingFileSummary.
|
||||
|
||||
append_file_dict(Proxy, FLU_name, D) ->
|
||||
{ok, Res} = machi_proxy_flu1_client:list_files(Proxy, ?DUMMY_PV1_EPOCH),
|
||||
lists:foldl(fun({Size, File}, Dict) ->
|
||||
dict:append(File, {FLU_name, Size}, Dict)
|
||||
end, D, Res).
|
||||
|
||||
repair(Mode, File, Size, [], Verb, Src, ProxiesDict) ->
|
||||
?VERB("~p: ~s: present on both: ", [Src, File]),
|
||||
?VERB("TODO!\n"), ok;
|
||||
%%TODO: repair_both_present(File, Size, Mode, V, SrcS, SrcS2, DstS, DstS2);
|
||||
repair(Mode, File, Size, MissingList, Verb, Src, ProxiesDict) ->
|
||||
case lists:member(Src, MissingList) of
|
||||
true ->
|
||||
?VERB("~p: ~s -> ~p, skipping: not on source server\n",
|
||||
[Src, File, MissingList]);
|
||||
false when Mode == check ->
|
||||
?VERB("~p: ~s -> ~p, copy ~s MB (skipped)\n",
|
||||
[Src, File, MissingList, mbytes(Size)]);
|
||||
false ->
|
||||
?VERB("~p: ~s -> ~p, copy ~s MB, ",
|
||||
[Src, File, MissingList, mbytes(Size)]),
|
||||
MissingProxiesDict =
|
||||
orddict:filter(fun(K, _V) -> lists:member(K, MissingList) end,
|
||||
ProxiesDict),
|
||||
SrcProxy = orddict:fetch(Src, ProxiesDict),
|
||||
ok = copy_file(File, SrcProxy, MissingProxiesDict, Verb),
|
||||
?VERB("done\n", [])
|
||||
end.
|
||||
|
||||
copy_file(File, SrcProxy, MissingProxiesDict, Verb) ->
|
||||
%% Use the first source socket to enumerate the chunks & checksums.
|
||||
%% Use the second source socket to copy each chunk.
|
||||
{ok, EpochID} = machi_proxy_flu1_client:get_epoch_id(SrcProxy,
|
||||
?SHORT_TIMEOUT),
|
||||
{ok, CheckSums} = machi_proxy_flu1_client:checksum_list(
|
||||
SrcProxy, EpochID, File, ?LONG_TIMEOUT),
|
||||
CopyChunks =
|
||||
fun({Offset, Size, CSum}, {In_Cs, In_Bs, Out_Cs, Out_Bs}) ->
|
||||
if In_Cs rem 100 == 0 -> ?VERB(".", []);
|
||||
true -> ok end,
|
||||
{ok, Chunk} = machi_proxy_flu1_client:read_chunk(
|
||||
SrcProxy, EpochID, File, Offset, Size),
|
||||
case machi_util:checksum_chunk(Chunk) of
|
||||
CSum_now when CSum_now == CSum ->
|
||||
[begin
|
||||
ok = machi_proxy_flu1_client:write_chunk(
|
||||
DstProxy, EpochID, File, Offset, Chunk)
|
||||
end || {_FLU, DstProxy} <- MissingProxiesDict],
|
||||
N = length(MissingProxiesDict),
|
||||
{In_Cs + 1, In_Bs + Size, Out_Cs + N, Out_Bs+(N*Size)};
|
||||
CSum_now ->
|
||||
error_logger:error_msg(
|
||||
"TODO: Checksum failure: "
|
||||
"file ~p offset ~p size ~p: "
|
||||
"expected ~p got ~p\n",
|
||||
[File, Offset, Size, CSum, CSum_now]),
|
||||
exit({todo_csum_error,
|
||||
{File, Offset, Size, CSum, CSum_now}})
|
||||
end
|
||||
end,
|
||||
{In_Cs, _In_Bs, Out_Cs, _Out_Bs} = lists:foldl(CopyChunks,
|
||||
{0,0,0,0}, CheckSums),
|
||||
?VERB("copied ~w chunks to ~w replicas, ", [In_Cs, Out_Cs]),
|
||||
ok.
|
||||
|
||||
%% copy_file_proc_checksum_fun(File, SrcS, DstS, _Verbose) ->
|
||||
%% fun(<<OffsetHex:16/binary, " ", LenHex:8/binary, " ",
|
||||
%% CSumHex:32/binary, "\n">>) ->
|
||||
%% <<Len:32/big>> = hexstr_to_bin(LenHex),
|
||||
%% DownloadChunkBin = <<OffsetHex/binary, " ", LenHex/binary, " ",
|
||||
%% File/binary, "\n">>,
|
||||
%% [Chunk] = escript_download_chunks(SrcS, {{{DownloadChunkBin}}},
|
||||
%% fun(_) -> ok end),
|
||||
%% CSum = hexstr_to_bin(CSumHex),
|
||||
%% CSum2 = checksum(Chunk),
|
||||
%% if Len == byte_size(Chunk), CSum == CSum2 ->
|
||||
%% {_,_,_} = upload_chunk_write(DstS, OffsetHex, File, Chunk),
|
||||
%% ok;
|
||||
%% true ->
|
||||
%% io:format("ERROR: ~s ~s ~s csum/size error\n",
|
||||
%% [File, OffsetHex, LenHex]),
|
||||
%% error
|
||||
%% end;
|
||||
%% (_Else) ->
|
||||
%% ok
|
||||
%% end.
|
||||
|
||||
repair_both_present(File, Size, Mode, V, SrcS, _SrcS2, DstS, _DstS2) ->
|
||||
verb("repair_both_present TODO\n"),
|
||||
ok.
|
||||
%% io:format("repair_both_present: ~p ~p mode ~p\n", [File, Size, Mode]).
|
||||
|
||||
%% repair_both_present(File, Size, Mode, V, SrcS, _SrcS2, DstS, _DstS2) ->
|
||||
%% Tmp1 = lists:flatten(io_lib:format("/tmp/sort.1.~w.~w.~w", tuple_to_list(now()))),
|
||||
%% Tmp2 = lists:flatten(io_lib:format("/tmp/sort.2.~w.~w.~w", tuple_to_list(now()))),
|
||||
%% J_Both = lists:flatten(io_lib:format("/tmp/join.3-both.~w.~w.~w", tuple_to_list(now()))),
|
||||
%% J_SrcOnly = lists:flatten(io_lib:format("/tmp/join.4-src-only.~w.~w.~w", tuple_to_list(now()))),
|
||||
%% J_DstOnly = lists:flatten(io_lib:format("/tmp/join.5-dst-only.~w.~w.~w", tuple_to_list(now()))),
|
||||
%% S_Identical = lists:flatten(io_lib:format("/tmp/join.6-sort-identical.~w.~w.~w", tuple_to_list(now()))),
|
||||
%% {ok, FH1} = file:open(Tmp1, [write, raw, binary]),
|
||||
%% {ok, FH2} = file:open(Tmp2, [write, raw, binary]),
|
||||
%% try
|
||||
%% K = md5_ctx,
|
||||
%% MD5_it = fun(Bin) ->
|
||||
%% {FH, MD5ctx1} = get(K),
|
||||
%% file:write(FH, Bin),
|
||||
%% MD5ctx2 = crypto:hash_update(MD5ctx1, Bin),
|
||||
%% put(K, {FH, MD5ctx2})
|
||||
%% end,
|
||||
%% put(K, {FH1, crypto:hash_init(md5)}),
|
||||
%% ok = escript_checksum_list(SrcS, File, fast, MD5_it),
|
||||
%% {_, MD5_1} = get(K),
|
||||
%% SrcMD5 = crypto:hash_final(MD5_1),
|
||||
%% put(K, {FH2, crypto:hash_init(md5)}),
|
||||
%% ok = escript_checksum_list(DstS, File, fast, MD5_it),
|
||||
%% {_, MD5_2} = get(K),
|
||||
%% DstMD5 = crypto:hash_final(MD5_2),
|
||||
%% if SrcMD5 == DstMD5 ->
|
||||
%% verb("identical\n", []);
|
||||
%% true ->
|
||||
%% ok = file:close(FH1),
|
||||
%% ok = file:close(FH2),
|
||||
%% _Q1 = os:cmd("./REPAIR-SORT-JOIN.sh " ++ Tmp1 ++ " " ++ Tmp2 ++ " " ++ J_Both ++ " " ++ J_SrcOnly ++ " " ++ J_DstOnly ++ " " ++ S_Identical),
|
||||
%% case file:read_file_info(S_Identical) of
|
||||
%% {ok, _} ->
|
||||
%% verb("identical (secondary sort)\n", []);
|
||||
%% {error, enoent} ->
|
||||
%% io:format("differences found:"),
|
||||
%% repair_both(File, Size, V, Mode,
|
||||
%% J_Both, J_SrcOnly, J_DstOnly,
|
||||
%% SrcS, DstS)
|
||||
%% end
|
||||
%% end
|
||||
%% after
|
||||
%% catch file:close(FH1),
|
||||
%% catch file:close(FH2),
|
||||
%% [(catch file:delete(FF)) || FF <- [Tmp1,Tmp2,J_Both,J_SrcOnly,J_DstOnly,
|
||||
%% S_Identical]]
|
||||
%% end.
|
||||
|
||||
%% repair_both(File, _Size, V, Mode, J_Both, J_SrcOnly, J_DstOnly, SrcS, DstS) ->
|
||||
%% AccFun = if Mode == check ->
|
||||
%% fun(_X, List) -> List end;
|
||||
%% Mode == repair ->
|
||||
%% fun( X, List) -> [X|List] end
|
||||
%% end,
|
||||
%% BothFun = fun(<<_OffsetSrcHex:16/binary, " ",
|
||||
%% LenSrcHex:8/binary, " ", CSumSrcHex:32/binary, " ",
|
||||
%% LenDstHex:8/binary, " ", CSumDstHex:32/binary, "\n">> =Line,
|
||||
%% {SameB, SameC, DiffB, DiffC, Ds}) ->
|
||||
%% <<Len:32/big>> = hexstr_to_bin(LenSrcHex),
|
||||
%% if LenSrcHex == LenDstHex,
|
||||
%% CSumSrcHex == CSumDstHex ->
|
||||
%% {SameB + Len, SameC + 1, DiffB, DiffC, Ds};
|
||||
%% true ->
|
||||
%% %% D = {OffsetSrcHex, LenSrcHex, ........
|
||||
%% {SameB, SameC, DiffB + Len, DiffC + 1,
|
||||
%% AccFun(Line, Ds)}
|
||||
%% end;
|
||||
%% (_Else, Acc) ->
|
||||
%% Acc
|
||||
%% end,
|
||||
%% OnlyFun = fun(<<_OffsetSrcHex:16/binary, " ", LenSrcHex:8/binary, " ",
|
||||
%% _CSumHex:32/binary, "\n">> = Line,
|
||||
%% {DiffB, DiffC, Ds}) ->
|
||||
%% <<Len:32/big>> = hexstr_to_bin(LenSrcHex),
|
||||
%% {DiffB + Len, DiffC + 1, AccFun(Line, Ds)};
|
||||
%% (_Else, Acc) ->
|
||||
%% Acc
|
||||
%% end,
|
||||
%% {SameBx, SameCx, DiffBy, DiffCy, BothDiffs} =
|
||||
%% file_folder(BothFun, {0,0,0,0,[]}, J_Both),
|
||||
%% {DiffB_src, DiffC_src, Ds_src} = file_folder(OnlyFun, {0,0,[]}, J_SrcOnly),
|
||||
%% {DiffB_dst, DiffC_dst, Ds_dst} = file_folder(OnlyFun, {0,0,[]}, J_DstOnly),
|
||||
%% if Mode == check orelse V == true ->
|
||||
%% io:format("\n\t"),
|
||||
%% io:format("BothR ~p, ", [{SameBx, SameCx, DiffBy, DiffCy}]),
|
||||
%% io:format("SrcR ~p, ", [{DiffB_src, DiffC_src}]),
|
||||
%% io:format("DstR ~p", [{DiffB_dst, DiffC_dst}]),
|
||||
%% io:format("\n");
|
||||
%% true -> ok
|
||||
%% end,
|
||||
%% if Mode == repair ->
|
||||
%% ok = repair_both_both(File, V, BothDiffs, SrcS, DstS),
|
||||
%% ok = repair_copy_chunks(File, V, Ds_src, DiffB_src, DiffC_src,
|
||||
%% SrcS, DstS),
|
||||
%% ok = repair_copy_chunks(File, V, Ds_dst, DiffB_dst, DiffC_dst,
|
||||
%% DstS, SrcS);
|
||||
%% true ->
|
||||
%% ok
|
||||
%% end.
|
||||
|
||||
%% repair_both_both(_File, _V, [_|_], _SrcS, _DstS) ->
|
||||
%% %% TODO: fetch both, check checksums, hopefully only exactly one
|
||||
%% %% is correct, then use that one to repair the other. And if the
|
||||
%% %% sizes are different, hrm, there may be an extra corner case(s)
|
||||
%% %% hiding there.
|
||||
%% io:format("WHOA! We have differing checksums or sizes here, TODO not implemented, but there's trouble in the little village on the river....\n"),
|
||||
%% timer:sleep(3*1000),
|
||||
%% ok;
|
||||
%% repair_both_both(_File, _V, [], _SrcS, _DstS) ->
|
||||
%% ok.
|
||||
|
||||
%% repair_copy_chunks(_File, _V, [], _DiffBytes, _DiffCount, _SrcS, _DstS) ->
|
||||
%% ok;
|
||||
%% repair_copy_chunks(File, V, ToBeCopied, DiffBytes, DiffCount, SrcS, DstS) ->
|
||||
%% verb("\n", []),
|
||||
%% verb("Starting copy of ~p chunks/~s MBytes to \n ~s: ",
|
||||
%% [DiffCount, mbytes(DiffBytes), File]),
|
||||
%% InnerCopyFun = copy_file_proc_checksum_fun(File, SrcS, DstS, V),
|
||||
%% FoldFun = fun(Line, ok) ->
|
||||
%% ok = InnerCopyFun(Line) % Strong sanity check
|
||||
%% end,
|
||||
%% ok = lists:foldl(FoldFun, ok, ToBeCopied),
|
||||
%% verb(" done\n", []),
|
||||
%% ok.
|
||||
|
||||
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||
|
||||
file_folder(Fun, Acc, Path) ->
|
||||
{ok, FH} = file:open(Path, [read, raw, binary]),
|
||||
try
|
||||
file_folder2(Fun, Acc, FH)
|
||||
after
|
||||
file:close(FH)
|
||||
end.
|
||||
|
||||
file_folder2(Fun, Acc, FH) ->
|
||||
file_folder2(file:read_line(FH), Fun, Acc, FH).
|
||||
|
||||
file_folder2({ok, Line}, Fun, Acc, FH) ->
|
||||
Acc2 = Fun(Line, Acc),
|
||||
file_folder2(Fun, Acc2, FH);
|
||||
file_folder2(eof, _Fun, Acc, _FH) ->
|
||||
Acc.
|
||||
|
||||
verb(Fmt) ->
|
||||
verb(Fmt, []).
|
||||
|
||||
verb(Fmt, Args) ->
|
||||
case {ok, true} of % application:get_env(kernel, verbose) of
|
||||
{ok, true} -> io:format(Fmt, Args);
|
||||
_ -> ok
|
||||
end.
|
||||
|
||||
mbytes(0) ->
|
||||
"0.0";
|
||||
mbytes(Size) ->
|
||||
lists:flatten(io_lib:format("~.1.0f", [max(0.1, Size / (1024*1024))])).
|
||||
|
|
@ -559,7 +559,11 @@ checksum_list2(Sock, EpochID, File) ->
|
|||
{ok, <<"ERROR WEDGED", _/binary>>} ->
|
||||
{error, wedged};
|
||||
{ok, Else} ->
|
||||
throw({server_protocol_error, Else})
|
||||
throw({server_protocol_error, Else});
|
||||
{error, closed} ->
|
||||
throw({error, closed});
|
||||
Else ->
|
||||
throw(Else)
|
||||
end
|
||||
catch
|
||||
throw:Error ->
|
||||
|
@ -725,16 +729,20 @@ do_projection_common(Sock, ProjCmd) ->
|
|||
Cmd = [<<"PROJ ">>, LenHex, <<"\n">>],
|
||||
ok = gen_tcp:send(Sock, [Cmd, ProjCmdBin]),
|
||||
ok = inet:setopts(Sock, [{packet, line}]),
|
||||
{ok, Line} = gen_tcp:recv(Sock, 0),
|
||||
case Line of
|
||||
<<"OK ", ResLenHex:8/binary, "\n">> ->
|
||||
ResLen = machi_util:hexstr_to_int(ResLenHex),
|
||||
ok = inet:setopts(Sock, [{packet, raw}]),
|
||||
{ok, ResBin} = gen_tcp:recv(Sock, ResLen),
|
||||
ok = inet:setopts(Sock, [{packet, line}]),
|
||||
binary_to_term(ResBin);
|
||||
Else ->
|
||||
{error, Else}
|
||||
case gen_tcp:recv(Sock, 0) of
|
||||
{ok, Line} ->
|
||||
case Line of
|
||||
<<"OK ", ResLenHex:8/binary, "\n">> ->
|
||||
ResLen = machi_util:hexstr_to_int(ResLenHex),
|
||||
ok = inet:setopts(Sock, [{packet, raw}]),
|
||||
{ok, ResBin} = gen_tcp:recv(Sock, ResLen),
|
||||
ok = inet:setopts(Sock, [{packet, line}]),
|
||||
binary_to_term(ResBin);
|
||||
Else ->
|
||||
{error, Else}
|
||||
end;
|
||||
{error, _} = Bad ->
|
||||
throw(Bad)
|
||||
end
|
||||
catch
|
||||
throw:Error ->
|
||||
|
|
|
@ -57,6 +57,7 @@
|
|||
wedge_status/1, wedge_status/2,
|
||||
|
||||
%% %% Projection API
|
||||
get_epoch_id/1, get_epoch_id/2,
|
||||
get_latest_epoch/2, get_latest_epoch/3,
|
||||
read_latest_projection/2, read_latest_projection/3,
|
||||
read_projection/3, read_projection/4,
|
||||
|
@ -65,7 +66,10 @@
|
|||
list_all_projections/2, list_all_projections/3,
|
||||
|
||||
%% Common API
|
||||
quit/1
|
||||
quit/1,
|
||||
|
||||
%% Internal API
|
||||
write_chunk/5, write_chunk/6
|
||||
]).
|
||||
|
||||
%% gen_server callbacks
|
||||
|
@ -143,6 +147,16 @@ wedge_status(PidSpec, Timeout) ->
|
|||
gen_server:call(PidSpec, {req, {wedge_status}},
|
||||
Timeout).
|
||||
|
||||
%% @doc Get the `epoch_id()' of the FLU's current/latest projection.
|
||||
|
||||
get_epoch_id(PidSpec) ->
|
||||
get_epoch_id(PidSpec, infinity).
|
||||
|
||||
%% @doc Get the `epoch_id()' of the FLU's current/latest projection.
|
||||
|
||||
get_epoch_id(PidSpec, Timeout) ->
|
||||
gen_server:call(PidSpec, {req, {get_epoch_id}}, Timeout).
|
||||
|
||||
%% @doc Get the latest epoch number + checksum from the FLU's projection store.
|
||||
|
||||
get_latest_epoch(PidSpec, ProjType) ->
|
||||
|
@ -215,6 +229,19 @@ list_all_projections(PidSpec, ProjType, Timeout) ->
|
|||
quit(PidSpec) ->
|
||||
gen_server:call(PidSpec, quit, infinity).
|
||||
|
||||
%% @doc Write a chunk (binary- or iolist-style) of data to a file
|
||||
%% with `Prefix' at `Offset'.
|
||||
|
||||
write_chunk(PidSpec, EpochID, File, Offset, Chunk) ->
|
||||
write_chunk(PidSpec, EpochID, File, Offset, Chunk, infinity).
|
||||
|
||||
%% @doc Write a chunk (binary- or iolist-style) of data to a file
|
||||
%% with `Prefix' at `Offset'.
|
||||
|
||||
write_chunk(PidSpec, EpochID, File, Offset, Chunk, Timeout) ->
|
||||
gen_server:call(PidSpec, {req, {write_chunk, EpochID, File, Offset, Chunk}},
|
||||
Timeout).
|
||||
|
||||
%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||
|
||||
init([I]) ->
|
||||
|
@ -269,12 +296,25 @@ make_req_fun({append_chunk, EpochID, Prefix, Chunk}, #state{sock=Sock}) ->
|
|||
fun() -> ?FLU_C:append_chunk(Sock, EpochID, Prefix, Chunk) end;
|
||||
make_req_fun({read_chunk, EpochID, File, Offset, Size}, #state{sock=Sock}) ->
|
||||
fun() -> ?FLU_C:read_chunk(Sock, EpochID, File, Offset, Size) end;
|
||||
make_req_fun({write_chunk, EpochID, File, Offset, Chunk}, #state{sock=Sock}) ->
|
||||
fun() -> ?FLU_C:write_chunk(Sock, EpochID, File, Offset, Chunk) end;
|
||||
make_req_fun({checksum_list, EpochID, File}, #state{sock=Sock}) ->
|
||||
fun() -> ?FLU_C:checksum_list(Sock, EpochID, File) end;
|
||||
make_req_fun({list_files, EpochID}, #state{sock=Sock}) ->
|
||||
fun() -> ?FLU_C:list_files(Sock, EpochID) end;
|
||||
make_req_fun({wedge_status}, #state{sock=Sock}) ->
|
||||
fun() -> ?FLU_C:wedge_status(Sock) end;
|
||||
make_req_fun({get_epoch_id}, #state{sock=Sock}) ->
|
||||
fun() -> case ?FLU_C:read_latest_projection(Sock, private) of
|
||||
{ok, P} ->
|
||||
#projection_v1{epoch_number=Epoch,
|
||||
epoch_csum=CSum} =
|
||||
machi_chain_manager1:inner_projection_or_self(P),
|
||||
{ok, {Epoch, CSum}};
|
||||
Error ->
|
||||
Error
|
||||
end
|
||||
end;
|
||||
make_req_fun({get_latest_epoch, ProjType}, #state{sock=Sock}) ->
|
||||
fun() -> ?FLU_C:get_latest_epoch(Sock, ProjType) end;
|
||||
make_req_fun({read_latest_projection, ProjType}, #state{sock=Sock}) ->
|
||||
|
|
Loading…
Reference in a new issue