diff --git a/src/machi_flu1_client.erl b/src/machi_flu1_client.erl index 6dd6c65..8850a0c 100644 --- a/src/machi_flu1_client.erl +++ b/src/machi_flu1_client.erl @@ -154,7 +154,7 @@ list_files(Host, TcpPort, EpochID) when is_integer(TcpPort) -> %% @doc Get the latest epoch number from the FLU's projection store. -spec get_latest_epoch(port(), projection_type()) -> - {ok, -1|non_neg_integer()} | {error, term()}. + {ok, epoch_id()} | {error, term()}. get_latest_epoch(Sock, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> get_latest_epoch2(Sock, ProjType). @@ -163,7 +163,7 @@ get_latest_epoch(Sock, ProjType) -spec get_latest_epoch(inet_host(), inet_port(), projection_type()) -> - {ok, -1|non_neg_integer()} | {error, term()}. + {ok, epoch_id()} | {error, term()}. get_latest_epoch(Host, TcpPort, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> Sock = machi_util:connect(Host, TcpPort), diff --git a/src/machi_projection_store.erl b/src/machi_projection_store.erl index c88a21b..d53ecc4 100644 --- a/src/machi_projection_store.erl +++ b/src/machi_projection_store.erl @@ -42,8 +42,8 @@ private_dir = "" :: string(), wedged = true :: boolean(), wedge_notify_pid :: pid() | atom(), - max_public_epoch = -1 :: -1 | non_neg_integer(), - max_private_epoch = -1 :: -1 | non_neg_integer() + max_public_epoch = {-1,<<>>} :: -1 | non_neg_integer(), + max_private_epoch = {-1,<<>>} :: -1 | non_neg_integer() }). start_link(RegName, DataDir, NotifyWedgeStateChanges) -> @@ -124,16 +124,16 @@ init([DataDir, NotifyWedgeStateChanges]) -> handle_call({{get_latest_epoch, ProjType}, LC1}, _From, S) -> LC2 = lclock_update(LC1), - Epoch = if ProjType == public -> S#state.max_public_epoch; - ProjType == private -> S#state.max_private_epoch - end, - {reply, {{ok, Epoch}, LC2}, S}; + EpochT = if ProjType == public -> S#state.max_public_epoch; + ProjType == private -> S#state.max_private_epoch + end, + {reply, {{ok, EpochT}, LC2}, S}; handle_call({{read_latest_projection, ProjType}, LC1}, _From, S) -> LC2 = lclock_update(LC1), - Epoch = if ProjType == public -> S#state.max_public_epoch; - ProjType == private -> S#state.max_private_epoch + {EpochNum, _CSum} = if ProjType == public -> S#state.max_public_epoch; + ProjType == private -> S#state.max_private_epoch end, - {Reply, NewS} = do_proj_read(ProjType, Epoch, S), + {Reply, NewS} = do_proj_read(ProjType, EpochNum, S), {reply, {Reply, LC2}, NewS}; handle_call({{read, ProjType, Epoch}, LC1}, _From, S) -> LC2 = lclock_update(LC1), @@ -176,17 +176,21 @@ code_change(_OldVsn, S, _Extra) -> do_proj_read(_ProjType, Epoch, S) when Epoch < 0 -> {{error, not_written}, S}; -do_proj_read(ProjType, Epoch, S) -> - Dir = pick_path(ProjType, S), +do_proj_read(ProjType, Epoch, S_or_Dir) -> + Dir = if is_record(S_or_Dir, state) -> + pick_path(ProjType, S_or_Dir); + is_list(S_or_Dir) -> + S_or_Dir + end, Path = filename:join(Dir, epoch2name(Epoch)), case file:read_file(Path) of {ok, Bin} -> %% TODO and if Bin is corrupt? (even if binary_to_term() succeeds) - {{ok, binary_to_term(Bin)}, S}; + {{ok, binary_to_term(Bin)}, S_or_Dir}; {error, enoent} -> - {{error, not_written}, S}; + {{error, not_written}, S_or_Dir}; {error, Else} -> - {{error, Else}, S} + {{error, Else}, S_or_Dir} end. do_proj_write(ProjType, #projection_v1{epoch_number=Epoch}=Proj, S) -> @@ -201,12 +205,15 @@ do_proj_write(ProjType, #projection_v1{epoch_number=Epoch}=Proj, S) -> ok = file:write(FH, term_to_binary(Proj)), ok = file:sync(FH), ok = file:close(FH), - NewS = if ProjType == public, Epoch > S#state.max_public_epoch -> + EpochT = {Epoch, Proj}, + NewS = if ProjType == public, + Epoch > element(1, S#state.max_public_epoch) -> io:format(user, "TODO: tell ~p we are wedged by epoch ~p\n", [S#state.wedge_notify_pid, Epoch]), - S#state{max_public_epoch=Epoch, wedged=true}; - ProjType == private, Epoch > S#state.max_private_epoch -> + S#state{max_public_epoch=EpochT, wedged=true}; + ProjType == private, + Epoch > element(1, S#state.max_private_epoch) -> io:format(user, "TODO: tell ~p we are unwedged by epoch ~p\n", [S#state.wedge_notify_pid, Epoch]), - S#state{max_private_epoch=Epoch, wedged=false}; + S#state{max_private_epoch=EpochT, wedged=false}; true -> S end, @@ -233,9 +240,11 @@ find_all(Dir) -> find_max_epoch(Dir) -> Fs = lists:sort(filelib:wildcard("*", Dir)), if Fs == [] -> - -1; + {-1, <<>>}; true -> - name2epoch(lists:last(Fs)) + EpochNum = name2epoch(lists:last(Fs)), + {{ok, Proj}, _} = do_proj_read(proj_type_ignored, EpochNum, Dir), + {EpochNum, Proj} end. %%%%%%%%%%%%%%%%%%%%%%%%%%% diff --git a/test/machi_flu1_test.erl b/test/machi_flu1_test.erl index 136d6d0..c37188c 100644 --- a/test/machi_flu1_test.erl +++ b/test/machi_flu1_test.erl @@ -125,7 +125,7 @@ flu_projection_smoke_test() -> FLU1 = setup_test_flu(projection_test_flu, TcpPort, DataDir), try [begin - {ok, -1} = ?FLU_C:get_latest_epoch(Host, TcpPort, T), + {ok, {-1,_}} = ?FLU_C:get_latest_epoch(Host, TcpPort, T), {error, not_written} = ?FLU_C:read_latest_projection(Host, TcpPort, T), {ok, []} = ?FLU_C:list_all(Host, TcpPort, T), @@ -135,6 +135,7 @@ flu_projection_smoke_test() -> ok = ?FLU_C:write_projection(Host, TcpPort, T, P1), {error, written} = ?FLU_C:write_projection(Host, TcpPort, T, P1), {ok, P1} = ?FLU_C:read_projection(Host, TcpPort, T, 1), + {ok, {1,_}} = ?FLU_C:get_latest_epoch(Host, TcpPort, T), {ok, P1} = ?FLU_C:read_latest_projection(Host, TcpPort, T), {ok, [1]} = ?FLU_C:list_all(Host, TcpPort, T), {ok, [P1]} = ?FLU_C:get_all(Host, TcpPort, T),