diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index 8bd710d..2dfbce4 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -550,24 +550,26 @@ do_projection_command(Sock, LenHex, S) -> {ok, ProjCmdBin} = gen_tcp:recv(Sock, Len), ok = inet:setopts(Sock, [{packet, line}]), ProjCmd = binary_to_term(ProjCmdBin), - case handle_projection_command(ProjCmd, S) of - ok -> - ok = gen_tcp:send(Sock, <<"OK\n">>); - {error, written} -> - ok = gen_tcp:send(Sock, <<"ERROR WRITTEN\n">>); - {error, not_written} -> - ok = gen_tcp:send(Sock, <<"ERROR NOT-WRITTEN\n">>); - Else -> - TODO = list_to_binary(io_lib:format("TODO-YOLO-~w", [Else])), - ok = gen_tcp:send(Sock, [<<"ERROR ">>, TODO, <<"\n">>]) - end + put(hack, ProjCmd), + Res = handle_projection_command(ProjCmd, S), + ResBin = term_to_binary(Res), + ResLenHex = machi_util:int_to_hexbin(byte_size(ResBin), 32), + ok = gen_tcp:send(Sock, [<<"OK ">>, ResLenHex, <<"\n">>, ResBin]) catch What:Why -> + io:format(user, "OOPS ~p\n", [get(hack)]), + io:format(user, "OOPS ~p ~p ~p\n", [What, Why, erlang:get_stacktrace()]), WHA = list_to_binary(io_lib:format("TODO-YOLO.~w:~w-~w", [What, Why, erlang:get_stacktrace()])), _ = (catch gen_tcp:send(Sock, [<<"ERROR ">>, WHA, <<"\n">>])) end. +handle_projection_command({get_latest_epoch, ProjType}, + #state{proj_store=ProjStore}) -> + machi_projection_store:get_latest_epoch(ProjStore, ProjType); +handle_projection_command({read_projection, ProjType, Epoch}, + #state{proj_store=ProjStore}) -> + machi_projection_store:read(ProjStore, ProjType, Epoch); handle_projection_command({write_projection, ProjType, Proj}, #state{proj_store=ProjStore}) -> machi_projection_store:write(ProjStore, ProjType, Proj); diff --git a/src/machi_flu1_client.erl b/src/machi_flu1_client.erl index 1965d8a..93124dd 100644 --- a/src/machi_flu1_client.erl +++ b/src/machi_flu1_client.erl @@ -31,6 +31,8 @@ list_files/2, list_files/3, %% Projection API + get_latest_epoch/2, get_latest_epoch/3, + read_projection/3, read_projection/4, write_projection/3, write_projection/4, %% Common API @@ -146,6 +148,50 @@ list_files(Host, TcpPort, EpochID) when is_integer(TcpPort) -> catch gen_tcp:close(Sock) end. +%% @doc Get the latest epoch number from the FLU's projection store. + +-spec get_latest_epoch(port(), projection_type()) -> + {ok, -1|non_neg_integer()} | {error, term()}. +get_latest_epoch(Sock, ProjType) + when ProjType == 'public' orelse ProjType == 'private' -> + get_latest_epoch2(Sock, ProjType). + +%% @doc Get the latest epoch number from the FLU's projection store. + +-spec get_latest_epoch(inet_host(), inet_port(), + projection_type()) -> + {ok, -1|non_neg_integer()} | {error, term()}. +get_latest_epoch(Host, TcpPort, ProjType) + when ProjType == 'public' orelse ProjType == 'private' -> + Sock = machi_util:connect(Host, TcpPort), + try + get_latest_epoch2(Sock, ProjType) + after + catch gen_tcp:close(Sock) + end. + +%% @doc Read a projection `Proj' of type `ProjType'. + +-spec read_projection(port(), projection_type(), epoch_num()) -> + 'ok' | {error, written} | {error, term()}. +read_projection(Sock, ProjType, Epoch) + when ProjType == 'public' orelse ProjType == 'private' -> + read_projection2(Sock, ProjType, Epoch). + +%% @doc Read a projection `Proj' of type `ProjType'. + +-spec read_projection(inet_host(), inet_port(), + projection_type(), epoch_num()) -> + 'ok' | {error, written} | {error, term()}. +read_projection(Host, TcpPort, ProjType, Epoch) + when ProjType == 'public' orelse ProjType == 'private' -> + Sock = machi_util:connect(Host, TcpPort), + try + read_projection2(Sock, ProjType, Epoch) + after + catch gen_tcp:close(Sock) + end. + %% @doc Write a projection `Proj' of type `ProjType'. -spec write_projection(port(), projection_type(), projection()) -> @@ -480,9 +526,17 @@ trunc_hack2(Sock, EpochID, File) -> {error, {badmatch, BadMatch}} end. +get_latest_epoch2(Sock, ProjType) -> + ProjCmd = {get_latest_epoch, ProjType}, + do_projection_common(Sock, ProjCmd). + +read_projection2(Sock, ProjType, Epoch) -> + ProjCmd = {read_projection, ProjType, Epoch}, + do_projection_common(Sock, ProjCmd). + write_projection2(Sock, ProjType, Proj) -> - ProjCmd = {write_projection, ProjType, Proj}, - do_projection_common(Sock, ProjCmd). + ProjCmd = {write_projection, ProjType, Proj}, + do_projection_common(Sock, ProjCmd). do_projection_common(Sock, ProjCmd) -> try @@ -492,13 +546,15 @@ do_projection_common(Sock, ProjCmd) -> LenHex = machi_util:int_to_hexbin(Len, 32), Cmd = [<<"PROJ ">>, LenHex, <<"\n">>], ok = gen_tcp:send(Sock, [Cmd, ProjCmdBin]), + ok = inet:setopts(Sock, [{packet, line}]), {ok, Line} = gen_tcp:recv(Sock, 0), - PathLen = byte_size(Line) - 3 - 16 - 1 - 1, case Line of - <<"OK\n">> -> - ok; - <<"ERROR WRITTEN\n">> -> - {error, written}; + <<"OK ", ResLenHex:8/binary, "\n">> -> + ResLen = machi_util:hexstr_to_int(ResLenHex), + ok = inet:setopts(Sock, [{packet, raw}]), + {ok, ResBin} = gen_tcp:recv(Sock, ResLen), + ok = inet:setopts(Sock, [{packet, line}]), + binary_to_term(ResBin); Else -> {error, Else} end diff --git a/src/machi_projection_store.erl b/src/machi_projection_store.erl index 526113b..a2bf4d7 100644 --- a/src/machi_projection_store.erl +++ b/src/machi_projection_store.erl @@ -25,6 +25,8 @@ %% API -export([ start_link/3, + get_latest_epoch/2, get_latest_epoch/3, + read/3, read/4, write/3, write/4 ]). @@ -37,14 +39,28 @@ private_dir = "" :: string(), wedged = true :: boolean(), wedge_notify_pid :: pid() | atom(), - max_public_epoch = -1 :: non_neg_integer(), - max_private_epoch = -1 :: non_neg_integer() + max_public_epoch = -1 :: -1 | non_neg_integer(), + max_private_epoch = -1 :: -1 | non_neg_integer() }). start_link(RegName, DataDir, NotifyWedgeStateChanges) -> gen_server:start_link({local, RegName}, ?MODULE, [DataDir, NotifyWedgeStateChanges], []). +get_latest_epoch(PidSpec, ProjType) -> + get_latest_epoch(PidSpec, ProjType, infinity). + +get_latest_epoch(PidSpec, ProjType, Timeout) + when ProjType == 'public' orelse ProjType == 'private' -> + g_call(PidSpec, {get_latest_epoch, ProjType}, Timeout). + +read(PidSpec, ProjType, Epoch) -> + read(PidSpec, ProjType, Epoch, infinity). + +read(PidSpec, ProjType, Epoch, Timeout) + when ProjType == 'public' orelse ProjType == 'private' -> + g_call(PidSpec, {read, ProjType, Epoch}, Timeout). + write(PidSpec, ProjType, Proj) -> write(PidSpec, ProjType, Proj, infinity). @@ -79,6 +95,16 @@ g_call(PidSpec, Arg, Timeout) -> %%%%%%%%%%%%%%%%%%%%%%%%%%% +handle_call({{get_latest_epoch, ProjType}, LC1}, _From, S) -> + LC2 = lclock_update(LC1), + Epoch = if ProjType == public -> S#state.max_public_epoch; + ProjType == private -> S#state.max_private_epoch + end, + {reply, {{ok, Epoch}, LC2}, S}; +handle_call({{read, ProjType, Epoch}, LC1}, _From, S) -> + LC2 = lclock_update(LC1), + {Reply, NewS} = do_proj_read(ProjType, Epoch, S), + {reply, {Reply, LC2}, NewS}; handle_call({{write, ProjType, Proj}, LC1}, _From, S) -> LC2 = lclock_update(LC1), {Reply, NewS} = do_proj_write(ProjType, Proj, S), @@ -101,6 +127,19 @@ code_change(_OldVsn, S, _Extra) -> %%%%%%%%%%%%%%%%%%%%%%%%%%% +do_proj_read(ProjType, Epoch, S) -> + Dir = pick_path(ProjType, S), + Path = filename:join(Dir, epoch2name(Epoch)), + case file:read_file(Path) of + {ok, Bin} -> + %% TODO and if Bin is corrupt? (even if binary_to_term() succeeds) + {{ok, binary_to_term(Bin)}, S}; + {error, enoent} -> + {{error, not_written}, S}; + {error, Else} -> + {{error, Else}, S} + end. + do_proj_write(ProjType, #projection_v1{epoch_number=Epoch}=Proj, S) -> %% TODO: We probably ought to check the projection checksum for sanity, eh? Dir = pick_path(ProjType, S), diff --git a/test/machi_flu1_test.erl b/test/machi_flu1_test.erl index 3a36800..3cd1daf 100644 --- a/test/machi_flu1_test.erl +++ b/test/machi_flu1_test.erl @@ -124,11 +124,19 @@ flu_projection_smoke_test() -> FLU1 = setup_test_flu(projection_test_flu, TcpPort, DataDir), try + {ok, -1} = ?FLU_C:get_latest_epoch(Host, TcpPort, public), + {ok, -1} = ?FLU_C:get_latest_epoch(Host, TcpPort, private), + P1 = machi_projection:new(1, a, [a], [], [a], [], []), ok = ?FLU_C:write_projection(Host, TcpPort, public, P1), {error, written} = ?FLU_C:write_projection(Host, TcpPort, public, P1), + {ok, P1} = ?FLU_C:read_projection(Host, TcpPort, public, 1), + {error, not_written} = ?FLU_C:read_projection(Host, TcpPort, public, 2), + ok = ?FLU_C:write_projection(Host, TcpPort, private, P1), {error, written} = ?FLU_C:write_projection(Host, TcpPort, private, P1), + {ok, P1} = ?FLU_C:read_projection(Host, TcpPort, private, 1), + {error, not_written} = ?FLU_C:read_projection(Host, TcpPort, private, 2), ok = ?FLU_C:quit(machi_util:connect(Host, TcpPort)) after