Trim command and GC prototype implementation

* maybe_gc/2 is triggered at machi_file_proxy, when chunk is deleted
  and the file is larger than `max_file_size`
* A file is deleted if all chunks except 1024 bytes header are trimmed
* If a file is going to be deleted, file_proxy notifies metadata_mgr
  to remember the filename persistently, whose filename is
  `known_files_<FluName>`
* Such trimmed filenames are stored in a machi_plist file per flu
* machi_file_proxy could not be started if the filename is in the
  manager's list. Consequently, any write, read and trim operations
  cannot happen against deleted file.
* After the file was trimmed, any read request to the file returns
  `{error, trimmed}`
* Disclaimer: no tests written yet and machi_plist does not support
  any recovery from partial writes.
* Add some thoughts as comments for repairing trims.

* State diagram of every byte is as follows:

```
state\action| write/append   | read_chunk       | trim_chunk
------------+----------------+------------------+---------------
 unwritten  |  -> written    | fail (+repair)   | -> trimmed
 written    | noop or repair | return content   | -> trimmed
 trimmed    |  fail          | fail             | noop
```
This commit is contained in:
UENISHI Kota 2015-10-23 18:19:25 +09:00
parent 61f02dfc9f
commit f7358424e4
18 changed files with 687 additions and 133 deletions

View file

@ -48,9 +48,10 @@ enum Mpb_GeneralStatusCode {
PARTITION = 4;
NOT_WRITTEN = 5;
WRITTEN = 6;
NO_SUCH_FILE = 7;
PARTIAL_READ = 8;
BAD_EPOCH = 9;
TRIMMED = 7; // The whole file was trimmed
NO_SUCH_FILE = 8;
PARTIAL_READ = 9;
BAD_EPOCH = 10;
BAD_JOSS = 255; // Only for testing by the Taipan
}
@ -355,6 +356,7 @@ message Mpb_ProjectionV1 {
// append_chunk()
// write_chunk()
// read_chunk()
// trim_chunk()
// checksum_list()
// list_files()
// wedge_status()
@ -424,6 +426,20 @@ message Mpb_LL_ReadChunkResp {
repeated Mpb_ChunkPos trimmed = 3;
}
// Low level API: trim_chunk()
message Mpb_LL_TrimChunkReq {
required Mpb_EpochID epoch_id = 1;
required string file = 2;
required uint64 offset = 3;
required uint32 size = 4;
optional uint32 trigger_gc = 5 [default=1];
}
message Mpb_LL_TrimChunkResp {
required Mpb_GeneralStatusCode status = 1;
}
// Low level API: checksum_list()
message Mpb_LL_ChecksumListReq {
@ -588,11 +604,12 @@ message Mpb_LL_Request {
optional Mpb_LL_AppendChunkReq append_chunk = 30;
optional Mpb_LL_WriteChunkReq write_chunk = 31;
optional Mpb_LL_ReadChunkReq read_chunk = 32;
optional Mpb_LL_ChecksumListReq checksum_list = 33;
optional Mpb_LL_ListFilesReq list_files = 34;
optional Mpb_LL_WedgeStatusReq wedge_status = 35;
optional Mpb_LL_DeleteMigrationReq delete_migration = 36;
optional Mpb_LL_TruncHackReq trunc_hack = 37;
optional Mpb_LL_TrimChunkReq trim_chunk = 33;
optional Mpb_LL_ChecksumListReq checksum_list = 34;
optional Mpb_LL_ListFilesReq list_files = 35;
optional Mpb_LL_WedgeStatusReq wedge_status = 36;
optional Mpb_LL_DeleteMigrationReq delete_migration = 37;
optional Mpb_LL_TruncHackReq trunc_hack = 38;
}
message Mpb_LL_Response {
@ -622,9 +639,10 @@ message Mpb_LL_Response {
optional Mpb_LL_AppendChunkResp append_chunk = 30;
optional Mpb_LL_WriteChunkResp write_chunk = 31;
optional Mpb_LL_ReadChunkResp read_chunk = 32;
optional Mpb_LL_ChecksumListResp checksum_list = 33;
optional Mpb_LL_ListFilesResp list_files = 34;
optional Mpb_LL_WedgeStatusResp wedge_status = 35;
optional Mpb_LL_DeleteMigrationResp delete_migration = 36;
optional Mpb_LL_TruncHackResp trunc_hack = 37;
optional Mpb_LL_TrimChunkResp trim_chunk = 33;
optional Mpb_LL_ChecksumListResp checksum_list = 34;
optional Mpb_LL_ListFilesResp list_files = 35;
optional Mpb_LL_WedgeStatusResp wedge_status = 36;
optional Mpb_LL_DeleteMigrationResp delete_migration = 37;
optional Mpb_LL_TruncHackResp trunc_hack = 38;
}

View file

@ -360,6 +360,9 @@ do_append_head3(Prefix, Chunk, ChunkExtra, Depth, STime, TO,
%% written block is. But we lost a race. Repeat, with a new
%% sequencer assignment.
do_append_head(Prefix, Chunk, ChunkExtra, Depth, STime, TO, S);
{error, trimmed} = Err ->
%% TODO: behaviour
{reply, Err, S};
{error, not_written} ->
exit({todo_should_never_happen,?MODULE,?LINE,
Prefix,iolist_size(Chunk)})
@ -406,7 +409,7 @@ do_append_midtail(_RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra,
Ws, Depth + 1, STime, TO, S2)
end
end
end.
end.
do_append_midtail2([], _Prefix, File, Offset, Chunk,
_ChunkExtra, _Ws, _Depth, _STime, _TO, S) ->
@ -434,6 +437,9 @@ do_append_midtail2([FLU|RestFLUs]=FLUs, Prefix, File, Offset, Chunk,
Resume = {append, Offset, iolist_size(Chunk), File},
do_repair_chunk(FLUs, Resume, Chunk, [], File, Offset,
iolist_size(Chunk), Depth, STime, S);
{error, trimmed} = Err ->
%% TODO: nothing can be done
{reply, Err, S};
{error, not_written} ->
exit({todo_should_never_happen,?MODULE,?LINE,File,Offset})
end.
@ -497,6 +503,8 @@ do_write_head2(File, Offset, Chunk, Depth, STime, TO,
do_write_head(File, Offset, Chunk, Depth, STime, TO, S);
{error, written}=Err ->
{reply, Err, S};
{error, trimmed}=Err ->
{reply, Err, S};
{error, not_written} ->
exit({todo_should_never_happen,?MODULE,?LINE,
iolist_size(Chunk)})
@ -528,18 +536,26 @@ do_read_chunk2(File, Offset, Size, Opts, Depth, STime, TO,
ConsistencyMode = P#projection_v1.mode,
case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID,
File, Offset, Size, Opts, ?TIMEOUT) of
{ok, {Chunks, []}} when is_list(Chunks) ->
{reply, {ok, {Chunks, []}}, S};
{ok, {Chunks, Trimmed}} when is_list(Chunks), is_list(Trimmed) ->
%% After partition heal, there could happen that heads may
%% have chunk trimmed but tails may have chunk written -
%% such repair couldn't be triggered in read time (because
%% there's data!). In this case, repair should happen by
%% partition heal event or some background
%% hashtree-n-repair service. TODO. FIXME.
{reply, {ok, {Chunks, Trimmed}}, S};
%% {ok, BadChunk} ->
%% %% TODO cleaner handling of bad chunks
%% exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, Offset, Size,
%% got, byte_size(BadChunk)});
{error, bad_arg} = BadArg ->
{error, bad_arg} = BadArg ->
{reply, BadArg, S};
{error, partial_read}=Err ->
%% TODO: maybe this case we might need another repair?
{reply, Err, S};
{error, bad_checksum}=BadCS ->
%% TODO: alternate strategy?
%% Maybe we need read repair here, too?
{reply, BadCS, S};
{error, Retry}
when Retry == partition; Retry == bad_epoch; Retry == wedged ->
@ -548,12 +564,125 @@ do_read_chunk2(File, Offset, Size, Opts, Depth, STime, TO,
read_repair(ConsistencyMode, read, File, Offset, Size, Depth, STime, S);
%% {reply, {error, not_written}, S};
{error, written} ->
exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size})
exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size});
{error, trimmed}=Err ->
{reply, Err, S}
end.
do_trim_chunk(File, Offset, Size, 0=Depth, STime, TO, S) ->
do_trim_chunk(File, Offset, Size, Depth+1, STime, TO, S);
do_trim_chunk(File, Offset, Size, Depth, STime, TO, #state{proj=P}=S) ->
sleep_a_while(Depth),
DiffMs = timer:now_diff(os:timestamp(), STime) div 1000,
if DiffMs > TO ->
{reply, {error, partition}, S};
true ->
%% This is suboptimal for performance: there are some paths
%% through this point where our current projection is good
%% enough. But we're going to try to keep the code as simple
%% as we can for now.
S2 = update_proj(S#state{proj=undefined, bad_proj=P}),
case S2#state.proj of
P2 when P2 == undefined orelse
P2#projection_v1.upi == [] ->
do_trim_chunk(File, Offset, Size, Depth + 1,
STime, TO, S2);
_ ->
do_trim_chunk2(File, Offset, Size, Depth + 1,
STime, TO, S2)
end
end.
do_trim_chunk2(File, Offset, Size, Depth, STime, TO,
#state{epoch_id=EpochID, proj=P, proxies_dict=PD}=S) ->
[HeadFLU|RestFLUs] = mutation_flus(P),
Proxy = orddict:fetch(HeadFLU, PD),
case ?FLU_PC:trim_chunk(Proxy, EpochID, File, Offset, Size, ?TIMEOUT) of
ok ->
%% From this point onward, we use the same code & logic path as
%% append does.
do_trim_midtail(RestFLUs, undefined, File, Offset, Size,
[HeadFLU], 0, STime, TO, S);
{error, trimmed} ->
%% Maybe the trim had failed in the middle of the tail so re-run
%% trim accross the whole chain.
do_trim_midtail(RestFLUs, undefined, File, Offset, Size,
[HeadFLU], 0, STime, TO, S);
{error, bad_checksum}=BadCS ->
{reply, BadCS, S};
{error, Retry}
when Retry == partition; Retry == bad_epoch; Retry == wedged ->
do_trim_chunk(File, Offset, Size, Depth, STime, TO, S)
end.
do_trim_midtail(RestFLUs, Prefix, File, Offset, Size,
Ws, Depth, STime, TO, S)
when RestFLUs == [] orelse Depth == 0 ->
do_trim_midtail2(RestFLUs, Prefix, File, Offset, Size,
Ws, Depth + 1, STime, TO, S);
do_trim_midtail(_RestFLUs, Prefix, File, Offset, Size,
Ws, Depth, STime, TO, #state{proj=P}=S) ->
%% io:format(user, "midtail sleep2,", []),
sleep_a_while(Depth),
DiffMs = timer:now_diff(os:timestamp(), STime) div 1000,
if DiffMs > TO ->
{reply, {error, partition}, S};
true ->
S2 = update_proj(S#state{proj=undefined, bad_proj=P}),
case S2#state.proj of
undefined ->
{reply, {error, partition}, S};
P2 ->
RestFLUs2 = mutation_flus(P2),
case RestFLUs2 -- Ws of
RestFLUs2 ->
%% None of the writes that we have done so far
%% are to FLUs that are in the RestFLUs2 list.
%% We are pessimistic here and assume that
%% those FLUs are permanently dead. Start
%% over with a new sequencer assignment, at
%% the 2nd have of the impl (we have already
%% slept & refreshed the projection).
if Prefix == undefined -> % atom! not binary()!!
{error, partition};
true ->
do_trim_chunk(Prefix, Offset, Size,
Depth, STime, TO, S2)
end;
RestFLUs3 ->
do_trim_midtail2(RestFLUs3, Prefix, File, Offset, Size,
Ws, Depth + 1, STime, TO, S2)
end
end
end.
do_trim_midtail2([], _Prefix, _File, _Offset, _Size,
_Ws, _Depth, _STime, _TO, S) ->
%% io:format(user, "ok!\n", []),
{reply, ok, S};
do_trim_midtail2([FLU|RestFLUs]=FLUs, Prefix, File, Offset, Size,
Ws, Depth, STime, TO,
#state{epoch_id=EpochID, proxies_dict=PD}=S) ->
Proxy = orddict:fetch(FLU, PD),
case ?FLU_PC:trim_chunk(Proxy, EpochID, File, Offset, Size, ?TIMEOUT) of
ok ->
%% io:format(user, "write ~w,", [FLU]),
do_trim_midtail2(RestFLUs, Prefix, File, Offset, Size,
[FLU|Ws], Depth, STime, TO, S);
{error, trimmed} ->
do_trim_midtail2(RestFLUs, Prefix, File, Offset, Size,
[FLU|Ws], Depth, STime, TO, S);
{error, bad_checksum}=BadCS ->
%% TODO: alternate strategy?
{reply, BadCS, S};
{error, Retry}
when Retry == partition; Retry == bad_epoch; Retry == wedged ->
do_trim_midtail(FLUs, Prefix, File, Offset, Size,
Ws, Depth, STime, TO, S)
end.
do_trim_chunk(_File, _Offset, _Size, _Depth, _STime, _TO, S) ->
%% This is just a stub to reach CR client from high level client
{reply, {error, bad_joss}, S}.
%% Read repair: depends on the consistency mode that we're in:
%%
@ -597,6 +726,7 @@ read_repair2(cp_mode=ConsistencyMode,
case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID,
File, Offset, Size, [], ?TIMEOUT) of
{ok, Chunks} when is_list(Chunks) ->
%% TODO: change to {Chunks, Trimmed} and have them repaired
ToRepair = mutation_flus(P) -- [Tail],
{Reply, S1} = do_repair_chunks(Chunks, ToRepair, ReturnMode,
[Tail], File, Depth, STime, S, {ok, Chunks}),
@ -614,7 +744,12 @@ read_repair2(cp_mode=ConsistencyMode,
{error, not_written} ->
{reply, {error, not_written}, S};
{error, written} ->
exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size})
exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size});
{error, trimmed} ->
%% TODO: Again, whole file was trimmed. Needs repair. How
%% do we repair trimmed file (which was already unlinked)
%% across the flu servers?
exit({todo_should_repair_unlinked_files, ?MODULE, ?LINE, File})
end;
read_repair2(ap_mode=ConsistencyMode,
ReturnMode, File, Offset, Size, Depth, STime,
@ -622,6 +757,7 @@ read_repair2(ap_mode=ConsistencyMode,
Eligible = mutation_flus(P),
case try_to_find_chunk(Eligible, File, Offset, Size, S) of
{ok, {Chunks, _Trimmed}, GotItFrom} when is_list(Chunks) ->
%% TODO: Repair trimmed chunks
ToRepair = mutation_flus(P) -- [GotItFrom],
{Reply0, S1} = do_repair_chunks(Chunks, ToRepair, ReturnMode, [GotItFrom],
File, Depth, STime, S, {ok, Chunks}),
@ -638,7 +774,11 @@ read_repair2(ap_mode=ConsistencyMode,
{error, not_written} ->
{reply, {error, not_written}, S};
{error, written} ->
exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size})
exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size});
{error, trimmed} ->
%% TODO: Again, whole file was trimmed. Needs repair. How
%% do we repair trimmed file across the flu servers?
exit({todo_should_repair_unlinked_files, ?MODULE, ?LINE, File})
end.
do_repair_chunks([], _, _, _, _, _, _, S, Reply) ->
@ -703,6 +843,9 @@ do_repair_chunk2([First|Rest]=ToRepair, ReturnMode, Chunk, Repaired, File, Offse
%% that it is exactly our Chunk.
do_repair_chunk2(Rest, ReturnMode, Chunk, Repaired, File,
Offset, Size, Depth, STime, S);
{error, trimmed} = _Error ->
%% TODO
exit(todo_should_repair_trimmed);
{error, not_written} ->
exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size})
end.
@ -872,7 +1015,9 @@ try_to_find_chunk(Eligible, File, Offset, Size,
[{FoundFLU, {ok, ChunkAndTrimmed}}|_] ->
{ok, ChunkAndTrimmed, FoundFLU};
[] ->
RetryErrs = [partition, bad_epoch, wedged],
RetryErrs = [partition, bad_epoch, wedged, trimmed],
%% Adding 'trimmed' to return so as to trigger repair,
%% once all other retry errors fixed
case [Err || {error, Err} <- Rs, lists:member(Err, RetryErrs)] of
[SomeErr|_] ->
{error, SomeErr};

View file

@ -177,7 +177,14 @@ all_trimmed(#machi_csum_table{table=T}, Left, Right) ->
-spec all_trimmed(table(), machi_dt:chunk_pos()) -> boolean().
all_trimmed(#machi_csum_table{table=T}, Pos) ->
runthru(ets:tab2list(T), 0, Pos).
case ets:tab2list(T) of
[{0, ?MINIMUM_OFFSET, _}|L] ->
%% tl/1 to remove header space {0, 1024, <<0>>}
runthru(L, ?MINIMUM_OFFSET, Pos);
List ->
%% In case a header is removed;
runthru(List, 0, Pos)
end.
-spec any_trimmed(table(),
pos_integer(),

View file

@ -22,20 +22,20 @@
%% controlled files. In particular, it manages the "write-once register"
%% conceit at the heart of Machi's design.
%%
%% Read, write and append requests for a single file will be managed
%% Read, write and append requests for a single file will be managed
%% through this proxy. Clients can also request syncs for specific
%% types of filehandles.
%%
%% As operations are requested, the proxy keeps track of how many
%% operations it has performed (and how many errors were generated.)
%% After a sufficient number of inactivity, the server terminates
%% After a sufficient number of inactivity, the server terminates
%% itself.
%%
%% TODO:
%% 1. Some way to transition the proxy into a wedged state that
%% 1. Some way to transition the proxy into a wedged state that
%% doesn't rely on message delivery.
%%
%% 2. Check max file size on appends. Writes we take on faith we can
%% 2. Check max file size on appends. Writes we take on faith we can
%% and should handle.
%%
%% 3. Async checksum reads on startup.
@ -47,7 +47,7 @@
%% public API
-export([
start_link/2,
start_link/3,
stop/1,
sync/1,
sync/2,
@ -55,6 +55,7 @@
read/4,
write/3,
write/4,
trim/4,
append/2,
append/4
]).
@ -74,10 +75,11 @@
-define(TIMEOUT, 10*1000).
-define(TOO_MANY_ERRORS_RATIO, 50).
-type op_stats() :: { Total :: non_neg_integer(),
-type op_stats() :: { Total :: non_neg_integer(),
Errors :: non_neg_integer() }.
-record(state, {
fluname :: atom(),
data_dir :: string() | undefined,
filename :: string() | undefined,
data_path :: string() | undefined,
@ -93,17 +95,18 @@
ops = 0 :: non_neg_integer(), %% sum of all ops
reads = {0, 0} :: op_stats(),
writes = {0, 0} :: op_stats(),
appends = {0, 0} :: op_stats()
appends = {0, 0} :: op_stats(),
trims = {0, 0} :: op_stats()
}).
%% Public API
% @doc Start a new instance of the file proxy service. Takes the filename
% @doc Start a new instance of the file proxy service. Takes the filename
% and data directory as arguments. This function is typically called by the
% `machi_file_proxy_sup:start_proxy/2' function.
-spec start_link(Filename :: string(), DataDir :: string()) -> any().
start_link(Filename, DataDir) ->
gen_server:start_link(?MODULE, {Filename, DataDir}, []).
-spec start_link(FluName :: atom(), Filename :: string(), DataDir :: string()) -> any().
start_link(FluName, Filename, DataDir) ->
gen_server:start_link(?MODULE, {FluName, Filename, DataDir}, []).
% @doc Request to stop an instance of the file proxy service.
-spec stop(Pid :: pid()) -> ok.
@ -120,7 +123,7 @@ sync(_Pid) ->
% @doc Force a sync of a specific filehandle type. Valid types are `all', `csum' and `data'.
-spec sync(Pid :: pid(), Type :: all|data|csum) -> ok|{error, term()}.
sync(Pid, Type) when is_pid(Pid) andalso
sync(Pid, Type) when is_pid(Pid) andalso
( Type =:= all orelse Type =:= csum orelse Type =:= data ) ->
gen_server:call(Pid, {sync, Type}, ?TIMEOUT);
sync(_Pid, Type) ->
@ -147,7 +150,7 @@ read(Pid, Offset, Length) ->
{ok, [{Filename::string(), Offset :: non_neg_integer(),
Data :: binary(), Checksum :: binary()}]} |
{error, Reason :: term()}.
read(Pid, Offset, Length, Opts) when is_pid(Pid) andalso is_integer(Offset) andalso Offset >= 0
read(Pid, Offset, Length, Opts) when is_pid(Pid) andalso is_integer(Offset) andalso Offset >= 0
andalso is_integer(Length) andalso Length > 0
andalso is_list(Opts) ->
gen_server:call(Pid, {read, Offset, Length, Opts}, ?TIMEOUT);
@ -179,6 +182,12 @@ write(_Pid, Offset, ClientMeta, _Data) ->
lager:warning("Bad arg to write: Offset ~p, ClientMeta: ~p", [Offset, ClientMeta]),
{error, bad_arg}.
trim(Pid, Offset, Size, TriggerGC) when is_pid(Pid),
is_integer(Offset) andalso Offset >= 0,
is_integer(Size) andalso Size > 0,
is_boolean(TriggerGC) ->
gen_server:call(Pid, {trim ,Offset, Size, TriggerGC}, ?TIMEOUT).
% @doc Append data
-spec append(Pid :: pid(), Data :: binary()) -> {ok, File :: string(), Offset :: non_neg_integer()}
|{error, term()}.
@ -194,8 +203,8 @@ append(_Pid, _Data) ->
-spec append(Pid :: pid(), ClientMeta :: proplists:proplist(),
Extra :: non_neg_integer(), Data :: binary()) -> {ok, File :: string(), Offset :: non_neg_integer()}
|{error, term()}.
append(Pid, ClientMeta, Extra, Data) when is_pid(Pid) andalso is_list(ClientMeta)
andalso is_integer(Extra) andalso Extra >= 0
append(Pid, ClientMeta, Extra, Data) when is_pid(Pid) andalso is_list(ClientMeta)
andalso is_integer(Extra) andalso Extra >= 0
andalso is_binary(Data) ->
gen_server:call(Pid, {append, ClientMeta, Extra, Data}, ?TIMEOUT);
append(_Pid, ClientMeta, Extra, _Data) ->
@ -205,7 +214,7 @@ append(_Pid, ClientMeta, Extra, _Data) ->
%% gen_server callbacks
% @private
init({Filename, DataDir}) ->
init({FluName, Filename, DataDir}) ->
CsumFile = machi_util:make_checksum_filename(DataDir, Filename),
{_, DPath} = machi_util:make_data_filename(DataDir, Filename),
ok = filelib:ensure_dir(CsumFile),
@ -216,6 +225,7 @@ init({Filename, DataDir}) ->
{ok, FHd} = file:open(DPath, [read, write, binary, raw]),
Tref = schedule_tick(),
St = #state{
fluname = FluName,
filename = Filename,
data_dir = DataDir,
data_path = DPath,
@ -250,13 +260,13 @@ handle_call({sync, all}, _From, State = #state{filename = F,
R1 = file:sync(FHd),
Resp = case {R, R1} of
{ok, ok} -> ok;
{ok, O1} ->
lager:error("Got ~p during a data file sync on file ~p", [O1, F]),
{ok, O1} ->
lager:error("Got ~p during a data file sync on file ~p", [O1, F]),
O1;
{O2, ok} ->
lager:error("Got ~p during a csum file sync on file ~p", [O2, F]),
{O2, ok} ->
lager:error("Got ~p during a csum file sync on file ~p", [O2, F]),
O2;
{O3, O4} ->
{O3, O4} ->
lager:error("Got ~p ~p syncing all files for file ~p", [O3, O4, F]),
{O3, O4}
end,
@ -285,16 +295,21 @@ handle_call({read, Offset, Length, Opts}, _From,
csum_table = CsumTable,
reads = {T, Err}
}) ->
%% TODO: use these options - NoChunk prevents reading from disks
%% NoChecksum doesn't check checksums
NoChecksum = proplists:get_value(no_checksum, Opts, false),
NoChunk = proplists:get_value(no_chunk, Opts, false),
NeedsMerge = proplists:get_value(needs_trimmed, Opts, false),
{Resp, NewErr} =
case do_read(FH, F, CsumTable, Offset, Length, NoChecksum, NoChunk, NeedsMerge) of
case do_read(FH, F, CsumTable, Offset, Length, NoChunk, NoChecksum) of
{ok, {[], []}} ->
{{error, not_written}, Err + 1};
{ok, {Chunks0, Trimmed0}} ->
Chunks = slice_both_side(Chunks0, Offset, Offset+Length),
{{ok, {Chunks, Trimmed0}}, Err};
Trimmed = case proplists:get_value(needs_trimmed, Opts, false) of
true -> Trimmed0;
false -> []
end,
{{ok, {Chunks, Trimmed}}, Err};
Error ->
lager:error("Can't read ~p, ~p at File ~p", [Offset, Length, F]),
{Error, Err + 1}
@ -338,6 +353,45 @@ handle_call({write, Offset, ClientMeta, Data}, _From,
{reply, Resp, State#state{writes = {T+1, NewErr},
eof_position = NewEof}};
%%% TRIMS
handle_call({trim, _Offset, _ClientMeta, _Data}, _From,
State = #state{wedged = true,
writes = {T, Err}
}) ->
{reply, {error, wedged}, State#state{writes = {T + 1, Err + 1}}};
handle_call({trim, Offset, Size, _TriggerGC}, _From,
State = #state{data_filehandle=FHd,
ops = Ops,
trims = {T, Err},
csum_table = CsumTable}) ->
case machi_csum_table:all_trimmed(CsumTable, Offset, Size) of
true ->
NewState = State#state{ops=Ops+1, trims={T, Err+1}},
%% All bytes of that range was already trimmed returns ok
%% here, not {error, trimmed}, which means the whole file
%% was trimmed
maybe_gc(ok, NewState);
false ->
LUpdate = maybe_regenerate_checksum(
FHd,
machi_csum_table:find_leftneighbor(CsumTable, Offset)),
RUpdate = maybe_regenerate_checksum(
FHd,
machi_csum_table:find_rightneighbor(CsumTable, Offset+Size)),
case machi_csum_table:trim(CsumTable, Offset, Size, LUpdate, RUpdate) of
ok ->
NewState = State#state{ops=Ops+1, trims={T+1, Err}},
maybe_gc(ok, NewState);
Error ->
{reply, Error, State#state{ops=Ops+1, trims={T, Err+1}}}
end
end;
%% APPENDS
handle_call({append, _ClientMeta, _Extra, _Data}, _From,
@ -476,10 +530,20 @@ terminate(Reason, #state{filename = F,
lager:info(" Reads: ~p/~p", [RT, RE]),
lager:info(" Writes: ~p/~p", [WT, WE]),
lager:info("Appends: ~p/~p", [AT, AE]),
ok = file:sync(FHd),
ok = file:close(FHd),
ok = machi_csum_table:sync(T),
ok = machi_csum_table:close(T),
case FHd of
undefined ->
noop; %% file deleted
_ ->
ok = file:sync(FHd),
ok = file:close(FHd)
end,
case T of
undefined ->
noop; %% file deleted
_ ->
ok = machi_csum_table:sync(T),
ok = machi_csum_table:close(T)
end,
ok.
% @private
@ -512,15 +576,14 @@ check_or_make_tagged_csum(Tag, InCsum, Data) when Tag == ?CSUM_TAG_CLIENT_SHA;
check_or_make_tagged_csum(OtherTag, _ClientCsum, _Data) ->
lager:warning("Unknown checksum tag ~p", [OtherTag]),
{error, bad_checksum}.
-spec do_read(FHd :: file:io_device(),
Filename :: string(),
CsumTable :: machi_csum_table:table(),
Offset :: non_neg_integer(),
Size :: non_neg_integer(),
NoChecksum :: boolean(),
NoChunk :: boolean(),
NeedsTrimmed :: boolean()
NoChecksum :: boolean()
) -> {ok, Chunks :: [{string(), Offset::non_neg_integer(), binary(), Csum :: binary()}]} |
{error, bad_checksum} |
{error, partial_read} |
@ -539,23 +602,23 @@ check_or_make_tagged_csum(OtherTag, _ClientCsum, _Data) ->
% tuple is returned.</ul>
% </li>
%
do_read(FHd, Filename, CsumTable, Offset, Size, _, _, _) ->
do_read(FHd, Filename, CsumTable, Offset, Size).
do_read(FHd, Filename, CsumTable, Offset, Size) ->
do_read(FHd, Filename, CsumTable, Offset, Size, _, _) ->
%% Note that find/3 only returns overlapping chunks, both borders
%% are not aligned to original Offset and Size.
ChunkCsums = machi_csum_table:find(CsumTable, Offset, Size),
read_all_ranges(FHd, Filename, ChunkCsums, []).
read_all_ranges(FHd, Filename, ChunkCsums, [], []).
read_all_ranges(_, _, [], ReadChunks) ->
read_all_ranges(_, _, [], ReadChunks, TrimmedChunks) ->
%% TODO: currently returns empty list of trimmed chunks
{ok, {lists:reverse(ReadChunks), []}};
{ok, {lists:reverse(ReadChunks), lists:reverse(TrimmedChunks)}};
read_all_ranges(FHd, Filename, [{Offset, Size, TaggedCsum}|T], ReadChunks) ->
read_all_ranges(FHd, Filename, [{Offset, Size, trimmed}|T], ReadChunks, TrimmedChunks) ->
read_all_ranges(FHd, Filename, T, ReadChunks, [{Filename, Offset, Size}|TrimmedChunks]);
read_all_ranges(FHd, Filename, [{Offset, Size, TaggedCsum}|T], ReadChunks, TrimmedChunks) ->
case file:pread(FHd, Offset, Size) of
eof ->
read_all_ranges(FHd, Filename, T, ReadChunks);
read_all_ranges(FHd, Filename, T, ReadChunks, TrimmedChunks);
{ok, Bytes} when byte_size(Bytes) == Size ->
{Tag, Ck} = machi_util:unmake_tagged_csum(TaggedCsum),
case check_or_make_tagged_csum(Tag, Ck, Bytes) of
@ -565,19 +628,21 @@ read_all_ranges(FHd, Filename, [{Offset, Size, TaggedCsum}|T], ReadChunks) ->
{error, bad_checksum};
TaggedCsum ->
read_all_ranges(FHd, Filename, T,
[{Filename, Offset, Bytes, TaggedCsum}|ReadChunks]);
[{Filename, Offset, Bytes, TaggedCsum}|ReadChunks],
TrimmedChunks);
OtherCsum when Tag =:= ?CSUM_TAG_NONE ->
%% XXX FIXME: Should we return something other than
%% {ok, ....} in this case?
read_all_ranges(FHd, Filename, T,
[{Filename, Offset, Bytes, OtherCsum}|ReadChunks])
[{Filename, Offset, Bytes, OtherCsum}|ReadChunks],
TrimmedChunks)
end;
{ok, Partial} ->
lager:error("In file ~p, offset ~p, wanted to read ~p bytes, but got ~p",
lager:error("In file ~p, offset ~p, wanted to read ~p bytes, but got ~p",
[Filename, Offset, Size, byte_size(Partial)]),
{error, partial_read};
Other ->
lager:error("While reading file ~p, offset ~p, length ~p, got ~p",
lager:error("While reading file ~p, offset ~p, length ~p, got ~p",
[Filename, Offset, Size, Other]),
{error, Other}
end.
@ -616,7 +681,7 @@ handle_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Data) ->
{error, Reason}
end;
[{Offset, Size, TaggedCsum}] ->
case do_read(FHd, Filename, CsumTable, Offset, Size, false, false, false) of
case do_read(FHd, Filename, CsumTable, Offset, Size, false, false) of
{error, _} = E ->
lager:warning("This should never happen: got ~p while reading"
" at offset ~p in file ~p that's supposedly written",
@ -693,6 +758,8 @@ do_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Size, Data) ->
%% Dialyzer 'can never match': slice_both_side([], _, _) ->
%% [];
slice_both_side([], _, _) ->
[];
slice_both_side([{F, Offset, Chunk, _Csum}|L], LeftPos, RightPos)
when Offset < LeftPos andalso LeftPos < RightPos ->
TrashLen = 8 * (LeftPos - Offset),
@ -729,3 +796,43 @@ maybe_regenerate_checksum(FHd, {Offset, Size, _Csum}) ->
Error ->
throw(Error)
end.
%% GC: make sure unwritte bytes = [{Eof, infinity}] and Eof is > max file size
%% walk through the checksum table and make sure all chunks trimmed
%% Then unlink the file
-spec maybe_gc(term(), #state{}) ->
{reply, term(), #state{}} | {stop, normal, term(), #state{}}.
maybe_gc(Reply, S = #state{eof_position = Eof,
max_file_size = MaxFileSize}) when Eof < MaxFileSize ->
lager:debug("The file is still small; not trying GC (Eof, MaxFileSize) = (~p, ~p)~n",
[Eof, MaxFileSize]),
{reply, Reply, S};
maybe_gc(Reply, S = #state{fluname=FluName,
data_filehandle = FHd,
data_dir = DataDir,
filename = Filename,
eof_position = Eof,
csum_table=CsumTable}) ->
case machi_csum_table:all_trimmed(CsumTable, Eof) of
true ->
lager:debug("GC? Let's do it: ~p.~n", [Filename]),
%% Before unlinking a file, it should inform
%% machi_flu_filename_mgr that this file is
%% deleted and mark it as "trimmed" to avoid
%% filename reuse and resurrection. Maybe garbage
%% will remain if a process crashed but it also
%% should be recovered at filename_mgr startup.
%% Also, this should be informed *before* file proxy
%% deletes files.
ok = machi_flu_metadata_mgr:trim_file(FluName, {file, Filename}),
ok = file:close(FHd),
{_, DPath} = machi_util:make_data_filename(DataDir, Filename),
ok = file:delete(DPath),
machi_csum_table:delete(CsumTable),
{stop, normal, Reply,
S#state{data_filehandle=undefined,
csum_table=undefined}};
false ->
{reply, Reply, S}
end.

View file

@ -44,7 +44,8 @@ start_link(FluName) ->
supervisor:start_link({local, make_proxy_name(FluName)}, ?MODULE, []).
start_proxy(FluName, DataDir, Filename) ->
supervisor:start_child(make_proxy_name(FluName), [Filename, DataDir]).
supervisor:start_child(make_proxy_name(FluName),
[FluName, Filename, DataDir]).
init([]) ->
SupFlags = {simple_one_for_one, 1000, 10},

View file

@ -75,6 +75,7 @@
epoch_id :: 'undefined' | machi_dt:epoch_id(),
pb_mode = undefined :: 'undefined' | 'high' | 'low',
high_clnt :: 'undefined' | pid(),
trim_table :: ets:tid(),
props = [] :: list() % proplist
}).
@ -148,6 +149,7 @@ main2(FluName, TcpPort, DataDir, Props) ->
{true, undefined}
end,
Witness_p = proplists:get_value(witness_mode, Props, false),
S0 = #state{flu_name=FluName,
proj_store=ProjectionPid,
tcp_port=TcpPort,
@ -409,8 +411,11 @@ do_pb_ll_request3({low_write_chunk, _EpochID, File, Offset, Chunk, CSum_tag,
#state{witness=false}=S) ->
{do_server_write_chunk(File, Offset, Chunk, CSum_tag, CSum, S), S};
do_pb_ll_request3({low_read_chunk, _EpochID, File, Offset, Size, Opts},
#state{witness=false}=S) ->
#state{witness=false} = S) ->
{do_server_read_chunk(File, Offset, Size, Opts, S), S};
do_pb_ll_request3({low_trim_chunk, _EpochID, File, Offset, Size, TriggerGC},
#state{witness=false}=S) ->
{do_server_trim_chunk(File, Offset, Size, TriggerGC, S), S};
do_pb_ll_request3({low_checksum_list, _EpochID, File},
#state{witness=false}=S) ->
{do_server_checksum_listing(File, S), S};
@ -541,21 +546,47 @@ do_server_append_chunk2(_PKey, Prefix, Chunk, CSum_tag, Client_CSum,
do_server_write_chunk(File, Offset, Chunk, CSum_tag, CSum, #state{flu_name=FluName}) ->
case sanitize_file_string(File) of
ok ->
{ok, Pid} = machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}),
Meta = [{client_csum_tag, CSum_tag}, {client_csum, CSum}],
machi_file_proxy:write(Pid, Offset, Meta, Chunk);
case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}) of
{ok, Pid} ->
Meta = [{client_csum_tag, CSum_tag}, {client_csum, CSum}],
machi_file_proxy:write(Pid, Offset, Meta, Chunk);
{error, trimmed} = Error ->
Error
end;
_ ->
{error, bad_arg}
end.
do_server_read_chunk(File, Offset, Size, Opts, #state{flu_name=FluName})->
%% TODO: Look inside Opts someday.
case sanitize_file_string(File) of
ok ->
{ok, Pid} = machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}),
case machi_file_proxy:read(Pid, Offset, Size, Opts) of
{ok, ChunksAndTrimmed} -> {ok, ChunksAndTrimmed};
Other -> Other
case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}) of
{ok, Pid} ->
case machi_file_proxy:read(Pid, Offset, Size, Opts) of
%% XXX FIXME
%% For now we are omiting the checksum data because it blows up
%% protobufs.
{ok, ChunksAndTrimmed} -> {ok, ChunksAndTrimmed};
Other -> Other
end;
{error, trimmed} = Error ->
Error
end;
_ ->
{error, bad_arg}
end.
do_server_trim_chunk(File, Offset, Size, TriggerGC, #state{flu_name=FluName}) ->
lager:debug("Hi there! I'm trimming this: ~s, (~p, ~p), ~p~n",
[File, Offset, Size, TriggerGC]),
case sanitize_file_string(File) of
ok ->
case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}) of
{ok, Pid} ->
machi_file_proxy:trim(Pid, Offset, Size, TriggerGC);
{error, trimmed} = Trimmed ->
%% Should be returned back to (maybe) trigger repair
Trimmed
end;
_ ->
{error, bad_arg}
@ -662,10 +693,14 @@ handle_append(Prefix, Chunk, Csum, Extra, FluName, EpochId) ->
Res = machi_flu_filename_mgr:find_or_make_filename_from_prefix(FluName, EpochId, {prefix, Prefix}),
case Res of
{file, F} ->
{ok, Pid} = machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, F}),
{Tag, CS} = machi_util:unmake_tagged_csum(Csum),
Meta = [{client_csum_tag, Tag}, {client_csum, CS}],
machi_file_proxy:append(Pid, Meta, Extra, Chunk);
case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, F}) of
{ok, Pid} ->
{Tag, CS} = machi_util:unmake_tagged_csum(Csum),
Meta = [{client_csum_tag, Tag}, {client_csum, CS}],
machi_file_proxy:append(Pid, Meta, Extra, Chunk);
{error, trimmed} = E ->
E
end;
Error ->
Error
end.

View file

@ -80,6 +80,7 @@
%% For "internal" replication only.
-export([
write_chunk/5, write_chunk/6,
trim_chunk/5, trim_chunk/6,
delete_migration/3, delete_migration/4,
trunc_hack/3, trunc_hack/4
]).
@ -474,6 +475,31 @@ write_chunk(Host, TcpPort, EpochID, File, Offset, Chunk)
disconnect(Sock)
end.
%% @doc Restricted API: Write a chunk of already-sequenced data to
%% `File' at `Offset'.
-spec trim_chunk(port_wrap(), machi_dt:epoch_id(), machi_dt:file_name(), machi_dt:file_offset(), machi_dt:chunk_size()) ->
ok | {error, machi_dt:error_general()} | {error, term()}.
trim_chunk(Sock, EpochID, File0, Offset, Size)
when Offset >= ?MINIMUM_OFFSET ->
ReqID = <<"id">>,
File = machi_util:make_binary(File0),
true = (Offset >= ?MINIMUM_OFFSET),
Req = machi_pb_translate:to_pb_request(
ReqID,
{low_trim_chunk, EpochID, File, Offset, Size, 0}),
do_pb_request_common(Sock, ReqID, Req).
%% @doc Restricted API: Write a chunk of already-sequenced data to
%% `File' at `Offset'.
-spec trim_chunk(machi_dt:inet_host(), machi_dt:inet_port(),
machi_dt:epoch_id(), machi_dt:file_name(), machi_dt:file_offset(), machi_dt:chunk_size()) ->
ok | {error, machi_dt:error_general()} | {error, term()}.
trim_chunk(_Host, _TcpPort, _EpochID, _File, _Offset, _Size) ->
not_used.
%% @doc Restricted API: Delete a file after it has been successfully
%% migrated.

View file

@ -17,8 +17,8 @@
%% under the License.
%%
%% -------------------------------------------------------------------
%%
%% @doc This process is responsible for managing filenames assigned to
%%
%% @doc This process is responsible for managing filenames assigned to
%% prefixes. It's started out of `machi_flu_psup'.
%%
%% Supported operations include finding the "current" filename assigned to
@ -32,7 +32,7 @@
%% First it looks up the sequence number from the prefix name. If
%% no sequence file is found, it uses 0 as the sequence number and searches
%% for a matching file with the prefix and 0 as the sequence number.
%% If no file is found, the it generates a new filename by incorporating
%% If no file is found, the it generates a new filename by incorporating
%% the given prefix, a randomly generated (v4) UUID and 0 as the
%% sequence number.
%%
@ -79,7 +79,7 @@
child_spec(FluName, DataDir) ->
Name = make_filename_mgr_name(FluName),
{Name,
{Name,
{?MODULE, start_link, [FluName, DataDir]},
permanent, 5000, worker, [?MODULE]}.
@ -87,8 +87,8 @@ start_link(FluName, DataDir) when is_atom(FluName) andalso is_list(DataDir) ->
N = make_filename_mgr_name(FluName),
gen_server:start_link({local, N}, ?MODULE, [FluName, DataDir], []).
-spec find_or_make_filename_from_prefix( FluName :: atom(),
EpochId :: pv1_epoch_n(),
-spec find_or_make_filename_from_prefix( FluName :: atom(),
EpochId :: pv1_epoch_n(),
Prefix :: {prefix, string()} ) ->
{file, Filename :: string()} | {error, Reason :: term() } | timeout.
% @doc Find the latest available or make a filename from a prefix. A prefix
@ -96,7 +96,7 @@ start_link(FluName, DataDir) when is_atom(FluName) andalso is_list(DataDir) ->
% tuple in the form of `{file, F}' or an `{error, Reason}'
find_or_make_filename_from_prefix(FluName, EpochId, {prefix, Prefix}) when is_atom(FluName) ->
N = make_filename_mgr_name(FluName),
gen_server:call(N, {find_filename, EpochId, Prefix}, ?TIMEOUT);
gen_server:call(N, {find_filename, EpochId, Prefix}, ?TIMEOUT);
find_or_make_filename_from_prefix(_FluName, _EpochId, Other) ->
lager:error("~p is not a valid prefix.", [Other]),
error(badarg).
@ -104,9 +104,9 @@ find_or_make_filename_from_prefix(_FluName, _EpochId, Other) ->
-spec increment_prefix_sequence( FluName :: atom(), Prefix :: {prefix, string()} ) ->
ok | {error, Reason :: term() } | timeout.
% @doc Increment the sequence counter for a given prefix. Prefix should
% be in the form of `{prefix, P}'.
% be in the form of `{prefix, P}'.
increment_prefix_sequence(FluName, {prefix, Prefix}) when is_atom(FluName) ->
gen_server:call(make_filename_mgr_name(FluName), {increment_sequence, Prefix}, ?TIMEOUT);
gen_server:call(make_filename_mgr_name(FluName), {increment_sequence, Prefix}, ?TIMEOUT);
increment_prefix_sequence(_FluName, Other) ->
lager:error("~p is not a valid prefix.", [Other]),
error(badarg).
@ -117,7 +117,7 @@ increment_prefix_sequence(_FluName, Other) ->
% all the data files associated with that prefix. Returns
% a list.
list_files_by_prefix(FluName, {prefix, Prefix}) when is_atom(FluName) ->
gen_server:call(make_filename_mgr_name(FluName), {list_files, Prefix}, ?TIMEOUT);
gen_server:call(make_filename_mgr_name(FluName), {list_files, Prefix}, ?TIMEOUT);
list_files_by_prefix(_FluName, Other) ->
lager:error("~p is not a valid prefix.", [Other]),
error(badarg).
@ -125,7 +125,10 @@ list_files_by_prefix(_FluName, Other) ->
%% gen_server API
init([FluName, DataDir]) ->
Tid = ets:new(make_filename_mgr_name(FluName), [named_table, {read_concurrency, true}]),
{ok, #state{ fluname = FluName, epoch = 0, datadir = DataDir, tid = Tid }}.
{ok, #state{fluname = FluName,
epoch = 0,
datadir = DataDir,
tid = Tid}}.
handle_cast(Req, State) ->
lager:warning("Got unknown cast ~p", [Req]),
@ -135,9 +138,9 @@ handle_cast(Req, State) ->
%% the FLU has already validated that the caller's epoch id and the FLU's epoch id
%% are the same. So we *assume* that remains the case here - that is to say, we
%% are not wedged.
handle_call({find_filename, EpochId, Prefix}, _From, S = #state{ datadir = DataDir,
epoch = EpochId,
tid = Tid }) ->
handle_call({find_filename, EpochId, Prefix}, _From, S = #state{ datadir = DataDir,
epoch = EpochId,
tid = Tid}) ->
%% Our state and the caller's epoch ids are the same. Business as usual.
File = handle_find_file(Tid, Prefix, DataDir),
{reply, {file, File}, S};
@ -154,7 +157,7 @@ handle_call({increment_sequence, Prefix}, _From, S = #state{ datadir = DataDir }
ok = machi_util:increment_max_filenum(DataDir, Prefix),
{reply, ok, S};
handle_call({list_files, Prefix}, From, S = #state{ datadir = DataDir }) ->
spawn(fun() ->
spawn(fun() ->
L = list_files(DataDir, Prefix),
gen_server:reply(From, L)
end),
@ -181,7 +184,7 @@ code_change(_OldVsn, State, _Extra) ->
%% MIT License
generate_uuid_v4_str() ->
<<A:32, B:16, C:16, D:16, E:48>> = crypto:strong_rand_bytes(16),
io_lib:format("~8.16.0b-~4.16.0b-4~3.16.0b-~4.16.0b-~12.16.0b",
io_lib:format("~8.16.0b-~4.16.0b-4~3.16.0b-~4.16.0b-~12.16.0b",
[A, B, C band 16#0fff, D band 16#3fff bor 16#8000, E]).
find_file(DataDir, Prefix, N) ->
@ -201,7 +204,7 @@ handle_find_file(Tid, Prefix, DataDir) ->
[] ->
{find_or_make_filename(Tid, DataDir, Prefix, N), false};
[H] -> {H, true};
[Fn | _ ] = L ->
[Fn | _ ] = L ->
lager:warning(
"Searching for a matching file to prefix ~p and sequence number ~p gave multiples: ~p",
[Prefix, N, L]),
@ -245,4 +248,3 @@ increment_and_cache_filename(Tid, DataDir, Prefix) ->
-ifdef(TEST).
-endif.

View file

@ -39,10 +39,13 @@
-define(HASH(X), erlang:phash2(X)). %% hash algorithm to use
-define(TIMEOUT, 10 * 1000). %% 10 second timeout
-define(KNOWN_FILES_LIST_PREFIX, "known_files_").
-record(state, {fluname :: atom(),
datadir :: string(),
tid :: ets:tid(),
cnt :: non_neg_integer()
cnt :: non_neg_integer(),
trimmed_files :: machi_plist:plist()
}).
%% This record goes in the ets table where filename is the key
@ -59,7 +62,8 @@
lookup_proxy_pid/2,
start_proxy_pid/2,
stop_proxy_pid/2,
build_metadata_mgr_name/2
build_metadata_mgr_name/2,
trim_file/2
]).
%% gen_server callbacks
@ -97,10 +101,24 @@ start_proxy_pid(FluName, {file, Filename}) ->
stop_proxy_pid(FluName, {file, Filename}) ->
gen_server:call(get_manager_atom(FluName, Filename), {stop_proxy_pid, Filename}, ?TIMEOUT).
trim_file(FluName, {file, Filename}) ->
gen_server:call(get_manager_atom(FluName, Filename), {trim_file, Filename}, ?TIMEOUT).
%% gen_server callbacks
init([FluName, Name, DataDir, Num]) ->
%% important: we'll need another persistent storage to
%% remember deleted (trimmed) file, to prevent resurrection after
%% flu restart and append.
FileListFileName =
filename:join([DataDir, ?KNOWN_FILES_LIST_PREFIX ++ atom_to_list(FluName)]),
{ok, PList} = machi_plist:open(FileListFileName, []),
%% TODO make sure all files non-existent, if any remaining files
%% here, just delete it. They're in the list *because* they're all
%% trimmed.
Tid = ets:new(Name, [{keypos, 2}, {read_concurrency, true}, {write_concurrency, true}]),
{ok, #state{ fluname = FluName, datadir = DataDir, tid = Tid, cnt = Num}}.
{ok, #state{fluname = FluName, datadir = DataDir, tid = Tid, cnt = Num,
trimmed_files=PList}}.
handle_cast(Req, State) ->
lager:warning("Got unknown cast ~p", [Req]),
@ -113,17 +131,25 @@ handle_call({proxy_pid, Filename}, _From, State = #state{ tid = Tid }) ->
end,
{reply, Reply, State};
handle_call({start_proxy_pid, Filename}, _From, State = #state{ fluname = N, tid = Tid, datadir = D }) ->
NewR = case lookup_md(Tid, Filename) of
not_found ->
start_file_proxy(N, D, Filename);
#md{ proxy_pid = undefined } = R0 ->
start_file_proxy(N, D, R0);
#md{ proxy_pid = _Pid } = R1 ->
R1
end,
update_ets(Tid, NewR),
{reply, {ok, NewR#md.proxy_pid}, State};
handle_call({start_proxy_pid, Filename}, _From,
State = #state{ fluname = N, tid = Tid, datadir = D,
trimmed_files=TrimmedFiles}) ->
case machi_plist:find(TrimmedFiles, Filename) of
false ->
NewR = case lookup_md(Tid, Filename) of
not_found ->
start_file_proxy(N, D, Filename);
#md{ proxy_pid = undefined } = R0 ->
start_file_proxy(N, D, R0);
#md{ proxy_pid = _Pid } = R1 ->
R1
end,
update_ets(Tid, NewR),
{reply, {ok, NewR#md.proxy_pid}, State};
true ->
{reply, {error, trimmed}, State}
end;
handle_call({stop_proxy_pid, Filename}, _From, State = #state{ tid = Tid }) ->
case lookup_md(Tid, Filename) of
not_found ->
@ -137,6 +163,15 @@ handle_call({stop_proxy_pid, Filename}, _From, State = #state{ tid = Tid }) ->
end,
{reply, ok, State};
handle_call({trim_file, Filename}, _,
S = #state{trimmed_files = TrimmedFiles }) ->
case machi_plist:add(TrimmedFiles, Filename) of
{ok, TrimmedFiles2} ->
{reply, ok, S#state{trimmed_files=TrimmedFiles2}};
Error ->
{reply, Error, S}
end;
handle_call(Req, From, State) ->
lager:warning("Got unknown call ~p from ~p", [Req, From]),
{reply, hoge, State}.
@ -169,18 +204,21 @@ handle_info({'DOWN', Mref, process, Pid, wedged}, State = #state{ tid = Tid }) -
lager:error("file proxy ~p shutdown because it's wedged", [Pid]),
clear_ets(Tid, Mref),
{noreply, State};
handle_info({'DOWN', _Mref, process, Pid, trimmed}, State = #state{ tid = _Tid }) ->
lager:debug("file proxy ~p shutdown because the file was trimmed", [Pid]),
{noreply, State};
handle_info({'DOWN', Mref, process, Pid, Error}, State = #state{ tid = Tid }) ->
lager:error("file proxy ~p shutdown because ~p", [Pid, Error]),
clear_ets(Tid, Mref),
{noreply, State};
handle_info(Info, State) ->
lager:warning("Got unknown info ~p", [Info]),
{noreply, State}.
terminate(Reason, _State) ->
terminate(Reason, _State = #state{trimmed_files=TrimmedFiles}) ->
lager:info("Shutting down because ~p", [Reason]),
machi_plist:close(TrimmedFiles),
ok.
code_change(_OldVsn, State, _Extra) ->

View file

@ -94,6 +94,9 @@ write_chunk(PidSpec, File, Offset, Chunk, CSum) ->
write_chunk(PidSpec, File, Offset, Chunk, CSum, Timeout) ->
send_sync(PidSpec, {write_chunk, File, Offset, Chunk, CSum}, Timeout).
%% @doc Tries to read a chunk of a specified file. It returns `{ok,
%% {Chunks, TrimmedChunks}}' for live file while it returns `{error,
%% trimmed}' if all bytes of the file was trimmed.
-spec read_chunk(pid(), string(), pos_integer(), pos_integer(),
[{flag_no_checksum | flag_no_chunk | needs_trimmed, boolean()}]) ->
{ok, {list(), list()}} | {error, term()}.
@ -107,6 +110,10 @@ read_chunk(PidSpec, File, Offset, Size, Options) ->
read_chunk(PidSpec, File, Offset, Size, Options, Timeout) ->
send_sync(PidSpec, {read_chunk, File, Offset, Size, Options}, Timeout).
%% @doc Trims arbitrary binary range of any file. TODO: Add option
%% specifying whether to trigger GC.
-spec trim_chunk(pid(), string(), non_neg_integer(), machi_dt:chunk_size()) ->
ok | {error, term()}.
trim_chunk(PidSpec, File, Offset, Size) ->
trim_chunk(PidSpec, File, Offset, Size, ?DEFAULT_TIMEOUT).
@ -415,6 +422,8 @@ convert_general_status_code('NOT_WRITTEN') ->
{error, not_written};
convert_general_status_code('WRITTEN') ->
{error, written};
convert_general_status_code('TRIMMED') ->
{error, trimmed};
convert_general_status_code('NO_SUCH_FILE') ->
{error, no_such_file};
convert_general_status_code('PARTIAL_READ') ->

View file

@ -88,6 +88,17 @@ from_pb_request(#mpb_ll_request{
offset=Offset,
chunk_size=Size} = ChunkPos,
{ReqID, {low_read_chunk, EpochID, File, Offset, Size, Opts}};
from_pb_request(#mpb_ll_request{
req_id=ReqID,
trim_chunk=#mpb_ll_trimchunkreq{
epoch_id=PB_EpochID,
file=File,
offset=Offset,
size=Size,
trigger_gc=PB_TriggerGC}}) ->
EpochID = conv_to_epoch_id(PB_EpochID),
TriggerGC = conv_to_boolean(PB_TriggerGC),
{ReqID, {low_trim_chunk, EpochID, File, Offset, Size, TriggerGC}};
from_pb_request(#mpb_ll_request{
req_id=ReqID,
checksum_list=#mpb_ll_checksumlistreq{
@ -262,6 +273,10 @@ from_pb_response(#mpb_ll_response{
_ ->
{ReqID, machi_pb_high_client:convert_general_status_code(Status)}
end;
from_pb_response(#mpb_ll_response{
req_id=ReqID,
trim_chunk=#mpb_ll_trimchunkresp{status=Status}}) ->
{ReqID, machi_pb_high_client:convert_general_status_code(Status)};
from_pb_response(#mpb_ll_response{
req_id=ReqID,
checksum_list=#mpb_ll_checksumlistresp{
@ -398,11 +413,10 @@ to_pb_request(ReqID, {low_write_chunk, EpochID, File, Offset, Chunk, CSum_tag, C
chunk=Chunk,
csum=PB_CSum}}};
to_pb_request(ReqID, {low_read_chunk, EpochID, File, Offset, Size, Opts}) ->
%% TODO: stop ignoring Opts ^_^
PB_EpochID = conv_from_epoch_id(EpochID),
FNChecksum = proplists:get_value(no_checksum, Opts, false),
FNChunk = proplists:get_value(no_chunk, Opts, false),
NeedsTrimmed = proplists:get_value(needs_merge, Opts, false),
NeedsTrimmed = proplists:get_value(needs_trimmed, Opts, false),
#mpb_ll_request{
req_id=ReqID, do_not_alter=2,
read_chunk=#mpb_ll_readchunkreq{
@ -414,6 +428,15 @@ to_pb_request(ReqID, {low_read_chunk, EpochID, File, Offset, Size, Opts}) ->
flag_no_checksum=machi_util:bool2int(FNChecksum),
flag_no_chunk=machi_util:bool2int(FNChunk),
flag_needs_trimmed=machi_util:bool2int(NeedsTrimmed)}};
to_pb_request(ReqID, {low_trim_chunk, EpochID, File, Offset, Size, TriggerGC}) ->
PB_EpochID = conv_from_epoch_id(EpochID),
#mpb_ll_request{req_id=ReqID, do_not_alter=2,
trim_chunk=#mpb_ll_trimchunkreq{
epoch_id=PB_EpochID,
file=File,
offset=Offset,
size=Size,
trigger_gc=TriggerGC}};
to_pb_request(ReqID, {low_checksum_list, EpochID, File}) ->
PB_EpochID = conv_from_epoch_id(EpochID),
#mpb_ll_request{req_id=ReqID, do_not_alter=2,
@ -524,6 +547,18 @@ to_pb_response(ReqID, {low_read_chunk, _EID, _Fl, _Off, _Sz, _Opts}, Resp)->
_Else ->
make_ll_error_resp(ReqID, 66, io_lib:format("err ~p", [_Else]))
end;
to_pb_response(ReqID, {low_trim_chunk, _, _, _, _, _}, Resp) ->
case Resp of
ok ->
#mpb_ll_response{req_id=ReqID,
trim_chunk=#mpb_ll_trimchunkresp{status='OK'}};
{error, _}=Error ->
Status = conv_from_status(Error),
#mpb_ll_response{req_id=ReqID,
read_chunk=#mpb_ll_trimchunkresp{status=Status}};
_Else ->
make_ll_error_resp(ReqID, 66, io_lib:format("err ~p", [_Else]))
end;
to_pb_response(ReqID, {low_checksum_list, _EpochID, _File}, Resp) ->
case Resp of
{ok, Chunk} ->
@ -909,6 +944,8 @@ conv_from_status({error, not_written}) ->
'NOT_WRITTEN';
conv_from_status({error, written}) ->
'WRITTEN';
conv_from_status({error, trimmed}) ->
'TRIMMED';
conv_from_status({error, no_such_file}) ->
'NO_SUCH_FILE';
conv_from_status({error, partial_read}) ->

63
src/machi_plist.erl Normal file
View file

@ -0,0 +1,63 @@
-module(machi_plist).
%%% @doc persistent list of binaries that support mutual exclusion
-export([open/2, close/1, find/2, add/2]).
-ifdef(TEST).
-export([all/1]).
-endif.
-record(machi_plist,
{filename :: string(),
fd :: file:descriptor(),
list}).
-type plist() :: #machi_plist{}.
-export_type([plist/0]).
-spec open(filename:filename(), proplists:proplist()) ->
{ok, plist()} | {error, file:posix()}.
open(Filename, _Opt) ->
List = case file:read_file(Filename) of
{ok, <<>>} -> [];
{ok, Bin} -> binary_to_term(Bin);
{error, enoent} -> []
end,
case file:open(Filename, [read, write, raw, binary, sync]) of
{ok, Fd} ->
{ok, #machi_plist{filename=Filename,
fd=Fd,
list=List}};
Error ->
Error
end.
-spec close(plist()) -> ok.
close(#machi_plist{fd=Fd}) ->
_ = file:close(Fd).
-spec find(plist(), string()) -> boolean().
find(#machi_plist{list=List}, Name) ->
lists:member(Name, List).
-spec add(plist(), string()) -> {ok, plist()} | {error, file:posix()}.
add(Plist = #machi_plist{list=List0, fd=Fd}, Name) ->
case find(Plist, Name) of
true ->
{ok, Plist};
false ->
List = lists:append(List0, [Name]),
case file:pwrite(Fd, 0, term_to_binary(List)) of
ok ->
{ok, Plist#machi_plist{list=List}};
Error ->
Error
end
end.
-ifdef(TEST).
-spec all(plist()) -> [file:filename()].
all(#machi_plist{list=List}) ->
List.
-endif.

View file

@ -79,6 +79,7 @@
%% Internal API
write_chunk/5, write_chunk/6,
trim_chunk/5, trim_chunk/6,
%% Helpers
stop_proxies/1, start_proxies/1
@ -310,6 +311,18 @@ write_chunk(PidSpec, EpochID, File, Offset, Chunk, Timeout) ->
Else
end.
trim_chunk(PidSpec, EpochID, File, Offset, Size) ->
trim_chunk(PidSpec, EpochID, File, Offset, Size, infinity).
%% @doc Write a chunk (binary- or iolist-style) of data to a file
%% with `Prefix' at `Offset'.
trim_chunk(PidSpec, EpochID, File, Offset, Chunk, Timeout) ->
gen_server:call(PidSpec,
{req, {trim_chunk, EpochID, File, Offset, Chunk}},
Timeout).
%%%%%%%%%%%%%%%%%%%%%%%%%%%
init([I]) ->
@ -383,6 +396,9 @@ make_req_fun({read_chunk, EpochID, File, Offset, Size, Opts},
make_req_fun({write_chunk, EpochID, File, Offset, Chunk},
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
fun() -> Mod:write_chunk(Sock, EpochID, File, Offset, Chunk) end;
make_req_fun({trim_chunk, EpochID, File, Offset, Size},
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
fun() -> Mod:trim_chunk(Sock, EpochID, File, Offset, Size) end;
make_req_fun({checksum_list, EpochID, File},
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
fun() -> Mod:checksum_list(Sock, EpochID, File) end;

View file

@ -76,13 +76,14 @@ smoke3_test() ->
{?LINE, trim, {0, 1024, <<>>}, undefined, undefined}
],
[ begin
%% ?debugVal({Line, Chunk}),
%% ?debugVal({_Line, Chunk}),
{Offset, Size, Csum} = Chunk,
?assertEqual(LeftN0,
machi_csum_table:find_leftneighbor(MC, Offset)),
?assertEqual(RightN0,
machi_csum_table:find_rightneighbor(MC, Offset+Size)),
LeftN = case LeftN0 of
{OffsL, SizeL, trimmed} -> {OffsL, SizeL, trimmed};
{OffsL, SizeL, _} -> {OffsL, SizeL, <<"boom">>};
OtherL -> OtherL
end,

View file

@ -170,7 +170,7 @@ start_command(S) ->
start(_S) ->
{_, _, MS} = os:timestamp(),
File = test_server:temp_name("eqc_data") ++ "." ++ integer_to_list(MS),
{ok, Pid} = machi_file_proxy:start_link(File, ?TESTDIR),
{ok, Pid} = machi_file_proxy:start_link(some_flu, File, ?TESTDIR),
unlink(Pid),
Pid.

View file

@ -78,7 +78,7 @@ random_binary(Start, End) ->
machi_file_proxy_test_() ->
clean_up_data_dir(?TESTDIR),
{ok, Pid} = machi_file_proxy:start_link("test", ?TESTDIR),
{ok, Pid} = machi_file_proxy:start_link(fluname, "test", ?TESTDIR),
[
?_assertEqual({error, bad_arg}, machi_file_proxy:read(Pid, -1, -1)),
?_assertEqual({error, bad_arg}, machi_file_proxy:write(Pid, -1, <<"yo">>)),
@ -100,7 +100,7 @@ machi_file_proxy_test_() ->
multiple_chunks_read_test_() ->
clean_up_data_dir(?TESTDIR),
{ok, Pid} = machi_file_proxy:start_link("test", ?TESTDIR),
{ok, Pid} = machi_file_proxy:start_link(fluname, "test", ?TESTDIR),
[
?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, random_binary(0, 1024))),
?_assertEqual(ok, machi_file_proxy:write(Pid, 10000, <<"fail">>)),

View file

@ -38,6 +38,7 @@ smoke_test2() ->
Ps = [#p_srvr{name=a, address="localhost", port=Port, props="./data.a"}
],
D = orddict:from_list([{P#p_srvr.name, P} || P <- Ps]),
ok = application:set_env(machi, max_file_size, 1024*1024),
[os:cmd("rm -rf " ++ P#p_srvr.props) || P <- Ps],
{ok, SupPid} = machi_flu_sup:start_link(),
@ -90,23 +91,54 @@ smoke_test2() ->
{ok, [{File1Size,File1}]} = ?C:list_files(Clnt),
true = is_integer(File1Size),
File1Bin = binary_to_list(File1),
[begin
%% ok = ?C:trim_chunk(Clnt, Fl, Off, Sz)
%% This gets an error as trim API is still a stub
?assertMatch({bummer,
{throw,
{error, bad_joss_taipan_fixme},
_Boring_stack_trace}},
?C:trim_chunk(Clnt, Fl, Off, Sz))
end || {Ch, Fl, Off, Sz} <- Reads],
#p_srvr{name=Name, port=Port, props=Dir} = P,
?assertEqual({ok, [File1Bin]},
file:list_dir(filename:join([Dir, "data"]))),
FileListFileName = filename:join([Dir, "known_files_" ++ atom_to_list(Name)]),
{ok, Plist} = machi_plist:open(FileListFileName, []),
?assertEqual([], machi_plist:all(Plist))
end || P <- Ps],
[begin
ok = ?C:trim_chunk(Clnt, Fl, Off, Sz)
end || {_Ch, Fl, Off, Sz} <- Reads],
[begin
{ok, {[], Trimmed}} =
?C:read_chunk(Clnt, Fl, Off, Sz, [{needs_trimmed, true}]),
Filename = binary_to_list(Fl),
?assertEqual([{Filename, Off, Sz}], Trimmed)
end || {_Ch, Fl, Off, Sz} <- Reads],
LargeBytes = binary:copy(<<"x">>, 1024*1024),
LBCsum = {client_sha, machi_util:checksum_chunk(LargeBytes)},
{ok, {Offx, Sizex, Filex}} =
?C:append_chunk(Clnt, PK, Prefix, LargeBytes, LBCsum, 0),
ok = ?C:trim_chunk(Clnt, Filex, Offx, Sizex),
%% Make sure everything was trimmed
File = binary_to_list(Filex),
[begin
#p_srvr{name=Name, port=_Port, props=Dir} = P,
?assertEqual({ok, []},
file:list_dir(filename:join([Dir, "data"]))),
FileListFileName = filename:join([Dir, "known_files_" ++ atom_to_list(Name)]),
{ok, Plist} = machi_plist:open(FileListFileName, []),
?assertEqual([File], machi_plist:all(Plist))
end || P <- Ps],
[begin
{error, trimmed} =
?C:read_chunk(Clnt, Fl, Off, Sz, [])
end || {_Ch, Fl, Off, Sz} <- Reads],
ok
after
(catch ?C:quit(Clnt))
end
after
exit(SupPid, normal),
[os:cmd("rm -rf " ++ P#p_srvr.props) || P <- Ps],
[os:cmd("rm -rf " ++ P#p_srvr.props) || P <- Ps],
machi_util:wait_for_death(SupPid, 100),
ok
end.

17
test/machi_plist_test.erl Normal file
View file

@ -0,0 +1,17 @@
-module(machi_plist_test).
-include_lib("eunit/include/eunit.hrl").
open_close_test() ->
FileName = "bark-bark-one",
file:delete(FileName),
{ok, PList0} = machi_plist:open(FileName, []),
{ok, PList1} = machi_plist:add(PList0, "boomar"),
?assertEqual(["boomar"], machi_plist:all(PList1)),
ok = machi_plist:close(PList1),
{ok, PList2} = machi_plist:open(FileName, []),
?assertEqual(["boomar"], machi_plist:all(PList2)),
ok = machi_plist:close(PList2),
file:delete(FileName),
ok.