Fix type problem for return of get_latest_epoch
This commit is contained in:
parent
99bfa2a3b8
commit
a79f385fa7
3 changed files with 33 additions and 23 deletions
|
@ -154,7 +154,7 @@ list_files(Host, TcpPort, EpochID) when is_integer(TcpPort) ->
|
||||||
%% @doc Get the latest epoch number from the FLU's projection store.
|
%% @doc Get the latest epoch number from the FLU's projection store.
|
||||||
|
|
||||||
-spec get_latest_epoch(port(), projection_type()) ->
|
-spec get_latest_epoch(port(), projection_type()) ->
|
||||||
{ok, -1|non_neg_integer()} | {error, term()}.
|
{ok, epoch_id()} | {error, term()}.
|
||||||
get_latest_epoch(Sock, ProjType)
|
get_latest_epoch(Sock, ProjType)
|
||||||
when ProjType == 'public' orelse ProjType == 'private' ->
|
when ProjType == 'public' orelse ProjType == 'private' ->
|
||||||
get_latest_epoch2(Sock, ProjType).
|
get_latest_epoch2(Sock, ProjType).
|
||||||
|
@ -163,7 +163,7 @@ get_latest_epoch(Sock, ProjType)
|
||||||
|
|
||||||
-spec get_latest_epoch(inet_host(), inet_port(),
|
-spec get_latest_epoch(inet_host(), inet_port(),
|
||||||
projection_type()) ->
|
projection_type()) ->
|
||||||
{ok, -1|non_neg_integer()} | {error, term()}.
|
{ok, epoch_id()} | {error, term()}.
|
||||||
get_latest_epoch(Host, TcpPort, ProjType)
|
get_latest_epoch(Host, TcpPort, ProjType)
|
||||||
when ProjType == 'public' orelse ProjType == 'private' ->
|
when ProjType == 'public' orelse ProjType == 'private' ->
|
||||||
Sock = machi_util:connect(Host, TcpPort),
|
Sock = machi_util:connect(Host, TcpPort),
|
||||||
|
|
|
@ -42,8 +42,8 @@
|
||||||
private_dir = "" :: string(),
|
private_dir = "" :: string(),
|
||||||
wedged = true :: boolean(),
|
wedged = true :: boolean(),
|
||||||
wedge_notify_pid :: pid() | atom(),
|
wedge_notify_pid :: pid() | atom(),
|
||||||
max_public_epoch = -1 :: -1 | non_neg_integer(),
|
max_public_epoch = {-1,<<>>} :: -1 | non_neg_integer(),
|
||||||
max_private_epoch = -1 :: -1 | non_neg_integer()
|
max_private_epoch = {-1,<<>>} :: -1 | non_neg_integer()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
start_link(RegName, DataDir, NotifyWedgeStateChanges) ->
|
start_link(RegName, DataDir, NotifyWedgeStateChanges) ->
|
||||||
|
@ -124,16 +124,16 @@ init([DataDir, NotifyWedgeStateChanges]) ->
|
||||||
|
|
||||||
handle_call({{get_latest_epoch, ProjType}, LC1}, _From, S) ->
|
handle_call({{get_latest_epoch, ProjType}, LC1}, _From, S) ->
|
||||||
LC2 = lclock_update(LC1),
|
LC2 = lclock_update(LC1),
|
||||||
Epoch = if ProjType == public -> S#state.max_public_epoch;
|
EpochT = if ProjType == public -> S#state.max_public_epoch;
|
||||||
ProjType == private -> S#state.max_private_epoch
|
ProjType == private -> S#state.max_private_epoch
|
||||||
end,
|
end,
|
||||||
{reply, {{ok, Epoch}, LC2}, S};
|
{reply, {{ok, EpochT}, LC2}, S};
|
||||||
handle_call({{read_latest_projection, ProjType}, LC1}, _From, S) ->
|
handle_call({{read_latest_projection, ProjType}, LC1}, _From, S) ->
|
||||||
LC2 = lclock_update(LC1),
|
LC2 = lclock_update(LC1),
|
||||||
Epoch = if ProjType == public -> S#state.max_public_epoch;
|
{EpochNum, _CSum} = if ProjType == public -> S#state.max_public_epoch;
|
||||||
ProjType == private -> S#state.max_private_epoch
|
ProjType == private -> S#state.max_private_epoch
|
||||||
end,
|
end,
|
||||||
{Reply, NewS} = do_proj_read(ProjType, Epoch, S),
|
{Reply, NewS} = do_proj_read(ProjType, EpochNum, S),
|
||||||
{reply, {Reply, LC2}, NewS};
|
{reply, {Reply, LC2}, NewS};
|
||||||
handle_call({{read, ProjType, Epoch}, LC1}, _From, S) ->
|
handle_call({{read, ProjType, Epoch}, LC1}, _From, S) ->
|
||||||
LC2 = lclock_update(LC1),
|
LC2 = lclock_update(LC1),
|
||||||
|
@ -176,17 +176,21 @@ code_change(_OldVsn, S, _Extra) ->
|
||||||
|
|
||||||
do_proj_read(_ProjType, Epoch, S) when Epoch < 0 ->
|
do_proj_read(_ProjType, Epoch, S) when Epoch < 0 ->
|
||||||
{{error, not_written}, S};
|
{{error, not_written}, S};
|
||||||
do_proj_read(ProjType, Epoch, S) ->
|
do_proj_read(ProjType, Epoch, S_or_Dir) ->
|
||||||
Dir = pick_path(ProjType, S),
|
Dir = if is_record(S_or_Dir, state) ->
|
||||||
|
pick_path(ProjType, S_or_Dir);
|
||||||
|
is_list(S_or_Dir) ->
|
||||||
|
S_or_Dir
|
||||||
|
end,
|
||||||
Path = filename:join(Dir, epoch2name(Epoch)),
|
Path = filename:join(Dir, epoch2name(Epoch)),
|
||||||
case file:read_file(Path) of
|
case file:read_file(Path) of
|
||||||
{ok, Bin} ->
|
{ok, Bin} ->
|
||||||
%% TODO and if Bin is corrupt? (even if binary_to_term() succeeds)
|
%% TODO and if Bin is corrupt? (even if binary_to_term() succeeds)
|
||||||
{{ok, binary_to_term(Bin)}, S};
|
{{ok, binary_to_term(Bin)}, S_or_Dir};
|
||||||
{error, enoent} ->
|
{error, enoent} ->
|
||||||
{{error, not_written}, S};
|
{{error, not_written}, S_or_Dir};
|
||||||
{error, Else} ->
|
{error, Else} ->
|
||||||
{{error, Else}, S}
|
{{error, Else}, S_or_Dir}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_proj_write(ProjType, #projection_v1{epoch_number=Epoch}=Proj, S) ->
|
do_proj_write(ProjType, #projection_v1{epoch_number=Epoch}=Proj, S) ->
|
||||||
|
@ -201,12 +205,15 @@ do_proj_write(ProjType, #projection_v1{epoch_number=Epoch}=Proj, S) ->
|
||||||
ok = file:write(FH, term_to_binary(Proj)),
|
ok = file:write(FH, term_to_binary(Proj)),
|
||||||
ok = file:sync(FH),
|
ok = file:sync(FH),
|
||||||
ok = file:close(FH),
|
ok = file:close(FH),
|
||||||
NewS = if ProjType == public, Epoch > S#state.max_public_epoch ->
|
EpochT = {Epoch, Proj},
|
||||||
|
NewS = if ProjType == public,
|
||||||
|
Epoch > element(1, S#state.max_public_epoch) ->
|
||||||
io:format(user, "TODO: tell ~p we are wedged by epoch ~p\n", [S#state.wedge_notify_pid, Epoch]),
|
io:format(user, "TODO: tell ~p we are wedged by epoch ~p\n", [S#state.wedge_notify_pid, Epoch]),
|
||||||
S#state{max_public_epoch=Epoch, wedged=true};
|
S#state{max_public_epoch=EpochT, wedged=true};
|
||||||
ProjType == private, Epoch > S#state.max_private_epoch ->
|
ProjType == private,
|
||||||
|
Epoch > element(1, S#state.max_private_epoch) ->
|
||||||
io:format(user, "TODO: tell ~p we are unwedged by epoch ~p\n", [S#state.wedge_notify_pid, Epoch]),
|
io:format(user, "TODO: tell ~p we are unwedged by epoch ~p\n", [S#state.wedge_notify_pid, Epoch]),
|
||||||
S#state{max_private_epoch=Epoch, wedged=false};
|
S#state{max_private_epoch=EpochT, wedged=false};
|
||||||
true ->
|
true ->
|
||||||
S
|
S
|
||||||
end,
|
end,
|
||||||
|
@ -233,9 +240,11 @@ find_all(Dir) ->
|
||||||
find_max_epoch(Dir) ->
|
find_max_epoch(Dir) ->
|
||||||
Fs = lists:sort(filelib:wildcard("*", Dir)),
|
Fs = lists:sort(filelib:wildcard("*", Dir)),
|
||||||
if Fs == [] ->
|
if Fs == [] ->
|
||||||
-1;
|
{-1, <<>>};
|
||||||
true ->
|
true ->
|
||||||
name2epoch(lists:last(Fs))
|
EpochNum = name2epoch(lists:last(Fs)),
|
||||||
|
{{ok, Proj}, _} = do_proj_read(proj_type_ignored, EpochNum, Dir),
|
||||||
|
{EpochNum, Proj}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||||
|
|
|
@ -125,7 +125,7 @@ flu_projection_smoke_test() ->
|
||||||
FLU1 = setup_test_flu(projection_test_flu, TcpPort, DataDir),
|
FLU1 = setup_test_flu(projection_test_flu, TcpPort, DataDir),
|
||||||
try
|
try
|
||||||
[begin
|
[begin
|
||||||
{ok, -1} = ?FLU_C:get_latest_epoch(Host, TcpPort, T),
|
{ok, {-1,_}} = ?FLU_C:get_latest_epoch(Host, TcpPort, T),
|
||||||
{error, not_written} =
|
{error, not_written} =
|
||||||
?FLU_C:read_latest_projection(Host, TcpPort, T),
|
?FLU_C:read_latest_projection(Host, TcpPort, T),
|
||||||
{ok, []} = ?FLU_C:list_all(Host, TcpPort, T),
|
{ok, []} = ?FLU_C:list_all(Host, TcpPort, T),
|
||||||
|
@ -135,6 +135,7 @@ flu_projection_smoke_test() ->
|
||||||
ok = ?FLU_C:write_projection(Host, TcpPort, T, P1),
|
ok = ?FLU_C:write_projection(Host, TcpPort, T, P1),
|
||||||
{error, written} = ?FLU_C:write_projection(Host, TcpPort, T, P1),
|
{error, written} = ?FLU_C:write_projection(Host, TcpPort, T, P1),
|
||||||
{ok, P1} = ?FLU_C:read_projection(Host, TcpPort, T, 1),
|
{ok, P1} = ?FLU_C:read_projection(Host, TcpPort, T, 1),
|
||||||
|
{ok, {1,_}} = ?FLU_C:get_latest_epoch(Host, TcpPort, T),
|
||||||
{ok, P1} = ?FLU_C:read_latest_projection(Host, TcpPort, T),
|
{ok, P1} = ?FLU_C:read_latest_projection(Host, TcpPort, T),
|
||||||
{ok, [1]} = ?FLU_C:list_all(Host, TcpPort, T),
|
{ok, [1]} = ?FLU_C:list_all(Host, TcpPort, T),
|
||||||
{ok, [P1]} = ?FLU_C:get_all(Host, TcpPort, T),
|
{ok, [P1]} = ?FLU_C:get_all(Host, TcpPort, T),
|
||||||
|
|
Loading…
Reference in a new issue