Client API timeout and other minor bugfixes
This commit is contained in:
parent
24f8cb21a2
commit
57ba204210
2 changed files with 15 additions and 11 deletions
|
@ -97,7 +97,7 @@ run(append, KeyGen, ValueGen, #m{conn=Conn}=S) ->
|
||||||
Prefix = KeyGen(),
|
Prefix = KeyGen(),
|
||||||
Value = ValueGen(),
|
Value = ValueGen(),
|
||||||
CSum = machi_util:make_client_csum(Value),
|
CSum = machi_util:make_client_csum(Value),
|
||||||
AppendOpts = {append_opts,0,undefined,false},
|
AppendOpts = {append_opts,0,undefined,false}, % HACK FIXME
|
||||||
case machi_cr_client:append_chunk(Conn, undefined, Prefix, Value, CSum, AppendOpts, ?THE_TIMEOUT) of
|
case machi_cr_client:append_chunk(Conn, undefined, Prefix, Value, CSum, AppendOpts, ?THE_TIMEOUT) of
|
||||||
{ok, Pos} ->
|
{ok, Pos} ->
|
||||||
EtsKey = ets:update_counter(?ETS_TAB, max_key, 1),
|
EtsKey = ets:update_counter(?ETS_TAB, max_key, 1),
|
||||||
|
@ -115,7 +115,8 @@ run(read, KeyGen, _ValueGen, #m{conn=Conn, max_key=MaxKey}=S) ->
|
||||||
Idx = KeyGen() rem MaxKey,
|
Idx = KeyGen() rem MaxKey,
|
||||||
%% {File, Offset, Size, _CSum} = ets:lookup_element(?ETS_TAB, Idx, 2),
|
%% {File, Offset, Size, _CSum} = ets:lookup_element(?ETS_TAB, Idx, 2),
|
||||||
{File, Offset, Size} = ets:lookup_element(?ETS_TAB, Idx, 2),
|
{File, Offset, Size} = ets:lookup_element(?ETS_TAB, Idx, 2),
|
||||||
case machi_cr_client:read_chunk(Conn, undefined, File, Offset, Size, undefined, ?THE_TIMEOUT) of
|
ReadOpts = {read_opts,false,false,false}, % HACK FIXME
|
||||||
|
case machi_cr_client:read_chunk(Conn, undefined, File, Offset, Size, ReadOpts, ?THE_TIMEOUT) of
|
||||||
{ok, _Chunk} ->
|
{ok, _Chunk} ->
|
||||||
{ok, S};
|
{ok, S};
|
||||||
{error, _}=Err ->
|
{error, _}=Err ->
|
||||||
|
@ -138,19 +139,22 @@ find_server_info(_Id) ->
|
||||||
load_ets_table(Conn, ETS) ->
|
load_ets_table(Conn, ETS) ->
|
||||||
{ok, Fs} = machi_cr_client:list_files(Conn),
|
{ok, Fs} = machi_cr_client:list_files(Conn),
|
||||||
[begin
|
[begin
|
||||||
{ok, InfoBin} = machi_cr_client:checksum_list(Conn, File),
|
{ok, InfoBin} = machi_cr_client:checksum_list(Conn, File, ?THE_TIMEOUT),
|
||||||
PosList = machi_csum_table:split_checksum_list_blob_decode(InfoBin),
|
PosList = machi_csum_table:split_checksum_list_blob_decode(InfoBin),
|
||||||
?INFO("File ~s len PosList ~p\n", [File, length(PosList)]),
|
?INFO("File ~s len PosList ~p\n", [File, length(PosList)]),
|
||||||
StartKey = ets:update_counter(ETS, max_key, 0),
|
StartKey = ets:update_counter(ETS, max_key, 0),
|
||||||
{_, C, Bytes} = lists:foldl(fun({_Off,0,_CSum}, {_K, _C, _Bs}=Acc) ->
|
{_, C, Bytes} = lists:foldl(fun({_Off,0,_CSum}, {_K, _C, _Bs}=Acc) ->
|
||||||
Acc;
|
Acc;
|
||||||
|
({0,_Sz,_CSum}, {_K, _C, _Bs}=Acc) ->
|
||||||
|
Acc;
|
||||||
({Off,Sz,_CSum}, {K, C, Bs}) ->
|
({Off,Sz,_CSum}, {K, C, Bs}) ->
|
||||||
V = {File, Off, Sz},
|
V = {File, Off, Sz},
|
||||||
ets:insert(ETS, {K, V}),
|
ets:insert(ETS, {K, V}),
|
||||||
{K + 1, C + 1, Bs + Sz}
|
{K + 1, C + 1, Bs + Sz}
|
||||||
end, {StartKey, 0, 0}, PosList),
|
end, {StartKey, 0, 0}, PosList),
|
||||||
ets:update_counter(ETS, max_key, C),
|
_ = ets:update_counter(ETS, max_key, C),
|
||||||
ets:update_counter(ETS, total_bytes, Bytes)
|
_ = ets:update_counter(ETS, total_bytes, Bytes),
|
||||||
|
ok
|
||||||
end || {_Size, File} <- Fs],
|
end || {_Size, File} <- Fs],
|
||||||
ets:update_counter(?ETS_TAB, max_key, 0).
|
ets:update_counter(?ETS_TAB, max_key, 0).
|
||||||
|
|
||||||
|
|
|
@ -78,8 +78,8 @@
|
||||||
terminate/2, code_change/3]).
|
terminate/2, code_change/3]).
|
||||||
|
|
||||||
-define(FLU_PC, machi_proxy_flu1_client).
|
-define(FLU_PC, machi_proxy_flu1_client).
|
||||||
-define(TIMEOUT, 2*1000).
|
-define(TIMEOUT, 10*1000).
|
||||||
-define(DEFAULT_TIMEOUT, 10*1000).
|
-define(DEFAULT_TIMEOUT, ?TIMEOUT*5).
|
||||||
-define(MAX_RUNTIME, 8*1000).
|
-define(MAX_RUNTIME, 8*1000).
|
||||||
-define(WORST_PROJ, #projection_v1{epoch_number=0,epoch_csum= <<>>,
|
-define(WORST_PROJ, #projection_v1{epoch_number=0,epoch_csum= <<>>,
|
||||||
members_dict=[]}).
|
members_dict=[]}).
|
||||||
|
@ -506,7 +506,7 @@ do_read_chunk2(NSInfo, File, Offset, Size, Opts, Depth, STime, TO,
|
||||||
Tail = lists:last(UPI),
|
Tail = lists:last(UPI),
|
||||||
ConsistencyMode = P#projection_v1.mode,
|
ConsistencyMode = P#projection_v1.mode,
|
||||||
case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), NSInfo, EpochID,
|
case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), NSInfo, EpochID,
|
||||||
File, Offset, Size, Opts, ?TIMEOUT) of
|
File, Offset, Size, Opts, TO) of
|
||||||
{ok, {Chunks, Trimmed}} when is_list(Chunks), is_list(Trimmed) ->
|
{ok, {Chunks, Trimmed}} when is_list(Chunks), is_list(Trimmed) ->
|
||||||
%% After partition heal, there could happen that heads may
|
%% After partition heal, there could happen that heads may
|
||||||
%% have chunk trimmed but tails may have chunk written -
|
%% have chunk trimmed but tails may have chunk written -
|
||||||
|
@ -690,7 +690,7 @@ read_repair2(cp_mode=ConsistencyMode,
|
||||||
%% TODO WTF was I thinking here??....
|
%% TODO WTF was I thinking here??....
|
||||||
Tail = lists:last(readonly_flus(P)),
|
Tail = lists:last(readonly_flus(P)),
|
||||||
case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), NSInfo, EpochID,
|
case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), NSInfo, EpochID,
|
||||||
File, Offset, Size, undefined, ?TIMEOUT) of
|
File, Offset, Size, undefined, ?DEFAULT_TIMEOUT) of
|
||||||
{ok, Chunks} when is_list(Chunks) ->
|
{ok, Chunks} when is_list(Chunks) ->
|
||||||
%% TODO: change to {Chunks, Trimmed} and have them repaired
|
%% TODO: change to {Chunks, Trimmed} and have them repaired
|
||||||
ToRepair = mutation_flus(P) -- [Tail],
|
ToRepair = mutation_flus(P) -- [Tail],
|
||||||
|
@ -840,7 +840,7 @@ do_checksum_list(File, Depth, STime, TO, #state{proj=P}=S) ->
|
||||||
do_checksum_list2(File, Depth, STime, TO,
|
do_checksum_list2(File, Depth, STime, TO,
|
||||||
#state{proj=P, proxies_dict=PD}=S) ->
|
#state{proj=P, proxies_dict=PD}=S) ->
|
||||||
Proxy = orddict:fetch(lists:last(readonly_flus(P)), PD),
|
Proxy = orddict:fetch(lists:last(readonly_flus(P)), PD),
|
||||||
case ?FLU_PC:checksum_list(Proxy, File, ?TIMEOUT) of
|
case ?FLU_PC:checksum_list(Proxy, File, TO) of
|
||||||
{ok, _}=OK ->
|
{ok, _}=OK ->
|
||||||
{reply, OK, S};
|
{reply, OK, S};
|
||||||
{error, Retry}
|
{error, Retry}
|
||||||
|
@ -875,7 +875,7 @@ do_list_files(Depth, STime, TO, #state{proj=P}=S) ->
|
||||||
do_list_files2(Depth, STime, TO,
|
do_list_files2(Depth, STime, TO,
|
||||||
#state{epoch_id=EpochID, proj=P, proxies_dict=PD}=S) ->
|
#state{epoch_id=EpochID, proj=P, proxies_dict=PD}=S) ->
|
||||||
Proxy = orddict:fetch(lists:last(readonly_flus(P)), PD),
|
Proxy = orddict:fetch(lists:last(readonly_flus(P)), PD),
|
||||||
case ?FLU_PC:list_files(Proxy, EpochID, ?TIMEOUT) of
|
case ?FLU_PC:list_files(Proxy, EpochID, ?DEFAULT_TIMEOUT) of
|
||||||
{ok, _}=OK ->
|
{ok, _}=OK ->
|
||||||
{reply, OK, S};
|
{reply, OK, S};
|
||||||
{error, Retry}
|
{error, Retry}
|
||||||
|
|
Loading…
Reference in a new issue