diff --git a/src/machi.proto b/src/machi.proto index e5d77d9..e583ba5 100644 --- a/src/machi.proto +++ b/src/machi.proto @@ -48,9 +48,10 @@ enum Mpb_GeneralStatusCode { PARTITION = 4; NOT_WRITTEN = 5; WRITTEN = 6; - NO_SUCH_FILE = 7; - PARTIAL_READ = 8; - BAD_EPOCH = 9; + TRIMMED = 7; // The whole file was trimmed + NO_SUCH_FILE = 8; + PARTIAL_READ = 9; + BAD_EPOCH = 10; BAD_JOSS = 255; // Only for testing by the Taipan } @@ -355,6 +356,7 @@ message Mpb_ProjectionV1 { // append_chunk() // write_chunk() // read_chunk() +// trim_chunk() // checksum_list() // list_files() // wedge_status() @@ -424,6 +426,20 @@ message Mpb_LL_ReadChunkResp { repeated Mpb_ChunkPos trimmed = 3; } +// Low level API: trim_chunk() + +message Mpb_LL_TrimChunkReq { + required Mpb_EpochID epoch_id = 1; + required string file = 2; + required uint64 offset = 3; + required uint32 size = 4; + optional uint32 trigger_gc = 5 [default=1]; +} + +message Mpb_LL_TrimChunkResp { + required Mpb_GeneralStatusCode status = 1; +} + // Low level API: checksum_list() message Mpb_LL_ChecksumListReq { @@ -588,11 +604,12 @@ message Mpb_LL_Request { optional Mpb_LL_AppendChunkReq append_chunk = 30; optional Mpb_LL_WriteChunkReq write_chunk = 31; optional Mpb_LL_ReadChunkReq read_chunk = 32; - optional Mpb_LL_ChecksumListReq checksum_list = 33; - optional Mpb_LL_ListFilesReq list_files = 34; - optional Mpb_LL_WedgeStatusReq wedge_status = 35; - optional Mpb_LL_DeleteMigrationReq delete_migration = 36; - optional Mpb_LL_TruncHackReq trunc_hack = 37; + optional Mpb_LL_TrimChunkReq trim_chunk = 33; + optional Mpb_LL_ChecksumListReq checksum_list = 34; + optional Mpb_LL_ListFilesReq list_files = 35; + optional Mpb_LL_WedgeStatusReq wedge_status = 36; + optional Mpb_LL_DeleteMigrationReq delete_migration = 37; + optional Mpb_LL_TruncHackReq trunc_hack = 38; } message Mpb_LL_Response { @@ -622,9 +639,10 @@ message Mpb_LL_Response { optional Mpb_LL_AppendChunkResp append_chunk = 30; optional Mpb_LL_WriteChunkResp write_chunk = 31; optional Mpb_LL_ReadChunkResp read_chunk = 32; - optional Mpb_LL_ChecksumListResp checksum_list = 33; - optional Mpb_LL_ListFilesResp list_files = 34; - optional Mpb_LL_WedgeStatusResp wedge_status = 35; - optional Mpb_LL_DeleteMigrationResp delete_migration = 36; - optional Mpb_LL_TruncHackResp trunc_hack = 37; + optional Mpb_LL_TrimChunkResp trim_chunk = 33; + optional Mpb_LL_ChecksumListResp checksum_list = 34; + optional Mpb_LL_ListFilesResp list_files = 35; + optional Mpb_LL_WedgeStatusResp wedge_status = 36; + optional Mpb_LL_DeleteMigrationResp delete_migration = 37; + optional Mpb_LL_TruncHackResp trunc_hack = 38; } diff --git a/src/machi_cr_client.erl b/src/machi_cr_client.erl index 7b88f45..e198c40 100644 --- a/src/machi_cr_client.erl +++ b/src/machi_cr_client.erl @@ -360,6 +360,9 @@ do_append_head3(Prefix, Chunk, ChunkExtra, Depth, STime, TO, %% written block is. But we lost a race. Repeat, with a new %% sequencer assignment. do_append_head(Prefix, Chunk, ChunkExtra, Depth, STime, TO, S); + {error, trimmed} = Err -> + %% TODO: behaviour + {reply, Err, S}; {error, not_written} -> exit({todo_should_never_happen,?MODULE,?LINE, Prefix,iolist_size(Chunk)}) @@ -406,7 +409,7 @@ do_append_midtail(_RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra, Ws, Depth + 1, STime, TO, S2) end end - end. + end. do_append_midtail2([], _Prefix, File, Offset, Chunk, _ChunkExtra, _Ws, _Depth, _STime, _TO, S) -> @@ -434,6 +437,9 @@ do_append_midtail2([FLU|RestFLUs]=FLUs, Prefix, File, Offset, Chunk, Resume = {append, Offset, iolist_size(Chunk), File}, do_repair_chunk(FLUs, Resume, Chunk, [], File, Offset, iolist_size(Chunk), Depth, STime, S); + {error, trimmed} = Err -> + %% TODO: nothing can be done + {reply, Err, S}; {error, not_written} -> exit({todo_should_never_happen,?MODULE,?LINE,File,Offset}) end. @@ -497,6 +503,8 @@ do_write_head2(File, Offset, Chunk, Depth, STime, TO, do_write_head(File, Offset, Chunk, Depth, STime, TO, S); {error, written}=Err -> {reply, Err, S}; + {error, trimmed}=Err -> + {reply, Err, S}; {error, not_written} -> exit({todo_should_never_happen,?MODULE,?LINE, iolist_size(Chunk)}) @@ -528,18 +536,26 @@ do_read_chunk2(File, Offset, Size, Opts, Depth, STime, TO, ConsistencyMode = P#projection_v1.mode, case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID, File, Offset, Size, Opts, ?TIMEOUT) of - {ok, {Chunks, []}} when is_list(Chunks) -> - {reply, {ok, {Chunks, []}}, S}; + {ok, {Chunks, Trimmed}} when is_list(Chunks), is_list(Trimmed) -> + %% After partition heal, there could happen that heads may + %% have chunk trimmed but tails may have chunk written - + %% such repair couldn't be triggered in read time (because + %% there's data!). In this case, repair should happen by + %% partition heal event or some background + %% hashtree-n-repair service. TODO. FIXME. + {reply, {ok, {Chunks, Trimmed}}, S}; %% {ok, BadChunk} -> %% %% TODO cleaner handling of bad chunks %% exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, Offset, Size, %% got, byte_size(BadChunk)}); - {error, bad_arg} = BadArg -> + {error, bad_arg} = BadArg -> {reply, BadArg, S}; {error, partial_read}=Err -> + %% TODO: maybe this case we might need another repair? {reply, Err, S}; {error, bad_checksum}=BadCS -> %% TODO: alternate strategy? + %% Maybe we need read repair here, too? {reply, BadCS, S}; {error, Retry} when Retry == partition; Retry == bad_epoch; Retry == wedged -> @@ -548,12 +564,125 @@ do_read_chunk2(File, Offset, Size, Opts, Depth, STime, TO, read_repair(ConsistencyMode, read, File, Offset, Size, Depth, STime, S); %% {reply, {error, not_written}, S}; {error, written} -> - exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size}) + exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size}); + {error, trimmed}=Err -> + {reply, Err, S} + end. + +do_trim_chunk(File, Offset, Size, 0=Depth, STime, TO, S) -> + do_trim_chunk(File, Offset, Size, Depth+1, STime, TO, S); + +do_trim_chunk(File, Offset, Size, Depth, STime, TO, #state{proj=P}=S) -> + sleep_a_while(Depth), + DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, + if DiffMs > TO -> + {reply, {error, partition}, S}; + true -> + %% This is suboptimal for performance: there are some paths + %% through this point where our current projection is good + %% enough. But we're going to try to keep the code as simple + %% as we can for now. + S2 = update_proj(S#state{proj=undefined, bad_proj=P}), + case S2#state.proj of + P2 when P2 == undefined orelse + P2#projection_v1.upi == [] -> + do_trim_chunk(File, Offset, Size, Depth + 1, + STime, TO, S2); + _ -> + do_trim_chunk2(File, Offset, Size, Depth + 1, + STime, TO, S2) + end + end. + +do_trim_chunk2(File, Offset, Size, Depth, STime, TO, + #state{epoch_id=EpochID, proj=P, proxies_dict=PD}=S) -> + [HeadFLU|RestFLUs] = mutation_flus(P), + Proxy = orddict:fetch(HeadFLU, PD), + case ?FLU_PC:trim_chunk(Proxy, EpochID, File, Offset, Size, ?TIMEOUT) of + ok -> + %% From this point onward, we use the same code & logic path as + %% append does. + do_trim_midtail(RestFLUs, undefined, File, Offset, Size, + [HeadFLU], 0, STime, TO, S); + {error, trimmed} -> + %% Maybe the trim had failed in the middle of the tail so re-run + %% trim accross the whole chain. + do_trim_midtail(RestFLUs, undefined, File, Offset, Size, + [HeadFLU], 0, STime, TO, S); + {error, bad_checksum}=BadCS -> + {reply, BadCS, S}; + {error, Retry} + when Retry == partition; Retry == bad_epoch; Retry == wedged -> + do_trim_chunk(File, Offset, Size, Depth, STime, TO, S) + end. + +do_trim_midtail(RestFLUs, Prefix, File, Offset, Size, + Ws, Depth, STime, TO, S) + when RestFLUs == [] orelse Depth == 0 -> + do_trim_midtail2(RestFLUs, Prefix, File, Offset, Size, + Ws, Depth + 1, STime, TO, S); +do_trim_midtail(_RestFLUs, Prefix, File, Offset, Size, + Ws, Depth, STime, TO, #state{proj=P}=S) -> + %% io:format(user, "midtail sleep2,", []), + sleep_a_while(Depth), + DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, + if DiffMs > TO -> + {reply, {error, partition}, S}; + true -> + S2 = update_proj(S#state{proj=undefined, bad_proj=P}), + case S2#state.proj of + undefined -> + {reply, {error, partition}, S}; + P2 -> + RestFLUs2 = mutation_flus(P2), + case RestFLUs2 -- Ws of + RestFLUs2 -> + %% None of the writes that we have done so far + %% are to FLUs that are in the RestFLUs2 list. + %% We are pessimistic here and assume that + %% those FLUs are permanently dead. Start + %% over with a new sequencer assignment, at + %% the 2nd have of the impl (we have already + %% slept & refreshed the projection). + + if Prefix == undefined -> % atom! not binary()!! + {error, partition}; + true -> + do_trim_chunk(Prefix, Offset, Size, + Depth, STime, TO, S2) + end; + RestFLUs3 -> + do_trim_midtail2(RestFLUs3, Prefix, File, Offset, Size, + Ws, Depth + 1, STime, TO, S2) + end + end + end. + +do_trim_midtail2([], _Prefix, _File, _Offset, _Size, + _Ws, _Depth, _STime, _TO, S) -> + %% io:format(user, "ok!\n", []), + {reply, ok, S}; +do_trim_midtail2([FLU|RestFLUs]=FLUs, Prefix, File, Offset, Size, + Ws, Depth, STime, TO, + #state{epoch_id=EpochID, proxies_dict=PD}=S) -> + Proxy = orddict:fetch(FLU, PD), + case ?FLU_PC:trim_chunk(Proxy, EpochID, File, Offset, Size, ?TIMEOUT) of + ok -> + %% io:format(user, "write ~w,", [FLU]), + do_trim_midtail2(RestFLUs, Prefix, File, Offset, Size, + [FLU|Ws], Depth, STime, TO, S); + {error, trimmed} -> + do_trim_midtail2(RestFLUs, Prefix, File, Offset, Size, + [FLU|Ws], Depth, STime, TO, S); + {error, bad_checksum}=BadCS -> + %% TODO: alternate strategy? + {reply, BadCS, S}; + {error, Retry} + when Retry == partition; Retry == bad_epoch; Retry == wedged -> + do_trim_midtail(FLUs, Prefix, File, Offset, Size, + Ws, Depth, STime, TO, S) end. -do_trim_chunk(_File, _Offset, _Size, _Depth, _STime, _TO, S) -> - %% This is just a stub to reach CR client from high level client - {reply, {error, bad_joss}, S}. %% Read repair: depends on the consistency mode that we're in: %% @@ -597,6 +726,7 @@ read_repair2(cp_mode=ConsistencyMode, case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID, File, Offset, Size, [], ?TIMEOUT) of {ok, Chunks} when is_list(Chunks) -> + %% TODO: change to {Chunks, Trimmed} and have them repaired ToRepair = mutation_flus(P) -- [Tail], {Reply, S1} = do_repair_chunks(Chunks, ToRepair, ReturnMode, [Tail], File, Depth, STime, S, {ok, Chunks}), @@ -614,7 +744,12 @@ read_repair2(cp_mode=ConsistencyMode, {error, not_written} -> {reply, {error, not_written}, S}; {error, written} -> - exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size}) + exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size}); + {error, trimmed} -> + %% TODO: Again, whole file was trimmed. Needs repair. How + %% do we repair trimmed file (which was already unlinked) + %% across the flu servers? + exit({todo_should_repair_unlinked_files, ?MODULE, ?LINE, File}) end; read_repair2(ap_mode=ConsistencyMode, ReturnMode, File, Offset, Size, Depth, STime, @@ -622,6 +757,7 @@ read_repair2(ap_mode=ConsistencyMode, Eligible = mutation_flus(P), case try_to_find_chunk(Eligible, File, Offset, Size, S) of {ok, {Chunks, _Trimmed}, GotItFrom} when is_list(Chunks) -> + %% TODO: Repair trimmed chunks ToRepair = mutation_flus(P) -- [GotItFrom], {Reply0, S1} = do_repair_chunks(Chunks, ToRepair, ReturnMode, [GotItFrom], File, Depth, STime, S, {ok, Chunks}), @@ -638,7 +774,11 @@ read_repair2(ap_mode=ConsistencyMode, {error, not_written} -> {reply, {error, not_written}, S}; {error, written} -> - exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size}) + exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size}); + {error, trimmed} -> + %% TODO: Again, whole file was trimmed. Needs repair. How + %% do we repair trimmed file across the flu servers? + exit({todo_should_repair_unlinked_files, ?MODULE, ?LINE, File}) end. do_repair_chunks([], _, _, _, _, _, _, S, Reply) -> @@ -703,6 +843,9 @@ do_repair_chunk2([First|Rest]=ToRepair, ReturnMode, Chunk, Repaired, File, Offse %% that it is exactly our Chunk. do_repair_chunk2(Rest, ReturnMode, Chunk, Repaired, File, Offset, Size, Depth, STime, S); + {error, trimmed} = _Error -> + %% TODO + exit(todo_should_repair_trimmed); {error, not_written} -> exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size}) end. @@ -872,7 +1015,9 @@ try_to_find_chunk(Eligible, File, Offset, Size, [{FoundFLU, {ok, ChunkAndTrimmed}}|_] -> {ok, ChunkAndTrimmed, FoundFLU}; [] -> - RetryErrs = [partition, bad_epoch, wedged], + RetryErrs = [partition, bad_epoch, wedged, trimmed], + %% Adding 'trimmed' to return so as to trigger repair, + %% once all other retry errors fixed case [Err || {error, Err} <- Rs, lists:member(Err, RetryErrs)] of [SomeErr|_] -> {error, SomeErr}; diff --git a/src/machi_csum_table.erl b/src/machi_csum_table.erl index 80f1765..7921df3 100644 --- a/src/machi_csum_table.erl +++ b/src/machi_csum_table.erl @@ -177,7 +177,14 @@ all_trimmed(#machi_csum_table{table=T}, Left, Right) -> -spec all_trimmed(table(), machi_dt:chunk_pos()) -> boolean(). all_trimmed(#machi_csum_table{table=T}, Pos) -> - runthru(ets:tab2list(T), 0, Pos). + case ets:tab2list(T) of + [{0, ?MINIMUM_OFFSET, _}|L] -> + %% tl/1 to remove header space {0, 1024, <<0>>} + runthru(L, ?MINIMUM_OFFSET, Pos); + List -> + %% In case a header is removed; + runthru(List, 0, Pos) + end. -spec any_trimmed(table(), pos_integer(), diff --git a/src/machi_file_proxy.erl b/src/machi_file_proxy.erl index ed9a933..a7bad0f 100644 --- a/src/machi_file_proxy.erl +++ b/src/machi_file_proxy.erl @@ -22,20 +22,20 @@ %% controlled files. In particular, it manages the "write-once register" %% conceit at the heart of Machi's design. %% -%% Read, write and append requests for a single file will be managed +%% Read, write and append requests for a single file will be managed %% through this proxy. Clients can also request syncs for specific %% types of filehandles. %% %% As operations are requested, the proxy keeps track of how many %% operations it has performed (and how many errors were generated.) -%% After a sufficient number of inactivity, the server terminates +%% After a sufficient number of inactivity, the server terminates %% itself. %% %% TODO: -%% 1. Some way to transition the proxy into a wedged state that +%% 1. Some way to transition the proxy into a wedged state that %% doesn't rely on message delivery. %% -%% 2. Check max file size on appends. Writes we take on faith we can +%% 2. Check max file size on appends. Writes we take on faith we can %% and should handle. %% %% 3. Async checksum reads on startup. @@ -47,7 +47,7 @@ %% public API -export([ - start_link/2, + start_link/3, stop/1, sync/1, sync/2, @@ -55,6 +55,7 @@ read/4, write/3, write/4, + trim/4, append/2, append/4 ]). @@ -74,10 +75,11 @@ -define(TIMEOUT, 10*1000). -define(TOO_MANY_ERRORS_RATIO, 50). --type op_stats() :: { Total :: non_neg_integer(), +-type op_stats() :: { Total :: non_neg_integer(), Errors :: non_neg_integer() }. -record(state, { + fluname :: atom(), data_dir :: string() | undefined, filename :: string() | undefined, data_path :: string() | undefined, @@ -93,17 +95,18 @@ ops = 0 :: non_neg_integer(), %% sum of all ops reads = {0, 0} :: op_stats(), writes = {0, 0} :: op_stats(), - appends = {0, 0} :: op_stats() + appends = {0, 0} :: op_stats(), + trims = {0, 0} :: op_stats() }). %% Public API -% @doc Start a new instance of the file proxy service. Takes the filename +% @doc Start a new instance of the file proxy service. Takes the filename % and data directory as arguments. This function is typically called by the % `machi_file_proxy_sup:start_proxy/2' function. --spec start_link(Filename :: string(), DataDir :: string()) -> any(). -start_link(Filename, DataDir) -> - gen_server:start_link(?MODULE, {Filename, DataDir}, []). +-spec start_link(FluName :: atom(), Filename :: string(), DataDir :: string()) -> any(). +start_link(FluName, Filename, DataDir) -> + gen_server:start_link(?MODULE, {FluName, Filename, DataDir}, []). % @doc Request to stop an instance of the file proxy service. -spec stop(Pid :: pid()) -> ok. @@ -120,7 +123,7 @@ sync(_Pid) -> % @doc Force a sync of a specific filehandle type. Valid types are `all', `csum' and `data'. -spec sync(Pid :: pid(), Type :: all|data|csum) -> ok|{error, term()}. -sync(Pid, Type) when is_pid(Pid) andalso +sync(Pid, Type) when is_pid(Pid) andalso ( Type =:= all orelse Type =:= csum orelse Type =:= data ) -> gen_server:call(Pid, {sync, Type}, ?TIMEOUT); sync(_Pid, Type) -> @@ -147,7 +150,7 @@ read(Pid, Offset, Length) -> {ok, [{Filename::string(), Offset :: non_neg_integer(), Data :: binary(), Checksum :: binary()}]} | {error, Reason :: term()}. -read(Pid, Offset, Length, Opts) when is_pid(Pid) andalso is_integer(Offset) andalso Offset >= 0 +read(Pid, Offset, Length, Opts) when is_pid(Pid) andalso is_integer(Offset) andalso Offset >= 0 andalso is_integer(Length) andalso Length > 0 andalso is_list(Opts) -> gen_server:call(Pid, {read, Offset, Length, Opts}, ?TIMEOUT); @@ -179,6 +182,12 @@ write(_Pid, Offset, ClientMeta, _Data) -> lager:warning("Bad arg to write: Offset ~p, ClientMeta: ~p", [Offset, ClientMeta]), {error, bad_arg}. +trim(Pid, Offset, Size, TriggerGC) when is_pid(Pid), + is_integer(Offset) andalso Offset >= 0, + is_integer(Size) andalso Size > 0, + is_boolean(TriggerGC) -> + gen_server:call(Pid, {trim ,Offset, Size, TriggerGC}, ?TIMEOUT). + % @doc Append data -spec append(Pid :: pid(), Data :: binary()) -> {ok, File :: string(), Offset :: non_neg_integer()} |{error, term()}. @@ -194,8 +203,8 @@ append(_Pid, _Data) -> -spec append(Pid :: pid(), ClientMeta :: proplists:proplist(), Extra :: non_neg_integer(), Data :: binary()) -> {ok, File :: string(), Offset :: non_neg_integer()} |{error, term()}. -append(Pid, ClientMeta, Extra, Data) when is_pid(Pid) andalso is_list(ClientMeta) - andalso is_integer(Extra) andalso Extra >= 0 +append(Pid, ClientMeta, Extra, Data) when is_pid(Pid) andalso is_list(ClientMeta) + andalso is_integer(Extra) andalso Extra >= 0 andalso is_binary(Data) -> gen_server:call(Pid, {append, ClientMeta, Extra, Data}, ?TIMEOUT); append(_Pid, ClientMeta, Extra, _Data) -> @@ -205,7 +214,7 @@ append(_Pid, ClientMeta, Extra, _Data) -> %% gen_server callbacks % @private -init({Filename, DataDir}) -> +init({FluName, Filename, DataDir}) -> CsumFile = machi_util:make_checksum_filename(DataDir, Filename), {_, DPath} = machi_util:make_data_filename(DataDir, Filename), ok = filelib:ensure_dir(CsumFile), @@ -216,6 +225,7 @@ init({Filename, DataDir}) -> {ok, FHd} = file:open(DPath, [read, write, binary, raw]), Tref = schedule_tick(), St = #state{ + fluname = FluName, filename = Filename, data_dir = DataDir, data_path = DPath, @@ -250,13 +260,13 @@ handle_call({sync, all}, _From, State = #state{filename = F, R1 = file:sync(FHd), Resp = case {R, R1} of {ok, ok} -> ok; - {ok, O1} -> - lager:error("Got ~p during a data file sync on file ~p", [O1, F]), + {ok, O1} -> + lager:error("Got ~p during a data file sync on file ~p", [O1, F]), O1; - {O2, ok} -> - lager:error("Got ~p during a csum file sync on file ~p", [O2, F]), + {O2, ok} -> + lager:error("Got ~p during a csum file sync on file ~p", [O2, F]), O2; - {O3, O4} -> + {O3, O4} -> lager:error("Got ~p ~p syncing all files for file ~p", [O3, O4, F]), {O3, O4} end, @@ -285,16 +295,21 @@ handle_call({read, Offset, Length, Opts}, _From, csum_table = CsumTable, reads = {T, Err} }) -> + %% TODO: use these options - NoChunk prevents reading from disks + %% NoChecksum doesn't check checksums NoChecksum = proplists:get_value(no_checksum, Opts, false), NoChunk = proplists:get_value(no_chunk, Opts, false), - NeedsMerge = proplists:get_value(needs_trimmed, Opts, false), {Resp, NewErr} = - case do_read(FH, F, CsumTable, Offset, Length, NoChecksum, NoChunk, NeedsMerge) of + case do_read(FH, F, CsumTable, Offset, Length, NoChunk, NoChecksum) of {ok, {[], []}} -> {{error, not_written}, Err + 1}; {ok, {Chunks0, Trimmed0}} -> Chunks = slice_both_side(Chunks0, Offset, Offset+Length), - {{ok, {Chunks, Trimmed0}}, Err}; + Trimmed = case proplists:get_value(needs_trimmed, Opts, false) of + true -> Trimmed0; + false -> [] + end, + {{ok, {Chunks, Trimmed}}, Err}; Error -> lager:error("Can't read ~p, ~p at File ~p", [Offset, Length, F]), {Error, Err + 1} @@ -338,6 +353,45 @@ handle_call({write, Offset, ClientMeta, Data}, _From, {reply, Resp, State#state{writes = {T+1, NewErr}, eof_position = NewEof}}; + +%%% TRIMS + +handle_call({trim, _Offset, _ClientMeta, _Data}, _From, + State = #state{wedged = true, + writes = {T, Err} + }) -> + {reply, {error, wedged}, State#state{writes = {T + 1, Err + 1}}}; + +handle_call({trim, Offset, Size, _TriggerGC}, _From, + State = #state{data_filehandle=FHd, + ops = Ops, + trims = {T, Err}, + csum_table = CsumTable}) -> + + case machi_csum_table:all_trimmed(CsumTable, Offset, Size) of + true -> + NewState = State#state{ops=Ops+1, trims={T, Err+1}}, + %% All bytes of that range was already trimmed returns ok + %% here, not {error, trimmed}, which means the whole file + %% was trimmed + maybe_gc(ok, NewState); + false -> + LUpdate = maybe_regenerate_checksum( + FHd, + machi_csum_table:find_leftneighbor(CsumTable, Offset)), + RUpdate = maybe_regenerate_checksum( + FHd, + machi_csum_table:find_rightneighbor(CsumTable, Offset+Size)), + + case machi_csum_table:trim(CsumTable, Offset, Size, LUpdate, RUpdate) of + ok -> + NewState = State#state{ops=Ops+1, trims={T+1, Err}}, + maybe_gc(ok, NewState); + Error -> + {reply, Error, State#state{ops=Ops+1, trims={T, Err+1}}} + end + end; + %% APPENDS handle_call({append, _ClientMeta, _Extra, _Data}, _From, @@ -476,10 +530,20 @@ terminate(Reason, #state{filename = F, lager:info(" Reads: ~p/~p", [RT, RE]), lager:info(" Writes: ~p/~p", [WT, WE]), lager:info("Appends: ~p/~p", [AT, AE]), - ok = file:sync(FHd), - ok = file:close(FHd), - ok = machi_csum_table:sync(T), - ok = machi_csum_table:close(T), + case FHd of + undefined -> + noop; %% file deleted + _ -> + ok = file:sync(FHd), + ok = file:close(FHd) + end, + case T of + undefined -> + noop; %% file deleted + _ -> + ok = machi_csum_table:sync(T), + ok = machi_csum_table:close(T) + end, ok. % @private @@ -512,15 +576,14 @@ check_or_make_tagged_csum(Tag, InCsum, Data) when Tag == ?CSUM_TAG_CLIENT_SHA; check_or_make_tagged_csum(OtherTag, _ClientCsum, _Data) -> lager:warning("Unknown checksum tag ~p", [OtherTag]), {error, bad_checksum}. - + -spec do_read(FHd :: file:io_device(), Filename :: string(), CsumTable :: machi_csum_table:table(), Offset :: non_neg_integer(), Size :: non_neg_integer(), - NoChecksum :: boolean(), NoChunk :: boolean(), - NeedsTrimmed :: boolean() + NoChecksum :: boolean() ) -> {ok, Chunks :: [{string(), Offset::non_neg_integer(), binary(), Csum :: binary()}]} | {error, bad_checksum} | {error, partial_read} | @@ -539,23 +602,23 @@ check_or_make_tagged_csum(OtherTag, _ClientCsum, _Data) -> % tuple is returned. % % -do_read(FHd, Filename, CsumTable, Offset, Size, _, _, _) -> - do_read(FHd, Filename, CsumTable, Offset, Size). - -do_read(FHd, Filename, CsumTable, Offset, Size) -> +do_read(FHd, Filename, CsumTable, Offset, Size, _, _) -> %% Note that find/3 only returns overlapping chunks, both borders %% are not aligned to original Offset and Size. ChunkCsums = machi_csum_table:find(CsumTable, Offset, Size), - read_all_ranges(FHd, Filename, ChunkCsums, []). + read_all_ranges(FHd, Filename, ChunkCsums, [], []). -read_all_ranges(_, _, [], ReadChunks) -> +read_all_ranges(_, _, [], ReadChunks, TrimmedChunks) -> %% TODO: currently returns empty list of trimmed chunks - {ok, {lists:reverse(ReadChunks), []}}; + {ok, {lists:reverse(ReadChunks), lists:reverse(TrimmedChunks)}}; -read_all_ranges(FHd, Filename, [{Offset, Size, TaggedCsum}|T], ReadChunks) -> +read_all_ranges(FHd, Filename, [{Offset, Size, trimmed}|T], ReadChunks, TrimmedChunks) -> + read_all_ranges(FHd, Filename, T, ReadChunks, [{Filename, Offset, Size}|TrimmedChunks]); + +read_all_ranges(FHd, Filename, [{Offset, Size, TaggedCsum}|T], ReadChunks, TrimmedChunks) -> case file:pread(FHd, Offset, Size) of eof -> - read_all_ranges(FHd, Filename, T, ReadChunks); + read_all_ranges(FHd, Filename, T, ReadChunks, TrimmedChunks); {ok, Bytes} when byte_size(Bytes) == Size -> {Tag, Ck} = machi_util:unmake_tagged_csum(TaggedCsum), case check_or_make_tagged_csum(Tag, Ck, Bytes) of @@ -565,19 +628,21 @@ read_all_ranges(FHd, Filename, [{Offset, Size, TaggedCsum}|T], ReadChunks) -> {error, bad_checksum}; TaggedCsum -> read_all_ranges(FHd, Filename, T, - [{Filename, Offset, Bytes, TaggedCsum}|ReadChunks]); + [{Filename, Offset, Bytes, TaggedCsum}|ReadChunks], + TrimmedChunks); OtherCsum when Tag =:= ?CSUM_TAG_NONE -> %% XXX FIXME: Should we return something other than %% {ok, ....} in this case? read_all_ranges(FHd, Filename, T, - [{Filename, Offset, Bytes, OtherCsum}|ReadChunks]) + [{Filename, Offset, Bytes, OtherCsum}|ReadChunks], + TrimmedChunks) end; {ok, Partial} -> - lager:error("In file ~p, offset ~p, wanted to read ~p bytes, but got ~p", + lager:error("In file ~p, offset ~p, wanted to read ~p bytes, but got ~p", [Filename, Offset, Size, byte_size(Partial)]), {error, partial_read}; Other -> - lager:error("While reading file ~p, offset ~p, length ~p, got ~p", + lager:error("While reading file ~p, offset ~p, length ~p, got ~p", [Filename, Offset, Size, Other]), {error, Other} end. @@ -616,7 +681,7 @@ handle_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Data) -> {error, Reason} end; [{Offset, Size, TaggedCsum}] -> - case do_read(FHd, Filename, CsumTable, Offset, Size, false, false, false) of + case do_read(FHd, Filename, CsumTable, Offset, Size, false, false) of {error, _} = E -> lager:warning("This should never happen: got ~p while reading" " at offset ~p in file ~p that's supposedly written", @@ -693,6 +758,8 @@ do_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Size, Data) -> %% Dialyzer 'can never match': slice_both_side([], _, _) -> %% []; +slice_both_side([], _, _) -> + []; slice_both_side([{F, Offset, Chunk, _Csum}|L], LeftPos, RightPos) when Offset < LeftPos andalso LeftPos < RightPos -> TrashLen = 8 * (LeftPos - Offset), @@ -729,3 +796,43 @@ maybe_regenerate_checksum(FHd, {Offset, Size, _Csum}) -> Error -> throw(Error) end. + +%% GC: make sure unwritte bytes = [{Eof, infinity}] and Eof is > max file size +%% walk through the checksum table and make sure all chunks trimmed +%% Then unlink the file +-spec maybe_gc(term(), #state{}) -> + {reply, term(), #state{}} | {stop, normal, term(), #state{}}. +maybe_gc(Reply, S = #state{eof_position = Eof, + max_file_size = MaxFileSize}) when Eof < MaxFileSize -> + lager:debug("The file is still small; not trying GC (Eof, MaxFileSize) = (~p, ~p)~n", + [Eof, MaxFileSize]), + {reply, Reply, S}; +maybe_gc(Reply, S = #state{fluname=FluName, + data_filehandle = FHd, + data_dir = DataDir, + filename = Filename, + eof_position = Eof, + csum_table=CsumTable}) -> + case machi_csum_table:all_trimmed(CsumTable, Eof) of + true -> + lager:debug("GC? Let's do it: ~p.~n", [Filename]), + %% Before unlinking a file, it should inform + %% machi_flu_filename_mgr that this file is + %% deleted and mark it as "trimmed" to avoid + %% filename reuse and resurrection. Maybe garbage + %% will remain if a process crashed but it also + %% should be recovered at filename_mgr startup. + + %% Also, this should be informed *before* file proxy + %% deletes files. + ok = machi_flu_metadata_mgr:trim_file(FluName, {file, Filename}), + ok = file:close(FHd), + {_, DPath} = machi_util:make_data_filename(DataDir, Filename), + ok = file:delete(DPath), + machi_csum_table:delete(CsumTable), + {stop, normal, Reply, + S#state{data_filehandle=undefined, + csum_table=undefined}}; + false -> + {reply, Reply, S} + end. diff --git a/src/machi_file_proxy_sup.erl b/src/machi_file_proxy_sup.erl index dbb0fa6..a165a68 100644 --- a/src/machi_file_proxy_sup.erl +++ b/src/machi_file_proxy_sup.erl @@ -44,7 +44,8 @@ start_link(FluName) -> supervisor:start_link({local, make_proxy_name(FluName)}, ?MODULE, []). start_proxy(FluName, DataDir, Filename) -> - supervisor:start_child(make_proxy_name(FluName), [Filename, DataDir]). + supervisor:start_child(make_proxy_name(FluName), + [FluName, Filename, DataDir]). init([]) -> SupFlags = {simple_one_for_one, 1000, 10}, diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index cb269f2..042eaed 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -75,6 +75,7 @@ epoch_id :: 'undefined' | machi_dt:epoch_id(), pb_mode = undefined :: 'undefined' | 'high' | 'low', high_clnt :: 'undefined' | pid(), + trim_table :: ets:tid(), props = [] :: list() % proplist }). @@ -148,6 +149,7 @@ main2(FluName, TcpPort, DataDir, Props) -> {true, undefined} end, Witness_p = proplists:get_value(witness_mode, Props, false), + S0 = #state{flu_name=FluName, proj_store=ProjectionPid, tcp_port=TcpPort, @@ -409,8 +411,11 @@ do_pb_ll_request3({low_write_chunk, _EpochID, File, Offset, Chunk, CSum_tag, #state{witness=false}=S) -> {do_server_write_chunk(File, Offset, Chunk, CSum_tag, CSum, S), S}; do_pb_ll_request3({low_read_chunk, _EpochID, File, Offset, Size, Opts}, - #state{witness=false}=S) -> + #state{witness=false} = S) -> {do_server_read_chunk(File, Offset, Size, Opts, S), S}; +do_pb_ll_request3({low_trim_chunk, _EpochID, File, Offset, Size, TriggerGC}, + #state{witness=false}=S) -> + {do_server_trim_chunk(File, Offset, Size, TriggerGC, S), S}; do_pb_ll_request3({low_checksum_list, _EpochID, File}, #state{witness=false}=S) -> {do_server_checksum_listing(File, S), S}; @@ -541,21 +546,47 @@ do_server_append_chunk2(_PKey, Prefix, Chunk, CSum_tag, Client_CSum, do_server_write_chunk(File, Offset, Chunk, CSum_tag, CSum, #state{flu_name=FluName}) -> case sanitize_file_string(File) of ok -> - {ok, Pid} = machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}), - Meta = [{client_csum_tag, CSum_tag}, {client_csum, CSum}], - machi_file_proxy:write(Pid, Offset, Meta, Chunk); + case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}) of + {ok, Pid} -> + Meta = [{client_csum_tag, CSum_tag}, {client_csum, CSum}], + machi_file_proxy:write(Pid, Offset, Meta, Chunk); + {error, trimmed} = Error -> + Error + end; _ -> {error, bad_arg} end. do_server_read_chunk(File, Offset, Size, Opts, #state{flu_name=FluName})-> - %% TODO: Look inside Opts someday. case sanitize_file_string(File) of ok -> - {ok, Pid} = machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}), - case machi_file_proxy:read(Pid, Offset, Size, Opts) of - {ok, ChunksAndTrimmed} -> {ok, ChunksAndTrimmed}; - Other -> Other + case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}) of + {ok, Pid} -> + case machi_file_proxy:read(Pid, Offset, Size, Opts) of + %% XXX FIXME + %% For now we are omiting the checksum data because it blows up + %% protobufs. + {ok, ChunksAndTrimmed} -> {ok, ChunksAndTrimmed}; + Other -> Other + end; + {error, trimmed} = Error -> + Error + end; + _ -> + {error, bad_arg} + end. + +do_server_trim_chunk(File, Offset, Size, TriggerGC, #state{flu_name=FluName}) -> + lager:debug("Hi there! I'm trimming this: ~s, (~p, ~p), ~p~n", + [File, Offset, Size, TriggerGC]), + case sanitize_file_string(File) of + ok -> + case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}) of + {ok, Pid} -> + machi_file_proxy:trim(Pid, Offset, Size, TriggerGC); + {error, trimmed} = Trimmed -> + %% Should be returned back to (maybe) trigger repair + Trimmed end; _ -> {error, bad_arg} @@ -662,10 +693,14 @@ handle_append(Prefix, Chunk, Csum, Extra, FluName, EpochId) -> Res = machi_flu_filename_mgr:find_or_make_filename_from_prefix(FluName, EpochId, {prefix, Prefix}), case Res of {file, F} -> - {ok, Pid} = machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, F}), - {Tag, CS} = machi_util:unmake_tagged_csum(Csum), - Meta = [{client_csum_tag, Tag}, {client_csum, CS}], - machi_file_proxy:append(Pid, Meta, Extra, Chunk); + case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, F}) of + {ok, Pid} -> + {Tag, CS} = machi_util:unmake_tagged_csum(Csum), + Meta = [{client_csum_tag, Tag}, {client_csum, CS}], + machi_file_proxy:append(Pid, Meta, Extra, Chunk); + {error, trimmed} = E -> + E + end; Error -> Error end. diff --git a/src/machi_flu1_client.erl b/src/machi_flu1_client.erl index da90618..ed8808c 100644 --- a/src/machi_flu1_client.erl +++ b/src/machi_flu1_client.erl @@ -80,6 +80,7 @@ %% For "internal" replication only. -export([ write_chunk/5, write_chunk/6, + trim_chunk/5, trim_chunk/6, delete_migration/3, delete_migration/4, trunc_hack/3, trunc_hack/4 ]). @@ -474,6 +475,31 @@ write_chunk(Host, TcpPort, EpochID, File, Offset, Chunk) disconnect(Sock) end. +%% @doc Restricted API: Write a chunk of already-sequenced data to +%% `File' at `Offset'. + +-spec trim_chunk(port_wrap(), machi_dt:epoch_id(), machi_dt:file_name(), machi_dt:file_offset(), machi_dt:chunk_size()) -> + ok | {error, machi_dt:error_general()} | {error, term()}. +trim_chunk(Sock, EpochID, File0, Offset, Size) + when Offset >= ?MINIMUM_OFFSET -> + ReqID = <<"id">>, + File = machi_util:make_binary(File0), + true = (Offset >= ?MINIMUM_OFFSET), + Req = machi_pb_translate:to_pb_request( + ReqID, + {low_trim_chunk, EpochID, File, Offset, Size, 0}), + do_pb_request_common(Sock, ReqID, Req). + +%% @doc Restricted API: Write a chunk of already-sequenced data to +%% `File' at `Offset'. + +-spec trim_chunk(machi_dt:inet_host(), machi_dt:inet_port(), + machi_dt:epoch_id(), machi_dt:file_name(), machi_dt:file_offset(), machi_dt:chunk_size()) -> + ok | {error, machi_dt:error_general()} | {error, term()}. +trim_chunk(_Host, _TcpPort, _EpochID, _File, _Offset, _Size) -> + not_used. + + %% @doc Restricted API: Delete a file after it has been successfully %% migrated. diff --git a/src/machi_flu_filename_mgr.erl b/src/machi_flu_filename_mgr.erl index ac505a9..54fdcfe 100644 --- a/src/machi_flu_filename_mgr.erl +++ b/src/machi_flu_filename_mgr.erl @@ -17,8 +17,8 @@ %% under the License. %% %% ------------------------------------------------------------------- -%% -%% @doc This process is responsible for managing filenames assigned to +%% +%% @doc This process is responsible for managing filenames assigned to %% prefixes. It's started out of `machi_flu_psup'. %% %% Supported operations include finding the "current" filename assigned to @@ -32,7 +32,7 @@ %% First it looks up the sequence number from the prefix name. If %% no sequence file is found, it uses 0 as the sequence number and searches %% for a matching file with the prefix and 0 as the sequence number. -%% If no file is found, the it generates a new filename by incorporating +%% If no file is found, the it generates a new filename by incorporating %% the given prefix, a randomly generated (v4) UUID and 0 as the %% sequence number. %% @@ -79,7 +79,7 @@ child_spec(FluName, DataDir) -> Name = make_filename_mgr_name(FluName), - {Name, + {Name, {?MODULE, start_link, [FluName, DataDir]}, permanent, 5000, worker, [?MODULE]}. @@ -87,8 +87,8 @@ start_link(FluName, DataDir) when is_atom(FluName) andalso is_list(DataDir) -> N = make_filename_mgr_name(FluName), gen_server:start_link({local, N}, ?MODULE, [FluName, DataDir], []). --spec find_or_make_filename_from_prefix( FluName :: atom(), - EpochId :: pv1_epoch_n(), +-spec find_or_make_filename_from_prefix( FluName :: atom(), + EpochId :: pv1_epoch_n(), Prefix :: {prefix, string()} ) -> {file, Filename :: string()} | {error, Reason :: term() } | timeout. % @doc Find the latest available or make a filename from a prefix. A prefix @@ -96,7 +96,7 @@ start_link(FluName, DataDir) when is_atom(FluName) andalso is_list(DataDir) -> % tuple in the form of `{file, F}' or an `{error, Reason}' find_or_make_filename_from_prefix(FluName, EpochId, {prefix, Prefix}) when is_atom(FluName) -> N = make_filename_mgr_name(FluName), - gen_server:call(N, {find_filename, EpochId, Prefix}, ?TIMEOUT); + gen_server:call(N, {find_filename, EpochId, Prefix}, ?TIMEOUT); find_or_make_filename_from_prefix(_FluName, _EpochId, Other) -> lager:error("~p is not a valid prefix.", [Other]), error(badarg). @@ -104,9 +104,9 @@ find_or_make_filename_from_prefix(_FluName, _EpochId, Other) -> -spec increment_prefix_sequence( FluName :: atom(), Prefix :: {prefix, string()} ) -> ok | {error, Reason :: term() } | timeout. % @doc Increment the sequence counter for a given prefix. Prefix should -% be in the form of `{prefix, P}'. +% be in the form of `{prefix, P}'. increment_prefix_sequence(FluName, {prefix, Prefix}) when is_atom(FluName) -> - gen_server:call(make_filename_mgr_name(FluName), {increment_sequence, Prefix}, ?TIMEOUT); + gen_server:call(make_filename_mgr_name(FluName), {increment_sequence, Prefix}, ?TIMEOUT); increment_prefix_sequence(_FluName, Other) -> lager:error("~p is not a valid prefix.", [Other]), error(badarg). @@ -117,7 +117,7 @@ increment_prefix_sequence(_FluName, Other) -> % all the data files associated with that prefix. Returns % a list. list_files_by_prefix(FluName, {prefix, Prefix}) when is_atom(FluName) -> - gen_server:call(make_filename_mgr_name(FluName), {list_files, Prefix}, ?TIMEOUT); + gen_server:call(make_filename_mgr_name(FluName), {list_files, Prefix}, ?TIMEOUT); list_files_by_prefix(_FluName, Other) -> lager:error("~p is not a valid prefix.", [Other]), error(badarg). @@ -125,7 +125,10 @@ list_files_by_prefix(_FluName, Other) -> %% gen_server API init([FluName, DataDir]) -> Tid = ets:new(make_filename_mgr_name(FluName), [named_table, {read_concurrency, true}]), - {ok, #state{ fluname = FluName, epoch = 0, datadir = DataDir, tid = Tid }}. + {ok, #state{fluname = FluName, + epoch = 0, + datadir = DataDir, + tid = Tid}}. handle_cast(Req, State) -> lager:warning("Got unknown cast ~p", [Req]), @@ -135,9 +138,9 @@ handle_cast(Req, State) -> %% the FLU has already validated that the caller's epoch id and the FLU's epoch id %% are the same. So we *assume* that remains the case here - that is to say, we %% are not wedged. -handle_call({find_filename, EpochId, Prefix}, _From, S = #state{ datadir = DataDir, - epoch = EpochId, - tid = Tid }) -> +handle_call({find_filename, EpochId, Prefix}, _From, S = #state{ datadir = DataDir, + epoch = EpochId, + tid = Tid}) -> %% Our state and the caller's epoch ids are the same. Business as usual. File = handle_find_file(Tid, Prefix, DataDir), {reply, {file, File}, S}; @@ -154,7 +157,7 @@ handle_call({increment_sequence, Prefix}, _From, S = #state{ datadir = DataDir } ok = machi_util:increment_max_filenum(DataDir, Prefix), {reply, ok, S}; handle_call({list_files, Prefix}, From, S = #state{ datadir = DataDir }) -> - spawn(fun() -> + spawn(fun() -> L = list_files(DataDir, Prefix), gen_server:reply(From, L) end), @@ -181,7 +184,7 @@ code_change(_OldVsn, State, _Extra) -> %% MIT License generate_uuid_v4_str() -> <> = crypto:strong_rand_bytes(16), - io_lib:format("~8.16.0b-~4.16.0b-4~3.16.0b-~4.16.0b-~12.16.0b", + io_lib:format("~8.16.0b-~4.16.0b-4~3.16.0b-~4.16.0b-~12.16.0b", [A, B, C band 16#0fff, D band 16#3fff bor 16#8000, E]). find_file(DataDir, Prefix, N) -> @@ -201,7 +204,7 @@ handle_find_file(Tid, Prefix, DataDir) -> [] -> {find_or_make_filename(Tid, DataDir, Prefix, N), false}; [H] -> {H, true}; - [Fn | _ ] = L -> + [Fn | _ ] = L -> lager:warning( "Searching for a matching file to prefix ~p and sequence number ~p gave multiples: ~p", [Prefix, N, L]), @@ -245,4 +248,3 @@ increment_and_cache_filename(Tid, DataDir, Prefix) -> -ifdef(TEST). -endif. - diff --git a/src/machi_flu_metadata_mgr.erl b/src/machi_flu_metadata_mgr.erl index c851f84..d4447ae 100644 --- a/src/machi_flu_metadata_mgr.erl +++ b/src/machi_flu_metadata_mgr.erl @@ -39,10 +39,13 @@ -define(HASH(X), erlang:phash2(X)). %% hash algorithm to use -define(TIMEOUT, 10 * 1000). %% 10 second timeout +-define(KNOWN_FILES_LIST_PREFIX, "known_files_"). + -record(state, {fluname :: atom(), datadir :: string(), tid :: ets:tid(), - cnt :: non_neg_integer() + cnt :: non_neg_integer(), + trimmed_files :: machi_plist:plist() }). %% This record goes in the ets table where filename is the key @@ -59,7 +62,8 @@ lookup_proxy_pid/2, start_proxy_pid/2, stop_proxy_pid/2, - build_metadata_mgr_name/2 + build_metadata_mgr_name/2, + trim_file/2 ]). %% gen_server callbacks @@ -97,10 +101,24 @@ start_proxy_pid(FluName, {file, Filename}) -> stop_proxy_pid(FluName, {file, Filename}) -> gen_server:call(get_manager_atom(FluName, Filename), {stop_proxy_pid, Filename}, ?TIMEOUT). +trim_file(FluName, {file, Filename}) -> + gen_server:call(get_manager_atom(FluName, Filename), {trim_file, Filename}, ?TIMEOUT). + %% gen_server callbacks init([FluName, Name, DataDir, Num]) -> + %% important: we'll need another persistent storage to + %% remember deleted (trimmed) file, to prevent resurrection after + %% flu restart and append. + FileListFileName = + filename:join([DataDir, ?KNOWN_FILES_LIST_PREFIX ++ atom_to_list(FluName)]), + {ok, PList} = machi_plist:open(FileListFileName, []), + %% TODO make sure all files non-existent, if any remaining files + %% here, just delete it. They're in the list *because* they're all + %% trimmed. + Tid = ets:new(Name, [{keypos, 2}, {read_concurrency, true}, {write_concurrency, true}]), - {ok, #state{ fluname = FluName, datadir = DataDir, tid = Tid, cnt = Num}}. + {ok, #state{fluname = FluName, datadir = DataDir, tid = Tid, cnt = Num, + trimmed_files=PList}}. handle_cast(Req, State) -> lager:warning("Got unknown cast ~p", [Req]), @@ -113,17 +131,25 @@ handle_call({proxy_pid, Filename}, _From, State = #state{ tid = Tid }) -> end, {reply, Reply, State}; -handle_call({start_proxy_pid, Filename}, _From, State = #state{ fluname = N, tid = Tid, datadir = D }) -> - NewR = case lookup_md(Tid, Filename) of - not_found -> - start_file_proxy(N, D, Filename); - #md{ proxy_pid = undefined } = R0 -> - start_file_proxy(N, D, R0); - #md{ proxy_pid = _Pid } = R1 -> - R1 - end, - update_ets(Tid, NewR), - {reply, {ok, NewR#md.proxy_pid}, State}; +handle_call({start_proxy_pid, Filename}, _From, + State = #state{ fluname = N, tid = Tid, datadir = D, + trimmed_files=TrimmedFiles}) -> + case machi_plist:find(TrimmedFiles, Filename) of + false -> + NewR = case lookup_md(Tid, Filename) of + not_found -> + start_file_proxy(N, D, Filename); + #md{ proxy_pid = undefined } = R0 -> + start_file_proxy(N, D, R0); + #md{ proxy_pid = _Pid } = R1 -> + R1 + end, + update_ets(Tid, NewR), + {reply, {ok, NewR#md.proxy_pid}, State}; + true -> + {reply, {error, trimmed}, State} + end; + handle_call({stop_proxy_pid, Filename}, _From, State = #state{ tid = Tid }) -> case lookup_md(Tid, Filename) of not_found -> @@ -137,6 +163,15 @@ handle_call({stop_proxy_pid, Filename}, _From, State = #state{ tid = Tid }) -> end, {reply, ok, State}; +handle_call({trim_file, Filename}, _, + S = #state{trimmed_files = TrimmedFiles }) -> + case machi_plist:add(TrimmedFiles, Filename) of + {ok, TrimmedFiles2} -> + {reply, ok, S#state{trimmed_files=TrimmedFiles2}}; + Error -> + {reply, Error, S} + end; + handle_call(Req, From, State) -> lager:warning("Got unknown call ~p from ~p", [Req, From]), {reply, hoge, State}. @@ -169,18 +204,21 @@ handle_info({'DOWN', Mref, process, Pid, wedged}, State = #state{ tid = Tid }) - lager:error("file proxy ~p shutdown because it's wedged", [Pid]), clear_ets(Tid, Mref), {noreply, State}; +handle_info({'DOWN', _Mref, process, Pid, trimmed}, State = #state{ tid = _Tid }) -> + lager:debug("file proxy ~p shutdown because the file was trimmed", [Pid]), + {noreply, State}; handle_info({'DOWN', Mref, process, Pid, Error}, State = #state{ tid = Tid }) -> lager:error("file proxy ~p shutdown because ~p", [Pid, Error]), clear_ets(Tid, Mref), {noreply, State}; - handle_info(Info, State) -> lager:warning("Got unknown info ~p", [Info]), {noreply, State}. -terminate(Reason, _State) -> +terminate(Reason, _State = #state{trimmed_files=TrimmedFiles}) -> lager:info("Shutting down because ~p", [Reason]), + machi_plist:close(TrimmedFiles), ok. code_change(_OldVsn, State, _Extra) -> diff --git a/src/machi_pb_high_client.erl b/src/machi_pb_high_client.erl index 5509803..ec2dfc6 100644 --- a/src/machi_pb_high_client.erl +++ b/src/machi_pb_high_client.erl @@ -94,6 +94,9 @@ write_chunk(PidSpec, File, Offset, Chunk, CSum) -> write_chunk(PidSpec, File, Offset, Chunk, CSum, Timeout) -> send_sync(PidSpec, {write_chunk, File, Offset, Chunk, CSum}, Timeout). +%% @doc Tries to read a chunk of a specified file. It returns `{ok, +%% {Chunks, TrimmedChunks}}' for live file while it returns `{error, +%% trimmed}' if all bytes of the file was trimmed. -spec read_chunk(pid(), string(), pos_integer(), pos_integer(), [{flag_no_checksum | flag_no_chunk | needs_trimmed, boolean()}]) -> {ok, {list(), list()}} | {error, term()}. @@ -107,6 +110,10 @@ read_chunk(PidSpec, File, Offset, Size, Options) -> read_chunk(PidSpec, File, Offset, Size, Options, Timeout) -> send_sync(PidSpec, {read_chunk, File, Offset, Size, Options}, Timeout). +%% @doc Trims arbitrary binary range of any file. TODO: Add option +%% specifying whether to trigger GC. +-spec trim_chunk(pid(), string(), non_neg_integer(), machi_dt:chunk_size()) -> + ok | {error, term()}. trim_chunk(PidSpec, File, Offset, Size) -> trim_chunk(PidSpec, File, Offset, Size, ?DEFAULT_TIMEOUT). @@ -415,6 +422,8 @@ convert_general_status_code('NOT_WRITTEN') -> {error, not_written}; convert_general_status_code('WRITTEN') -> {error, written}; +convert_general_status_code('TRIMMED') -> + {error, trimmed}; convert_general_status_code('NO_SUCH_FILE') -> {error, no_such_file}; convert_general_status_code('PARTIAL_READ') -> diff --git a/src/machi_pb_translate.erl b/src/machi_pb_translate.erl index cc26766..0b49908 100644 --- a/src/machi_pb_translate.erl +++ b/src/machi_pb_translate.erl @@ -88,6 +88,17 @@ from_pb_request(#mpb_ll_request{ offset=Offset, chunk_size=Size} = ChunkPos, {ReqID, {low_read_chunk, EpochID, File, Offset, Size, Opts}}; +from_pb_request(#mpb_ll_request{ + req_id=ReqID, + trim_chunk=#mpb_ll_trimchunkreq{ + epoch_id=PB_EpochID, + file=File, + offset=Offset, + size=Size, + trigger_gc=PB_TriggerGC}}) -> + EpochID = conv_to_epoch_id(PB_EpochID), + TriggerGC = conv_to_boolean(PB_TriggerGC), + {ReqID, {low_trim_chunk, EpochID, File, Offset, Size, TriggerGC}}; from_pb_request(#mpb_ll_request{ req_id=ReqID, checksum_list=#mpb_ll_checksumlistreq{ @@ -262,6 +273,10 @@ from_pb_response(#mpb_ll_response{ _ -> {ReqID, machi_pb_high_client:convert_general_status_code(Status)} end; +from_pb_response(#mpb_ll_response{ + req_id=ReqID, + trim_chunk=#mpb_ll_trimchunkresp{status=Status}}) -> + {ReqID, machi_pb_high_client:convert_general_status_code(Status)}; from_pb_response(#mpb_ll_response{ req_id=ReqID, checksum_list=#mpb_ll_checksumlistresp{ @@ -398,11 +413,10 @@ to_pb_request(ReqID, {low_write_chunk, EpochID, File, Offset, Chunk, CSum_tag, C chunk=Chunk, csum=PB_CSum}}}; to_pb_request(ReqID, {low_read_chunk, EpochID, File, Offset, Size, Opts}) -> - %% TODO: stop ignoring Opts ^_^ PB_EpochID = conv_from_epoch_id(EpochID), FNChecksum = proplists:get_value(no_checksum, Opts, false), FNChunk = proplists:get_value(no_chunk, Opts, false), - NeedsTrimmed = proplists:get_value(needs_merge, Opts, false), + NeedsTrimmed = proplists:get_value(needs_trimmed, Opts, false), #mpb_ll_request{ req_id=ReqID, do_not_alter=2, read_chunk=#mpb_ll_readchunkreq{ @@ -414,6 +428,15 @@ to_pb_request(ReqID, {low_read_chunk, EpochID, File, Offset, Size, Opts}) -> flag_no_checksum=machi_util:bool2int(FNChecksum), flag_no_chunk=machi_util:bool2int(FNChunk), flag_needs_trimmed=machi_util:bool2int(NeedsTrimmed)}}; +to_pb_request(ReqID, {low_trim_chunk, EpochID, File, Offset, Size, TriggerGC}) -> + PB_EpochID = conv_from_epoch_id(EpochID), + #mpb_ll_request{req_id=ReqID, do_not_alter=2, + trim_chunk=#mpb_ll_trimchunkreq{ + epoch_id=PB_EpochID, + file=File, + offset=Offset, + size=Size, + trigger_gc=TriggerGC}}; to_pb_request(ReqID, {low_checksum_list, EpochID, File}) -> PB_EpochID = conv_from_epoch_id(EpochID), #mpb_ll_request{req_id=ReqID, do_not_alter=2, @@ -524,6 +547,18 @@ to_pb_response(ReqID, {low_read_chunk, _EID, _Fl, _Off, _Sz, _Opts}, Resp)-> _Else -> make_ll_error_resp(ReqID, 66, io_lib:format("err ~p", [_Else])) end; +to_pb_response(ReqID, {low_trim_chunk, _, _, _, _, _}, Resp) -> + case Resp of + ok -> + #mpb_ll_response{req_id=ReqID, + trim_chunk=#mpb_ll_trimchunkresp{status='OK'}}; + {error, _}=Error -> + Status = conv_from_status(Error), + #mpb_ll_response{req_id=ReqID, + read_chunk=#mpb_ll_trimchunkresp{status=Status}}; + _Else -> + make_ll_error_resp(ReqID, 66, io_lib:format("err ~p", [_Else])) + end; to_pb_response(ReqID, {low_checksum_list, _EpochID, _File}, Resp) -> case Resp of {ok, Chunk} -> @@ -909,6 +944,8 @@ conv_from_status({error, not_written}) -> 'NOT_WRITTEN'; conv_from_status({error, written}) -> 'WRITTEN'; +conv_from_status({error, trimmed}) -> + 'TRIMMED'; conv_from_status({error, no_such_file}) -> 'NO_SUCH_FILE'; conv_from_status({error, partial_read}) -> diff --git a/src/machi_plist.erl b/src/machi_plist.erl new file mode 100644 index 0000000..4bf745e --- /dev/null +++ b/src/machi_plist.erl @@ -0,0 +1,63 @@ +-module(machi_plist). + +%%% @doc persistent list of binaries that support mutual exclusion + +-export([open/2, close/1, find/2, add/2]). + +-ifdef(TEST). +-export([all/1]). +-endif. + +-record(machi_plist, + {filename :: string(), + fd :: file:descriptor(), + list}). + +-type plist() :: #machi_plist{}. +-export_type([plist/0]). + +-spec open(filename:filename(), proplists:proplist()) -> + {ok, plist()} | {error, file:posix()}. +open(Filename, _Opt) -> + List = case file:read_file(Filename) of + {ok, <<>>} -> []; + {ok, Bin} -> binary_to_term(Bin); + {error, enoent} -> [] + end, + case file:open(Filename, [read, write, raw, binary, sync]) of + {ok, Fd} -> + {ok, #machi_plist{filename=Filename, + fd=Fd, + list=List}}; + Error -> + Error + end. + +-spec close(plist()) -> ok. +close(#machi_plist{fd=Fd}) -> + _ = file:close(Fd). + +-spec find(plist(), string()) -> boolean(). +find(#machi_plist{list=List}, Name) -> + lists:member(Name, List). + +-spec add(plist(), string()) -> {ok, plist()} | {error, file:posix()}. +add(Plist = #machi_plist{list=List0, fd=Fd}, Name) -> + case find(Plist, Name) of + true -> + {ok, Plist}; + false -> + List = lists:append(List0, [Name]), + case file:pwrite(Fd, 0, term_to_binary(List)) of + ok -> + {ok, Plist#machi_plist{list=List}}; + Error -> + Error + end + end. + +-ifdef(TEST). +-spec all(plist()) -> [file:filename()]. +all(#machi_plist{list=List}) -> + List. +-endif. diff --git a/src/machi_proxy_flu1_client.erl b/src/machi_proxy_flu1_client.erl index 93f3b95..2cbaabd 100644 --- a/src/machi_proxy_flu1_client.erl +++ b/src/machi_proxy_flu1_client.erl @@ -79,6 +79,7 @@ %% Internal API write_chunk/5, write_chunk/6, + trim_chunk/5, trim_chunk/6, %% Helpers stop_proxies/1, start_proxies/1 @@ -310,6 +311,18 @@ write_chunk(PidSpec, EpochID, File, Offset, Chunk, Timeout) -> Else end. + +trim_chunk(PidSpec, EpochID, File, Offset, Size) -> + trim_chunk(PidSpec, EpochID, File, Offset, Size, infinity). + +%% @doc Write a chunk (binary- or iolist-style) of data to a file +%% with `Prefix' at `Offset'. + +trim_chunk(PidSpec, EpochID, File, Offset, Chunk, Timeout) -> + gen_server:call(PidSpec, + {req, {trim_chunk, EpochID, File, Offset, Chunk}}, + Timeout). + %%%%%%%%%%%%%%%%%%%%%%%%%%% init([I]) -> @@ -383,6 +396,9 @@ make_req_fun({read_chunk, EpochID, File, Offset, Size, Opts}, make_req_fun({write_chunk, EpochID, File, Offset, Chunk}, #state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) -> fun() -> Mod:write_chunk(Sock, EpochID, File, Offset, Chunk) end; +make_req_fun({trim_chunk, EpochID, File, Offset, Size}, + #state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) -> + fun() -> Mod:trim_chunk(Sock, EpochID, File, Offset, Size) end; make_req_fun({checksum_list, EpochID, File}, #state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) -> fun() -> Mod:checksum_list(Sock, EpochID, File) end; diff --git a/test/machi_csum_table_test.erl b/test/machi_csum_table_test.erl index f34d955..683d512 100644 --- a/test/machi_csum_table_test.erl +++ b/test/machi_csum_table_test.erl @@ -76,13 +76,14 @@ smoke3_test() -> {?LINE, trim, {0, 1024, <<>>}, undefined, undefined} ], [ begin - %% ?debugVal({Line, Chunk}), + %% ?debugVal({_Line, Chunk}), {Offset, Size, Csum} = Chunk, ?assertEqual(LeftN0, machi_csum_table:find_leftneighbor(MC, Offset)), ?assertEqual(RightN0, machi_csum_table:find_rightneighbor(MC, Offset+Size)), LeftN = case LeftN0 of + {OffsL, SizeL, trimmed} -> {OffsL, SizeL, trimmed}; {OffsL, SizeL, _} -> {OffsL, SizeL, <<"boom">>}; OtherL -> OtherL end, diff --git a/test/machi_file_proxy_eqc.erl b/test/machi_file_proxy_eqc.erl index bf57043..e2eb955 100644 --- a/test/machi_file_proxy_eqc.erl +++ b/test/machi_file_proxy_eqc.erl @@ -170,7 +170,7 @@ start_command(S) -> start(_S) -> {_, _, MS} = os:timestamp(), File = test_server:temp_name("eqc_data") ++ "." ++ integer_to_list(MS), - {ok, Pid} = machi_file_proxy:start_link(File, ?TESTDIR), + {ok, Pid} = machi_file_proxy:start_link(some_flu, File, ?TESTDIR), unlink(Pid), Pid. diff --git a/test/machi_file_proxy_test.erl b/test/machi_file_proxy_test.erl index 8269483..cbf2014 100644 --- a/test/machi_file_proxy_test.erl +++ b/test/machi_file_proxy_test.erl @@ -78,7 +78,7 @@ random_binary(Start, End) -> machi_file_proxy_test_() -> clean_up_data_dir(?TESTDIR), - {ok, Pid} = machi_file_proxy:start_link("test", ?TESTDIR), + {ok, Pid} = machi_file_proxy:start_link(fluname, "test", ?TESTDIR), [ ?_assertEqual({error, bad_arg}, machi_file_proxy:read(Pid, -1, -1)), ?_assertEqual({error, bad_arg}, machi_file_proxy:write(Pid, -1, <<"yo">>)), @@ -100,7 +100,7 @@ machi_file_proxy_test_() -> multiple_chunks_read_test_() -> clean_up_data_dir(?TESTDIR), - {ok, Pid} = machi_file_proxy:start_link("test", ?TESTDIR), + {ok, Pid} = machi_file_proxy:start_link(fluname, "test", ?TESTDIR), [ ?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, random_binary(0, 1024))), ?_assertEqual(ok, machi_file_proxy:write(Pid, 10000, <<"fail">>)), diff --git a/test/machi_pb_high_client_test.erl b/test/machi_pb_high_client_test.erl index 25c79fd..361eb55 100644 --- a/test/machi_pb_high_client_test.erl +++ b/test/machi_pb_high_client_test.erl @@ -38,6 +38,7 @@ smoke_test2() -> Ps = [#p_srvr{name=a, address="localhost", port=Port, props="./data.a"} ], D = orddict:from_list([{P#p_srvr.name, P} || P <- Ps]), + ok = application:set_env(machi, max_file_size, 1024*1024), [os:cmd("rm -rf " ++ P#p_srvr.props) || P <- Ps], {ok, SupPid} = machi_flu_sup:start_link(), @@ -90,23 +91,54 @@ smoke_test2() -> {ok, [{File1Size,File1}]} = ?C:list_files(Clnt), true = is_integer(File1Size), + File1Bin = binary_to_list(File1), [begin - %% ok = ?C:trim_chunk(Clnt, Fl, Off, Sz) - %% This gets an error as trim API is still a stub - ?assertMatch({bummer, - {throw, - {error, bad_joss_taipan_fixme}, - _Boring_stack_trace}}, - ?C:trim_chunk(Clnt, Fl, Off, Sz)) - end || {Ch, Fl, Off, Sz} <- Reads], + #p_srvr{name=Name, port=Port, props=Dir} = P, + ?assertEqual({ok, [File1Bin]}, + file:list_dir(filename:join([Dir, "data"]))), + FileListFileName = filename:join([Dir, "known_files_" ++ atom_to_list(Name)]), + {ok, Plist} = machi_plist:open(FileListFileName, []), + ?assertEqual([], machi_plist:all(Plist)) + end || P <- Ps], + [begin + ok = ?C:trim_chunk(Clnt, Fl, Off, Sz) + end || {_Ch, Fl, Off, Sz} <- Reads], + [begin + {ok, {[], Trimmed}} = + ?C:read_chunk(Clnt, Fl, Off, Sz, [{needs_trimmed, true}]), + Filename = binary_to_list(Fl), + ?assertEqual([{Filename, Off, Sz}], Trimmed) + end || {_Ch, Fl, Off, Sz} <- Reads], + + LargeBytes = binary:copy(<<"x">>, 1024*1024), + LBCsum = {client_sha, machi_util:checksum_chunk(LargeBytes)}, + {ok, {Offx, Sizex, Filex}} = + ?C:append_chunk(Clnt, PK, Prefix, LargeBytes, LBCsum, 0), + ok = ?C:trim_chunk(Clnt, Filex, Offx, Sizex), + + %% Make sure everything was trimmed + File = binary_to_list(Filex), + [begin + #p_srvr{name=Name, port=_Port, props=Dir} = P, + ?assertEqual({ok, []}, + file:list_dir(filename:join([Dir, "data"]))), + FileListFileName = filename:join([Dir, "known_files_" ++ atom_to_list(Name)]), + {ok, Plist} = machi_plist:open(FileListFileName, []), + ?assertEqual([File], machi_plist:all(Plist)) + end || P <- Ps], + + [begin + {error, trimmed} = + ?C:read_chunk(Clnt, Fl, Off, Sz, []) + end || {_Ch, Fl, Off, Sz} <- Reads], ok after (catch ?C:quit(Clnt)) end after exit(SupPid, normal), - [os:cmd("rm -rf " ++ P#p_srvr.props) || P <- Ps], + [os:cmd("rm -rf " ++ P#p_srvr.props) || P <- Ps], machi_util:wait_for_death(SupPid, 100), ok end. diff --git a/test/machi_plist_test.erl b/test/machi_plist_test.erl new file mode 100644 index 0000000..a796c1b --- /dev/null +++ b/test/machi_plist_test.erl @@ -0,0 +1,17 @@ +-module(machi_plist_test). + +-include_lib("eunit/include/eunit.hrl"). + +open_close_test() -> + FileName = "bark-bark-one", + file:delete(FileName), + {ok, PList0} = machi_plist:open(FileName, []), + {ok, PList1} = machi_plist:add(PList0, "boomar"), + ?assertEqual(["boomar"], machi_plist:all(PList1)), + ok = machi_plist:close(PList1), + + {ok, PList2} = machi_plist:open(FileName, []), + ?assertEqual(["boomar"], machi_plist:all(PList2)), + ok = machi_plist:close(PList2), + file:delete(FileName), + ok.