[WIP] Unify LevelDB usage to single instance #57
14 changed files with 447 additions and 411 deletions
|
@ -21,7 +21,7 @@
|
||||||
%% @doc Now 4GiBytes, could be up to 64bit due to PB message limit of
|
%% @doc Now 4GiBytes, could be up to 64bit due to PB message limit of
|
||||||
%% chunk size
|
%% chunk size
|
||||||
-define(DEFAULT_MAX_FILE_SIZE, ((1 bsl 32) - 1)).
|
-define(DEFAULT_MAX_FILE_SIZE, ((1 bsl 32) - 1)).
|
||||||
-define(MINIMUM_OFFSET, 1024).
|
-define(MINIMUM_OFFSET, 0).
|
||||||
|
|
||||||
%% 0th draft of checksum typing with 1st byte.
|
%% 0th draft of checksum typing with 1st byte.
|
||||||
-define(CSUM_TAG_NONE, 0). % No csum provided by client
|
-define(CSUM_TAG_NONE, 0). % No csum provided by client
|
||||||
|
|
|
@ -1,20 +1,26 @@
|
||||||
-module(machi_csum_table).
|
-module(machi_csum_table).
|
||||||
|
|
||||||
|
%% @doc Object Database mapper that translates
|
||||||
|
%% (file, checksum, offset, size)|(trimmed-file) <-> LevelDB key and value
|
||||||
|
%% Keys and values are both encoded with sext.
|
||||||
|
|
||||||
-export([open/2,
|
-export([open/2,
|
||||||
find/3,
|
find/4,
|
||||||
write/6, write/4, trim/5,
|
write/7, write/5, trim/6,
|
||||||
find_leftneighbor/2, find_rightneighbor/2,
|
find_leftneighbor/3, find_rightneighbor/3,
|
||||||
all_trimmed/3, any_trimmed/3,
|
is_file_trimmed/2,
|
||||||
all_trimmed/2,
|
all_trimmed/4, any_trimmed/4,
|
||||||
calc_unwritten_bytes/1,
|
calc_unwritten_bytes/2,
|
||||||
split_checksum_list_blob_decode/1,
|
split_checksum_list_blob_decode/1,
|
||||||
all/1,
|
all/2, all_files/1,
|
||||||
close/1, delete/1,
|
close/1, maybe_trim_file/3,
|
||||||
foldl_chunks/3]).
|
foldl_file_chunks/4, foldl_chunks/3]).
|
||||||
|
|
||||||
-include("machi.hrl").
|
-include("machi.hrl").
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
|
-export([all/2]).
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
-endif.
|
-endif.
|
||||||
|
|
||||||
|
@ -40,12 +46,9 @@ open(CSumFilename, _Opts) ->
|
||||||
%% operating system's file cache, which is for
|
%% operating system's file cache, which is for
|
||||||
%% Machi's main read efficiency
|
%% Machi's main read efficiency
|
||||||
{total_leveldb_mem_percent, 10}],
|
{total_leveldb_mem_percent, 10}],
|
||||||
|
ok = filelib:ensure_dir(CSumFilename),
|
||||||
|
|
||||||
{ok, T} = eleveldb:open(CSumFilename, LevelDBOptions),
|
{ok, T} = eleveldb:open(CSumFilename, LevelDBOptions),
|
||||||
%% Dummy entry for reserved headers
|
|
||||||
ok = eleveldb:put(T,
|
|
||||||
sext:encode({0, ?MINIMUM_OFFSET}),
|
|
||||||
sext:encode(?CSUM_TAG_NONE_ATOM),
|
|
||||||
[{sync, true}]),
|
|
||||||
C0 = #machi_csum_table{
|
C0 = #machi_csum_table{
|
||||||
file=CSumFilename,
|
file=CSumFilename,
|
||||||
table=T},
|
table=T},
|
||||||
|
@ -55,61 +58,53 @@ open(CSumFilename, _Opts) ->
|
||||||
split_checksum_list_blob_decode(Bin) ->
|
split_checksum_list_blob_decode(Bin) ->
|
||||||
erlang:binary_to_term(Bin).
|
erlang:binary_to_term(Bin).
|
||||||
|
|
||||||
|
|
||||||
-define(has_overlap(LeftOffset, LeftSize, RightOffset, RightSize),
|
-define(has_overlap(LeftOffset, LeftSize, RightOffset, RightSize),
|
||||||
((LeftOffset - (RightOffset+RightSize)) * (LeftOffset+LeftSize - RightOffset) < 0)).
|
((LeftOffset - (RightOffset+RightSize)) * (LeftOffset+LeftSize - RightOffset) < 0)).
|
||||||
|
|
||||||
-spec find(table(), machi_dt:file_offset(), machi_dt:chunk_size())
|
-spec find(table(), binary(), machi_dt:file_offset(), machi_dt:chunk_size())
|
||||||
-> [chunk()].
|
-> [chunk()].
|
||||||
find(#machi_csum_table{table=T}, Offset, Size) ->
|
find(#machi_csum_table{table=T}, Filename, Offset, Size) when is_binary(Filename) ->
|
||||||
{ok, I} = eleveldb:iterator(T, [], keys_only),
|
EndKey = sext:encode({Filename, Offset+Size, 0}),
|
||||||
EndKey = sext:encode({Offset+Size, 0}),
|
|
||||||
StartKey = sext:encode({Offset, Size}),
|
|
||||||
|
|
||||||
{ok, FirstKey} = case eleveldb:iterator_move(I, StartKey) of
|
case search_for_start_key(T, Filename, Offset, Size) of
|
||||||
{error, invalid_iterator} ->
|
undefined -> [];
|
||||||
eleveldb:iterator_move(I, first);
|
FirstKey ->
|
||||||
{ok, _} = R0 ->
|
|
||||||
case eleveldb:iterator_move(I, prev) of
|
FoldFun = fun({K, V}, Acc) ->
|
||||||
{error, invalid_iterator} ->
|
{Filename, TargetOffset, TargetSize} = sext:decode(K),
|
||||||
R0;
|
case ?has_overlap(TargetOffset, TargetSize, Offset, Size) of
|
||||||
{ok, _} = R1 ->
|
true ->
|
||||||
R1
|
[{TargetOffset, TargetSize, sext:decode(V)}|Acc];
|
||||||
end
|
false ->
|
||||||
end,
|
Acc
|
||||||
_ = eleveldb:iterator_close(I),
|
end;
|
||||||
FoldFun = fun({K, V}, Acc) ->
|
(_K, Acc) ->
|
||||||
{TargetOffset, TargetSize} = sext:decode(K),
|
lager:error("~p wrong option", [_K]),
|
||||||
case ?has_overlap(TargetOffset, TargetSize, Offset, Size) of
|
|
||||||
true ->
|
|
||||||
[{TargetOffset, TargetSize, sext:decode(V)}|Acc];
|
|
||||||
false ->
|
|
||||||
Acc
|
Acc
|
||||||
end;
|
end,
|
||||||
(_K, Acc) ->
|
lists:reverse(eleveldb_fold(T, FirstKey, EndKey, FoldFun, []))
|
||||||
lager:error("~p wrong option", [_K]),
|
end.
|
||||||
Acc
|
|
||||||
end,
|
|
||||||
lists:reverse(eleveldb_fold(T, FirstKey, EndKey, FoldFun, [])).
|
|
||||||
|
|
||||||
|
|
||||||
%% @doc Updates all chunk info, by deleting existing entries if exists
|
%% @doc Updates all chunk info, by deleting existing entries if exists
|
||||||
%% and putting new chunk info
|
%% and putting new chunk info
|
||||||
-spec write(table(),
|
-spec write(table(), binary(),
|
||||||
machi_dt:file_offset(), machi_dt:chunk_size(),
|
machi_dt:file_offset(), machi_dt:chunk_size(),
|
||||||
machi_dt:chunk_csum()|'none'|'trimmed',
|
machi_dt:chunk_csum()|'none'|'trimmed',
|
||||||
undefined|chunk(), undefined|chunk()) ->
|
undefined|chunk(), undefined|chunk()) ->
|
||||||
ok | {error, term()}.
|
ok | {error, term()}.
|
||||||
write(#machi_csum_table{table=T} = CsumT, Offset, Size, CSum,
|
write(#machi_csum_table{table=T} = CsumT, Filename,
|
||||||
LeftUpdate, RightUpdate) ->
|
Offset, Size, CSum, LeftUpdate, RightUpdate) when is_binary(Filename) ->
|
||||||
|
FileEntry = {put, sext:encode({file, Filename}), sext:encode(existent)},
|
||||||
PutOps =
|
PutOps =
|
||||||
[{put,
|
[{put,
|
||||||
sext:encode({Offset, Size}),
|
sext:encode({Filename, Offset, Size}),
|
||||||
sext:encode(CSum)}]
|
sext:encode(CSum)},
|
||||||
|
FileEntry]
|
||||||
++ case LeftUpdate of
|
++ case LeftUpdate of
|
||||||
{LO, LS, LCsum} when LO + LS =:= Offset ->
|
{LO, LS, LCsum} when LO + LS =:= Offset ->
|
||||||
[{put,
|
[{put,
|
||||||
sext:encode({LO, LS}),
|
sext:encode({Filename, LO, LS}),
|
||||||
sext:encode(LCsum)}];
|
sext:encode(LCsum)}];
|
||||||
undefined ->
|
undefined ->
|
||||||
[]
|
[]
|
||||||
|
@ -117,58 +112,68 @@ write(#machi_csum_table{table=T} = CsumT, Offset, Size, CSum,
|
||||||
++ case RightUpdate of
|
++ case RightUpdate of
|
||||||
{RO, RS, RCsum} when RO =:= Offset + Size ->
|
{RO, RS, RCsum} when RO =:= Offset + Size ->
|
||||||
[{put,
|
[{put,
|
||||||
sext:encode({RO, RS}),
|
sext:encode({Filename, RO, RS}),
|
||||||
sext:encode(RCsum)}];
|
sext:encode(RCsum)}];
|
||||||
undefined ->
|
undefined ->
|
||||||
[]
|
[]
|
||||||
end,
|
end,
|
||||||
Chunks = find(CsumT, Offset, Size),
|
Chunks = find(CsumT, Filename, Offset, Size),
|
||||||
DeleteOps = lists:map(fun({O, L, _}) ->
|
DeleteOps = lists:map(fun({O, L, _}) ->
|
||||||
{delete, sext:encode({O, L})}
|
{delete, sext:encode({Filename, O, L})}
|
||||||
end, Chunks),
|
end, Chunks),
|
||||||
eleveldb:write(T, DeleteOps ++ PutOps, [{sync, true}]).
|
eleveldb:write(T, DeleteOps ++ PutOps, [{sync, true}]).
|
||||||
|
|
||||||
-spec find_leftneighbor(table(), non_neg_integer()) ->
|
-spec find_leftneighbor(table(), binary(), non_neg_integer()) ->
|
||||||
undefined | chunk().
|
undefined | chunk().
|
||||||
find_leftneighbor(CsumT, Offset) ->
|
find_leftneighbor(CsumT, Filename, Offset) when is_binary(Filename) ->
|
||||||
case find(CsumT, Offset, 1) of
|
case find(CsumT, Filename, Offset, 1) of
|
||||||
[] -> undefined;
|
[] -> undefined;
|
||||||
[{Offset, _, _}] -> undefined;
|
[{Offset, _, _}] -> undefined;
|
||||||
[{LOffset, _, CsumOrTrimmed}] -> {LOffset, Offset - LOffset, CsumOrTrimmed}
|
[{LOffset, _, CsumOrTrimmed}] -> {LOffset, Offset - LOffset, CsumOrTrimmed}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec find_rightneighbor(table(), non_neg_integer()) ->
|
-spec find_rightneighbor(table(), binary(), non_neg_integer()) ->
|
||||||
undefined | chunk().
|
undefined | chunk().
|
||||||
find_rightneighbor(CsumT, Offset) ->
|
find_rightneighbor(CsumT, Filename, Offset) when is_binary(Filename) ->
|
||||||
case find(CsumT, Offset, 1) of
|
case find(CsumT, Filename, Offset, 1) of
|
||||||
[] -> undefined;
|
[] -> undefined;
|
||||||
[{Offset, _, _}] -> undefined;
|
[{Offset, _, _}] -> undefined;
|
||||||
[{ROffset, RSize, CsumOrTrimmed}] ->
|
[{ROffset, RSize, CsumOrTrimmed}] ->
|
||||||
{Offset, ROffset + RSize - Offset, CsumOrTrimmed}
|
{Offset, ROffset + RSize - Offset, CsumOrTrimmed}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec write(table(), machi_dt:file_offset(), machi_dt:file_size(),
|
-spec write(table(), binary(), machi_dt:file_offset(), machi_dt:file_size(),
|
||||||
machi_dt:chunk_csum()|none|trimmed) ->
|
machi_dt:chunk_csum()|none|trimmed) ->
|
||||||
ok | {error, trimmed|file:posix()}.
|
ok | {error, trimmed|file:posix()}.
|
||||||
write(CsumT, Offset, Size, CSum) ->
|
write(CsumT, Filename, Offset, Size, CSum) ->
|
||||||
write(CsumT, Offset, Size, CSum, undefined, undefined).
|
write(CsumT, Filename, Offset, Size, CSum, undefined, undefined).
|
||||||
|
|
||||||
trim(CsumT, Offset, Size, LeftUpdate, RightUpdate) ->
|
trim(CsumT, Filename, Offset, Size, LeftUpdate, RightUpdate) ->
|
||||||
write(CsumT, Offset, Size,
|
write(CsumT, Filename, Offset, Size,
|
||||||
trimmed, %% Should this be much smaller like $t or just 't'
|
trimmed, %% Should this be much smaller like $t or just 't'
|
||||||
LeftUpdate, RightUpdate).
|
LeftUpdate, RightUpdate).
|
||||||
|
|
||||||
|
-spec is_file_trimmed(table(), binary()) -> boolean().
|
||||||
|
is_file_trimmed(#machi_csum_table{table=T}, Filename) when is_binary(Filename) ->
|
||||||
|
case eleveldb:get(T, sext:encode({file, Filename}), []) of
|
||||||
|
{ok, V} ->
|
||||||
|
(sext:decode(V) =:= ts);
|
||||||
|
_E ->
|
||||||
|
false
|
||||||
|
end.
|
||||||
|
|
||||||
%% @doc returns whether all bytes in a specific window is continously
|
%% @doc returns whether all bytes in a specific window is continously
|
||||||
%% trimmed or not
|
%% trimmed or not
|
||||||
-spec all_trimmed(table(), non_neg_integer(), non_neg_integer()) -> boolean().
|
-spec all_trimmed(table(), binary(), non_neg_integer(), non_neg_integer()) -> boolean().
|
||||||
all_trimmed(#machi_csum_table{table=T}, Left, Right) ->
|
all_trimmed(#machi_csum_table{table=T}, Filename, Left, Right) when is_binary(Filename) ->
|
||||||
FoldFun = fun({_, _}, false) ->
|
FoldFun = fun({K, V}, false) ->
|
||||||
false;
|
false;
|
||||||
({K, V}, Pos) when is_integer(Pos) andalso Pos =< Right ->
|
({K, V}, Pos) when is_integer(Pos) andalso Pos =< Right ->
|
||||||
case {sext:decode(K), sext:decode(V)} of
|
case {sext:decode(K), sext:decode(V)} of
|
||||||
{{Pos, Size}, trimmed} ->
|
{{file, _}, _} -> Pos;
|
||||||
|
{{Filename, Pos, Size}, trimmed} ->
|
||||||
Pos + Size;
|
Pos + Size;
|
||||||
{{Offset, Size}, _}
|
{{Filename, Offset, Size}, _}
|
||||||
when Offset + Size =< Left ->
|
when Offset + Size =< Left ->
|
||||||
Left;
|
Left;
|
||||||
_Eh ->
|
_Eh ->
|
||||||
|
@ -176,65 +181,108 @@ all_trimmed(#machi_csum_table{table=T}, Left, Right) ->
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
case eleveldb:fold(T, FoldFun, Left, [{verify_checksums, true}]) of
|
case eleveldb:fold(T, FoldFun, Left, [{verify_checksums, true}]) of
|
||||||
false -> false;
|
false ->
|
||||||
Right -> true;
|
false;
|
||||||
LastTrimmed when LastTrimmed < Right -> false;
|
Right ->
|
||||||
|
true;
|
||||||
|
LastTrimmed when LastTrimmed < Right ->
|
||||||
|
false;
|
||||||
_ -> %% LastTrimmed > Pos0, which is a irregular case but ok
|
_ -> %% LastTrimmed > Pos0, which is a irregular case but ok
|
||||||
true
|
true
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% @doc returns whether all bytes 0-Pos0 is continously trimmed or
|
-spec any_trimmed(table(), binary(),
|
||||||
%% not, including header.
|
|
||||||
-spec all_trimmed(table(), non_neg_integer()) -> boolean().
|
|
||||||
all_trimmed(CsumT, Pos0) ->
|
|
||||||
all_trimmed(CsumT, 0, Pos0).
|
|
||||||
|
|
||||||
-spec any_trimmed(table(),
|
|
||||||
pos_integer(),
|
pos_integer(),
|
||||||
machi_dt:chunk_size()) -> boolean().
|
machi_dt:chunk_size()) -> boolean().
|
||||||
any_trimmed(CsumT, Offset, Size) ->
|
any_trimmed(CsumT, Filename, Offset, Size) ->
|
||||||
Chunks = find(CsumT, Offset, Size),
|
Chunks = find(CsumT, Filename, Offset, Size),
|
||||||
lists:any(fun({_, _, State}) -> State =:= trimmed end, Chunks).
|
lists:any(fun({_, _, State}) -> State =:= trimmed end, Chunks).
|
||||||
|
|
||||||
-spec calc_unwritten_bytes(table()) -> [byte_sequence()].
|
-spec calc_unwritten_bytes(table(), binary()) -> [byte_sequence()].
|
||||||
calc_unwritten_bytes(#machi_csum_table{table=_} = CsumT) ->
|
calc_unwritten_bytes(#machi_csum_table{table=_} = CsumT, Filename) ->
|
||||||
case lists:sort(all(CsumT)) of
|
case lists:sort(all(CsumT, Filename)) of
|
||||||
[] ->
|
[] ->
|
||||||
[{?MINIMUM_OFFSET, infinity}];
|
[{0, infinity}];
|
||||||
Sorted ->
|
[{0, _, _}|_] = Sorted ->
|
||||||
{LastOffset, _, _} = hd(Sorted),
|
build_unwritten_bytes_list(Sorted, 0, []);
|
||||||
build_unwritten_bytes_list(Sorted, LastOffset, [])
|
[{LastOffset, _, _}|_] = Sorted ->
|
||||||
|
build_unwritten_bytes_list(Sorted, LastOffset, [{0, LastOffset}])
|
||||||
end.
|
end.
|
||||||
|
|
||||||
all(CsumT) ->
|
all(CsumT, Filename) ->
|
||||||
FoldFun = fun(E, Acc) -> [E|Acc] end,
|
FoldFun = fun(E, Acc) -> [E|Acc] end,
|
||||||
lists:reverse(foldl_chunks(FoldFun, [], CsumT)).
|
lists:reverse(foldl_file_chunks(FoldFun, [], CsumT, Filename)).
|
||||||
|
|
||||||
|
all_files(#machi_csum_table{table=T}) ->
|
||||||
|
FoldFun = fun({K, V}, Acc) ->
|
||||||
|
case sext:decode(K) of
|
||||||
|
{file, Filename} ->
|
||||||
|
[{binary_to_list(Filename), sext:decode(V)}|Acc];
|
||||||
|
_ ->
|
||||||
|
Acc
|
||||||
|
end;
|
||||||
|
(_, Acc) -> Acc
|
||||||
|
end,
|
||||||
|
eleveldb_fold(T, sext:encode({file, ""}), sext:encode({file, [255,255,255]}),
|
||||||
|
FoldFun, []).
|
||||||
|
|
||||||
-spec close(table()) -> ok.
|
-spec close(table()) -> ok.
|
||||||
close(#machi_csum_table{table=T}) ->
|
close(#machi_csum_table{table=T}) ->
|
||||||
ok = eleveldb:close(T).
|
ok = eleveldb:close(T).
|
||||||
|
|
||||||
-spec delete(table()) -> ok.
|
|
||||||
delete(#machi_csum_table{table=T, file=F}) ->
|
-spec maybe_trim_file(table(), binary(), non_neg_integer()) ->
|
||||||
catch eleveldb:close(T),
|
{ok, trimmed|not_trimmed} | {error, term()}.
|
||||||
%% TODO change this to directory walk
|
maybe_trim_file(#machi_csum_table{table=T} = CsumT, Filename, EofP) when is_binary(Filename) ->
|
||||||
case os:cmd("rm -rf " ++ F) of
|
%% TODO: optimize; this code runs fold on eleveldb twice.
|
||||||
"" -> ok;
|
case all_trimmed(CsumT, Filename, 0, EofP) of
|
||||||
E -> E
|
true ->
|
||||||
|
|
||||||
|
Chunks = all(CsumT, Filename),
|
||||||
|
DeleteOps = lists:map(fun({O, L, _}) ->
|
||||||
|
{delete, sext:encode({Filename, O, L})}
|
||||||
|
end, Chunks),
|
||||||
|
FileTombstone = {put, sext:encode({file, Filename}), sext:encode(ts)},
|
||||||
|
case eleveldb:write(T, [FileTombstone|DeleteOps], [{sync, true}]) of
|
||||||
|
ok -> {ok, trimmed};
|
||||||
|
Other -> Other
|
||||||
|
end;
|
||||||
|
false ->
|
||||||
|
{ok, not_trimmed}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec foldl_chunks(fun((chunk(), Acc0 :: term()) -> Acc :: term()),
|
%% @doc Folds over all chunks of a file
|
||||||
Acc0 :: term(), table()) -> Acc :: term().
|
-spec foldl_file_chunks(fun((chunk(), Acc0 :: term()) -> Acc :: term()),
|
||||||
foldl_chunks(Fun, Acc0, #machi_csum_table{table=T}) ->
|
Acc0 :: term(), table(), binary()) -> Acc :: term().
|
||||||
|
foldl_file_chunks(Fun, Acc0, #machi_csum_table{table=T}, Filename) when is_binary(Filename) ->
|
||||||
FoldFun = fun({K, V}, Acc) ->
|
FoldFun = fun({K, V}, Acc) ->
|
||||||
{Offset, Len} = sext:decode(K),
|
{Filename, Offset, Len} = sext:decode(K),
|
||||||
Fun({Offset, Len, sext:decode(V)}, Acc);
|
Fun({Offset, Len, sext:decode(V)}, Acc);
|
||||||
(_K, Acc) ->
|
(_K, Acc) ->
|
||||||
_ = lager:error("~p: wrong option?", [_K]),
|
_ = lager:error("~p: wrong option?", [_K]),
|
||||||
Acc
|
Acc
|
||||||
end,
|
end,
|
||||||
|
StartKey = {Filename, 0, 0},
|
||||||
|
EndKey = { <<Filename/binary, 255, 255, 255, 255, 255>>, 0, 0},
|
||||||
|
eleveldb_fold(T, sext:encode(StartKey), sext:encode(EndKey),
|
||||||
|
FoldFun, Acc0).
|
||||||
|
|
||||||
|
|
||||||
|
%% @doc Folds over all chunks of all files
|
||||||
|
-spec foldl_chunks(fun((chunk(), Acc0 :: term()) -> Acc :: term()),
|
||||||
|
Acc0 :: term(), table()) -> Acc :: term().
|
||||||
|
foldl_chunks(Fun, Acc0, #machi_csum_table{table=T}) ->
|
||||||
|
FoldFun = fun({K, V}, Acc) ->
|
||||||
|
{Filename, Offset, Len} = sext:decode(K),
|
||||||
|
Fun({Filename, Offset, Len, sext:decode(V)}, Acc);
|
||||||
|
(_K, Acc) ->
|
||||||
|
_ = lager:error("~p: wrong option?", [_K]),
|
||||||
|
Acc
|
||||||
|
end,
|
||||||
eleveldb:fold(T, FoldFun, Acc0, [{verify_checksums, true}]).
|
eleveldb:fold(T, FoldFun, Acc0, [{verify_checksums, true}]).
|
||||||
|
|
||||||
|
%% == internal functions ==
|
||||||
|
|
||||||
-spec build_unwritten_bytes_list( CsumData :: [{ Offset :: non_neg_integer(),
|
-spec build_unwritten_bytes_list( CsumData :: [{ Offset :: non_neg_integer(),
|
||||||
Size :: pos_integer(),
|
Size :: pos_integer(),
|
||||||
Checksum :: binary() }],
|
Checksum :: binary() }],
|
||||||
|
@ -298,3 +346,50 @@ eleveldb_do_fold({error, iterator_closed}, _, _, _, Acc) ->
|
||||||
eleveldb_do_fold({error, invalid_iterator}, _, _, _, Acc) ->
|
eleveldb_do_fold({error, invalid_iterator}, _, _, _, Acc) ->
|
||||||
%% Probably reached to end
|
%% Probably reached to end
|
||||||
Acc.
|
Acc.
|
||||||
|
|
||||||
|
|
||||||
|
%% Key1 < MaybeStartKey =< Key
|
||||||
|
%% FirstKey =< MaybeStartKey
|
||||||
|
search_for_start_key(T, Filename, Offset, Size) ->
|
||||||
|
MaybeStartKey = sext:encode({Filename, Offset, Size}),
|
||||||
|
FirstKey = sext:encode({Filename, 0, 0}),
|
||||||
|
{ok, I} = eleveldb:iterator(T, [], keys_only),
|
||||||
|
|
||||||
|
try
|
||||||
|
case eleveldb:iterator_move(I, MaybeStartKey) of
|
||||||
|
{error, invalid_iterator} ->
|
||||||
|
%% No key in right - go for probably first key in the file
|
||||||
|
case eleveldb:iterator_move(I, FirstKey) of
|
||||||
|
{error, _} -> undefined;
|
||||||
|
{ok, Key0} -> goto_end(I, Key0, Offset)
|
||||||
|
end;
|
||||||
|
{ok, Key} when Key < FirstKey ->
|
||||||
|
FirstKey;
|
||||||
|
{ok, Key} ->
|
||||||
|
case eleveldb:iterator_move(I, prev) of
|
||||||
|
{error, invalid_iterator} ->
|
||||||
|
Key;
|
||||||
|
{ok, Key1} when Key1 < FirstKey ->
|
||||||
|
Key;
|
||||||
|
{ok, Key1} ->
|
||||||
|
Key1
|
||||||
|
end
|
||||||
|
end
|
||||||
|
after
|
||||||
|
_ = eleveldb:iterator_close(I)
|
||||||
|
end.
|
||||||
|
|
||||||
|
goto_end(I, Key, Offset) ->
|
||||||
|
case sext:decode(Key) of
|
||||||
|
{_Filename, O, L} when Offset =< O + L ->
|
||||||
|
Key;
|
||||||
|
{_Filename, O, L} when O + L < Offset ->
|
||||||
|
case eleveldb:iterator_move(I, next) of
|
||||||
|
{ok, NextKey} ->
|
||||||
|
goto_end(I, NextKey, Offset);
|
||||||
|
{error, _} ->
|
||||||
|
Key
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -85,8 +85,6 @@
|
||||||
filename :: string() | undefined,
|
filename :: string() | undefined,
|
||||||
data_path :: string() | undefined,
|
data_path :: string() | undefined,
|
||||||
wedged = false :: boolean(),
|
wedged = false :: boolean(),
|
||||||
csum_file :: string()|undefined,
|
|
||||||
csum_path :: string()|undefined,
|
|
||||||
data_filehandle :: file:io_device(),
|
data_filehandle :: file:io_device(),
|
||||||
csum_table :: machi_csum_table:table(),
|
csum_table :: machi_csum_table:table(),
|
||||||
eof_position = 0 :: non_neg_integer(),
|
eof_position = 0 :: non_neg_integer(),
|
||||||
|
@ -102,12 +100,14 @@
|
||||||
|
|
||||||
%% Public API
|
%% Public API
|
||||||
|
|
||||||
% @doc Start a new instance of the file proxy service. Takes the filename
|
% @doc Start a new instance of the file proxy service. Takes the
|
||||||
% and data directory as arguments. This function is typically called by the
|
% filename and data directory as arguments. This function is typically
|
||||||
% `machi_file_proxy_sup:start_proxy/2' function.
|
% called by the `machi_file_proxy_sup:start_proxy/2'
|
||||||
-spec start_link(FluName :: atom(), Filename :: string(), DataDir :: string()) -> any().
|
% function. Checksum table is also passed at startup.
|
||||||
start_link(FluName, Filename, DataDir) ->
|
-spec start_link(Filename :: string(),
|
||||||
gen_server:start_link(?MODULE, {FluName, Filename, DataDir}, []).
|
DataDir :: string(), CsumTable :: machi_csum_table:table()) -> any().
|
||||||
|
start_link(Filename, DataDir, CsumTable) ->
|
||||||
|
gen_server:start_link(?MODULE, {Filename, DataDir, CsumTable}, []).
|
||||||
|
|
||||||
% @doc Request to stop an instance of the file proxy service.
|
% @doc Request to stop an instance of the file proxy service.
|
||||||
-spec stop(Pid :: pid()) -> ok.
|
-spec stop(Pid :: pid()) -> ok.
|
||||||
|
@ -218,26 +218,24 @@ checksum_list(Pid) ->
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
|
|
||||||
% @private
|
% @private
|
||||||
init({FluName, Filename, DataDir}) ->
|
init({Filename, DataDir, CsumTable}) ->
|
||||||
CsumFile = machi_util:make_checksum_filename(DataDir, Filename),
|
|
||||||
{_, DPath} = machi_util:make_data_filename(DataDir, Filename),
|
{_, DPath} = machi_util:make_data_filename(DataDir, Filename),
|
||||||
ok = filelib:ensure_dir(CsumFile),
|
|
||||||
ok = filelib:ensure_dir(DPath),
|
ok = filelib:ensure_dir(DPath),
|
||||||
{ok, CsumTable} = machi_csum_table:open(CsumFile, []),
|
CsumTable1 = case machi_csum_table:is_file_trimmed(CsumTable, list_to_binary(Filename)) of
|
||||||
UnwrittenBytes = machi_csum_table:calc_unwritten_bytes(CsumTable),
|
false -> CsumTable;
|
||||||
|
true -> trimmed
|
||||||
|
end,
|
||||||
|
|
||||||
|
UnwrittenBytes = machi_csum_table:calc_unwritten_bytes(CsumTable, iolist_to_binary(Filename)),
|
||||||
{Eof, infinity} = lists:last(UnwrittenBytes),
|
{Eof, infinity} = lists:last(UnwrittenBytes),
|
||||||
{ok, FHd} = file:open(DPath, [read, write, binary, raw]),
|
{ok, FHd} = file:open(DPath, [read, write, binary, raw]),
|
||||||
%% Reserve for EC and stuff, to prevent eof when read
|
|
||||||
ok = file:pwrite(FHd, 0, binary:copy(<<"so what?">>, ?MINIMUM_OFFSET div 8)),
|
|
||||||
Tref = schedule_tick(),
|
Tref = schedule_tick(),
|
||||||
St = #state{
|
St = #state{
|
||||||
fluname = FluName,
|
|
||||||
filename = Filename,
|
filename = Filename,
|
||||||
data_dir = DataDir,
|
data_dir = DataDir,
|
||||||
data_path = DPath,
|
data_path = DPath,
|
||||||
csum_file = CsumFile,
|
|
||||||
data_filehandle = FHd,
|
data_filehandle = FHd,
|
||||||
csum_table = CsumTable,
|
csum_table = CsumTable1,
|
||||||
tref = Tref,
|
tref = Tref,
|
||||||
eof_position = Eof,
|
eof_position = Eof,
|
||||||
max_file_size = machi_config:max_file_size()},
|
max_file_size = machi_config:max_file_size()},
|
||||||
|
@ -281,6 +279,13 @@ handle_call({read, _Offset, _Length, _}, _From,
|
||||||
}) ->
|
}) ->
|
||||||
{reply, {error, wedged}, State#state{writes = {T + 1, Err + 1}}};
|
{reply, {error, wedged}, State#state{writes = {T + 1, Err + 1}}};
|
||||||
|
|
||||||
|
handle_call({read, _Offset, _Length, _Opts}, _From,
|
||||||
|
State = #state{
|
||||||
|
csum_table = trimmed,
|
||||||
|
reads = {T, Err}
|
||||||
|
}) ->
|
||||||
|
{reply, {error, trimmed}, State#state{reads = {T+1, Err+1}}};
|
||||||
|
|
||||||
handle_call({read, Offset, Length, _Opts}, _From,
|
handle_call({read, Offset, Length, _Opts}, _From,
|
||||||
State = #state{eof_position = Eof,
|
State = #state{eof_position = Eof,
|
||||||
reads = {T, Err}
|
reads = {T, Err}
|
||||||
|
@ -325,6 +330,11 @@ handle_call({write, _Offset, _ClientMeta, _Data}, _From,
|
||||||
}) ->
|
}) ->
|
||||||
{reply, {error, wedged}, State#state{writes = {T + 1, Err + 1}}};
|
{reply, {error, wedged}, State#state{writes = {T + 1, Err + 1}}};
|
||||||
|
|
||||||
|
handle_call({write, _, _, _}, _From,
|
||||||
|
State = #state{writes = {T, Err},
|
||||||
|
csum_table = trimmed}) ->
|
||||||
|
{reply, {error, trimmed}, State#state{writes = {T + 1, Err + 1}}};
|
||||||
|
|
||||||
handle_call({write, Offset, ClientMeta, Data}, _From,
|
handle_call({write, Offset, ClientMeta, Data}, _From,
|
||||||
State = #state{filename = F,
|
State = #state{filename = F,
|
||||||
writes = {T, Err},
|
writes = {T, Err},
|
||||||
|
@ -348,7 +358,7 @@ handle_call({write, Offset, ClientMeta, Data}, _From,
|
||||||
{Error, Err + 1}
|
{Error, Err + 1}
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
{NewEof, infinity} = lists:last(machi_csum_table:calc_unwritten_bytes(CsumTable)),
|
{NewEof, infinity} = lists:last(machi_csum_table:calc_unwritten_bytes(CsumTable, iolist_to_binary(F))),
|
||||||
lager:debug("Wrote ~p bytes at ~p of file ~p, NewEOF = ~p~n",
|
lager:debug("Wrote ~p bytes at ~p of file ~p, NewEOF = ~p~n",
|
||||||
[iolist_size(Data), Offset, F, NewEof]),
|
[iolist_size(Data), Offset, F, NewEof]),
|
||||||
{reply, Resp, State#state{writes = {T+1, NewErr},
|
{reply, Resp, State#state{writes = {T+1, NewErr},
|
||||||
|
@ -365,35 +375,33 @@ handle_call({trim, _Offset, _ClientMeta, _Data}, _From,
|
||||||
|
|
||||||
handle_call({trim, Offset, Size, _TriggerGC}, _From,
|
handle_call({trim, Offset, Size, _TriggerGC}, _From,
|
||||||
State = #state{data_filehandle=FHd,
|
State = #state{data_filehandle=FHd,
|
||||||
|
filename=Filename,
|
||||||
ops = Ops,
|
ops = Ops,
|
||||||
trims = {T, Err},
|
trims = {T, Err},
|
||||||
csum_table = CsumTable}) ->
|
csum_table = CsumTable}) ->
|
||||||
|
|
||||||
case machi_csum_table:all_trimmed(CsumTable, Offset, Offset+Size) of
|
F = iolist_to_binary(Filename),
|
||||||
true ->
|
LUpdate = maybe_regenerate_checksum(
|
||||||
NewState = State#state{ops=Ops+1, trims={T, Err+1}},
|
FHd,
|
||||||
%% All bytes of that range was already trimmed returns ok
|
machi_csum_table:find_leftneighbor(CsumTable,
|
||||||
%% here, not {error, trimmed}, which means the whole file
|
F,
|
||||||
%% was trimmed
|
Offset)),
|
||||||
|
RUpdate = maybe_regenerate_checksum(
|
||||||
|
FHd,
|
||||||
|
machi_csum_table:find_rightneighbor(CsumTable,
|
||||||
|
F,
|
||||||
|
Offset+Size)),
|
||||||
|
|
||||||
|
case machi_csum_table:trim(CsumTable, F, Offset,
|
||||||
|
Size, LUpdate, RUpdate) of
|
||||||
|
ok ->
|
||||||
|
{NewEof, infinity} = lists:last(machi_csum_table:calc_unwritten_bytes(CsumTable, F)),
|
||||||
|
NewState = State#state{ops=Ops+1,
|
||||||
|
trims={T+1, Err},
|
||||||
|
eof_position=NewEof},
|
||||||
maybe_gc(ok, NewState);
|
maybe_gc(ok, NewState);
|
||||||
false ->
|
Error ->
|
||||||
LUpdate = maybe_regenerate_checksum(
|
{reply, Error, State#state{ops=Ops+1, trims={T, Err+1}}}
|
||||||
FHd,
|
|
||||||
machi_csum_table:find_leftneighbor(CsumTable, Offset)),
|
|
||||||
RUpdate = maybe_regenerate_checksum(
|
|
||||||
FHd,
|
|
||||||
machi_csum_table:find_rightneighbor(CsumTable, Offset+Size)),
|
|
||||||
|
|
||||||
case machi_csum_table:trim(CsumTable, Offset, Size, LUpdate, RUpdate) of
|
|
||||||
ok ->
|
|
||||||
{NewEof, infinity} = lists:last(machi_csum_table:calc_unwritten_bytes(CsumTable)),
|
|
||||||
NewState = State#state{ops=Ops+1,
|
|
||||||
trims={T+1, Err},
|
|
||||||
eof_position=NewEof},
|
|
||||||
maybe_gc(ok, NewState);
|
|
||||||
Error ->
|
|
||||||
{reply, Error, State#state{ops=Ops+1, trims={T, Err+1}}}
|
|
||||||
end
|
|
||||||
end;
|
end;
|
||||||
|
|
||||||
%% APPENDS
|
%% APPENDS
|
||||||
|
@ -435,8 +443,9 @@ handle_call({append, ClientMeta, Extra, Data}, _From,
|
||||||
{reply, Resp, State#state{appends = {T+1, NewErr},
|
{reply, Resp, State#state{appends = {T+1, NewErr},
|
||||||
eof_position = NewEof}};
|
eof_position = NewEof}};
|
||||||
|
|
||||||
handle_call({checksum_list}, _FRom, State = #state{csum_table=T}) ->
|
handle_call({checksum_list}, _FRom, State = #state{filename=Filename,
|
||||||
All = machi_csum_table:all(T),
|
csum_table=T}) ->
|
||||||
|
All = machi_csum_table:all(T,iolist_to_binary(Filename)),
|
||||||
{reply, {ok, All}, State};
|
{reply, {ok, All}, State};
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
|
@ -528,7 +537,6 @@ handle_info(Req, State) ->
|
||||||
% @private
|
% @private
|
||||||
terminate(Reason, #state{filename = F,
|
terminate(Reason, #state{filename = F,
|
||||||
data_filehandle = FHd,
|
data_filehandle = FHd,
|
||||||
csum_table = T,
|
|
||||||
reads = {RT, RE},
|
reads = {RT, RE},
|
||||||
writes = {WT, WE},
|
writes = {WT, WE},
|
||||||
appends = {AT, AE}
|
appends = {AT, AE}
|
||||||
|
@ -544,14 +552,7 @@ terminate(Reason, #state{filename = F,
|
||||||
_ ->
|
_ ->
|
||||||
ok = file:sync(FHd),
|
ok = file:sync(FHd),
|
||||||
ok = file:close(FHd)
|
ok = file:close(FHd)
|
||||||
end,
|
end.
|
||||||
case T of
|
|
||||||
undefined ->
|
|
||||||
noop; %% file deleted
|
|
||||||
_ ->
|
|
||||||
ok = machi_csum_table:close(T)
|
|
||||||
end,
|
|
||||||
ok.
|
|
||||||
|
|
||||||
% @private
|
% @private
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
|
@ -622,7 +623,8 @@ check_or_make_tagged_csum(OtherTag, _ClientCsum, _Data) ->
|
||||||
do_read(FHd, Filename, CsumTable, Offset, Size, _, _) ->
|
do_read(FHd, Filename, CsumTable, Offset, Size, _, _) ->
|
||||||
%% Note that find/3 only returns overlapping chunks, both borders
|
%% Note that find/3 only returns overlapping chunks, both borders
|
||||||
%% are not aligned to original Offset and Size.
|
%% are not aligned to original Offset and Size.
|
||||||
ChunkCsums = machi_csum_table:find(CsumTable, Offset, Size),
|
ChunkCsums = machi_csum_table:find(CsumTable, iolist_to_binary(Filename),
|
||||||
|
Offset, Size),
|
||||||
read_all_ranges(FHd, Filename, ChunkCsums, [], []).
|
read_all_ranges(FHd, Filename, ChunkCsums, [], []).
|
||||||
|
|
||||||
-spec read_all_ranges(file:io_device(), string(),
|
-spec read_all_ranges(file:io_device(), string(),
|
||||||
|
@ -700,7 +702,7 @@ read_all_ranges(FHd, Filename, [{Offset, Size, TaggedCsum}|T], ReadChunks, Trimm
|
||||||
handle_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Data) ->
|
handle_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Data) ->
|
||||||
Size = iolist_size(Data),
|
Size = iolist_size(Data),
|
||||||
|
|
||||||
case machi_csum_table:find(CsumTable, Offset, Size) of
|
case machi_csum_table:find(CsumTable, iolist_to_binary(Filename), Offset, Size) of
|
||||||
[] -> %% Nothing should be there
|
[] -> %% Nothing should be there
|
||||||
try
|
try
|
||||||
do_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Size, Data)
|
do_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Size, Data)
|
||||||
|
@ -723,6 +725,7 @@ handle_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Data) ->
|
||||||
ok;
|
ok;
|
||||||
{ok, _Other} ->
|
{ok, _Other} ->
|
||||||
%% TODO: leave some debug/warning message here?
|
%% TODO: leave some debug/warning message here?
|
||||||
|
io:format(user, "baposdifa;lsdfkj<<<<<<<~n", []),
|
||||||
{error, written}
|
{error, written}
|
||||||
end;
|
end;
|
||||||
[{Offset, Size, OtherCsum}] ->
|
[{Offset, Size, OtherCsum}] ->
|
||||||
|
@ -731,11 +734,13 @@ handle_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Data) ->
|
||||||
" a check for unwritten bytes gave us checksum ~p"
|
" a check for unwritten bytes gave us checksum ~p"
|
||||||
" but the data we were trying to write has checksum ~p",
|
" but the data we were trying to write has checksum ~p",
|
||||||
[Offset, Filename, OtherCsum, TaggedCsum]),
|
[Offset, Filename, OtherCsum, TaggedCsum]),
|
||||||
|
io:format(user, "baposdifa;lsdfkj*************8~n", []),
|
||||||
{error, written};
|
{error, written};
|
||||||
_Chunks ->
|
_Chunks ->
|
||||||
%% TODO: Do we try to read all continuous chunks to see
|
%% TODO: Do we try to read all continuous chunks to see
|
||||||
%% wether its total checksum matches client-provided checksum?
|
%% wether its total checksum matches client-provided checksum?
|
||||||
case machi_csum_table:any_trimmed(CsumTable, Offset, Size) of
|
case machi_csum_table:any_trimmed(CsumTable, iolist_to_binary(Filename),
|
||||||
|
Offset, Size) of
|
||||||
true ->
|
true ->
|
||||||
%% More than a byte is trimmed, besides, do we
|
%% More than a byte is trimmed, besides, do we
|
||||||
%% have to return exact written bytes? No. Clients
|
%% have to return exact written bytes? No. Clients
|
||||||
|
@ -744,6 +749,7 @@ handle_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Data) ->
|
||||||
{error, trimmed};
|
{error, trimmed};
|
||||||
false ->
|
false ->
|
||||||
%% No byte is trimmed, but at least one byte is written
|
%% No byte is trimmed, but at least one byte is written
|
||||||
|
io:format(user, "baposdifa;lsdfkj*************8 ~p~n", [_Chunks]),
|
||||||
{error, written}
|
{error, written}
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
@ -761,6 +767,7 @@ handle_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Data) ->
|
||||||
do_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Size, Data) ->
|
do_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Size, Data) ->
|
||||||
case file:pwrite(FHd, Offset, Data) of
|
case file:pwrite(FHd, Offset, Data) of
|
||||||
ok ->
|
ok ->
|
||||||
|
F = iolist_to_binary(Filename),
|
||||||
lager:debug("Successful write in file ~p at offset ~p, length ~p",
|
lager:debug("Successful write in file ~p at offset ~p, length ~p",
|
||||||
[Filename, Offset, Size]),
|
[Filename, Offset, Size]),
|
||||||
|
|
||||||
|
@ -769,11 +776,15 @@ do_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Size, Data) ->
|
||||||
%% as server_sha
|
%% as server_sha
|
||||||
LUpdate = maybe_regenerate_checksum(
|
LUpdate = maybe_regenerate_checksum(
|
||||||
FHd,
|
FHd,
|
||||||
machi_csum_table:find_leftneighbor(CsumTable, Offset)),
|
machi_csum_table:find_leftneighbor(CsumTable,
|
||||||
|
F,
|
||||||
|
Offset)),
|
||||||
RUpdate = maybe_regenerate_checksum(
|
RUpdate = maybe_regenerate_checksum(
|
||||||
FHd,
|
FHd,
|
||||||
machi_csum_table:find_rightneighbor(CsumTable, Offset+Size)),
|
machi_csum_table:find_rightneighbor(CsumTable,
|
||||||
ok = machi_csum_table:write(CsumTable, Offset, Size,
|
F,
|
||||||
|
Offset+Size)),
|
||||||
|
ok = machi_csum_table:write(CsumTable, F, Offset, Size,
|
||||||
TaggedCsum, LUpdate, RUpdate),
|
TaggedCsum, LUpdate, RUpdate),
|
||||||
lager:debug("Successful write to checksum file for ~p",
|
lager:debug("Successful write to checksum file for ~p",
|
||||||
[Filename]),
|
[Filename]),
|
||||||
|
@ -838,32 +849,27 @@ maybe_gc(Reply, S = #state{eof_position = Eof,
|
||||||
lager:debug("The file is still small; not trying GC (Eof, MaxFileSize) = (~p, ~p)~n",
|
lager:debug("The file is still small; not trying GC (Eof, MaxFileSize) = (~p, ~p)~n",
|
||||||
[Eof, MaxFileSize]),
|
[Eof, MaxFileSize]),
|
||||||
{reply, Reply, S};
|
{reply, Reply, S};
|
||||||
maybe_gc(Reply, S = #state{fluname=FluName,
|
maybe_gc(Reply, S = #state{data_filehandle = FHd,
|
||||||
data_filehandle = FHd,
|
|
||||||
data_dir = DataDir,
|
data_dir = DataDir,
|
||||||
filename = Filename,
|
filename = Filename,
|
||||||
eof_position = Eof,
|
eof_position = Eof,
|
||||||
csum_table=CsumTable}) ->
|
csum_table=CsumTable}) ->
|
||||||
case machi_csum_table:all_trimmed(CsumTable, ?MINIMUM_OFFSET, Eof) of
|
lager:debug("GC? Let's try it: ~p.~n", [Filename]),
|
||||||
true ->
|
|
||||||
lager:debug("GC? Let's do it: ~p.~n", [Filename]),
|
|
||||||
%% Before unlinking a file, it should inform
|
|
||||||
%% machi_flu_filename_mgr that this file is
|
|
||||||
%% deleted and mark it as "trimmed" to avoid
|
|
||||||
%% filename reuse and resurrection. Maybe garbage
|
|
||||||
%% will remain if a process crashed but it also
|
|
||||||
%% should be recovered at filename_mgr startup.
|
|
||||||
|
|
||||||
%% Also, this should be informed *before* file proxy
|
case machi_csum_table:maybe_trim_file(CsumTable, iolist_to_binary(Filename), Eof) of
|
||||||
%% deletes files.
|
{ok, trimmed} ->
|
||||||
ok = machi_flu_metadata_mgr:trim_file(FluName, {file, Filename}),
|
%% Checksum table entries are all trimmed now, unlinking
|
||||||
|
%% file from operating system
|
||||||
ok = file:close(FHd),
|
ok = file:close(FHd),
|
||||||
{_, DPath} = machi_util:make_data_filename(DataDir, Filename),
|
{_, DPath} = machi_util:make_data_filename(DataDir, Filename),
|
||||||
ok = file:delete(DPath),
|
ok = file:delete(DPath),
|
||||||
machi_csum_table:delete(CsumTable),
|
lager:info("File ~s has been unlinked as all chunks"
|
||||||
{stop, normal, Reply,
|
" were trimmed.", [Filename]),
|
||||||
S#state{data_filehandle=undefined,
|
{stop, normal, Reply, S#state{data_filehandle=undefined}};
|
||||||
csum_table=undefined}};
|
{ok, not_trimmed} ->
|
||||||
false ->
|
{reply, Reply, S};
|
||||||
|
{error, _} = Error ->
|
||||||
|
lager:error("machi_csum_table:maybe_trim_file/4 has been "
|
||||||
|
"unexpectedly failed: ~p", [Error]),
|
||||||
{reply, Reply, S}
|
{reply, Reply, S}
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -44,8 +44,9 @@ start_link(FluName) ->
|
||||||
supervisor:start_link({local, make_proxy_name(FluName)}, ?MODULE, []).
|
supervisor:start_link({local, make_proxy_name(FluName)}, ?MODULE, []).
|
||||||
|
|
||||||
start_proxy(FluName, DataDir, Filename) ->
|
start_proxy(FluName, DataDir, Filename) ->
|
||||||
|
{ok, CsumTable} = machi_flu_filename_mgr:get_csum_table(FluName),
|
||||||
supervisor:start_child(make_proxy_name(FluName),
|
supervisor:start_child(make_proxy_name(FluName),
|
||||||
[FluName, Filename, DataDir]).
|
[Filename, DataDir, CsumTable]).
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
SupFlags = {simple_one_for_one, 1000, 10},
|
SupFlags = {simple_one_for_one, 1000, 10},
|
||||||
|
|
|
@ -392,17 +392,14 @@ do_server_write_chunk(File, Offset, Chunk, CSum_tag, CSum, #state{flu_name=FluNa
|
||||||
do_server_read_chunk(File, Offset, Size, Opts, #state{flu_name=FluName})->
|
do_server_read_chunk(File, Offset, Size, Opts, #state{flu_name=FluName})->
|
||||||
case sanitize_file_string(File) of
|
case sanitize_file_string(File) of
|
||||||
ok ->
|
ok ->
|
||||||
case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}) of
|
{ok, Pid} = machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}),
|
||||||
{ok, Pid} ->
|
case machi_file_proxy:read(Pid, Offset, Size, Opts) of
|
||||||
case machi_file_proxy:read(Pid, Offset, Size, Opts) of
|
%% XXX FIXME
|
||||||
%% XXX FIXME
|
%% For now we are omiting the checksum data because it blows up
|
||||||
%% For now we are omiting the checksum data because it blows up
|
%% protobufs.
|
||||||
%% protobufs.
|
{ok, ChunksAndTrimmed} -> {ok, ChunksAndTrimmed};
|
||||||
{ok, ChunksAndTrimmed} -> {ok, ChunksAndTrimmed};
|
Other ->
|
||||||
Other -> Other
|
Other
|
||||||
end;
|
|
||||||
{error, trimmed} = Error ->
|
|
||||||
Error
|
|
||||||
end;
|
end;
|
||||||
_ ->
|
_ ->
|
||||||
{error, bad_arg}
|
{error, bad_arg}
|
||||||
|
|
|
@ -48,13 +48,12 @@
|
||||||
-compile(export_all).
|
-compile(export_all).
|
||||||
-endif.
|
-endif.
|
||||||
|
|
||||||
-export([
|
-export([child_spec/2,
|
||||||
child_spec/2,
|
start_link/2,
|
||||||
start_link/2,
|
find_or_make_filename_from_prefix/4,
|
||||||
find_or_make_filename_from_prefix/4,
|
increment_prefix_sequence/3,
|
||||||
increment_prefix_sequence/3,
|
list_files_by_prefix/2,
|
||||||
list_files_by_prefix/2
|
get_csum_table/1]).
|
||||||
]).
|
|
||||||
|
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
-export([
|
-export([
|
||||||
|
@ -72,7 +71,8 @@
|
||||||
-record(state, {fluname :: atom(),
|
-record(state, {fluname :: atom(),
|
||||||
tid :: ets:tid(),
|
tid :: ets:tid(),
|
||||||
datadir :: string(),
|
datadir :: string(),
|
||||||
epoch :: pv1_epoch()
|
epoch :: pv1_epoch(),
|
||||||
|
csum_table :: machi_csum_table:table()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
%% public API
|
%% public API
|
||||||
|
@ -126,13 +126,25 @@ list_files_by_prefix(_FluName, Other) ->
|
||||||
lager:error("~p is not a valid prefix.", [Other]),
|
lager:error("~p is not a valid prefix.", [Other]),
|
||||||
error(badarg).
|
error(badarg).
|
||||||
|
|
||||||
|
get_csum_table(FluName) when is_atom(FluName) ->
|
||||||
|
gen_server:call(make_filename_mgr_name(FluName), get_csum_table, ?TIMEOUT).
|
||||||
|
|
||||||
%% gen_server API
|
%% gen_server API
|
||||||
init([FluName, DataDir]) ->
|
init([FluName, DataDir]) ->
|
||||||
Tid = ets:new(make_filename_mgr_name(FluName), [named_table, {read_concurrency, true}]),
|
Tid = ets:new(make_filename_mgr_name(FluName), [named_table, {read_concurrency, true}]),
|
||||||
|
|
||||||
|
%% metadata includes checksums, offsets and filenames
|
||||||
|
CsumTableDir = filename:join(DataDir, "metadata"),
|
||||||
|
{ok, CsumTable} = machi_csum_table:open(CsumTableDir, []),
|
||||||
|
%% TODO make sure all files non-existent, if any remaining files
|
||||||
|
%% here, just delete it. They're in the list *because* they're all
|
||||||
|
%% trimmed.
|
||||||
|
|
||||||
{ok, #state{fluname = FluName,
|
{ok, #state{fluname = FluName,
|
||||||
epoch = ?DUMMY_PV1_EPOCH,
|
epoch = ?DUMMY_PV1_EPOCH,
|
||||||
datadir = DataDir,
|
datadir = DataDir,
|
||||||
tid = Tid}}.
|
tid = Tid,
|
||||||
|
csum_table = CsumTable}}.
|
||||||
|
|
||||||
handle_cast(Req, State) ->
|
handle_cast(Req, State) ->
|
||||||
lager:warning("Got unknown cast ~p", [Req]),
|
lager:warning("Got unknown cast ~p", [Req]),
|
||||||
|
@ -167,6 +179,9 @@ handle_call({list_files, Prefix}, From, S = #state{ datadir = DataDir }) ->
|
||||||
end),
|
end),
|
||||||
{noreply, S};
|
{noreply, S};
|
||||||
|
|
||||||
|
handle_call(get_csum_table, _From, S = #state{ csum_table = CsumTable }) ->
|
||||||
|
{reply, {ok, CsumTable}, S};
|
||||||
|
|
||||||
handle_call(Req, From, State) ->
|
handle_call(Req, From, State) ->
|
||||||
lager:warning("Got unknown call ~p from ~p", [Req, From]),
|
lager:warning("Got unknown call ~p from ~p", [Req, From]),
|
||||||
{reply, hoge, State}.
|
{reply, hoge, State}.
|
||||||
|
@ -175,8 +190,9 @@ handle_info(Info, State) ->
|
||||||
lager:warning("Got unknown info ~p", [Info]),
|
lager:warning("Got unknown info ~p", [Info]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
terminate(Reason, _State) ->
|
terminate(Reason, _State = #state{ csum_table = CsumTable} ) ->
|
||||||
lager:info("Shutting down because ~p", [Reason]),
|
lager:info("Shutting down because ~p", [Reason]),
|
||||||
|
ok = machi_csum_table:close(CsumTable),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
|
|
|
@ -39,13 +39,10 @@
|
||||||
-define(HASH(X), erlang:phash2(X)). %% hash algorithm to use
|
-define(HASH(X), erlang:phash2(X)). %% hash algorithm to use
|
||||||
-define(TIMEOUT, 10 * 1000). %% 10 second timeout
|
-define(TIMEOUT, 10 * 1000). %% 10 second timeout
|
||||||
|
|
||||||
-define(KNOWN_FILES_LIST_PREFIX, "known_files_").
|
|
||||||
|
|
||||||
-record(state, {fluname :: atom(),
|
-record(state, {fluname :: atom(),
|
||||||
datadir :: string(),
|
datadir :: string(),
|
||||||
tid :: ets:tid(),
|
tid :: ets:tid(),
|
||||||
cnt :: non_neg_integer(),
|
cnt :: non_neg_integer()
|
||||||
trimmed_files :: machi_plist:plist()
|
|
||||||
}).
|
}).
|
||||||
|
|
||||||
%% This record goes in the ets table where filename is the key
|
%% This record goes in the ets table where filename is the key
|
||||||
|
@ -62,8 +59,7 @@
|
||||||
lookup_proxy_pid/2,
|
lookup_proxy_pid/2,
|
||||||
start_proxy_pid/2,
|
start_proxy_pid/2,
|
||||||
stop_proxy_pid/2,
|
stop_proxy_pid/2,
|
||||||
build_metadata_mgr_name/2,
|
build_metadata_mgr_name/2
|
||||||
trim_file/2
|
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
|
@ -101,24 +97,15 @@ start_proxy_pid(FluName, {file, Filename}) ->
|
||||||
stop_proxy_pid(FluName, {file, Filename}) ->
|
stop_proxy_pid(FluName, {file, Filename}) ->
|
||||||
gen_server:call(get_manager_atom(FluName, Filename), {stop_proxy_pid, Filename}, ?TIMEOUT).
|
gen_server:call(get_manager_atom(FluName, Filename), {stop_proxy_pid, Filename}, ?TIMEOUT).
|
||||||
|
|
||||||
trim_file(FluName, {file, Filename}) ->
|
|
||||||
gen_server:call(get_manager_atom(FluName, Filename), {trim_file, Filename}, ?TIMEOUT).
|
|
||||||
|
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
init([FluName, Name, DataDir, Num]) ->
|
init([FluName, Name, DataDir, Num]) ->
|
||||||
%% important: we'll need another persistent storage to
|
%% important: we'll need another persistent storage to
|
||||||
%% remember deleted (trimmed) file, to prevent resurrection after
|
%% remember deleted (trimmed) file, to prevent resurrection after
|
||||||
%% flu restart and append.
|
%% flu restart and append.
|
||||||
FileListFileName =
|
|
||||||
filename:join([DataDir, ?KNOWN_FILES_LIST_PREFIX ++ atom_to_list(FluName)]),
|
|
||||||
{ok, PList} = machi_plist:open(FileListFileName, []),
|
|
||||||
%% TODO make sure all files non-existent, if any remaining files
|
|
||||||
%% here, just delete it. They're in the list *because* they're all
|
|
||||||
%% trimmed.
|
|
||||||
|
|
||||||
Tid = ets:new(Name, [{keypos, 2}, {read_concurrency, true}, {write_concurrency, true}]),
|
Tid = ets:new(Name, [{keypos, 2}, {read_concurrency, true}, {write_concurrency, true}]),
|
||||||
{ok, #state{fluname = FluName, datadir = DataDir, tid = Tid, cnt = Num,
|
|
||||||
trimmed_files=PList}}.
|
{ok, #state{fluname = FluName, datadir = DataDir, tid = Tid, cnt = Num}}.
|
||||||
|
|
||||||
handle_cast(Req, State) ->
|
handle_cast(Req, State) ->
|
||||||
lager:warning("Got unknown cast ~p", [Req]),
|
lager:warning("Got unknown cast ~p", [Req]),
|
||||||
|
@ -132,23 +119,17 @@ handle_call({proxy_pid, Filename}, _From, State = #state{ tid = Tid }) ->
|
||||||
{reply, Reply, State};
|
{reply, Reply, State};
|
||||||
|
|
||||||
handle_call({start_proxy_pid, Filename}, _From,
|
handle_call({start_proxy_pid, Filename}, _From,
|
||||||
State = #state{ fluname = N, tid = Tid, datadir = D,
|
State = #state{ fluname = N, tid = Tid, datadir = D}) ->
|
||||||
trimmed_files=TrimmedFiles}) ->
|
NewR = case lookup_md(Tid, Filename) of
|
||||||
case machi_plist:find(TrimmedFiles, Filename) of
|
not_found ->
|
||||||
false ->
|
start_file_proxy(N, D, Filename);
|
||||||
NewR = case lookup_md(Tid, Filename) of
|
#md{ proxy_pid = undefined } = R0 ->
|
||||||
not_found ->
|
start_file_proxy(N, D, R0);
|
||||||
start_file_proxy(N, D, Filename);
|
#md{ proxy_pid = _Pid } = R1 ->
|
||||||
#md{ proxy_pid = undefined } = R0 ->
|
R1
|
||||||
start_file_proxy(N, D, R0);
|
end,
|
||||||
#md{ proxy_pid = _Pid } = R1 ->
|
update_ets(Tid, NewR),
|
||||||
R1
|
{reply, {ok, NewR#md.proxy_pid}, State};
|
||||||
end,
|
|
||||||
update_ets(Tid, NewR),
|
|
||||||
{reply, {ok, NewR#md.proxy_pid}, State};
|
|
||||||
true ->
|
|
||||||
{reply, {error, trimmed}, State}
|
|
||||||
end;
|
|
||||||
|
|
||||||
handle_call({stop_proxy_pid, Filename}, _From, State = #state{ tid = Tid }) ->
|
handle_call({stop_proxy_pid, Filename}, _From, State = #state{ tid = Tid }) ->
|
||||||
case lookup_md(Tid, Filename) of
|
case lookup_md(Tid, Filename) of
|
||||||
|
@ -163,15 +144,6 @@ handle_call({stop_proxy_pid, Filename}, _From, State = #state{ tid = Tid }) ->
|
||||||
end,
|
end,
|
||||||
{reply, ok, State};
|
{reply, ok, State};
|
||||||
|
|
||||||
handle_call({trim_file, Filename}, _,
|
|
||||||
S = #state{trimmed_files = TrimmedFiles }) ->
|
|
||||||
case machi_plist:add(TrimmedFiles, Filename) of
|
|
||||||
{ok, TrimmedFiles2} ->
|
|
||||||
{reply, ok, S#state{trimmed_files=TrimmedFiles2}};
|
|
||||||
Error ->
|
|
||||||
{reply, Error, S}
|
|
||||||
end;
|
|
||||||
|
|
||||||
handle_call(Req, From, State) ->
|
handle_call(Req, From, State) ->
|
||||||
lager:warning("Got unknown call ~p from ~p", [Req, From]),
|
lager:warning("Got unknown call ~p from ~p", [Req, From]),
|
||||||
{reply, hoge, State}.
|
{reply, hoge, State}.
|
||||||
|
@ -219,9 +191,8 @@ handle_info(Info, State) ->
|
||||||
lager:warning("Got unknown info ~p", [Info]),
|
lager:warning("Got unknown info ~p", [Info]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
terminate(Reason, _State = #state{trimmed_files=TrimmedFiles}) ->
|
terminate(Reason, _State) ->
|
||||||
lager:info("Shutting down because ~p", [Reason]),
|
lager:info("Shutting down because ~p", [Reason]),
|
||||||
machi_plist:close(TrimmedFiles),
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
|
@ -253,7 +224,7 @@ lookup_md(Tid, Data) ->
|
||||||
[R] -> R
|
[R] -> R
|
||||||
end.
|
end.
|
||||||
|
|
||||||
start_file_proxy(FluName, D, R = #md{filename = F} ) ->
|
start_file_proxy(FluName, D, R = #md{filename = F}) ->
|
||||||
{ok, Pid} = machi_file_proxy_sup:start_proxy(FluName, D, F),
|
{ok, Pid} = machi_file_proxy_sup:start_proxy(FluName, D, F),
|
||||||
Mref = monitor(process, Pid),
|
Mref = monitor(process, Pid),
|
||||||
R#md{ proxy_pid = Pid, mref = Mref };
|
R#md{ proxy_pid = Pid, mref = Mref };
|
||||||
|
|
|
@ -1,69 +0,0 @@
|
||||||
-module(machi_plist).
|
|
||||||
|
|
||||||
%%% @doc persistent list of binaries
|
|
||||||
|
|
||||||
-export([open/2, close/1, find/2, add/2]).
|
|
||||||
|
|
||||||
-ifdef(TEST).
|
|
||||||
-export([all/1]).
|
|
||||||
-endif.
|
|
||||||
|
|
||||||
-record(machi_plist,
|
|
||||||
{filename :: file:filename_all(),
|
|
||||||
fd :: file:io_device(),
|
|
||||||
list = [] :: list(string)}).
|
|
||||||
|
|
||||||
-type plist() :: #machi_plist{}.
|
|
||||||
-export_type([plist/0]).
|
|
||||||
|
|
||||||
-spec open(file:filename_all(), proplists:proplist()) ->
|
|
||||||
{ok, plist()} | {error, file:posix()}.
|
|
||||||
open(Filename, _Opt) ->
|
|
||||||
%% TODO: This decode could fail if the file didn't finish writing
|
|
||||||
%% whole contents, which should be fixed by some persistent
|
|
||||||
%% solution.
|
|
||||||
List = case file:read_file(Filename) of
|
|
||||||
{ok, <<>>} -> [];
|
|
||||||
{ok, Bin} -> binary_to_term(Bin);
|
|
||||||
{error, enoent} -> []
|
|
||||||
end,
|
|
||||||
case file:open(Filename, [read, write, raw, binary, sync]) of
|
|
||||||
{ok, Fd} ->
|
|
||||||
{ok, #machi_plist{filename=Filename,
|
|
||||||
fd=Fd,
|
|
||||||
list=List}};
|
|
||||||
Error ->
|
|
||||||
Error
|
|
||||||
end.
|
|
||||||
|
|
||||||
-spec close(plist()) -> ok.
|
|
||||||
close(#machi_plist{fd=Fd}) ->
|
|
||||||
_ = file:close(Fd).
|
|
||||||
|
|
||||||
-spec find(plist(), string()) -> boolean().
|
|
||||||
find(#machi_plist{list=List}, Name) ->
|
|
||||||
lists:member(Name, List).
|
|
||||||
|
|
||||||
-spec add(plist(), string()) -> {ok, plist()} | {error, file:posix()}.
|
|
||||||
add(Plist = #machi_plist{list=List0, fd=Fd}, Name) ->
|
|
||||||
case find(Plist, Name) of
|
|
||||||
true ->
|
|
||||||
{ok, Plist};
|
|
||||||
false ->
|
|
||||||
List = lists:append(List0, [Name]),
|
|
||||||
%% TODO: partial write could break the file with other
|
|
||||||
%% persistent info (even lose data of trimmed states);
|
|
||||||
%% needs a solution.
|
|
||||||
case file:pwrite(Fd, 0, term_to_binary(List)) of
|
|
||||||
ok ->
|
|
||||||
{ok, Plist#machi_plist{list=List}};
|
|
||||||
Error ->
|
|
||||||
Error
|
|
||||||
end
|
|
||||||
end.
|
|
||||||
|
|
||||||
-ifdef(TEST).
|
|
||||||
-spec all(plist()) -> [file:filename()].
|
|
||||||
all(#machi_plist{list=List}) ->
|
|
||||||
List.
|
|
||||||
-endif.
|
|
|
@ -2,68 +2,69 @@
|
||||||
-compile(export_all).
|
-compile(export_all).
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
-define(HDR, {0, 1024, none}).
|
|
||||||
|
|
||||||
cleanup(Dir) ->
|
cleanup(Dir) ->
|
||||||
os:cmd("rm -rf " ++ Dir).
|
os:cmd("rm -rf " ++ Dir).
|
||||||
|
|
||||||
smoke_test() ->
|
smoke_test() ->
|
||||||
Filename = "./temp-checksum-dumb-file",
|
DBFile = "./temp-checksum-dumb-file",
|
||||||
_ = cleanup(Filename),
|
Filename = <<"/some/puppy/and/cats^^^42">>,
|
||||||
{ok, MC} = machi_csum_table:open(Filename, []),
|
_ = cleanup(DBFile),
|
||||||
?assertEqual([{1024, infinity}],
|
{ok, MC} = machi_csum_table:open(DBFile, []),
|
||||||
machi_csum_table:calc_unwritten_bytes(MC)),
|
?assertEqual([{0, infinity}],
|
||||||
|
machi_csum_table:calc_unwritten_bytes(MC, Filename)),
|
||||||
Entry = {Offset, Size, Checksum} = {1064, 34, <<"deadbeef">>},
|
Entry = {Offset, Size, Checksum} = {1064, 34, <<"deadbeef">>},
|
||||||
[] = machi_csum_table:find(MC, Offset, Size),
|
[] = machi_csum_table:find(MC, Filename, Offset, Size),
|
||||||
ok = machi_csum_table:write(MC, Offset, Size, Checksum),
|
ok = machi_csum_table:write(MC, Filename, Offset, Size, Checksum),
|
||||||
[{1024, 40}, {1098, infinity}] = machi_csum_table:calc_unwritten_bytes(MC),
|
[{0, 1064}, {1098, infinity}] = machi_csum_table:calc_unwritten_bytes(MC, Filename),
|
||||||
?assertEqual([Entry], machi_csum_table:find(MC, Offset, Size)),
|
?assertEqual([Entry], machi_csum_table:find(MC, Filename, Offset, Size)),
|
||||||
ok = machi_csum_table:trim(MC, Offset, Size, undefined, undefined),
|
ok = machi_csum_table:trim(MC, Filename, Offset, Size, undefined, undefined),
|
||||||
?assertEqual([{Offset, Size, trimmed}],
|
?assertEqual([{Offset, Size, trimmed}],
|
||||||
machi_csum_table:find(MC, Offset, Size)),
|
machi_csum_table:find(MC, Filename, Offset, Size)),
|
||||||
ok = machi_csum_table:close(MC),
|
ok = machi_csum_table:close(MC).
|
||||||
ok = machi_csum_table:delete(MC).
|
|
||||||
|
|
||||||
close_test() ->
|
close_test() ->
|
||||||
Filename = "./temp-checksum-dumb-file-2",
|
DBFile = "./temp-checksum-dumb-file-2",
|
||||||
_ = cleanup(Filename),
|
Filename = <<"/some/puppy/and/cats^^^43">>,
|
||||||
{ok, MC} = machi_csum_table:open(Filename, []),
|
_ = cleanup(DBFile),
|
||||||
|
{ok, MC} = machi_csum_table:open(DBFile, []),
|
||||||
Entry = {Offset, Size, Checksum} = {1064, 34, <<"deadbeef">>},
|
Entry = {Offset, Size, Checksum} = {1064, 34, <<"deadbeef">>},
|
||||||
[] = machi_csum_table:find(MC, Offset, Size),
|
[] = machi_csum_table:find(MC, Filename, Offset, Size),
|
||||||
ok = machi_csum_table:write(MC, Offset, Size, Checksum),
|
ok = machi_csum_table:write(MC, Filename, Offset, Size, Checksum),
|
||||||
[Entry] = machi_csum_table:find(MC, Offset, Size),
|
[Entry] = machi_csum_table:find(MC, Filename, Offset, Size),
|
||||||
ok = machi_csum_table:close(MC),
|
ok = machi_csum_table:close(MC),
|
||||||
|
|
||||||
{ok, MC2} = machi_csum_table:open(Filename, []),
|
{ok, MC2} = machi_csum_table:open(DBFile, []),
|
||||||
[Entry] = machi_csum_table:find(MC2, Offset, Size),
|
[Entry] = machi_csum_table:find(MC2, Filename, Offset, Size),
|
||||||
ok = machi_csum_table:trim(MC2, Offset, Size, undefined, undefined),
|
ok = machi_csum_table:trim(MC2, Filename, Offset, Size, undefined, undefined),
|
||||||
[{Offset, Size, trimmed}] = machi_csum_table:find(MC2, Offset, Size),
|
[{Offset, Size, trimmed}] = machi_csum_table:find(MC2, Filename, Offset, Size),
|
||||||
ok = machi_csum_table:delete(MC2).
|
ok = machi_csum_table:close(MC2).
|
||||||
|
|
||||||
smoke2_test() ->
|
smoke2_test() ->
|
||||||
Filename = "./temp-checksum-dumb-file-3",
|
DBFile = "./temp-checksum-dumb-file-3",
|
||||||
_ = cleanup(Filename),
|
Filename = <<"/some/puppy/and/cats^^^43">>,
|
||||||
{ok, MC} = machi_csum_table:open(Filename, []),
|
_ = cleanup(DBFile),
|
||||||
|
{ok, MC} = machi_csum_table:open(DBFile, []),
|
||||||
Entry = {Offset, Size, Checksum} = {1025, 10, <<"deadbeef">>},
|
Entry = {Offset, Size, Checksum} = {1025, 10, <<"deadbeef">>},
|
||||||
ok = machi_csum_table:write(MC, Offset, Size, Checksum),
|
ok = machi_csum_table:write(MC, Filename, Offset, Size, Checksum),
|
||||||
?assertEqual([], machi_csum_table:find(MC, 0, 0)),
|
?assertEqual([], machi_csum_table:find(MC, Filename, 0, 0)),
|
||||||
?assertEqual([?HDR], machi_csum_table:find(MC, 0, 1)),
|
?assertEqual([], machi_csum_table:find(MC, Filename, 0, 1)),
|
||||||
[Entry] = machi_csum_table:find(MC, Offset, Size),
|
[Entry] = machi_csum_table:find(MC, Filename, Offset, Size),
|
||||||
[?HDR] = machi_csum_table:find(MC, 1, 1024),
|
[] = machi_csum_table:find(MC, Filename, 1, 1024),
|
||||||
?assertEqual([?HDR, Entry],
|
?assertEqual([Entry],
|
||||||
machi_csum_table:find(MC, 1023, 1024)),
|
machi_csum_table:find(MC, Filename, 1023, 1024)),
|
||||||
[Entry] = machi_csum_table:find(MC, 1024, 1024),
|
[Entry] = machi_csum_table:find(MC, Filename, 1024, 1024),
|
||||||
[Entry] = machi_csum_table:find(MC, 1025, 1024),
|
[Entry] = machi_csum_table:find(MC, Filename, 1025, 1024),
|
||||||
|
|
||||||
ok = machi_csum_table:trim(MC, Offset, Size, undefined, undefined),
|
ok = machi_csum_table:trim(MC, Filename, Offset, Size, undefined, undefined),
|
||||||
[{Offset, Size, trimmed}] = machi_csum_table:find(MC, Offset, Size),
|
[{Offset, Size, trimmed}] = machi_csum_table:find(MC, Filename, Offset, Size),
|
||||||
ok = machi_csum_table:close(MC),
|
ok = machi_csum_table:close(MC).
|
||||||
ok = machi_csum_table:delete(MC).
|
|
||||||
|
|
||||||
smoke3_test() ->
|
smoke3_test() ->
|
||||||
Filename = "./temp-checksum-dumb-file-4",
|
DBFile = "./temp-checksum-dumb-file-4",
|
||||||
_ = cleanup(Filename),
|
Filename = <<"/some/puppy/and/cats^^^44">>,
|
||||||
{ok, MC} = machi_csum_table:open(Filename, []),
|
_ = cleanup(DBFile),
|
||||||
|
{ok, MC} = machi_csum_table:open(DBFile, []),
|
||||||
Scenario =
|
Scenario =
|
||||||
[%% Command, {Offset, Size, Csum}, LeftNeighbor, RightNeibor
|
[%% Command, {Offset, Size, Csum}, LeftNeighbor, RightNeibor
|
||||||
{?LINE, write, {2000, 10, <<"heh">>}, undefined, undefined},
|
{?LINE, write, {2000, 10, <<"heh">>}, undefined, undefined},
|
||||||
|
@ -84,9 +85,9 @@ smoke3_test() ->
|
||||||
%% ?debugVal({_Line, Chunk}),
|
%% ?debugVal({_Line, Chunk}),
|
||||||
{Offset, Size, Csum} = Chunk,
|
{Offset, Size, Csum} = Chunk,
|
||||||
?assertEqual(LeftN0,
|
?assertEqual(LeftN0,
|
||||||
machi_csum_table:find_leftneighbor(MC, Offset)),
|
machi_csum_table:find_leftneighbor(MC, Filename, Offset)),
|
||||||
?assertEqual(RightN0,
|
?assertEqual(RightN0,
|
||||||
machi_csum_table:find_rightneighbor(MC, Offset+Size)),
|
machi_csum_table:find_rightneighbor(MC, Filename, Offset+Size)),
|
||||||
LeftN = case LeftN0 of
|
LeftN = case LeftN0 of
|
||||||
{OffsL, SizeL, trimmed} -> {OffsL, SizeL, trimmed};
|
{OffsL, SizeL, trimmed} -> {OffsL, SizeL, trimmed};
|
||||||
{OffsL, SizeL, _} -> {OffsL, SizeL, <<"boom">>};
|
{OffsL, SizeL, _} -> {OffsL, SizeL, <<"boom">>};
|
||||||
|
@ -98,19 +99,18 @@ smoke3_test() ->
|
||||||
end,
|
end,
|
||||||
case Cmd of
|
case Cmd of
|
||||||
write ->
|
write ->
|
||||||
ok = machi_csum_table:write(MC, Offset, Size, Csum,
|
ok = machi_csum_table:write(MC, Filename, Offset, Size, Csum,
|
||||||
LeftN, RightN);
|
LeftN, RightN);
|
||||||
trim ->
|
trim ->
|
||||||
ok = machi_csum_table:trim(MC, Offset, Size,
|
ok = machi_csum_table:trim(MC, Filename, Offset, Size,
|
||||||
LeftN, RightN)
|
LeftN, RightN)
|
||||||
end
|
end
|
||||||
end || {_Line, Cmd, Chunk, LeftN0, RightN0} <- Scenario ],
|
end || {_Line, Cmd, Chunk, LeftN0, RightN0} <- Scenario ],
|
||||||
?assert(not machi_csum_table:all_trimmed(MC, 10000)),
|
?assert(not machi_csum_table:all_trimmed(MC, Filename, 0, 10000)),
|
||||||
machi_csum_table:trim(MC, 0, 10000, undefined, undefined),
|
machi_csum_table:trim(MC, Filename, 0, 10000, undefined, undefined),
|
||||||
?assert(machi_csum_table:all_trimmed(MC, 10000)),
|
?assert(machi_csum_table:all_trimmed(MC, Filename, 0, 10000)),
|
||||||
|
|
||||||
ok = machi_csum_table:close(MC),
|
ok = machi_csum_table:close(MC).
|
||||||
ok = machi_csum_table:delete(MC).
|
|
||||||
|
|
||||||
%% TODO: add quickcheck test here
|
%% TODO: add quickcheck test here
|
||||||
|
|
||||||
|
|
|
@ -116,11 +116,11 @@ get_written_interval(L) ->
|
||||||
initial_state() ->
|
initial_state() ->
|
||||||
{_, _, MS} = os:timestamp(),
|
{_, _, MS} = os:timestamp(),
|
||||||
Filename = test_server:temp_name("eqc_data") ++ "." ++ integer_to_list(MS),
|
Filename = test_server:temp_name("eqc_data") ++ "." ++ integer_to_list(MS),
|
||||||
#state{filename=Filename, written=[{0,1024}]}.
|
#state{filename=Filename, written=[]}.
|
||||||
|
|
||||||
initial_state(I, T) ->
|
initial_state(I, T) ->
|
||||||
S=initial_state(),
|
S=initial_state(),
|
||||||
S#state{written=[{0,1024}],
|
S#state{written=[],
|
||||||
planned_writes=I,
|
planned_writes=I,
|
||||||
planned_trims=T}.
|
planned_trims=T}.
|
||||||
|
|
||||||
|
@ -230,7 +230,8 @@ start_command(S) ->
|
||||||
{call, ?MODULE, start, [S]}.
|
{call, ?MODULE, start, [S]}.
|
||||||
|
|
||||||
start(#state{filename=File}) ->
|
start(#state{filename=File}) ->
|
||||||
{ok, Pid} = machi_file_proxy:start_link(some_flu, File, ?TESTDIR),
|
CsumT = get_csum_table(),
|
||||||
|
{ok, Pid} = machi_file_proxy:start_link(File, ?TESTDIR, CsumT),
|
||||||
unlink(Pid),
|
unlink(Pid),
|
||||||
Pid.
|
Pid.
|
||||||
|
|
||||||
|
@ -432,6 +433,40 @@ stop_post(_, _, _) -> true.
|
||||||
stop_next(S, _, _) ->
|
stop_next(S, _, _) ->
|
||||||
S#state{pid=undefined, prev_extra=0}.
|
S#state{pid=undefined, prev_extra=0}.
|
||||||
|
|
||||||
|
csum_table_holder() ->
|
||||||
|
Parent = self(),
|
||||||
|
spawn_link(fun() ->
|
||||||
|
CsumFile = test_server:temp_name("eqc_data-csum"),
|
||||||
|
filelib:ensure_dir(CsumFile),
|
||||||
|
{ok, CsumT} = machi_csum_table:open(CsumFile, []),
|
||||||
|
erlang:register(csum_table_holder, self()),
|
||||||
|
Parent ! ok,
|
||||||
|
csum_table_holder_loop(CsumT),
|
||||||
|
machi_csum_table:close(CsumT),
|
||||||
|
erlang:unregister(csum_table_holder)
|
||||||
|
end),
|
||||||
|
receive
|
||||||
|
Other -> Other
|
||||||
|
after 1000 ->
|
||||||
|
timeout
|
||||||
|
end.
|
||||||
|
|
||||||
|
csum_table_holder_loop(CsumT) ->
|
||||||
|
receive
|
||||||
|
{get, From} ->
|
||||||
|
From ! CsumT;
|
||||||
|
stop ->
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
|
get_csum_table() ->
|
||||||
|
csum_table_holder ! {get, self()},
|
||||||
|
receive CsumT -> CsumT
|
||||||
|
end.
|
||||||
|
|
||||||
|
stop_csum_table_holder() ->
|
||||||
|
catch csum_table_holder ! stop.
|
||||||
|
|
||||||
%% Property
|
%% Property
|
||||||
|
|
||||||
prop_ok() ->
|
prop_ok() ->
|
||||||
|
@ -440,7 +475,9 @@ prop_ok() ->
|
||||||
{shuffle_interval(), shuffle_interval()},
|
{shuffle_interval(), shuffle_interval()},
|
||||||
?FORALL(Cmds, parallel_commands(?MODULE, initial_state(I, T)),
|
?FORALL(Cmds, parallel_commands(?MODULE, initial_state(I, T)),
|
||||||
begin
|
begin
|
||||||
|
ok = csum_table_holder(),
|
||||||
{H, S, Res} = run_parallel_commands(?MODULE, Cmds),
|
{H, S, Res} = run_parallel_commands(?MODULE, Cmds),
|
||||||
|
stop_csum_table_holder(),
|
||||||
cleanup(),
|
cleanup(),
|
||||||
pretty_commands(?MODULE, Cmds, {H, S, Res},
|
pretty_commands(?MODULE, Cmds, {H, S, Res},
|
||||||
aggregate(command_names(Cmds), Res == ok))
|
aggregate(command_names(Cmds), Res == ok))
|
||||||
|
|
|
@ -77,25 +77,28 @@ random_binary(Start, End) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
setup() ->
|
setup() ->
|
||||||
{ok, Pid} = machi_file_proxy:start_link(fluname, "test", ?TESTDIR),
|
{ok, CsumT} = machi_csum_table:open(filename:join([?TESTDIR, "csumfile"]), []),
|
||||||
Pid.
|
{ok, Pid} = machi_file_proxy:start_link("test", ?TESTDIR, CsumT),
|
||||||
|
{Pid, CsumT}.
|
||||||
|
|
||||||
teardown(Pid) ->
|
teardown({Pid, CsumT}) ->
|
||||||
catch machi_file_proxy:stop(Pid).
|
catch machi_file_proxy:stop(Pid),
|
||||||
|
catch machi_csum_table:close(CsumT).
|
||||||
|
|
||||||
machi_file_proxy_test_() ->
|
machi_file_proxy_test_() ->
|
||||||
clean_up_data_dir(?TESTDIR),
|
clean_up_data_dir(?TESTDIR),
|
||||||
{setup,
|
{setup,
|
||||||
fun setup/0,
|
fun setup/0,
|
||||||
fun teardown/1,
|
fun teardown/1,
|
||||||
fun(Pid) ->
|
fun({Pid, _}) ->
|
||||||
[
|
[
|
||||||
?_assertEqual({error, bad_arg}, machi_file_proxy:read(Pid, -1, -1)),
|
?_assertEqual({error, bad_arg}, machi_file_proxy:read(Pid, -1, -1)),
|
||||||
?_assertEqual({error, bad_arg}, machi_file_proxy:write(Pid, -1, <<"yo">>)),
|
?_assertEqual({error, bad_arg}, machi_file_proxy:write(Pid, -1, <<"yo">>)),
|
||||||
?_assertEqual({error, bad_arg}, machi_file_proxy:append(Pid, [], -1, <<"krep">>)),
|
?_assertEqual({error, bad_arg}, machi_file_proxy:append(Pid, [], -1, <<"krep">>)),
|
||||||
?_assertMatch({ok, {_, []}}, machi_file_proxy:read(Pid, 1, 1)),
|
?_assertMatch({error, not_written}, machi_file_proxy:read(Pid, 1, 1)),
|
||||||
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1024, 1)),
|
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1024, 1)),
|
||||||
?_assertMatch({ok, {_, []}}, machi_file_proxy:read(Pid, 1, 1024)),
|
?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, random_binary(0, 1024))),
|
||||||
|
?_assertMatch({ok, _}, machi_file_proxy:read(Pid, 1, 1024)),
|
||||||
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1024, ?HYOOGE)),
|
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1024, ?HYOOGE)),
|
||||||
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, ?HYOOGE, 1)),
|
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, ?HYOOGE, 1)),
|
||||||
{timeout, 10,
|
{timeout, 10,
|
||||||
|
@ -114,7 +117,7 @@ multiple_chunks_read_test_() ->
|
||||||
{setup,
|
{setup,
|
||||||
fun setup/0,
|
fun setup/0,
|
||||||
fun teardown/1,
|
fun teardown/1,
|
||||||
fun(Pid) ->
|
fun({Pid, _}) ->
|
||||||
[
|
[
|
||||||
?_assertEqual(ok, machi_file_proxy:trim(Pid, 0, 1, false)),
|
?_assertEqual(ok, machi_file_proxy:trim(Pid, 0, 1, false)),
|
||||||
?_assertMatch({ok, {[], [{"test", 0, 1}]}},
|
?_assertMatch({ok, {[], [{"test", 0, 1}]}},
|
||||||
|
|
|
@ -122,8 +122,8 @@ flu_smoke_test() ->
|
||||||
{ok, [{_,File1}]} = ?FLU_C:list_files(Host, TcpPort, ?DUMMY_PV1_EPOCH),
|
{ok, [{_,File1}]} = ?FLU_C:list_files(Host, TcpPort, ?DUMMY_PV1_EPOCH),
|
||||||
Len1 = size(Chunk1),
|
Len1 = size(Chunk1),
|
||||||
{error, not_written} = ?FLU_C:read_chunk(Host, TcpPort,
|
{error, not_written} = ?FLU_C:read_chunk(Host, TcpPort,
|
||||||
?DUMMY_PV1_EPOCH,
|
?DUMMY_PV1_EPOCH,
|
||||||
File1, Off1*983829323, Len1, []),
|
File1, Off1*983829323, Len1, []),
|
||||||
%% XXX FIXME
|
%% XXX FIXME
|
||||||
%%
|
%%
|
||||||
%% This is failing because the read extends past the end of the file.
|
%% This is failing because the read extends past the end of the file.
|
||||||
|
|
|
@ -91,10 +91,7 @@ smoke_test2() ->
|
||||||
#p_srvr{name=Name, props=Props} = P,
|
#p_srvr{name=Name, props=Props} = P,
|
||||||
Dir = proplists:get_value(data_dir, Props),
|
Dir = proplists:get_value(data_dir, Props),
|
||||||
?assertEqual({ok, [File1Bin]},
|
?assertEqual({ok, [File1Bin]},
|
||||||
file:list_dir(filename:join([Dir, "data"]))),
|
file:list_dir(filename:join([Dir, "data"])))
|
||||||
FileListFileName = filename:join([Dir, "known_files_" ++ atom_to_list(Name)]),
|
|
||||||
{ok, Plist} = machi_plist:open(FileListFileName, []),
|
|
||||||
?assertEqual([], machi_plist:all(Plist))
|
|
||||||
end || P <- Ps],
|
end || P <- Ps],
|
||||||
|
|
||||||
[begin
|
[begin
|
||||||
|
@ -118,12 +115,11 @@ smoke_test2() ->
|
||||||
File = binary_to_list(Filex),
|
File = binary_to_list(Filex),
|
||||||
[begin
|
[begin
|
||||||
#p_srvr{name=Name, props=Props} = P,
|
#p_srvr{name=Name, props=Props} = P,
|
||||||
Dir = proplists:get_value(data_dir, Props),
|
DataDir = filename:join([proplists:get_value(data_dir, Props), "data"]),
|
||||||
?assertEqual({ok, []},
|
?assertEqual({ok, []}, file:list_dir(DataDir)),
|
||||||
file:list_dir(filename:join([Dir, "data"]))),
|
{ok, CsumT} = machi_flu_filename_mgr:get_csum_table(Name),
|
||||||
FileListFileName = filename:join([Dir, "known_files_" ++ atom_to_list(Name)]),
|
Plist = machi_csum_table:all_files(CsumT),
|
||||||
{ok, Plist} = machi_plist:open(FileListFileName, []),
|
?assertEqual([{File, ts}], Plist)
|
||||||
?assertEqual([File], machi_plist:all(Plist))
|
|
||||||
end || P <- Ps],
|
end || P <- Ps],
|
||||||
|
|
||||||
[begin
|
[begin
|
||||||
|
|
|
@ -1,17 +0,0 @@
|
||||||
-module(machi_plist_test).
|
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
|
||||||
|
|
||||||
open_close_test() ->
|
|
||||||
FileName = "bark-bark-one",
|
|
||||||
file:delete(FileName),
|
|
||||||
{ok, PList0} = machi_plist:open(FileName, []),
|
|
||||||
{ok, PList1} = machi_plist:add(PList0, "boomar"),
|
|
||||||
?assertEqual(["boomar"], machi_plist:all(PList1)),
|
|
||||||
ok = machi_plist:close(PList1),
|
|
||||||
|
|
||||||
{ok, PList2} = machi_plist:open(FileName, []),
|
|
||||||
?assertEqual(["boomar"], machi_plist:all(PList2)),
|
|
||||||
ok = machi_plist:close(PList2),
|
|
||||||
file:delete(FileName),
|
|
||||||
ok.
|
|
Loading…
Reference in a new issue