Refactor to use record for FLU state, add dbg mode

This commit is contained in:
Scott Lystig Fritchie 2015-04-02 17:16:15 +09:00
parent 4c3bd81689
commit 5580098d49
2 changed files with 47 additions and 21 deletions

View file

@ -26,9 +26,19 @@
-export([start_link/1, stop/1]).
start_link([{FluName, TcpPort, DataDir}])
-record(state, {
reg_name :: atom(),
tcp_port :: non_neg_integer(),
data_dir :: string(),
wedge = true :: 'disabled' | boolean(),
my_epoch_id :: 'undefined',
dbg_props = [] :: list(), % proplist
props = [] :: list() % proplist
}).
start_link([{FluName, TcpPort, DataDir}|Rest])
when is_atom(FluName), is_integer(TcpPort), is_list(DataDir) ->
{ok, spawn_link(fun() -> main2(FluName, TcpPort, DataDir) end)}.
{ok, spawn_link(fun() -> main2(FluName, TcpPort, DataDir, Rest) end)}.
stop(Pid) ->
case erlang:is_process_alive(Pid) of
@ -41,41 +51,53 @@ stop(Pid) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%%
main2(RegName, TcpPort, DataDir) ->
_Pid1 = start_listen_server(RegName, TcpPort, DataDir),
_Pid2 = start_append_server(RegName, DataDir),
main2(RegName, TcpPort, DataDir, Rest) ->
S1 = #state{reg_name=RegName,
tcp_port=TcpPort,
data_dir=DataDir,
props=Rest},
S2 = case proplists:get_value(dbg, Rest) of
undefined ->
S1;
DbgProps ->
S1#state{wedge=disabled,
dbg_props=DbgProps,
props=lists:keydelete(dbg, 1, Rest)}
end,
_Pid1 = start_listen_server(S2),
_Pid2 = start_append_server(S2),
receive forever -> ok end.
start_listen_server(RegName, TcpPort, DataDir) ->
spawn_link(fun() -> run_listen_server(RegName, TcpPort, DataDir) end).
start_listen_server(S) ->
spawn_link(fun() -> run_listen_server(S) end).
start_append_server(Name, DataDir) ->
spawn_link(fun() -> run_append_server(Name, DataDir) end).
start_append_server(S) ->
spawn_link(fun() -> run_append_server(S) end).
run_listen_server(RegName, TcpPort, DataDir) ->
run_listen_server(#state{tcp_port=TcpPort}=S) ->
SockOpts = [{reuseaddr, true},
{mode, binary}, {active, false}, {packet, line}],
{ok, LSock} = gen_tcp:listen(TcpPort, SockOpts),
listen_server_loop(RegName, LSock, DataDir).
listen_server_loop(LSock, S).
run_append_server(Name, DataDir) ->
run_append_server(#state{reg_name=Name}=S) ->
register(Name, self()),
append_server_loop(DataDir).
append_server_loop(S).
listen_server_loop(RegName, LSock, DataDir) ->
listen_server_loop(LSock, S) ->
{ok, Sock} = gen_tcp:accept(LSock),
spawn(fun() -> net_server_loop(RegName, Sock, DataDir) end),
listen_server_loop(RegName, LSock, DataDir).
spawn(fun() -> net_server_loop(Sock, S) end),
listen_server_loop(LSock, S).
append_server_loop(DataDir) ->
append_server_loop(#state{data_dir=DataDir}=S) ->
receive
{seq_append, From, Prefix, Chunk, CSum} ->
spawn(fun() -> append_server_dispatch(From, Prefix, Chunk, CSum,
DataDir) end),
append_server_loop(DataDir)
append_server_loop(S)
end.
net_server_loop(RegName, Sock, DataDir) ->
net_server_loop(Sock, #state{reg_name=RegName, data_dir=DataDir}=S) ->
ok = inet:setopts(Sock, [{packet, line}]),
case gen_tcp:recv(Sock, 0, 60*1000) of
{ok, Line} ->
@ -132,7 +154,7 @@ net_server_loop(RegName, Sock, DataDir) ->
catch gen_tcp:close(Sock),
exit(normal)
end,
net_server_loop(RegName, Sock, DataDir);
net_server_loop(Sock, S);
_ ->
catch gen_tcp:close(Sock),
exit(normal)

View file

@ -29,9 +29,13 @@
-define(FLU_C, machi_flu1_client).
setup_test_flu(RegName, TcpPort, DataDir) ->
setup_test_flu(RegName, TcpPort, DataDir, []).
setup_test_flu(RegName, TcpPort, DataDir, DbgProps) ->
clean_up_data_dir(DataDir),
{ok, FLU1} = ?FLU:start_link([{RegName, TcpPort, DataDir}]),
{ok, FLU1} = ?FLU:start_link([{RegName, TcpPort, DataDir},
{dbg, DbgProps}]),
FLU1.
flu_smoke_test() ->