WIP: refactoring machi_cr_client:append_chunk*
This commit is contained in:
parent
a7f53cf21a
commit
966d2edae8
1 changed files with 142 additions and 33 deletions
|
@ -66,6 +66,7 @@
|
||||||
|
|
||||||
-define(FLU_PC, machi_proxy_flu1_client).
|
-define(FLU_PC, machi_proxy_flu1_client).
|
||||||
-define(TIMEOUT, 2*1000).
|
-define(TIMEOUT, 2*1000).
|
||||||
|
-define(MAX_RUNTIME, 5*1000).
|
||||||
|
|
||||||
-record(state, {
|
-record(state, {
|
||||||
members_dict :: p_srvr_dict(),
|
members_dict :: p_srvr_dict(),
|
||||||
|
@ -279,43 +280,134 @@ code_change(_OldVsn, S, _Extra) ->
|
||||||
%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||||
|
|
||||||
handle_call2({append_chunk_extra, Prefix, Chunk, ChunkExtra}, _From, S) ->
|
handle_call2({append_chunk_extra, Prefix, Chunk, ChunkExtra}, _From, S) ->
|
||||||
do_append_chunk_extra1(Prefix, Chunk, ChunkExtra, S).
|
do_append_head(Prefix, Chunk, ChunkExtra, 0, os:timestamp(), S).
|
||||||
|
|
||||||
do_append_chunk_extra1(Prefix, Chunk, ChunkExtra,
|
do_append_head(Prefix, Chunk, ChunkExtra, 0=Depth, STime, S) ->
|
||||||
#state{epoch_id=EpochID, proj=P,
|
do_append_head2(Prefix, Chunk, ChunkExtra, Depth + 1, STime, S);
|
||||||
proxies_dict=PD}=S) ->
|
do_append_head(Prefix, Chunk, ChunkExtra, Depth, STime, #state{proj=P}=S) ->
|
||||||
#projection_v1{upi=[HeadFLU|RestFLUs]} = P,
|
io:format(user, "append sleep1,", []),
|
||||||
case ?FLU_PC:append_chunk_extra(orddict:fetch(HeadFLU, PD),
|
sleep_a_while(Depth),
|
||||||
|
DiffMs = timer:now_diff(os:timestamp(), STime) div 1000,
|
||||||
|
if DiffMs > ?MAX_RUNTIME ->
|
||||||
|
{error, partition};
|
||||||
|
true ->
|
||||||
|
%% This is suboptimal for performance: there are some paths
|
||||||
|
%% through this point where our current projection is good
|
||||||
|
%% enough. But we're going to try to keep the code as simple
|
||||||
|
%% as we can for now.
|
||||||
|
S2 = update_proj(S#state{proj=undefined, bad_proj=P}),
|
||||||
|
case S2#state.proj of
|
||||||
|
P2 when P2 == undefined orelse
|
||||||
|
P2#projection_v1.upi == [] ->
|
||||||
|
do_append_head(Prefix, Chunk, ChunkExtra, Depth + 1,
|
||||||
|
STime, S);
|
||||||
|
_ ->
|
||||||
|
do_append_head2(Prefix, Chunk, ChunkExtra, Depth + 1,
|
||||||
|
STime, S)
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
|
do_append_head2(Prefix, Chunk, ChunkExtra, Depth, STime,
|
||||||
|
#state{epoch_id=EpochID, proj=P, proxies_dict=PD}=S) ->
|
||||||
|
[HeadFLU|RestFLUs] = mutation_flus(P),
|
||||||
|
Proxy = orddict:fetch(HeadFLU, PD),
|
||||||
|
io:format(user, "append ~w,", [Proxy]),
|
||||||
|
case ?FLU_PC:append_chunk_extra(Proxy,
|
||||||
EpochID, Prefix, Chunk, ChunkExtra,
|
EpochID, Prefix, Chunk, ChunkExtra,
|
||||||
?TIMEOUT) of
|
?TIMEOUT) of
|
||||||
{ok, {Offset, _Size, File}=_X} ->
|
{ok, {Offset, _Size, File}=_X} ->
|
||||||
do_append_chunk_extra2(RestFLUs, File, Offset, Chunk,
|
%% io:format(user, "append ~w@~p,~w,", [HeadFLU, File, Offset]),
|
||||||
HeadFLU, 1, S);
|
io:format(user, "append ~w,", [HeadFLU]),
|
||||||
{error, Change} when Change == bad_epoch; Change == wedged ->
|
do_append_midtail(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra,
|
||||||
S2 = update_proj(S#state{proj=undefined, bad_proj=P}),
|
[HeadFLU], 0, STime, S);
|
||||||
do_append_chunk_extra1(Prefix, Chunk, ChunkExtra, S2)
|
{error, Retry}
|
||||||
%% TODO return values here
|
when Retry == partition; Retry == bad_epoch; Retry == wedged ->
|
||||||
|
do_append_head(Prefix, Chunk, ChunkExtra, Depth, STime, S);
|
||||||
|
{error, written} ->
|
||||||
|
%% Implicit sequencing + this error = we don't know where this
|
||||||
|
%% written block is. But we lost a race. Repeat, with a new
|
||||||
|
%% sequencer assignment.
|
||||||
|
do_append_head(Prefix, Chunk, ChunkExtra, Depth, STime, S);
|
||||||
|
{error, not_written} ->
|
||||||
|
exit({todo_should_never_happen,?MODULE,?LINE,
|
||||||
|
Prefix,iolist_size(Chunk)})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_append_chunk_extra2([], File, Offset, Chunk, _OldHeadFLU, _OkCount, S) ->
|
do_append_midtail(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra,
|
||||||
{reply, {ok, {Offset, size(Chunk), File}}, S};
|
Ws, Depth, STime, S)
|
||||||
do_append_chunk_extra2([FLU|RestFLUs], File, Offset, Chunk, OldHeadFLU, OkCount,
|
when RestFLUs == [] orelse Depth == 0 ->
|
||||||
#state{epoch_id=EpochID, proj=P,
|
do_append_midtail2(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra,
|
||||||
proxies_dict=PD}=S) ->
|
Ws, Depth + 1, STime, S);
|
||||||
case ?FLU_PC:write_chunk(orddict:fetch(FLU, PD),
|
do_append_midtail(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra,
|
||||||
EpochID, File, Offset, Chunk, ?TIMEOUT) of
|
Ws, Depth, STime, #state{proj=P}=S) ->
|
||||||
ok ->
|
io:format(user, "append sleep2,", []),
|
||||||
do_append_chunk_extra2(RestFLUs, File, Offset, Chunk,
|
sleep_a_while(Depth),
|
||||||
OldHeadFLU, OkCount + 1, S);
|
DiffMs = timer:now_diff(os:timestamp(), STime) div 1000,
|
||||||
{error, Change} when Change == bad_epoch; Change == wedged ->
|
if DiffMs > ?MAX_RUNTIME ->
|
||||||
|
{error, partition};
|
||||||
|
true ->
|
||||||
S2 = update_proj(S#state{proj=undefined, bad_proj=P}),
|
S2 = update_proj(S#state{proj=undefined, bad_proj=P}),
|
||||||
#projection_v1{upi=[NewHeadFLU|NewRestFLUs]} = S2#state.proj,
|
case S2#state.proj of
|
||||||
if OkCount == 1, NewHeadFLU == OldHeadFLU ->
|
undefined ->
|
||||||
do_append_chunk_extra2(NewRestFLUs, File, Offset, Chunk,
|
{error, partition};
|
||||||
OldHeadFLU, OkCount, S2);
|
P2 ->
|
||||||
true ->
|
RestFLUs2 = mutation_flus(P2),
|
||||||
{error, partition}
|
case RestFLUs2 -- Ws of
|
||||||
|
RestFLUs ->
|
||||||
|
%% None of the writes that we have done so far
|
||||||
|
%% are to FLUs that are in the RestFLUs2 list.
|
||||||
|
%% We are pessimistic here and assume that
|
||||||
|
%% those FLUs are permanently dead. Start
|
||||||
|
%% over with a new sequencer assignment, at
|
||||||
|
%% the 2nd have of the impl (we have already
|
||||||
|
%% slept & refreshed the projection).
|
||||||
|
do_append_head2(Prefix, Chunk, ChunkExtra, Depth,
|
||||||
|
STime, S);
|
||||||
|
RestFLUs3 ->
|
||||||
|
do_append_midtail2(RestFLUs3, Prefix, File, Offset,
|
||||||
|
Chunk, ChunkExtra,
|
||||||
|
Ws, Depth + 1, STime, S)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
|
do_append_midtail2([], _Prefix, File, Offset, Chunk,
|
||||||
|
_ChunkExtra, _Ws, _Depth, _STime, S) ->
|
||||||
|
io:format(user, "ok!\n", []),
|
||||||
|
{reply, {ok, {Offset, iolist_size(Chunk), File}}, S};
|
||||||
|
do_append_midtail2([FLU|RestFLUs]=FLUs, Prefix, File, Offset, Chunk,
|
||||||
|
ChunkExtra, Ws, Depth, STime,
|
||||||
|
#state{epoch_id=EpochID, proxies_dict=PD}=S) ->
|
||||||
|
Proxy = orddict:fetch(FLU, PD),
|
||||||
|
io:format(user, "write ~w,", [Proxy]),
|
||||||
|
case ?FLU_PC:write_chunk(Proxy, EpochID, File, Offset, Chunk, ?TIMEOUT) of
|
||||||
|
ok ->
|
||||||
|
%% io:format(user, "write ~w@~p~w,", [FLU, File, Offset]),
|
||||||
|
io:format(user, "write ~w,", [FLU]),
|
||||||
|
do_append_midtail2(RestFLUs, Prefix, File, Offset, Chunk,
|
||||||
|
ChunkExtra, [FLU|Ws], Depth, STime, S);
|
||||||
|
{error, Retry}
|
||||||
|
when Retry == partition; Retry == bad_epoch; Retry == wedged ->
|
||||||
|
do_append_midtail(FLUs, Prefix, File, Offset, Chunk,
|
||||||
|
ChunkExtra, Ws, Depth, STime, S);
|
||||||
|
{error, written} ->
|
||||||
|
exit({todo,read_repair,?MODULE,?LINE,File,Offset,iolist_size(Chunk)})
|
||||||
|
%% read_repair(P#projection_v1.upi ++ P#projection_v1.repairing,
|
||||||
|
%% Chunk, Depth, STime, S)
|
||||||
|
%% Chunk1 = if is_binary(Chunk) -> Chunk;
|
||||||
|
%% is_list(Chunk) -> list_to_binary(Chunk)
|
||||||
|
%% end,
|
||||||
|
%% case ?FLU_PC:read_chunk(Proxy, EpochID, File, Offset,
|
||||||
|
%% size(Chunk1), ?TIMEOUT) of
|
||||||
|
%% {ok, Chunk2} when Chunk2 == Chunk1 ->
|
||||||
|
%% %% Someone has been read-repairing this chunk.
|
||||||
|
%% %% Keep going.
|
||||||
|
%% do_append_chunk_extra2(RestFLUs, File, Offset, Chunk,
|
||||||
|
%% OldHeadFLU, OkCount + 1, S);
|
||||||
|
%% {error, not_written} ->
|
||||||
|
%% exit({todo_should_never_happen,?MODULE,?LINE,
|
||||||
|
%% File,Offset, size(Chunk1)});
|
||||||
|
%% { ->
|
||||||
%% TODO return values here
|
%% TODO return values here
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -328,7 +420,7 @@ update_proj2(Count, #state{bad_proj=BadProj, proxies_dict=ProxiesDict}=S) ->
|
||||||
Timeout = 2*1000,
|
Timeout = 2*1000,
|
||||||
Parent = self(),
|
Parent = self(),
|
||||||
Proxies = orddict:to_list(ProxiesDict),
|
Proxies = orddict:to_list(ProxiesDict),
|
||||||
Worker = spawn(
|
MiddleWorker = spawn(
|
||||||
fun() ->
|
fun() ->
|
||||||
PidsMons =
|
PidsMons =
|
||||||
[spawn_monitor(fun() ->
|
[spawn_monitor(fun() ->
|
||||||
|
@ -339,7 +431,7 @@ update_proj2(Count, #state{bad_proj=BadProj, proxies_dict=ProxiesDict}=S) ->
|
||||||
Parent ! {res, self(), Rs},
|
Parent ! {res, self(), Rs},
|
||||||
exit(normal)
|
exit(normal)
|
||||||
end),
|
end),
|
||||||
Rs = receive {res, Worker, Results} -> Results
|
Rs = receive {res, MiddleWorker, Results} -> Results
|
||||||
after Timeout*2 -> []
|
after Timeout*2 -> []
|
||||||
end,
|
end,
|
||||||
%% TODO: There's a possible bug here when running multiple independent
|
%% TODO: There's a possible bug here when running multiple independent
|
||||||
|
@ -352,14 +444,14 @@ update_proj2(Count, #state{bad_proj=BadProj, proxies_dict=ProxiesDict}=S) ->
|
||||||
P when P >= BadProj ->
|
P when P >= BadProj ->
|
||||||
#projection_v1{epoch_number=Epoch, epoch_csum=CSum,
|
#projection_v1{epoch_number=Epoch, epoch_csum=CSum,
|
||||||
members_dict=NewMembersDict} = P,
|
members_dict=NewMembersDict} = P,
|
||||||
io:format(user, "~s: proj ~P\n", [?MODULE, P, 10]),
|
io:format(user, "~s: proj ~P\n", [?MODULE, P, 15]),
|
||||||
EpochID = {Epoch, CSum},
|
EpochID = {Epoch, CSum},
|
||||||
?FLU_PC:stop_proxies(ProxiesDict),
|
?FLU_PC:stop_proxies(ProxiesDict),
|
||||||
NewProxiesDict = ?FLU_PC:start_proxies(NewMembersDict),
|
NewProxiesDict = ?FLU_PC:start_proxies(NewMembersDict),
|
||||||
S#state{bad_proj=undefined, proj=P, epoch_id=EpochID,
|
S#state{bad_proj=undefined, proj=P, epoch_id=EpochID,
|
||||||
members_dict=NewMembersDict, proxies_dict=NewProxiesDict};
|
members_dict=NewMembersDict, proxies_dict=NewProxiesDict};
|
||||||
_ ->
|
_ ->
|
||||||
timer:sleep(10 + (Count * 20)),
|
sleep_a_while(Count),
|
||||||
update_proj2(Count + 1, S)
|
update_proj2(Count + 1, S)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -382,5 +474,22 @@ choose_best_proj(Rs) ->
|
||||||
BestEpoch
|
BestEpoch
|
||||||
end, WorstEpoch, Rs).
|
end, WorstEpoch, Rs).
|
||||||
|
|
||||||
|
mutation_flus(#projection_v1{upi=UPI, repairing=Repairing}) ->
|
||||||
|
UPI ++ Repairing;
|
||||||
|
mutation_flus(#state{proj=P}) ->
|
||||||
|
mutation_flus(P).
|
||||||
|
|
||||||
|
readonly_flus(#projection_v1{upi=UPI}) ->
|
||||||
|
UPI;
|
||||||
|
readonly_flus(#state{proj=P}) ->
|
||||||
|
readonly_flus(P).
|
||||||
|
|
||||||
|
sleep_a_while(0) ->
|
||||||
|
ok;
|
||||||
|
sleep_a_while(1) ->
|
||||||
|
ok;
|
||||||
|
sleep_a_while(Depth) ->
|
||||||
|
timer:sleep(30 + trunc(math:pow(1.9, Depth))).
|
||||||
|
|
||||||
noop() ->
|
noop() ->
|
||||||
ok.
|
ok.
|
||||||
|
|
Loading…
Reference in a new issue