Merge branch 'slf/perf-improvements1' into tmp/merge-delme

This commit is contained in:
Scott Lystig Fritchie 2016-03-29 18:40:14 +09:00
commit e87bd59a97
8 changed files with 237 additions and 89 deletions

View file

@ -1,6 +1,6 @@
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
%% %%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved. %% Copyright (c) 2007-2016 Basho Technologies, Inc. All Rights Reserved.
%% %%
%% This file is provided to you under the Apache License, %% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file %% Version 2.0 (the "License"); you may not use this file
@ -43,23 +43,25 @@
%% could add new entries to this ETS table. %% could add new entries to this ETS table.
%% %%
%% Now we can use various integer-centric key generators that are %% Now we can use various integer-centric key generators that are
%% already bundled with basho_bench. %% already bundled with basho_bench. NOTE: this scheme does not allow
%% mixing of 'append' and 'read' operations in the same config. Basho
%% Bench does not support different key generators for different
%% operations, unfortunately. The work-around is to run two different
%% Basho Bench instances: on for 'append' ops with a key generator for
%% the desired prefix(es), and the other for 'read' ops with an
%% integer key generator.
%% %%
%% TODO: Add CRC checking, when feasible and when supported on the %% TODO: The 'read' operator will always read chunks at exactly the
%% server side. %% byte offset & size as the original append/write ops. If reads are
%% %% desired at any arbitrary offset & size, then a new strategy is
%% TODO: As an alternate idea, if we know that the chunks written are %% required.
%% always the same size, and if we don't care about CRC checking, then
%% all we need to know are the file names & file sizes on the server:
%% we can then pick any valid offset within that file. That would
%% certainly be more scalable than the zillion-row-ETS-table, which is
%% definitely RAM-hungry.
-module(machi_basho_bench_driver). -module(machi_basho_bench_driver).
-export([new/1, run/4]). -export([new/1, run/4]).
-record(m, { -record(m, {
id,
conn, conn,
max_key max_key
}). }).
@ -81,7 +83,7 @@ new(Id) ->
{read_concurrency, true}]), {read_concurrency, true}]),
ets:insert(ETS, {max_key, 0}), ets:insert(ETS, {max_key, 0}),
ets:insert(ETS, {total_bytes, 0}), ets:insert(ETS, {total_bytes, 0}),
MaxKeys = load_ets_table(Conn, ETS), MaxKeys = load_ets_table_maybe(Conn, ETS),
?INFO("Key preload: finished, ~w keys loaded", [MaxKeys]), ?INFO("Key preload: finished, ~w keys loaded", [MaxKeys]),
Bytes = ets:lookup_element(ETS, total_bytes, 2), Bytes = ets:lookup_element(ETS, total_bytes, 2),
?INFO("Key preload: finished, chunk list specifies ~s MBytes of chunks", ?INFO("Key preload: finished, chunk list specifies ~s MBytes of chunks",
@ -90,12 +92,14 @@ new(Id) ->
true -> true ->
ok ok
end, end,
{ok, #m{conn=Conn}}. {ok, #m{id=Id, conn=Conn}}.
run(append, KeyGen, ValueGen, #m{conn=Conn}=S) -> run(append, KeyGen, ValueGen, #m{conn=Conn}=S) ->
Prefix = KeyGen(), Prefix = KeyGen(),
Value = ValueGen(), Value = ValueGen(),
case machi_cr_client:append_chunk(Conn, Prefix, Value, ?THE_TIMEOUT) of CSum = machi_util:make_client_csum(Value),
AppendOpts = {append_opts,0,undefined,false}, % HACK FIXME
case machi_cr_client:append_chunk(Conn, undefined, Prefix, Value, CSum, AppendOpts, ?THE_TIMEOUT) of
{ok, Pos} -> {ok, Pos} ->
EtsKey = ets:update_counter(?ETS_TAB, max_key, 1), EtsKey = ets:update_counter(?ETS_TAB, max_key, 1),
true = ets:insert(?ETS_TAB, {EtsKey, Pos}), true = ets:insert(?ETS_TAB, {EtsKey, Pos}),
@ -112,9 +116,26 @@ run(read, KeyGen, _ValueGen, #m{conn=Conn, max_key=MaxKey}=S) ->
Idx = KeyGen() rem MaxKey, Idx = KeyGen() rem MaxKey,
%% {File, Offset, Size, _CSum} = ets:lookup_element(?ETS_TAB, Idx, 2), %% {File, Offset, Size, _CSum} = ets:lookup_element(?ETS_TAB, Idx, 2),
{File, Offset, Size} = ets:lookup_element(?ETS_TAB, Idx, 2), {File, Offset, Size} = ets:lookup_element(?ETS_TAB, Idx, 2),
case machi_cr_client:read_chunk(Conn, File, Offset, Size, undefined, ?THE_TIMEOUT) of ReadOpts = {read_opts,false,false,false}, % HACK FIXME
{ok, _Chunk} -> case machi_cr_client:read_chunk(Conn, undefined, File, Offset, Size, ReadOpts, ?THE_TIMEOUT) of
{ok, S}; {ok, {Chunks, _Trimmed}} ->
%% io:format(user, "Chunks ~P\n", [Chunks, 15]),
%% {ok, S};
case lists:all(fun({File2, Offset2, Chunk, CSum}) ->
{_Tag, CS} = machi_util:unmake_tagged_csum(CSum),
CS2 = machi_util:checksum_chunk(Chunk),
if CS == CS2 ->
true;
CS /= CS2 ->
?ERROR("Client-side checksum error for file ~p offset ~p expected ~p got ~p\n", [File2, Offset2, CS, CS2]),
false
end
end, Chunks) of
true ->
{ok, S};
false ->
{error, bad_checksum, S}
end;
{error, _}=Err -> {error, _}=Err ->
?ERROR("read file ~p offset ~w size ~w: ~w\n", ?ERROR("read file ~p offset ~w size ~w: ~w\n",
[File, Offset, Size, Err]), [File, Offset, Size, Err]),
@ -132,21 +153,40 @@ find_server_info(_Id) ->
Ps Ps
end. end.
load_ets_table_maybe(Conn, ETS) ->
case basho_bench_config:get(operations, undefined) of
undefined ->
?ERROR("The 'operations' key is missing from the config file, aborting", []),
exit(bad_config);
Ops when is_list(Ops) ->
case lists:keyfind(read, 1, Ops) of
{read,_} ->
load_ets_table(Conn, ETS);
false ->
?INFO("No 'read' op in the 'operations' list ~p, skipping ETS table load.", [Ops]),
0
end
end.
load_ets_table(Conn, ETS) -> load_ets_table(Conn, ETS) ->
{ok, Fs} = machi_cr_client:list_files(Conn), {ok, Fs} = machi_cr_client:list_files(Conn),
[begin [begin
{ok, InfoBin} = machi_cr_client:checksum_list(Conn, File), {ok, InfoBin} = machi_cr_client:checksum_list(Conn, File, ?THE_TIMEOUT),
PosList = machi_csum_table:split_checksum_list_blob_decode(InfoBin), PosList = machi_csum_table:split_checksum_list_blob_decode(InfoBin),
?INFO("File ~s len PosList ~p\n", [File, length(PosList)]),
StartKey = ets:update_counter(ETS, max_key, 0), StartKey = ets:update_counter(ETS, max_key, 0),
%% _EndKey = lists:foldl(fun({Off,Sz,CSum}, K) -> {_, C, Bytes} = lists:foldl(fun({_Off,0,_CSum}, {_K, _C, _Bs}=Acc) ->
%% V = {File, Off, Sz, CSum}, Acc;
{_, Bytes} = lists:foldl(fun({Off,Sz,_CSum}, {K, Bs}) -> ({0,_Sz,_CSum}, {_K, _C, _Bs}=Acc) ->
V = {File, Off, Sz}, Acc;
ets:insert(ETS, {K, V}), ({Off,Sz,_CSum}, {K, C, Bs}) ->
{K + 1, Bs + Sz} V = {File, Off, Sz},
end, {StartKey, 0}, PosList), ets:insert(ETS, {K, V}),
ets:update_counter(ETS, max_key, length(PosList)), {K + 1, C + 1, Bs + Sz}
ets:update_counter(ETS, total_bytes, Bytes) end, {StartKey, 0, 0}, PosList),
_ = ets:update_counter(ETS, max_key, C),
_ = ets:update_counter(ETS, total_bytes, Bytes),
ok
end || {_Size, File} <- Fs], end || {_Size, File} <- Fs],
ets:update_counter(?ETS_TAB, max_key, 0). ets:update_counter(?ETS_TAB, max_key, 0).

View file

@ -78,8 +78,8 @@
terminate/2, code_change/3]). terminate/2, code_change/3]).
-define(FLU_PC, machi_proxy_flu1_client). -define(FLU_PC, machi_proxy_flu1_client).
-define(TIMEOUT, 2*1000). -define(TIMEOUT, 10*1000).
-define(DEFAULT_TIMEOUT, 10*1000). -define(DEFAULT_TIMEOUT, ?TIMEOUT*5).
-define(MAX_RUNTIME, 8*1000). -define(MAX_RUNTIME, 8*1000).
-define(WORST_PROJ, #projection_v1{epoch_number=0,epoch_csum= <<>>, -define(WORST_PROJ, #projection_v1{epoch_number=0,epoch_csum= <<>>,
members_dict=[]}). members_dict=[]}).
@ -506,7 +506,7 @@ do_read_chunk2(NSInfo, File, Offset, Size, Opts, Depth, STime, TO,
Tail = lists:last(UPI), Tail = lists:last(UPI),
ConsistencyMode = P#projection_v1.mode, ConsistencyMode = P#projection_v1.mode,
case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), NSInfo, EpochID, case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), NSInfo, EpochID,
File, Offset, Size, Opts, ?TIMEOUT) of File, Offset, Size, Opts, TO) of
{ok, {Chunks, Trimmed}} when is_list(Chunks), is_list(Trimmed) -> {ok, {Chunks, Trimmed}} when is_list(Chunks), is_list(Trimmed) ->
%% After partition heal, there could happen that heads may %% After partition heal, there could happen that heads may
%% have chunk trimmed but tails may have chunk written - %% have chunk trimmed but tails may have chunk written -
@ -690,7 +690,7 @@ read_repair2(cp_mode=ConsistencyMode,
%% TODO WTF was I thinking here??.... %% TODO WTF was I thinking here??....
Tail = lists:last(readonly_flus(P)), Tail = lists:last(readonly_flus(P)),
case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), NSInfo, EpochID, case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), NSInfo, EpochID,
File, Offset, Size, undefined, ?TIMEOUT) of File, Offset, Size, undefined, ?DEFAULT_TIMEOUT) of
{ok, Chunks} when is_list(Chunks) -> {ok, Chunks} when is_list(Chunks) ->
%% TODO: change to {Chunks, Trimmed} and have them repaired %% TODO: change to {Chunks, Trimmed} and have them repaired
ToRepair = mutation_flus(P) -- [Tail], ToRepair = mutation_flus(P) -- [Tail],
@ -840,7 +840,7 @@ do_checksum_list(File, Depth, STime, TO, #state{proj=P}=S) ->
do_checksum_list2(File, Depth, STime, TO, do_checksum_list2(File, Depth, STime, TO,
#state{proj=P, proxies_dict=PD}=S) -> #state{proj=P, proxies_dict=PD}=S) ->
Proxy = orddict:fetch(lists:last(readonly_flus(P)), PD), Proxy = orddict:fetch(lists:last(readonly_flus(P)), PD),
case ?FLU_PC:checksum_list(Proxy, File, ?TIMEOUT) of case ?FLU_PC:checksum_list(Proxy, File, TO) of
{ok, _}=OK -> {ok, _}=OK ->
{reply, OK, S}; {reply, OK, S};
{error, Retry} {error, Retry}
@ -875,7 +875,7 @@ do_list_files(Depth, STime, TO, #state{proj=P}=S) ->
do_list_files2(Depth, STime, TO, do_list_files2(Depth, STime, TO,
#state{epoch_id=EpochID, proj=P, proxies_dict=PD}=S) -> #state{epoch_id=EpochID, proj=P, proxies_dict=PD}=S) ->
Proxy = orddict:fetch(lists:last(readonly_flus(P)), PD), Proxy = orddict:fetch(lists:last(readonly_flus(P)), PD),
case ?FLU_PC:list_files(Proxy, EpochID, ?TIMEOUT) of case ?FLU_PC:list_files(Proxy, EpochID, ?DEFAULT_TIMEOUT) of
{ok, _}=OK -> {ok, _}=OK ->
{reply, OK, S}; {reply, OK, S};
{error, Retry} {error, Retry}

View file

@ -1,3 +1,23 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2016 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(machi_csum_table). -module(machi_csum_table).
-export([open/2, -export([open/2,
@ -65,10 +85,18 @@ find(#machi_csum_table{table=T}, Offset, Size) ->
{ok, I} = eleveldb:iterator(T, [], keys_only), {ok, I} = eleveldb:iterator(T, [], keys_only),
EndKey = sext:encode({Offset+Size, 0}), EndKey = sext:encode({Offset+Size, 0}),
StartKey = sext:encode({Offset, Size}), StartKey = sext:encode({Offset, Size}),
{ok, FirstKey} = case eleveldb:iterator_move(I, StartKey) of {ok, FirstKey} = case eleveldb:iterator_move(I, StartKey) of
{error, invalid_iterator} -> {error, invalid_iterator} ->
eleveldb:iterator_move(I, first); try
%% Assume that the invalid_iterator is because
%% we tried to move to the end via StartKey.
%% Instead, move there directly.
{ok, _} = eleveldb:iterator_move(I, last),
{ok, _} = eleveldb:iterator_move(I, prev)
catch
_:_ ->
{ok, _} = eleveldb:iterator_move(I, first)
end;
{ok, _} = R0 -> {ok, _} = R0 ->
case eleveldb:iterator_move(I, prev) of case eleveldb:iterator_move(I, prev) of
{error, invalid_iterator} -> {error, invalid_iterator} ->
@ -92,7 +120,6 @@ find(#machi_csum_table{table=T}, Offset, Size) ->
end, end,
lists:reverse(eleveldb_fold(T, FirstKey, EndKey, FoldFun, [])). lists:reverse(eleveldb_fold(T, FirstKey, EndKey, FoldFun, [])).
%% @doc Updates all chunk info, by deleting existing entries if exists %% @doc Updates all chunk info, by deleting existing entries if exists
%% and putting new chunk info %% and putting new chunk info
-spec write(table(), -spec write(table(),
@ -126,6 +153,8 @@ write(#machi_csum_table{table=T} = CsumT, Offset, Size, CSum,
DeleteOps = lists:map(fun({O, L, _}) -> DeleteOps = lists:map(fun({O, L, _}) ->
{delete, sext:encode({O, L})} {delete, sext:encode({O, L})}
end, Chunks), end, Chunks),
%% io:format(user, "PutOps: ~P\n", [PutOps, 20]),
%% io:format(user, "DelOps: ~P\n", [DeleteOps, 20]),
eleveldb:write(T, DeleteOps ++ PutOps, [{sync, true}]). eleveldb:write(T, DeleteOps ++ PutOps, [{sync, true}]).
-spec find_leftneighbor(table(), non_neg_integer()) -> -spec find_leftneighbor(table(), non_neg_integer()) ->

View file

@ -71,7 +71,7 @@
code_change/3 code_change/3
]). ]).
-define(TICK, 30*1000). %% XXX FIXME Should be something like 5 seconds -define(TICK, 5*1000).
-define(TICK_THRESHOLD, 5). %% After this + 1 more quiescent ticks, shutdown -define(TICK_THRESHOLD, 5). %% After this + 1 more quiescent ticks, shutdown
-define(TIMEOUT, 10*1000). -define(TIMEOUT, 10*1000).
-define(TOO_MANY_ERRORS_RATIO, 50). -define(TOO_MANY_ERRORS_RATIO, 50).
@ -91,6 +91,7 @@
csum_table :: machi_csum_table:table(), csum_table :: machi_csum_table:table(),
eof_position = 0 :: non_neg_integer(), eof_position = 0 :: non_neg_integer(),
max_file_size = ?DEFAULT_MAX_FILE_SIZE :: pos_integer(), max_file_size = ?DEFAULT_MAX_FILE_SIZE :: pos_integer(),
rollover = false :: boolean(),
tref :: reference(), %% timer ref tref :: reference(), %% timer ref
ticks = 0 :: non_neg_integer(), %% ticks elapsed with no new operations ticks = 0 :: non_neg_integer(), %% ticks elapsed with no new operations
ops = 0 :: non_neg_integer(), %% sum of all ops ops = 0 :: non_neg_integer(), %% sum of all ops
@ -239,7 +240,7 @@ init({FluName, Filename, DataDir}) ->
data_filehandle = FHd, data_filehandle = FHd,
csum_table = CsumTable, csum_table = CsumTable,
tref = Tref, tref = Tref,
eof_position = Eof, eof_position = erlang:max(Eof, ?MINIMUM_OFFSET),
max_file_size = machi_config:max_file_size()}, max_file_size = machi_config:max_file_size()},
lager:debug("Starting file proxy ~p for filename ~p, state = ~p, Eof = ~p", lager:debug("Starting file proxy ~p for filename ~p, state = ~p, Eof = ~p",
[self(), Filename, St, Eof]), [self(), Filename, St, Eof]),
@ -449,11 +450,23 @@ handle_cast(Cast, State) ->
{noreply, State}. {noreply, State}.
% @private % @private
handle_info(tick, State = #state{eof_position = Eof, handle_info(tick, State = #state{fluname = FluName,
filename = F,
eof_position = Eof,
max_file_size = MaxFileSize}) when Eof >= MaxFileSize -> max_file_size = MaxFileSize}) when Eof >= MaxFileSize ->
lager:notice("Eof position ~p >= max file size ~p. Shutting down.", %% Older code halted here with {stop, file_rollover, State}.
[Eof, MaxFileSize]), %% However, there may be other requests in our mailbox already
{stop, file_rollover, State}; %% and/or not yet delivered but in a race with the
%% machi_flu_metadata_mgr. So we close our eleveldb instance (to
%% avoid double-open attempt by a new file proxy proc), tell
%% machi_flu_metadata_mgr that we request a rollover, then stop.
%% terminate() will take care of forwarding messages that are
%% caught in the race.
lager:notice("Eof ~s position ~p >= max file size ~p. Shutting down.",
[F, Eof, MaxFileSize]),
State2 = close_files(State),
machi_flu_metadata_mgr:stop_proxy_pid_rollover(FluName, {file, F}),
{stop, normal, State2#state{rollover = true}};
%% XXX Is this a good idea? Need to think this through a bit. %% XXX Is this a good idea? Need to think this through a bit.
handle_info(tick, State = #state{wedged = true}) -> handle_info(tick, State = #state{wedged = true}) ->
@ -467,7 +480,7 @@ handle_info(tick, State = #state{
writes = {WT, WE}, writes = {WT, WE},
appends = {AT, AE} appends = {AT, AE}
}) when Ops > 100 andalso }) when Ops > 100 andalso
trunc(((RE+WE+AE) / RT+WT+AT) * 100) > ?TOO_MANY_ERRORS_RATIO -> trunc(((RE+WE+AE) / (RT+WT+AT)) * 100) > ?TOO_MANY_ERRORS_RATIO ->
Errors = RE + WE + AE, Errors = RE + WE + AE,
lager:notice("Got ~p errors. Shutting down.", [Errors]), lager:notice("Got ~p errors. Shutting down.", [Errors]),
{stop, too_many_errors, State}; {stop, too_many_errors, State};
@ -526,30 +539,23 @@ handle_info(Req, State) ->
{noreply, State}. {noreply, State}.
% @private % @private
terminate(Reason, #state{filename = F, terminate(Reason, State = #state{fluname = FluName,
data_filehandle = FHd, filename = F,
csum_table = T, rollover = Rollover_p,
reads = {RT, RE}, reads = {RT, RE},
writes = {WT, WE}, writes = {WT, WE},
appends = {AT, AE} appends = {AT, AE}
}) -> }) ->
lager:info("Shutting down proxy for file ~p because ~p", [F, Reason]), lager:info("Shutting down proxy for file ~p because ~p", [F, Reason]),
lager:info(" Op Tot/Error", []), lager:info(" Op Tot/Error", []),
lager:info(" Reads: ~p/~p", [RT, RE]), lager:info(" Reads: ~p/~p", [RT, RE]),
lager:info(" Writes: ~p/~p", [WT, WE]), lager:info(" Writes: ~p/~p", [WT, WE]),
lager:info("Appends: ~p/~p", [AT, AE]), lager:info("Appends: ~p/~p", [AT, AE]),
case FHd of close_files(State),
undefined -> if Rollover_p ->
noop; %% file deleted forward_late_messages(FluName, F, 500);
_ -> true ->
ok = file:sync(FHd), ok
ok = file:close(FHd)
end,
case T of
undefined ->
noop; %% file deleted
_ ->
ok = machi_csum_table:close(T)
end, end,
ok. ok.
@ -867,3 +873,36 @@ maybe_gc(Reply, S = #state{fluname=FluName,
false -> false ->
{reply, Reply, S} {reply, Reply, S}
end. end.
close_files(State = #state{data_filehandle = FHd,
csum_table = T}) ->
case FHd of
undefined ->
noop; %% file deleted
_ ->
ok = file:sync(FHd),
ok = file:close(FHd)
end,
case T of
undefined ->
noop; %% file deleted
_ ->
ok = machi_csum_table:close(T)
end,
State#state{data_filehandle = undefined, csum_table = undefined}.
forward_late_messages(FluName, F, Timeout) ->
receive
M ->
case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, F}) of
{ok, Pid} ->
Pid ! M;
{error, trimmed} ->
lager:error("TODO: FLU ~p file ~p reports trimmed status "
"when forwarding ~P\n",
[FluName, F, M, 20])
end,
forward_late_messages(FluName, F, Timeout)
after Timeout ->
ok
end.

View file

@ -157,8 +157,9 @@ handle_call({find_filename, _FluName, EpochId, NSInfo, Prefix}, _From, S = #stat
File = increment_and_cache_filename(Tid, DataDir, NSInfo, Prefix), File = increment_and_cache_filename(Tid, DataDir, NSInfo, Prefix),
{reply, {file, File}, S#state{epoch = EpochId}}; {reply, {file, File}, S#state{epoch = EpochId}};
handle_call({increment_sequence, #ns_info{name=NS, locator=NSLocator}, Prefix}, _From, S = #state{ datadir = DataDir }) -> handle_call({increment_sequence, #ns_info{name=NS, locator=NSLocator}, Prefix}, _From, S = #state{ datadir = DataDir, tid=Tid }) ->
ok = machi_util:increment_max_filenum(DataDir, NS, NSLocator, Prefix), NSInfo = #ns_info{name=NS, locator=NSLocator},
_File = increment_and_cache_filename(Tid, DataDir, NSInfo, Prefix),
{reply, ok, S}; {reply, ok, S};
handle_call({list_files, Prefix}, From, S = #state{ datadir = DataDir }) -> handle_call({list_files, Prefix}, From, S = #state{ datadir = DataDir }) ->
spawn(fun() -> spawn(fun() ->

View file

@ -63,6 +63,7 @@
lookup_proxy_pid/2, lookup_proxy_pid/2,
start_proxy_pid/2, start_proxy_pid/2,
stop_proxy_pid/2, stop_proxy_pid/2,
stop_proxy_pid_rollover/2,
build_metadata_mgr_name/2, build_metadata_mgr_name/2,
trim_file/2 trim_file/2
]). ]).
@ -100,7 +101,10 @@ start_proxy_pid(FluName, {file, Filename}) ->
gen_server:call(get_manager_atom(FluName, Filename), {start_proxy_pid, Filename}, ?TIMEOUT). gen_server:call(get_manager_atom(FluName, Filename), {start_proxy_pid, Filename}, ?TIMEOUT).
stop_proxy_pid(FluName, {file, Filename}) -> stop_proxy_pid(FluName, {file, Filename}) ->
gen_server:call(get_manager_atom(FluName, Filename), {stop_proxy_pid, Filename}, ?TIMEOUT). gen_server:call(get_manager_atom(FluName, Filename), {stop_proxy_pid, false, Filename}, ?TIMEOUT).
stop_proxy_pid_rollover(FluName, {file, Filename}) ->
gen_server:call(get_manager_atom(FluName, Filename), {stop_proxy_pid, true, Filename}, ?TIMEOUT).
trim_file(FluName, {file, Filename}) -> trim_file(FluName, {file, Filename}) ->
gen_server:call(get_manager_atom(FluName, Filename), {trim_file, Filename}, ?TIMEOUT). gen_server:call(get_manager_atom(FluName, Filename), {trim_file, Filename}, ?TIMEOUT).
@ -151,7 +155,7 @@ handle_call({start_proxy_pid, Filename}, _From,
{reply, {error, trimmed}, State} {reply, {error, trimmed}, State}
end; end;
handle_call({stop_proxy_pid, Filename}, _From, State = #state{ tid = Tid }) -> handle_call({stop_proxy_pid, Rollover_p, Filename}, _From, State = #state{ tid = Tid }) ->
case lookup_md(Tid, Filename) of case lookup_md(Tid, Filename) of
not_found -> not_found ->
ok; ok;
@ -159,8 +163,13 @@ handle_call({stop_proxy_pid, Filename}, _From, State = #state{ tid = Tid }) ->
ok; ok;
#md{ proxy_pid = Pid, mref = M } = R -> #md{ proxy_pid = Pid, mref = M } = R ->
demonitor(M, [flush]), demonitor(M, [flush]),
machi_file_proxy:stop(Pid), if Rollover_p ->
update_ets(Tid, R#md{ proxy_pid = undefined, mref = undefined }) do_rollover(Filename, State);
true ->
machi_file_proxy:stop(Pid),
update_ets(Tid, R#md{ proxy_pid = undefined,
mref = undefined })
end
end, end,
{reply, ok, State}; {reply, ok, State};
@ -182,27 +191,6 @@ handle_info({'DOWN', Mref, process, Pid, normal}, State = #state{ tid = Tid }) -
clear_ets(Tid, Mref), clear_ets(Tid, Mref),
{noreply, State}; {noreply, State};
handle_info({'DOWN', Mref, process, Pid, file_rollover}, State = #state{ fluname = FluName,
tid = Tid }) ->
lager:info("file proxy ~p shutdown because of file rollover", [Pid]),
R = get_md_record_by_mref(Tid, Mref),
{Prefix, NS, NSLocator, _, _} =
machi_util:parse_filename(R#md.filename),
%% We only increment the counter here. The filename will be generated on the
%% next append request to that prefix and since the filename will have a new
%% sequence number it probably will be associated with a different metadata
%% manager. That's why we don't want to generate a new file name immediately
%% and use it to start a new file proxy.
NSInfo = #ns_info{name=NS, locator=NSLocator},
ok = machi_flu_filename_mgr:increment_prefix_sequence(FluName, NSInfo, {prefix, Prefix}),
%% purge our ets table of this entry completely since it is likely the
%% new filename (whenever it comes) will be in a different manager than
%% us.
purge_ets(Tid, R),
{noreply, State};
handle_info({'DOWN', Mref, process, Pid, wedged}, State = #state{ tid = Tid }) -> handle_info({'DOWN', Mref, process, Pid, wedged}, State = #state{ tid = Tid }) ->
lager:error("file proxy ~p shutdown because it's wedged", [Pid]), lager:error("file proxy ~p shutdown because it's wedged", [Pid]),
clear_ets(Tid, Mref), clear_ets(Tid, Mref),
@ -275,8 +263,35 @@ get_md_record_by_mref(Tid, Mref) ->
[R] = ets:match_object(Tid, {md, '_', '_', Mref}), [R] = ets:match_object(Tid, {md, '_', '_', Mref}),
R. R.
get_md_record_by_filename(Tid, Filename) ->
[R] = ets:lookup(Tid, Filename),
R.
get_env(Setting, Default) -> get_env(Setting, Default) ->
case application:get_env(machi, Setting) of case application:get_env(machi, Setting) of
undefined -> Default; undefined -> Default;
{ok, V} -> V {ok, V} -> V
end. end.
do_rollover(Filename, _State = #state{ fluname = FluName,
tid = Tid }) ->
R = get_md_record_by_filename(Tid, Filename),
lager:info("file ~p proxy ~p shutdown because of file rollover",
[Filename, R#md.proxy_pid]),
{Prefix, NS, NSLocator, _, _} =
machi_util:parse_filename(R#md.filename),
%% We only increment the counter here. The filename will be generated on the
%% next append request to that prefix and since the filename will have a new
%% sequence number it probably will be associated with a different metadata
%% manager. That's why we don't want to generate a new file name immediately
%% and use it to start a new file proxy.
NSInfo = #ns_info{name=NS, locator=NSLocator},
lager:warning("INCR: ~p ~p\n", [FluName, Prefix]),
ok = machi_flu_filename_mgr:increment_prefix_sequence(FluName, NSInfo, {prefix, Prefix}),
%% purge our ets table of this entry completely since it is likely the
%% new filename (whenever it comes) will be in a different manager than
%% us.
purge_ets(Tid, R),
ok.

View file

@ -1,3 +1,23 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2016 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(machi_plist). -module(machi_plist).
%%% @doc persistent list of binaries %%% @doc persistent list of binaries

View file

@ -25,6 +25,7 @@
-export([ -export([
checksum_chunk/1, checksum_chunk/1,
make_tagged_csum/1, make_tagged_csum/2, make_tagged_csum/1, make_tagged_csum/2,
make_client_csum/1,
unmake_tagged_csum/1, unmake_tagged_csum/1,
hexstr_to_bin/1, bin_to_hexstr/1, hexstr_to_bin/1, bin_to_hexstr/1,
hexstr_to_int/1, int_to_hexstr/2, int_to_hexbin/2, hexstr_to_int/1, int_to_hexstr/2, int_to_hexbin/2,
@ -327,6 +328,9 @@ make_tagged_csum(?CSUM_TAG_SERVER_SHA_ATOM, SHA) ->
make_tagged_csum(?CSUM_TAG_SERVER_REGEN_SHA_ATOM, SHA) -> make_tagged_csum(?CSUM_TAG_SERVER_REGEN_SHA_ATOM, SHA) ->
<<?CSUM_TAG_SERVER_REGEN_SHA:8, SHA/binary>>. <<?CSUM_TAG_SERVER_REGEN_SHA:8, SHA/binary>>.
make_client_csum(BinOrList) ->
make_tagged_csum(?CSUM_TAG_CLIENT_SHA_ATOM, checksum_chunk(BinOrList)).
unmake_tagged_csum(<<Tag:8, Rest/binary>>) -> unmake_tagged_csum(<<Tag:8, Rest/binary>>) ->
{Tag, Rest}. {Tag, Rest}.