Compare commits

...

2 commits

Author SHA1 Message Date
UENISHI Kota
8103b424c0 WIP 2015-12-14 18:27:17 +09:00
UENISHI Kota
d482902509 Unify LevelDB usage to single instance
* Perfile LevelDB instance usage are changed to use single instance
  per FLU server.
* Not only chunk checksums, but the list of trimmed files are also
  stored in LevelDB.
* Remove 1024 bytes file header; instead put any metadata into
  LevelDB if needed.
* LevelDB `db_ref()` lifecycle is same as that of `machi_metadata_mgr`
* `machi_file_proxy` just uses it as it's passed at process startup
* There are several optimization space still left as it is
2015-12-14 16:38:12 +09:00
10 changed files with 342 additions and 309 deletions

View file

@ -21,7 +21,7 @@
%% @doc Now 4GiBytes, could be up to 64bit due to PB message limit of %% @doc Now 4GiBytes, could be up to 64bit due to PB message limit of
%% chunk size %% chunk size
-define(DEFAULT_MAX_FILE_SIZE, ((1 bsl 32) - 1)). -define(DEFAULT_MAX_FILE_SIZE, ((1 bsl 32) - 1)).
-define(MINIMUM_OFFSET, 1024). -define(MINIMUM_OFFSET, 0).
%% 0th draft of checksum typing with 1st byte. %% 0th draft of checksum typing with 1st byte.
-define(CSUM_TAG_NONE, 0). % No csum provided by client -define(CSUM_TAG_NONE, 0). % No csum provided by client

View file

@ -1,16 +1,19 @@
-module(machi_csum_table). -module(machi_csum_table).
%% @doc Object Database mapper that translates
%% (file, checksum, offset, size)|(trimmed-file) <-> LevelDB key and value
%% Keys and values are both encoded with sext.
-export([open/2, -export([open/2,
find/3, find/4,
write/6, write/4, trim/5, write/7, write/5, trim/6,
find_leftneighbor/2, find_rightneighbor/2, find_leftneighbor/3, find_rightneighbor/3,
all_trimmed/3, any_trimmed/3, all_trimmed/4, any_trimmed/4,
all_trimmed/2, calc_unwritten_bytes/2,
calc_unwritten_bytes/1,
split_checksum_list_blob_decode/1, split_checksum_list_blob_decode/1,
all/1, all/2,
close/1, delete/1, close/1, maybe_trim_file/3,
foldl_chunks/3]). foldl_file_chunks/4, foldl_chunks/3]).
-include("machi.hrl"). -include("machi.hrl").
@ -40,12 +43,8 @@ open(CSumFilename, _Opts) ->
%% operating system's file cache, which is for %% operating system's file cache, which is for
%% Machi's main read efficiency %% Machi's main read efficiency
{total_leveldb_mem_percent, 10}], {total_leveldb_mem_percent, 10}],
ok = filelib:ensure_dir(CSumFilename),
{ok, T} = eleveldb:open(CSumFilename, LevelDBOptions), {ok, T} = eleveldb:open(CSumFilename, LevelDBOptions),
%% Dummy entry for reserved headers
ok = eleveldb:put(T,
sext:encode({0, ?MINIMUM_OFFSET}),
sext:encode(?CSUM_TAG_NONE_ATOM),
[{sync, true}]),
C0 = #machi_csum_table{ C0 = #machi_csum_table{
file=CSumFilename, file=CSumFilename,
table=T}, table=T},
@ -55,61 +54,51 @@ open(CSumFilename, _Opts) ->
split_checksum_list_blob_decode(Bin) -> split_checksum_list_blob_decode(Bin) ->
erlang:binary_to_term(Bin). erlang:binary_to_term(Bin).
-define(has_overlap(LeftOffset, LeftSize, RightOffset, RightSize), -define(has_overlap(LeftOffset, LeftSize, RightOffset, RightSize),
((LeftOffset - (RightOffset+RightSize)) * (LeftOffset+LeftSize - RightOffset) < 0)). ((LeftOffset - (RightOffset+RightSize)) * (LeftOffset+LeftSize - RightOffset) < 0)).
-spec find(table(), machi_dt:file_offset(), machi_dt:chunk_size()) -spec find(table(), binary(), machi_dt:file_offset(), machi_dt:chunk_size())
-> [chunk()]. -> [chunk()].
find(#machi_csum_table{table=T}, Offset, Size) -> find(#machi_csum_table{table=T}, Filename, Offset, Size) when is_binary(Filename) ->
{ok, I} = eleveldb:iterator(T, [], keys_only), EndKey = sext:encode({Filename, Offset+Size, 0}),
EndKey = sext:encode({Offset+Size, 0}),
StartKey = sext:encode({Offset, Size}),
{ok, FirstKey} = case eleveldb:iterator_move(I, StartKey) of case search_for_start_key(T, Filename, Offset, Size) of
{error, invalid_iterator} -> undefined -> [];
eleveldb:iterator_move(I, first); FirstKey ->
{ok, _} = R0 ->
case eleveldb:iterator_move(I, prev) of FoldFun = fun({K, V}, Acc) ->
{error, invalid_iterator} -> {Filename, TargetOffset, TargetSize} = sext:decode(K),
R0; case ?has_overlap(TargetOffset, TargetSize, Offset, Size) of
{ok, _} = R1 -> true ->
R1 [{TargetOffset, TargetSize, sext:decode(V)}|Acc];
end false ->
end, Acc
_ = eleveldb:iterator_close(I), end;
FoldFun = fun({K, V}, Acc) -> (_K, Acc) ->
{TargetOffset, TargetSize} = sext:decode(K), lager:error("~p wrong option", [_K]),
case ?has_overlap(TargetOffset, TargetSize, Offset, Size) of
true ->
[{TargetOffset, TargetSize, sext:decode(V)}|Acc];
false ->
Acc Acc
end; end,
(_K, Acc) -> lists:reverse(eleveldb_fold(T, FirstKey, EndKey, FoldFun, []))
lager:error("~p wrong option", [_K]), end.
Acc
end,
lists:reverse(eleveldb_fold(T, FirstKey, EndKey, FoldFun, [])).
%% @doc Updates all chunk info, by deleting existing entries if exists %% @doc Updates all chunk info, by deleting existing entries if exists
%% and putting new chunk info %% and putting new chunk info
-spec write(table(), -spec write(table(), binary(),
machi_dt:file_offset(), machi_dt:chunk_size(), machi_dt:file_offset(), machi_dt:chunk_size(),
machi_dt:chunk_csum()|'none'|'trimmed', machi_dt:chunk_csum()|'none'|'trimmed',
undefined|chunk(), undefined|chunk()) -> undefined|chunk(), undefined|chunk()) ->
ok | {error, term()}. ok | {error, term()}.
write(#machi_csum_table{table=T} = CsumT, Offset, Size, CSum, write(#machi_csum_table{table=T} = CsumT, Filename,
LeftUpdate, RightUpdate) -> Offset, Size, CSum, LeftUpdate, RightUpdate) when is_binary(Filename) ->
PutOps = PutOps =
[{put, [{put,
sext:encode({Offset, Size}), sext:encode({Filename, Offset, Size}),
sext:encode(CSum)}] sext:encode(CSum)}]
++ case LeftUpdate of ++ case LeftUpdate of
{LO, LS, LCsum} when LO + LS =:= Offset -> {LO, LS, LCsum} when LO + LS =:= Offset ->
[{put, [{put,
sext:encode({LO, LS}), sext:encode({Filename, LO, LS}),
sext:encode(LCsum)}]; sext:encode(LCsum)}];
undefined -> undefined ->
[] []
@ -117,58 +106,58 @@ write(#machi_csum_table{table=T} = CsumT, Offset, Size, CSum,
++ case RightUpdate of ++ case RightUpdate of
{RO, RS, RCsum} when RO =:= Offset + Size -> {RO, RS, RCsum} when RO =:= Offset + Size ->
[{put, [{put,
sext:encode({RO, RS}), sext:encode({Filename, RO, RS}),
sext:encode(RCsum)}]; sext:encode(RCsum)}];
undefined -> undefined ->
[] []
end, end,
Chunks = find(CsumT, Offset, Size), Chunks = find(CsumT, Filename, Offset, Size),
DeleteOps = lists:map(fun({O, L, _}) -> DeleteOps = lists:map(fun({O, L, _}) ->
{delete, sext:encode({O, L})} {delete, sext:encode({Filename, O, L})}
end, Chunks), end, Chunks),
eleveldb:write(T, DeleteOps ++ PutOps, [{sync, true}]). eleveldb:write(T, DeleteOps ++ PutOps, [{sync, true}]).
-spec find_leftneighbor(table(), non_neg_integer()) -> -spec find_leftneighbor(table(), binary(), non_neg_integer()) ->
undefined | chunk(). undefined | chunk().
find_leftneighbor(CsumT, Offset) -> find_leftneighbor(CsumT, Filename, Offset) when is_binary(Filename) ->
case find(CsumT, Offset, 1) of case find(CsumT, Filename, Offset, 1) of
[] -> undefined; [] -> undefined;
[{Offset, _, _}] -> undefined; [{Offset, _, _}] -> undefined;
[{LOffset, _, CsumOrTrimmed}] -> {LOffset, Offset - LOffset, CsumOrTrimmed} [{LOffset, _, CsumOrTrimmed}] -> {LOffset, Offset - LOffset, CsumOrTrimmed}
end. end.
-spec find_rightneighbor(table(), non_neg_integer()) -> -spec find_rightneighbor(table(), binary(), non_neg_integer()) ->
undefined | chunk(). undefined | chunk().
find_rightneighbor(CsumT, Offset) -> find_rightneighbor(CsumT, Filename, Offset) when is_binary(Filename) ->
case find(CsumT, Offset, 1) of case find(CsumT, Filename, Offset, 1) of
[] -> undefined; [] -> undefined;
[{Offset, _, _}] -> undefined; [{Offset, _, _}] -> undefined;
[{ROffset, RSize, CsumOrTrimmed}] -> [{ROffset, RSize, CsumOrTrimmed}] ->
{Offset, ROffset + RSize - Offset, CsumOrTrimmed} {Offset, ROffset + RSize - Offset, CsumOrTrimmed}
end. end.
-spec write(table(), machi_dt:file_offset(), machi_dt:file_size(), -spec write(table(), binary(), machi_dt:file_offset(), machi_dt:file_size(),
machi_dt:chunk_csum()|none|trimmed) -> machi_dt:chunk_csum()|none|trimmed) ->
ok | {error, trimmed|file:posix()}. ok | {error, trimmed|file:posix()}.
write(CsumT, Offset, Size, CSum) -> write(CsumT, Filename, Offset, Size, CSum) ->
write(CsumT, Offset, Size, CSum, undefined, undefined). write(CsumT, Filename, Offset, Size, CSum, undefined, undefined).
trim(CsumT, Offset, Size, LeftUpdate, RightUpdate) -> trim(CsumT, Filename, Offset, Size, LeftUpdate, RightUpdate) ->
write(CsumT, Offset, Size, write(CsumT, Filename, Offset, Size,
trimmed, %% Should this be much smaller like $t or just 't' trimmed, %% Should this be much smaller like $t or just 't'
LeftUpdate, RightUpdate). LeftUpdate, RightUpdate).
%% @doc returns whether all bytes in a specific window is continously %% @doc returns whether all bytes in a specific window is continously
%% trimmed or not %% trimmed or not
-spec all_trimmed(table(), non_neg_integer(), non_neg_integer()) -> boolean(). -spec all_trimmed(table(), binary(), non_neg_integer(), non_neg_integer()) -> boolean().
all_trimmed(#machi_csum_table{table=T}, Left, Right) -> all_trimmed(#machi_csum_table{table=T}, Filename, Left, Right) when is_binary(Filename) ->
FoldFun = fun({_, _}, false) -> FoldFun = fun({_, _}, false) ->
false; false;
({K, V}, Pos) when is_integer(Pos) andalso Pos =< Right -> ({K, V}, Pos) when is_integer(Pos) andalso Pos =< Right ->
case {sext:decode(K), sext:decode(V)} of case {sext:decode(K), sext:decode(V)} of
{{Pos, Size}, trimmed} -> {{Filename, Pos, Size}, trimmed} ->
Pos + Size; Pos + Size;
{{Offset, Size}, _} {{Filename, Offset, Size}, _}
when Offset + Size =< Left -> when Offset + Size =< Left ->
Left; Left;
_Eh -> _Eh ->
@ -183,58 +172,84 @@ all_trimmed(#machi_csum_table{table=T}, Left, Right) ->
true true
end. end.
%% @doc returns whether all bytes 0-Pos0 is continously trimmed or -spec any_trimmed(table(), binary(),
%% not, including header.
-spec all_trimmed(table(), non_neg_integer()) -> boolean().
all_trimmed(CsumT, Pos0) ->
all_trimmed(CsumT, 0, Pos0).
-spec any_trimmed(table(),
pos_integer(), pos_integer(),
machi_dt:chunk_size()) -> boolean(). machi_dt:chunk_size()) -> boolean().
any_trimmed(CsumT, Offset, Size) -> any_trimmed(CsumT, Filename, Offset, Size) ->
Chunks = find(CsumT, Offset, Size), Chunks = find(CsumT, Filename, Offset, Size),
lists:any(fun({_, _, State}) -> State =:= trimmed end, Chunks). lists:any(fun({_, _, State}) -> State =:= trimmed end, Chunks).
-spec calc_unwritten_bytes(table()) -> [byte_sequence()]. -spec calc_unwritten_bytes(table(), binary()) -> [byte_sequence()].
calc_unwritten_bytes(#machi_csum_table{table=_} = CsumT) -> calc_unwritten_bytes(#machi_csum_table{table=_} = CsumT, Filename) ->
case lists:sort(all(CsumT)) of case lists:sort(all(CsumT, Filename)) of
[] -> [] ->
[{?MINIMUM_OFFSET, infinity}]; [{0, infinity}];
Sorted -> [{0, _, _}|_] = Sorted ->
{LastOffset, _, _} = hd(Sorted), build_unwritten_bytes_list(Sorted, 0, []);
build_unwritten_bytes_list(Sorted, LastOffset, []) [{LastOffset, _, _}|_] = Sorted ->
build_unwritten_bytes_list(Sorted, LastOffset, [{0, LastOffset}])
end. end.
all(CsumT) -> all(CsumT, Filename) ->
FoldFun = fun(E, Acc) -> [E|Acc] end, FoldFun = fun(E, Acc) -> [E|Acc] end,
lists:reverse(foldl_chunks(FoldFun, [], CsumT)). lists:reverse(foldl_file_chunks(FoldFun, [], CsumT, Filename)).
-spec close(table()) -> ok. -spec close(table()) -> ok.
close(#machi_csum_table{table=T}) -> close(#machi_csum_table{table=T}) ->
ok = eleveldb:close(T). ok = eleveldb:close(T).
-spec delete(table()) -> ok.
delete(#machi_csum_table{table=T, file=F}) -> -spec maybe_trim_file(table(), binary(), non_neg_integer()) ->
catch eleveldb:close(T), {ok, trimmed|not_trimmed} | {error, term()}.
%% TODO change this to directory walk maybe_trim_file(#machi_csum_table{table=T} = CsumT, Filename, EofP) when is_binary(Filename) ->
case os:cmd("rm -rf " ++ F) of %% TODO: optimize; this code runs fold on eleveldb twice.
"" -> ok; case all_trimmed(CsumT, Filename, 0, EofP) of
E -> E true ->
Chunks = all(CsumT, Filename),
DeleteOps = lists:map(fun({O, L, _}) ->
{delete, sext:encode({Filename, O, L})}
end, Chunks),
FileTombstone = {put, sext:encode({ts, Filename}), sext:encode(ts)},
case eleveldb:write(T, [FileTombstone|DeleteOps], [{sync, true}]) of
ok -> {ok, trimmed};
Other -> Other
end;
false ->
{ok, not_trimmed}
end. end.
-spec foldl_chunks(fun((chunk(), Acc0 :: term()) -> Acc :: term()), %% @doc Folds over all chunks of a file
Acc0 :: term(), table()) -> Acc :: term(). -spec foldl_file_chunks(fun((chunk(), Acc0 :: term()) -> Acc :: term()),
foldl_chunks(Fun, Acc0, #machi_csum_table{table=T}) -> Acc0 :: term(), table(), binary()) -> Acc :: term().
foldl_file_chunks(Fun, Acc0, #machi_csum_table{table=T}, Filename) when is_binary(Filename) ->
FoldFun = fun({K, V}, Acc) -> FoldFun = fun({K, V}, Acc) ->
{Offset, Len} = sext:decode(K), {Filename, Offset, Len} = sext:decode(K),
Fun({Offset, Len, sext:decode(V)}, Acc); Fun({Offset, Len, sext:decode(V)}, Acc);
(_K, Acc) -> (_K, Acc) ->
_ = lager:error("~p: wrong option?", [_K]), _ = lager:error("~p: wrong option?", [_K]),
Acc Acc
end, end,
StartKey = {Filename, 0, 0},
EndKey = { <<Filename/binary, 255, 255, 255, 255, 255>>, 0, 0},
eleveldb_fold(T, sext:encode(StartKey), sext:encode(EndKey),
FoldFun, Acc0).
%% @doc Folds over all chunks of all files
-spec foldl_chunks(fun((chunk(), Acc0 :: term()) -> Acc :: term()),
Acc0 :: term(), table()) -> Acc :: term().
foldl_chunks(Fun, Acc0, #machi_csum_table{table=T}) ->
FoldFun = fun({K, V}, Acc) ->
{Filename, Offset, Len} = sext:decode(K),
Fun({Filename, Offset, Len, sext:decode(V)}, Acc);
(_K, Acc) ->
_ = lager:error("~p: wrong option?", [_K]),
Acc
end,
eleveldb:fold(T, FoldFun, Acc0, [{verify_checksums, true}]). eleveldb:fold(T, FoldFun, Acc0, [{verify_checksums, true}]).
%% == internal functions ==
-spec build_unwritten_bytes_list( CsumData :: [{ Offset :: non_neg_integer(), -spec build_unwritten_bytes_list( CsumData :: [{ Offset :: non_neg_integer(),
Size :: pos_integer(), Size :: pos_integer(),
Checksum :: binary() }], Checksum :: binary() }],
@ -298,3 +313,50 @@ eleveldb_do_fold({error, iterator_closed}, _, _, _, Acc) ->
eleveldb_do_fold({error, invalid_iterator}, _, _, _, Acc) -> eleveldb_do_fold({error, invalid_iterator}, _, _, _, Acc) ->
%% Probably reached to end %% Probably reached to end
Acc. Acc.
%% Key1 < MaybeStartKey =< Key
%% FirstKey =< MaybeStartKey
search_for_start_key(T, Filename, Offset, Size) ->
MaybeStartKey = sext:encode({Filename, Offset, Size}),
FirstKey = sext:encode({Filename, 0, 0}),
{ok, I} = eleveldb:iterator(T, [], keys_only),
try
case eleveldb:iterator_move(I, MaybeStartKey) of
{error, invalid_iterator} ->
%% No key in right - go for probably first key in the file
case eleveldb:iterator_move(I, FirstKey) of
{error, _} -> undefined;
{ok, Key0} -> goto_end(I, Key0, Offset)
end;
{ok, Key} when Key < FirstKey ->
FirstKey;
{ok, Key} ->
case eleveldb:iterator_move(I, prev) of
{error, invalid_iterator} ->
Key;
{ok, Key1} when Key1 < FirstKey ->
Key;
{ok, Key1} ->
Key1
end
end
after
_ = eleveldb:iterator_close(I)
end.
goto_end(I, Key, Offset) ->
case sext:decode(Key) of
{_Filename, O, L} when Offset =< O + L ->
Key;
{_Filename, O, L} when O + L < Offset ->
case eleveldb:iterator_move(I, next) of
{ok, NextKey} ->
goto_end(I, NextKey, Offset);
{error, _} ->
Key
end
end.

View file

@ -85,8 +85,6 @@
filename :: string() | undefined, filename :: string() | undefined,
data_path :: string() | undefined, data_path :: string() | undefined,
wedged = false :: boolean(), wedged = false :: boolean(),
csum_file :: string()|undefined,
csum_path :: string()|undefined,
data_filehandle :: file:io_device(), data_filehandle :: file:io_device(),
csum_table :: machi_csum_table:table(), csum_table :: machi_csum_table:table(),
eof_position = 0 :: non_neg_integer(), eof_position = 0 :: non_neg_integer(),
@ -102,12 +100,14 @@
%% Public API %% Public API
% @doc Start a new instance of the file proxy service. Takes the filename % @doc Start a new instance of the file proxy service. Takes the
% and data directory as arguments. This function is typically called by the % filename and data directory as arguments. This function is typically
% `machi_file_proxy_sup:start_proxy/2' function. % called by the `machi_file_proxy_sup:start_proxy/2'
-spec start_link(FluName :: atom(), Filename :: string(), DataDir :: string()) -> any(). % function. Checksum table is also passed at startup.
start_link(FluName, Filename, DataDir) -> -spec start_link(Filename :: string(),
gen_server:start_link(?MODULE, {FluName, Filename, DataDir}, []). DataDir :: string(), CsumTable :: machi_csum_table:table()) -> any().
start_link(Filename, DataDir, CsumTable) ->
gen_server:start_link(?MODULE, {Filename, DataDir, CsumTable}, []).
% @doc Request to stop an instance of the file proxy service. % @doc Request to stop an instance of the file proxy service.
-spec stop(Pid :: pid()) -> ok. -spec stop(Pid :: pid()) -> ok.
@ -218,24 +218,19 @@ checksum_list(Pid) ->
%% gen_server callbacks %% gen_server callbacks
% @private % @private
init({FluName, Filename, DataDir}) -> init({Filename, DataDir, CsumTable}) ->
CsumFile = machi_util:make_checksum_filename(DataDir, Filename),
{_, DPath} = machi_util:make_data_filename(DataDir, Filename), {_, DPath} = machi_util:make_data_filename(DataDir, Filename),
ok = filelib:ensure_dir(CsumFile),
ok = filelib:ensure_dir(DPath), ok = filelib:ensure_dir(DPath),
{ok, CsumTable} = machi_csum_table:open(CsumFile, []), UnwrittenBytes = machi_csum_table:calc_unwritten_bytes(CsumTable, iolist_to_binary(Filename)),
UnwrittenBytes = machi_csum_table:calc_unwritten_bytes(CsumTable),
{Eof, infinity} = lists:last(UnwrittenBytes), {Eof, infinity} = lists:last(UnwrittenBytes),
{ok, FHd} = file:open(DPath, [read, write, binary, raw]), {ok, FHd} = file:open(DPath, [read, write, binary, raw]),
%% Reserve for EC and stuff, to prevent eof when read %% Reserve for EC and stuff, to prevent eof when read
ok = file:pwrite(FHd, 0, binary:copy(<<"so what?">>, ?MINIMUM_OFFSET div 8)), ok = file:pwrite(FHd, 0, binary:copy(<<"so what?">>, ?MINIMUM_OFFSET div 8)),
Tref = schedule_tick(), Tref = schedule_tick(),
St = #state{ St = #state{
fluname = FluName,
filename = Filename, filename = Filename,
data_dir = DataDir, data_dir = DataDir,
data_path = DPath, data_path = DPath,
csum_file = CsumFile,
data_filehandle = FHd, data_filehandle = FHd,
csum_table = CsumTable, csum_table = CsumTable,
tref = Tref, tref = Tref,
@ -348,7 +343,7 @@ handle_call({write, Offset, ClientMeta, Data}, _From,
{Error, Err + 1} {Error, Err + 1}
end end
end, end,
{NewEof, infinity} = lists:last(machi_csum_table:calc_unwritten_bytes(CsumTable)), {NewEof, infinity} = lists:last(machi_csum_table:calc_unwritten_bytes(CsumTable, iolist_to_binary(F))),
lager:debug("Wrote ~p bytes at ~p of file ~p, NewEOF = ~p~n", lager:debug("Wrote ~p bytes at ~p of file ~p, NewEOF = ~p~n",
[iolist_size(Data), Offset, F, NewEof]), [iolist_size(Data), Offset, F, NewEof]),
{reply, Resp, State#state{writes = {T+1, NewErr}, {reply, Resp, State#state{writes = {T+1, NewErr},
@ -365,11 +360,14 @@ handle_call({trim, _Offset, _ClientMeta, _Data}, _From,
handle_call({trim, Offset, Size, _TriggerGC}, _From, handle_call({trim, Offset, Size, _TriggerGC}, _From,
State = #state{data_filehandle=FHd, State = #state{data_filehandle=FHd,
filename=Filename,
ops = Ops, ops = Ops,
trims = {T, Err}, trims = {T, Err},
csum_table = CsumTable}) -> csum_table = CsumTable}) ->
case machi_csum_table:all_trimmed(CsumTable, Offset, Offset+Size) of F = iolist_to_binary(Filename),
case machi_csum_table:all_trimmed(CsumTable, F,
Offset, Offset+Size) of
true -> true ->
NewState = State#state{ops=Ops+1, trims={T, Err+1}}, NewState = State#state{ops=Ops+1, trims={T, Err+1}},
%% All bytes of that range was already trimmed returns ok %% All bytes of that range was already trimmed returns ok
@ -379,14 +377,19 @@ handle_call({trim, Offset, Size, _TriggerGC}, _From,
false -> false ->
LUpdate = maybe_regenerate_checksum( LUpdate = maybe_regenerate_checksum(
FHd, FHd,
machi_csum_table:find_leftneighbor(CsumTable, Offset)), machi_csum_table:find_leftneighbor(CsumTable,
F,
Offset)),
RUpdate = maybe_regenerate_checksum( RUpdate = maybe_regenerate_checksum(
FHd, FHd,
machi_csum_table:find_rightneighbor(CsumTable, Offset+Size)), machi_csum_table:find_rightneighbor(CsumTable,
F,
Offset+Size)),
case machi_csum_table:trim(CsumTable, Offset, Size, LUpdate, RUpdate) of case machi_csum_table:trim(CsumTable, F, Offset,
Size, LUpdate, RUpdate) of
ok -> ok ->
{NewEof, infinity} = lists:last(machi_csum_table:calc_unwritten_bytes(CsumTable)), {NewEof, infinity} = lists:last(machi_csum_table:calc_unwritten_bytes(CsumTable, F)),
NewState = State#state{ops=Ops+1, NewState = State#state{ops=Ops+1,
trims={T+1, Err}, trims={T+1, Err},
eof_position=NewEof}, eof_position=NewEof},
@ -435,8 +438,9 @@ handle_call({append, ClientMeta, Extra, Data}, _From,
{reply, Resp, State#state{appends = {T+1, NewErr}, {reply, Resp, State#state{appends = {T+1, NewErr},
eof_position = NewEof}}; eof_position = NewEof}};
handle_call({checksum_list}, _FRom, State = #state{csum_table=T}) -> handle_call({checksum_list}, _FRom, State = #state{filename=Filename,
All = machi_csum_table:all(T), csum_table=T}) ->
All = machi_csum_table:all(T,iolist_to_binary(Filename)),
{reply, {ok, All}, State}; {reply, {ok, All}, State};
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
@ -528,7 +532,6 @@ handle_info(Req, State) ->
% @private % @private
terminate(Reason, #state{filename = F, terminate(Reason, #state{filename = F,
data_filehandle = FHd, data_filehandle = FHd,
csum_table = T,
reads = {RT, RE}, reads = {RT, RE},
writes = {WT, WE}, writes = {WT, WE},
appends = {AT, AE} appends = {AT, AE}
@ -544,14 +547,7 @@ terminate(Reason, #state{filename = F,
_ -> _ ->
ok = file:sync(FHd), ok = file:sync(FHd),
ok = file:close(FHd) ok = file:close(FHd)
end, end.
case T of
undefined ->
noop; %% file deleted
_ ->
ok = machi_csum_table:close(T)
end,
ok.
% @private % @private
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
@ -622,7 +618,8 @@ check_or_make_tagged_csum(OtherTag, _ClientCsum, _Data) ->
do_read(FHd, Filename, CsumTable, Offset, Size, _, _) -> do_read(FHd, Filename, CsumTable, Offset, Size, _, _) ->
%% Note that find/3 only returns overlapping chunks, both borders %% Note that find/3 only returns overlapping chunks, both borders
%% are not aligned to original Offset and Size. %% are not aligned to original Offset and Size.
ChunkCsums = machi_csum_table:find(CsumTable, Offset, Size), ChunkCsums = machi_csum_table:find(CsumTable, iolist_to_binary(Filename),
Offset, Size),
read_all_ranges(FHd, Filename, ChunkCsums, [], []). read_all_ranges(FHd, Filename, ChunkCsums, [], []).
-spec read_all_ranges(file:io_device(), string(), -spec read_all_ranges(file:io_device(), string(),
@ -700,7 +697,7 @@ read_all_ranges(FHd, Filename, [{Offset, Size, TaggedCsum}|T], ReadChunks, Trimm
handle_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Data) -> handle_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Data) ->
Size = iolist_size(Data), Size = iolist_size(Data),
case machi_csum_table:find(CsumTable, Offset, Size) of case machi_csum_table:find(CsumTable, iolist_to_binary(Filename), Offset, Size) of
[] -> %% Nothing should be there [] -> %% Nothing should be there
try try
do_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Size, Data) do_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Size, Data)
@ -723,6 +720,7 @@ handle_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Data) ->
ok; ok;
{ok, _Other} -> {ok, _Other} ->
%% TODO: leave some debug/warning message here? %% TODO: leave some debug/warning message here?
io:format(user, "baposdifa;lsdfkj<<<<<<<~n", []),
{error, written} {error, written}
end; end;
[{Offset, Size, OtherCsum}] -> [{Offset, Size, OtherCsum}] ->
@ -731,11 +729,13 @@ handle_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Data) ->
" a check for unwritten bytes gave us checksum ~p" " a check for unwritten bytes gave us checksum ~p"
" but the data we were trying to write has checksum ~p", " but the data we were trying to write has checksum ~p",
[Offset, Filename, OtherCsum, TaggedCsum]), [Offset, Filename, OtherCsum, TaggedCsum]),
io:format(user, "baposdifa;lsdfkj*************8~n", []),
{error, written}; {error, written};
_Chunks -> _Chunks ->
%% TODO: Do we try to read all continuous chunks to see %% TODO: Do we try to read all continuous chunks to see
%% wether its total checksum matches client-provided checksum? %% wether its total checksum matches client-provided checksum?
case machi_csum_table:any_trimmed(CsumTable, Offset, Size) of case machi_csum_table:any_trimmed(CsumTable, iolist_to_binary(Filename),
Offset, Size) of
true -> true ->
%% More than a byte is trimmed, besides, do we %% More than a byte is trimmed, besides, do we
%% have to return exact written bytes? No. Clients %% have to return exact written bytes? No. Clients
@ -744,6 +744,7 @@ handle_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Data) ->
{error, trimmed}; {error, trimmed};
false -> false ->
%% No byte is trimmed, but at least one byte is written %% No byte is trimmed, but at least one byte is written
io:format(user, "baposdifa;lsdfkj*************8 ~p~n", [_Chunks]),
{error, written} {error, written}
end end
end. end.
@ -761,6 +762,7 @@ handle_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Data) ->
do_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Size, Data) -> do_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Size, Data) ->
case file:pwrite(FHd, Offset, Data) of case file:pwrite(FHd, Offset, Data) of
ok -> ok ->
F = iolist_to_binary(Filename),
lager:debug("Successful write in file ~p at offset ~p, length ~p", lager:debug("Successful write in file ~p at offset ~p, length ~p",
[Filename, Offset, Size]), [Filename, Offset, Size]),
@ -769,11 +771,15 @@ do_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Size, Data) ->
%% as server_sha %% as server_sha
LUpdate = maybe_regenerate_checksum( LUpdate = maybe_regenerate_checksum(
FHd, FHd,
machi_csum_table:find_leftneighbor(CsumTable, Offset)), machi_csum_table:find_leftneighbor(CsumTable,
F,
Offset)),
RUpdate = maybe_regenerate_checksum( RUpdate = maybe_regenerate_checksum(
FHd, FHd,
machi_csum_table:find_rightneighbor(CsumTable, Offset+Size)), machi_csum_table:find_rightneighbor(CsumTable,
ok = machi_csum_table:write(CsumTable, Offset, Size, F,
Offset+Size)),
ok = machi_csum_table:write(CsumTable, F, Offset, Size,
TaggedCsum, LUpdate, RUpdate), TaggedCsum, LUpdate, RUpdate),
lager:debug("Successful write to checksum file for ~p", lager:debug("Successful write to checksum file for ~p",
[Filename]), [Filename]),
@ -838,32 +844,26 @@ maybe_gc(Reply, S = #state{eof_position = Eof,
lager:debug("The file is still small; not trying GC (Eof, MaxFileSize) = (~p, ~p)~n", lager:debug("The file is still small; not trying GC (Eof, MaxFileSize) = (~p, ~p)~n",
[Eof, MaxFileSize]), [Eof, MaxFileSize]),
{reply, Reply, S}; {reply, Reply, S};
maybe_gc(Reply, S = #state{fluname=FluName, maybe_gc(Reply, S = #state{data_filehandle = FHd,
data_filehandle = FHd,
data_dir = DataDir, data_dir = DataDir,
filename = Filename, filename = Filename,
eof_position = Eof, eof_position = Eof,
csum_table=CsumTable}) -> csum_table=CsumTable}) ->
case machi_csum_table:all_trimmed(CsumTable, ?MINIMUM_OFFSET, Eof) of lager:debug("GC? Let's try it: ~p.~n", [Filename]),
true -> case machi_csum_table:maybe_trim_file(CsumTable, iolist_to_binary(Filename), Eof) of
lager:debug("GC? Let's do it: ~p.~n", [Filename]), {ok, trimmed} ->
%% Before unlinking a file, it should inform %% Checksum table entries are all trimmed now, unlinking
%% machi_flu_filename_mgr that this file is %% file from operating system
%% deleted and mark it as "trimmed" to avoid
%% filename reuse and resurrection. Maybe garbage
%% will remain if a process crashed but it also
%% should be recovered at filename_mgr startup.
%% Also, this should be informed *before* file proxy
%% deletes files.
ok = machi_flu_metadata_mgr:trim_file(FluName, {file, Filename}),
ok = file:close(FHd), ok = file:close(FHd),
{_, DPath} = machi_util:make_data_filename(DataDir, Filename), {_, DPath} = machi_util:make_data_filename(DataDir, Filename),
ok = file:delete(DPath), ok = file:delete(DPath),
machi_csum_table:delete(CsumTable), lager:info("File ~s has been unlinked as all chunks"
{stop, normal, Reply, " were trimmed.", [Filename]),
S#state{data_filehandle=undefined, {stop, normal, Reply, S#state{data_filehandle=undefined}};
csum_table=undefined}}; {ok, not_trimmed} ->
false -> {reply, Reply, S};
{error, _} = Error ->
lager:error("machi_csum_table:maybe_trim_file/4 has been "
"unexpectedly failed: ~p", [Error]),
{reply, Reply, S} {reply, Reply, S}
end. end.

View file

@ -26,7 +26,7 @@
-export([ -export([
child_spec/1, child_spec/1,
start_link/1, start_link/1,
start_proxy/3 start_proxy/4
]). ]).
%% supervisor callback %% supervisor callback
@ -43,9 +43,9 @@ child_spec(FluName) ->
start_link(FluName) -> start_link(FluName) ->
supervisor:start_link({local, make_proxy_name(FluName)}, ?MODULE, []). supervisor:start_link({local, make_proxy_name(FluName)}, ?MODULE, []).
start_proxy(FluName, DataDir, Filename) -> start_proxy(FluName, DataDir, Filename, CsumTable) ->
supervisor:start_child(make_proxy_name(FluName), supervisor:start_child(make_proxy_name(FluName),
[FluName, Filename, DataDir]). [Filename, DataDir, CsumTable]).
init([]) -> init([]) ->
SupFlags = {simple_one_for_one, 1000, 10}, SupFlags = {simple_one_for_one, 1000, 10},

View file

@ -149,7 +149,7 @@ main2(FluName, TcpPort, DataDir, Props) ->
{true, undefined} {true, undefined}
end, end,
Witness_p = proplists:get_value(witness_mode, Props, false), Witness_p = proplists:get_value(witness_mode, Props, false),
S0 = #state{flu_name=FluName, S0 = #state{flu_name=FluName,
proj_store=ProjectionPid, proj_store=ProjectionPid,
wedged=Wedged_p, wedged=Wedged_p,

View file

@ -39,12 +39,11 @@
-define(HASH(X), erlang:phash2(X)). %% hash algorithm to use -define(HASH(X), erlang:phash2(X)). %% hash algorithm to use
-define(TIMEOUT, 10 * 1000). %% 10 second timeout -define(TIMEOUT, 10 * 1000). %% 10 second timeout
-define(KNOWN_FILES_LIST_PREFIX, "known_files_").
-record(state, {fluname :: atom(), -record(state, {fluname :: atom(),
datadir :: string(), datadir :: string(),
tid :: ets:tid(), tid :: ets:tid(),
cnt :: non_neg_integer(), cnt :: non_neg_integer(),
csum_table :: machi_csum_table:table(),
trimmed_files :: machi_plist:plist() trimmed_files :: machi_plist:plist()
}). }).
@ -109,16 +108,17 @@ init([FluName, Name, DataDir, Num]) ->
%% important: we'll need another persistent storage to %% important: we'll need another persistent storage to
%% remember deleted (trimmed) file, to prevent resurrection after %% remember deleted (trimmed) file, to prevent resurrection after
%% flu restart and append. %% flu restart and append.
FileListFileName =
filename:join([DataDir, ?KNOWN_FILES_LIST_PREFIX ++ atom_to_list(FluName)]), Tid = ets:new(Name, [{keypos, 2}, {read_concurrency, true}, {write_concurrency, true}]),
{ok, PList} = machi_plist:open(FileListFileName, []),
CsumTableDir = filename:join(DataDir, "checksums"),
{ok, CsumTable} = machi_csum_table:open(CsumTableDir, []),
%% TODO make sure all files non-existent, if any remaining files %% TODO make sure all files non-existent, if any remaining files
%% here, just delete it. They're in the list *because* they're all %% here, just delete it. They're in the list *because* they're all
%% trimmed. %% trimmed.
Tid = ets:new(Name, [{keypos, 2}, {read_concurrency, true}, {write_concurrency, true}]),
{ok, #state{fluname = FluName, datadir = DataDir, tid = Tid, cnt = Num, {ok, #state{fluname = FluName, datadir = DataDir, tid = Tid, cnt = Num,
trimmed_files=PList}}. csum_table=CsumTable}}.
handle_cast(Req, State) -> handle_cast(Req, State) ->
lager:warning("Got unknown cast ~p", [Req]), lager:warning("Got unknown cast ~p", [Req]),
@ -133,14 +133,14 @@ handle_call({proxy_pid, Filename}, _From, State = #state{ tid = Tid }) ->
handle_call({start_proxy_pid, Filename}, _From, handle_call({start_proxy_pid, Filename}, _From,
State = #state{ fluname = N, tid = Tid, datadir = D, State = #state{ fluname = N, tid = Tid, datadir = D,
trimmed_files=TrimmedFiles}) -> csum_table=CsumTable}) ->
case machi_plist:find(TrimmedFiles, Filename) of case machi_csum_table:file_trimmed(CsumTable) of
false -> false ->
NewR = case lookup_md(Tid, Filename) of NewR = case lookup_md(Tid, Filename) of
not_found -> not_found ->
start_file_proxy(N, D, Filename); start_file_proxy(N, D, Filename, CsumTable);
#md{ proxy_pid = undefined } = R0 -> #md{ proxy_pid = undefined } = R0 ->
start_file_proxy(N, D, R0); start_file_proxy(N, D, R0, CsumTable);
#md{ proxy_pid = _Pid } = R1 -> #md{ proxy_pid = _Pid } = R1 ->
R1 R1
end, end,
@ -219,9 +219,9 @@ handle_info(Info, State) ->
lager:warning("Got unknown info ~p", [Info]), lager:warning("Got unknown info ~p", [Info]),
{noreply, State}. {noreply, State}.
terminate(Reason, _State = #state{trimmed_files=TrimmedFiles}) -> terminate(Reason, _State = #state{csum_table=CsumTable}) ->
lager:info("Shutting down because ~p", [Reason]), lager:info("Shutting down because ~p", [Reason]),
machi_plist:close(TrimmedFiles), ok = machi_csum_table:close(CsumTable),
ok. ok.
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
@ -253,13 +253,13 @@ lookup_md(Tid, Data) ->
[R] -> R [R] -> R
end. end.
start_file_proxy(FluName, D, R = #md{filename = F} ) -> start_file_proxy(FluName, D, R = #md{filename = F}, CsumTable) ->
{ok, Pid} = machi_file_proxy_sup:start_proxy(FluName, D, F), {ok, Pid} = machi_file_proxy_sup:start_proxy(FluName, D, F, CsumTable),
Mref = monitor(process, Pid), Mref = monitor(process, Pid),
R#md{ proxy_pid = Pid, mref = Mref }; R#md{ proxy_pid = Pid, mref = Mref };
start_file_proxy(FluName, D, Filename) -> start_file_proxy(FluName, D, Filename, CsumTable) ->
start_file_proxy(FluName, D, #md{ filename = Filename }). start_file_proxy(FluName, D, #md{ filename = Filename }, CsumTable).
update_ets(Tid, R) -> update_ets(Tid, R) ->
ets:insert(Tid, R). ets:insert(Tid, R).

View file

@ -1,69 +0,0 @@
-module(machi_plist).
%%% @doc persistent list of binaries
-export([open/2, close/1, find/2, add/2]).
-ifdef(TEST).
-export([all/1]).
-endif.
-record(machi_plist,
{filename :: file:filename_all(),
fd :: file:io_device(),
list = [] :: list(string)}).
-type plist() :: #machi_plist{}.
-export_type([plist/0]).
-spec open(file:filename_all(), proplists:proplist()) ->
{ok, plist()} | {error, file:posix()}.
open(Filename, _Opt) ->
%% TODO: This decode could fail if the file didn't finish writing
%% whole contents, which should be fixed by some persistent
%% solution.
List = case file:read_file(Filename) of
{ok, <<>>} -> [];
{ok, Bin} -> binary_to_term(Bin);
{error, enoent} -> []
end,
case file:open(Filename, [read, write, raw, binary, sync]) of
{ok, Fd} ->
{ok, #machi_plist{filename=Filename,
fd=Fd,
list=List}};
Error ->
Error
end.
-spec close(plist()) -> ok.
close(#machi_plist{fd=Fd}) ->
_ = file:close(Fd).
-spec find(plist(), string()) -> boolean().
find(#machi_plist{list=List}, Name) ->
lists:member(Name, List).
-spec add(plist(), string()) -> {ok, plist()} | {error, file:posix()}.
add(Plist = #machi_plist{list=List0, fd=Fd}, Name) ->
case find(Plist, Name) of
true ->
{ok, Plist};
false ->
List = lists:append(List0, [Name]),
%% TODO: partial write could break the file with other
%% persistent info (even lose data of trimmed states);
%% needs a solution.
case file:pwrite(Fd, 0, term_to_binary(List)) of
ok ->
{ok, Plist#machi_plist{list=List}};
Error ->
Error
end
end.
-ifdef(TEST).
-spec all(plist()) -> [file:filename()].
all(#machi_plist{list=List}) ->
List.
-endif.

View file

@ -2,68 +2,69 @@
-compile(export_all). -compile(export_all).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-define(HDR, {0, 1024, none}).
cleanup(Dir) -> cleanup(Dir) ->
os:cmd("rm -rf " ++ Dir). os:cmd("rm -rf " ++ Dir).
smoke_test() -> smoke_test() ->
Filename = "./temp-checksum-dumb-file", DBFile = "./temp-checksum-dumb-file",
_ = cleanup(Filename), Filename = <<"/some/puppy/and/cats^^^42">>,
{ok, MC} = machi_csum_table:open(Filename, []), _ = cleanup(DBFile),
?assertEqual([{1024, infinity}], {ok, MC} = machi_csum_table:open(DBFile, []),
machi_csum_table:calc_unwritten_bytes(MC)), ?assertEqual([{0, infinity}],
machi_csum_table:calc_unwritten_bytes(MC, Filename)),
Entry = {Offset, Size, Checksum} = {1064, 34, <<"deadbeef">>}, Entry = {Offset, Size, Checksum} = {1064, 34, <<"deadbeef">>},
[] = machi_csum_table:find(MC, Offset, Size), [] = machi_csum_table:find(MC, Filename, Offset, Size),
ok = machi_csum_table:write(MC, Offset, Size, Checksum), ok = machi_csum_table:write(MC, Filename, Offset, Size, Checksum),
[{1024, 40}, {1098, infinity}] = machi_csum_table:calc_unwritten_bytes(MC), [{0, 1064}, {1098, infinity}] = machi_csum_table:calc_unwritten_bytes(MC, Filename),
?assertEqual([Entry], machi_csum_table:find(MC, Offset, Size)), ?assertEqual([Entry], machi_csum_table:find(MC, Filename, Offset, Size)),
ok = machi_csum_table:trim(MC, Offset, Size, undefined, undefined), ok = machi_csum_table:trim(MC, Filename, Offset, Size, undefined, undefined),
?assertEqual([{Offset, Size, trimmed}], ?assertEqual([{Offset, Size, trimmed}],
machi_csum_table:find(MC, Offset, Size)), machi_csum_table:find(MC, Filename, Offset, Size)),
ok = machi_csum_table:close(MC), ok = machi_csum_table:close(MC).
ok = machi_csum_table:delete(MC).
close_test() -> close_test() ->
Filename = "./temp-checksum-dumb-file-2", DBFile = "./temp-checksum-dumb-file-2",
_ = cleanup(Filename), Filename = <<"/some/puppy/and/cats^^^43">>,
{ok, MC} = machi_csum_table:open(Filename, []), _ = cleanup(DBFile),
{ok, MC} = machi_csum_table:open(DBFile, []),
Entry = {Offset, Size, Checksum} = {1064, 34, <<"deadbeef">>}, Entry = {Offset, Size, Checksum} = {1064, 34, <<"deadbeef">>},
[] = machi_csum_table:find(MC, Offset, Size), [] = machi_csum_table:find(MC, Filename, Offset, Size),
ok = machi_csum_table:write(MC, Offset, Size, Checksum), ok = machi_csum_table:write(MC, Filename, Offset, Size, Checksum),
[Entry] = machi_csum_table:find(MC, Offset, Size), [Entry] = machi_csum_table:find(MC, Filename, Offset, Size),
ok = machi_csum_table:close(MC), ok = machi_csum_table:close(MC),
{ok, MC2} = machi_csum_table:open(Filename, []), {ok, MC2} = machi_csum_table:open(DBFile, []),
[Entry] = machi_csum_table:find(MC2, Offset, Size), [Entry] = machi_csum_table:find(MC2, Filename, Offset, Size),
ok = machi_csum_table:trim(MC2, Offset, Size, undefined, undefined), ok = machi_csum_table:trim(MC2, Filename, Offset, Size, undefined, undefined),
[{Offset, Size, trimmed}] = machi_csum_table:find(MC2, Offset, Size), [{Offset, Size, trimmed}] = machi_csum_table:find(MC2, Filename, Offset, Size),
ok = machi_csum_table:delete(MC2). ok = machi_csum_table:close(MC2).
smoke2_test() -> smoke2_test() ->
Filename = "./temp-checksum-dumb-file-3", DBFile = "./temp-checksum-dumb-file-3",
_ = cleanup(Filename), Filename = <<"/some/puppy/and/cats^^^43">>,
{ok, MC} = machi_csum_table:open(Filename, []), _ = cleanup(DBFile),
{ok, MC} = machi_csum_table:open(DBFile, []),
Entry = {Offset, Size, Checksum} = {1025, 10, <<"deadbeef">>}, Entry = {Offset, Size, Checksum} = {1025, 10, <<"deadbeef">>},
ok = machi_csum_table:write(MC, Offset, Size, Checksum), ok = machi_csum_table:write(MC, Filename, Offset, Size, Checksum),
?assertEqual([], machi_csum_table:find(MC, 0, 0)), ?assertEqual([], machi_csum_table:find(MC, Filename, 0, 0)),
?assertEqual([?HDR], machi_csum_table:find(MC, 0, 1)), ?assertEqual([], machi_csum_table:find(MC, Filename, 0, 1)),
[Entry] = machi_csum_table:find(MC, Offset, Size), [Entry] = machi_csum_table:find(MC, Filename, Offset, Size),
[?HDR] = machi_csum_table:find(MC, 1, 1024), [] = machi_csum_table:find(MC, Filename, 1, 1024),
?assertEqual([?HDR, Entry], ?assertEqual([Entry],
machi_csum_table:find(MC, 1023, 1024)), machi_csum_table:find(MC, Filename, 1023, 1024)),
[Entry] = machi_csum_table:find(MC, 1024, 1024), [Entry] = machi_csum_table:find(MC, Filename, 1024, 1024),
[Entry] = machi_csum_table:find(MC, 1025, 1024), [Entry] = machi_csum_table:find(MC, Filename, 1025, 1024),
ok = machi_csum_table:trim(MC, Offset, Size, undefined, undefined), ok = machi_csum_table:trim(MC, Filename, Offset, Size, undefined, undefined),
[{Offset, Size, trimmed}] = machi_csum_table:find(MC, Offset, Size), [{Offset, Size, trimmed}] = machi_csum_table:find(MC, Filename, Offset, Size),
ok = machi_csum_table:close(MC), ok = machi_csum_table:close(MC).
ok = machi_csum_table:delete(MC).
smoke3_test() -> smoke3_test() ->
Filename = "./temp-checksum-dumb-file-4", DBFile = "./temp-checksum-dumb-file-4",
_ = cleanup(Filename), Filename = <<"/some/puppy/and/cats^^^44">>,
{ok, MC} = machi_csum_table:open(Filename, []), _ = cleanup(DBFile),
{ok, MC} = machi_csum_table:open(DBFile, []),
Scenario = Scenario =
[%% Command, {Offset, Size, Csum}, LeftNeighbor, RightNeibor [%% Command, {Offset, Size, Csum}, LeftNeighbor, RightNeibor
{?LINE, write, {2000, 10, <<"heh">>}, undefined, undefined}, {?LINE, write, {2000, 10, <<"heh">>}, undefined, undefined},
@ -84,9 +85,9 @@ smoke3_test() ->
%% ?debugVal({_Line, Chunk}), %% ?debugVal({_Line, Chunk}),
{Offset, Size, Csum} = Chunk, {Offset, Size, Csum} = Chunk,
?assertEqual(LeftN0, ?assertEqual(LeftN0,
machi_csum_table:find_leftneighbor(MC, Offset)), machi_csum_table:find_leftneighbor(MC, Filename, Offset)),
?assertEqual(RightN0, ?assertEqual(RightN0,
machi_csum_table:find_rightneighbor(MC, Offset+Size)), machi_csum_table:find_rightneighbor(MC, Filename, Offset+Size)),
LeftN = case LeftN0 of LeftN = case LeftN0 of
{OffsL, SizeL, trimmed} -> {OffsL, SizeL, trimmed}; {OffsL, SizeL, trimmed} -> {OffsL, SizeL, trimmed};
{OffsL, SizeL, _} -> {OffsL, SizeL, <<"boom">>}; {OffsL, SizeL, _} -> {OffsL, SizeL, <<"boom">>};
@ -98,19 +99,18 @@ smoke3_test() ->
end, end,
case Cmd of case Cmd of
write -> write ->
ok = machi_csum_table:write(MC, Offset, Size, Csum, ok = machi_csum_table:write(MC, Filename, Offset, Size, Csum,
LeftN, RightN); LeftN, RightN);
trim -> trim ->
ok = machi_csum_table:trim(MC, Offset, Size, ok = machi_csum_table:trim(MC, Filename, Offset, Size,
LeftN, RightN) LeftN, RightN)
end end
end || {_Line, Cmd, Chunk, LeftN0, RightN0} <- Scenario ], end || {_Line, Cmd, Chunk, LeftN0, RightN0} <- Scenario ],
?assert(not machi_csum_table:all_trimmed(MC, 10000)), ?assert(not machi_csum_table:all_trimmed(MC, Filename, 0, 10000)),
machi_csum_table:trim(MC, 0, 10000, undefined, undefined), machi_csum_table:trim(MC, Filename, 0, 10000, undefined, undefined),
?assert(machi_csum_table:all_trimmed(MC, 10000)), ?assert(machi_csum_table:all_trimmed(MC, Filename, 0, 10000)),
ok = machi_csum_table:close(MC), ok = machi_csum_table:close(MC).
ok = machi_csum_table:delete(MC).
%% TODO: add quickcheck test here %% TODO: add quickcheck test here

View file

@ -116,11 +116,11 @@ get_written_interval(L) ->
initial_state() -> initial_state() ->
{_, _, MS} = os:timestamp(), {_, _, MS} = os:timestamp(),
Filename = test_server:temp_name("eqc_data") ++ "." ++ integer_to_list(MS), Filename = test_server:temp_name("eqc_data") ++ "." ++ integer_to_list(MS),
#state{filename=Filename, written=[{0,1024}]}. #state{filename=Filename, written=[]}.
initial_state(I, T) -> initial_state(I, T) ->
S=initial_state(), S=initial_state(),
S#state{written=[{0,1024}], S#state{written=[],
planned_writes=I, planned_writes=I,
planned_trims=T}. planned_trims=T}.
@ -230,7 +230,8 @@ start_command(S) ->
{call, ?MODULE, start, [S]}. {call, ?MODULE, start, [S]}.
start(#state{filename=File}) -> start(#state{filename=File}) ->
{ok, Pid} = machi_file_proxy:start_link(some_flu, File, ?TESTDIR), CsumT = get_csum_table(),
{ok, Pid} = machi_file_proxy:start_link(File, ?TESTDIR, CsumT),
unlink(Pid), unlink(Pid),
Pid. Pid.
@ -432,6 +433,40 @@ stop_post(_, _, _) -> true.
stop_next(S, _, _) -> stop_next(S, _, _) ->
S#state{pid=undefined, prev_extra=0}. S#state{pid=undefined, prev_extra=0}.
csum_table_holder() ->
Parent = self(),
spawn_link(fun() ->
CsumFile = test_server:temp_name("eqc_data-csum"),
filelib:ensure_dir(CsumFile),
{ok, CsumT} = machi_csum_table:open(CsumFile, []),
erlang:register(csum_table_holder, self()),
Parent ! ok,
csum_table_holder_loop(CsumT),
machi_csum_table:close(CsumT),
erlang:unregister(csum_table_holder)
end),
receive
Other -> Other
after 1000 ->
timeout
end.
csum_table_holder_loop(CsumT) ->
receive
{get, From} ->
From ! CsumT;
stop ->
ok
end.
get_csum_table() ->
csum_table_holder ! {get, self()},
receive CsumT -> CsumT
end.
stop_csum_table_holder() ->
catch csum_table_holder ! stop.
%% Property %% Property
prop_ok() -> prop_ok() ->
@ -440,7 +475,9 @@ prop_ok() ->
{shuffle_interval(), shuffle_interval()}, {shuffle_interval(), shuffle_interval()},
?FORALL(Cmds, parallel_commands(?MODULE, initial_state(I, T)), ?FORALL(Cmds, parallel_commands(?MODULE, initial_state(I, T)),
begin begin
ok = csum_table_holder(),
{H, S, Res} = run_parallel_commands(?MODULE, Cmds), {H, S, Res} = run_parallel_commands(?MODULE, Cmds),
stop_csum_table_holder(),
cleanup(), cleanup(),
pretty_commands(?MODULE, Cmds, {H, S, Res}, pretty_commands(?MODULE, Cmds, {H, S, Res},
aggregate(command_names(Cmds), Res == ok)) aggregate(command_names(Cmds), Res == ok))

View file

@ -77,25 +77,28 @@ random_binary(Start, End) ->
end. end.
setup() -> setup() ->
{ok, Pid} = machi_file_proxy:start_link(fluname, "test", ?TESTDIR), {ok, CsumT} = machi_csum_table:open(filename:join([?TESTDIR, "csumfile"]), []),
Pid. {ok, Pid} = machi_file_proxy:start_link("test", ?TESTDIR, CsumT),
{Pid, CsumT}.
teardown(Pid) -> teardown({Pid, CsumT}) ->
catch machi_file_proxy:stop(Pid). catch machi_file_proxy:stop(Pid),
catch machi_csum_table:close(CsumT).
machi_file_proxy_test_() -> machi_file_proxy_test_() ->
clean_up_data_dir(?TESTDIR), clean_up_data_dir(?TESTDIR),
{setup, {setup,
fun setup/0, fun setup/0,
fun teardown/1, fun teardown/1,
fun(Pid) -> fun({Pid, _}) ->
[ [
?_assertEqual({error, bad_arg}, machi_file_proxy:read(Pid, -1, -1)), ?_assertEqual({error, bad_arg}, machi_file_proxy:read(Pid, -1, -1)),
?_assertEqual({error, bad_arg}, machi_file_proxy:write(Pid, -1, <<"yo">>)), ?_assertEqual({error, bad_arg}, machi_file_proxy:write(Pid, -1, <<"yo">>)),
?_assertEqual({error, bad_arg}, machi_file_proxy:append(Pid, [], -1, <<"krep">>)), ?_assertEqual({error, bad_arg}, machi_file_proxy:append(Pid, [], -1, <<"krep">>)),
?_assertMatch({ok, {_, []}}, machi_file_proxy:read(Pid, 1, 1)), ?_assertMatch({error, not_written}, machi_file_proxy:read(Pid, 1, 1)),
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1024, 1)), ?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1024, 1)),
?_assertMatch({ok, {_, []}}, machi_file_proxy:read(Pid, 1, 1024)), ?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, random_binary(0, 1024))),
?_assertMatch({ok, _}, machi_file_proxy:read(Pid, 1, 1024)),
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1024, ?HYOOGE)), ?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1024, ?HYOOGE)),
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, ?HYOOGE, 1)), ?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, ?HYOOGE, 1)),
{timeout, 10, {timeout, 10,
@ -114,7 +117,7 @@ multiple_chunks_read_test_() ->
{setup, {setup,
fun setup/0, fun setup/0,
fun teardown/1, fun teardown/1,
fun(Pid) -> fun({Pid, _}) ->
[ [
?_assertEqual(ok, machi_file_proxy:trim(Pid, 0, 1, false)), ?_assertEqual(ok, machi_file_proxy:trim(Pid, 0, 1, false)),
?_assertMatch({ok, {[], [{"test", 0, 1}]}}, ?_assertMatch({ok, {[], [{"test", 0, 1}]}},