WIP. It's a hot mess.

This commit is contained in:
Mark Allen 2015-09-23 13:56:59 -05:00
parent dd27d10eec
commit 52b851a520
4 changed files with 668 additions and 637 deletions

File diff suppressed because it is too large Load diff

View file

@ -1,17 +1,23 @@
% 1. start file proxy supervisor
% 2. start projection store
% 3. start listener
-module(machi_flu_protocol).
-module(machi_flu_listener).
-behaviour(ranch_protocol).
-export([start_link/4]).
-export([init/4]).
-include("machi.hrl").
-include("machi_pb.hrl").
-include("machi_projection.hrl").
-record(state, {
pb_mode,
high_clnt
high_clnt,
proj_store,
etstab,
epoch_id,
flu_name
}).
-define(SERVER_CMD_READ_TIMEOUT, 600 * 1000).
@ -28,7 +34,7 @@ init(Ref, Socket, Transport, _Opts = []) ->
loop(Socket, Transport, #state{}).
loop(Socket, Transport, S) ->
case Transport:recv(Sock, 0, ?SERVER_CMD_READ_TIMEOUT) of
case Transport:recv(Socket, 0, ?SERVER_CMD_READ_TIMEOUT) of
{ok, Bin} ->
{RespBin, S2} =
case machi_pb:decode_mpb_ll_request(Bin) of
@ -49,7 +55,7 @@ loop(Socket, Transport, S) ->
loop(Socket, Transport, S2);
{error, SockError} ->
lager:error("Socket error ~w", [SockError]),
(catch Transport:close(Socket)),
(catch Transport:close(Socket))
end.
make_high_clnt(#state{high_clnt=undefined}=S) ->
@ -119,27 +125,31 @@ do_pb_ll_request3({low_echo, _BogusEpochID, Msg}, S) ->
{Msg, S};
do_pb_ll_request3({low_auth, _BogusEpochID, _User, _Pass}, S) ->
{-6, S};
do_pb_ll_request3({low_append_chunk, _EpochID, PKey, Prefix, Chunk, CSum_tag,
CSum, ChunkExtra}, S) ->
{do_server_append_chunk(PKey, Prefix, Chunk, CSum_tag, CSum,
ChunkExtra, S), S};
do_pb_ll_request3({low_write_chunk, _EpochID, File, Offset, Chunk, CSum_tag,
CSum}, S) ->
{do_server_write_chunk(File, Offset, Chunk, CSum_tag, CSum, S), S};
do_pb_ll_request3({low_read_chunk, _EpochID, File, Offset, Size, Opts}, S) ->
{do_server_read_chunk(File, Offset, Size, Opts, S), S};
do_pb_ll_request3({low_checksum_list, _EpochID, File}, S) ->
{do_server_checksum_listing(File, S), S};
do_pb_ll_request3({low_list_files, _EpochID}, S) ->
{do_server_list_files(S), S};
do_pb_ll_request3({low_wedge_status, _EpochID}, S) ->
{do_server_wedge_status(S), S};
do_pb_ll_request3({low_delete_migration, _EpochID, File}, S) ->
{do_server_delete_migration(File, S), S};
do_pb_ll_request3({low_trunc_hack, _EpochID, File}, S) ->
{do_server_trunc_hack(File, S), S};
do_pb_ll_request3({low_proj, PCMD}, S) ->
{do_server_proj_request(PCMD, S), S}.
do_pb_ll_request3(Cmd, S) ->
{execute_cmd(Cmd), S}.
%do_pb_ll_request3({low_append_chunk, _EpochID, PKey, Prefix, Chunk, CSum_tag,
% CSum, ChunkExtra}, S) ->
% {do_server_append_chunk(PKey, Prefix, Chunk, CSum_tag, CSum,
% ChunkExtra, S), S};
%do_pb_ll_request3({low_write_chunk, _EpochID, File, Offset, Chunk, CSum_tag,
% CSum}, S) ->
% {do_server_write_chunk(File, Offset, Chunk, CSum_tag, CSum, S), S};
%do_pb_ll_request3({low_read_chunk, _EpochID, File, Offset, Size, Opts}, S) ->
% {do_server_read_chunk(File, Offset, Size, Opts, S), S};
%do_pb_ll_request3({low_checksum_list, _EpochID, File}, S) ->
% {do_server_checksum_listing(File, S), S};
%do_pb_ll_request3({low_list_files, _EpochID}, S) ->
% {do_server_list_files(S), S};
%do_pb_ll_request3({low_wedge_status, _EpochID}, S) ->
% {do_server_wedge_status(S), S};
%do_pb_ll_request3({low_delete_migration, _EpochID, File}, S) ->
% {do_server_delete_migration(File, S), S};
%do_pb_ll_request3({low_trunc_hack, _EpochID, File}, S) ->
% {do_server_trunc_hack(File, S), S};
%do_pb_ll_request3({low_proj, PCMD}, S) ->
% {do_server_proj_request(PCMD, S), S}.
execute_cmd(_Cmd) ->
ok.
do_pb_hl_request(#mpb_request{req_id=ReqID}, #state{pb_mode=low}=S) ->
@ -176,4 +186,5 @@ do_pb_hl_request2({high_list_files}, #state{high_clnt=Clnt}=S) ->
Res = machi_cr_client:list_files(Clnt),
{Res, S}.
wedge_myself(_, _) -> ok.

View file

@ -26,9 +26,9 @@
%% Public API
-export([
start_link/1
start_link/1,
start/1,
stop/0
stop/1
]).
%% gen_server callbacks
@ -67,7 +67,7 @@ handle_call(Req, _From, S) ->
lager:warning("Unexpected call ~p", [Req]),
{reply, unexpected, S}.
handle_info({wedge_myself, EpochId}, S = #state{wedged = true}) ->
handle_info({wedge_myself, _EpochId}, S = #state{wedged = true}) ->
lager:debug("Request to wedge myself, but I'm already wedged. Ignoring."),
{noreply, S};
handle_info({wedge_myself, EpochId}, S = #state{flu_name = N,
@ -78,15 +78,16 @@ handle_info({wedge_myself, EpochId}, S = #state{flu_name = N,
kick_chain_manager(N),
{noreply, S#state{wedged=true}};
handle_info({wedge_state_change, Bool, {NewEpoch, _}}, S = #state{epoch_id = undefined}) ->
handle_info({wedge_state_change, Bool, {NewEpoch, _}},
S = #state{epoch_id = undefined, etstab=Tid}) ->
true = ets:insert(Tid, {epoch, {Bool, NewEpoch}}),
{noreply, S#state{wedged = Bool, epoch_id = NewEpoch}};
handle_info({wedge_state_change, Bool, {NewEpoch, _}},
S = #state{epoch_id = E, etstab = Tid}) when NewEpoch >= E ->
true = ets:insert(Tid, {epoch, {Bool, NewEpoch}}),
{noreply, S#state{wedged = Bool, epoch_id = NewEpoch}};
handle_info(M = {wedge_state_change, Bool, {NewEpoch, _}},
S = #state{epoch_id = E, etstab = Tid}) when NewEpoch < E ->
handle_info(M = {wedge_state_change, _Bool, {NewEpoch, _}},
S = #state{epoch_id = E}) when NewEpoch < E ->
lager:debug("Wedge state change message ~p, but my epoch id is higher (~p). Ignoring.",
[M, E]),
{noreply, S};
@ -134,12 +135,15 @@ dispatch_append(From, Prefix, Chunk, Csum, Extra) ->
try
{ok, Filename, Offset} = machi_flu_file_proxy:append(Pid,
[{client_csum_tag, Tag}, {client_csum, CS}],
Extra, Chunk)
Extra, Chunk),
From ! {assignment, Offset, Filename},
exit(normal)
catch
Type:Reason ->
_Type:Reason ->
lager:error("Could not append chunk to prefix ~p because ~p",
[Prefix, Reason])
[Prefix, Reason]),
exit(Reason)
end.
make_name(N, Suffix) ->
atom_to_list(N) ++ Suffix.

View file

@ -47,7 +47,7 @@
}).
%% This record goes in the ets table where prefix is the key
-record(md, {prefix :: string(),
-record(md, {prefix :: string(), %% either a prefix or a filename
file_proxy_pid :: undefined|pid(),
mref :: undefined|reference(), %% monitor ref for file proxy
current_file :: undefined|string(),
@ -186,32 +186,49 @@ compute_worker(Hash) ->
build_metadata_mgr_name(N) when is_integer(N) ->
list_to_atom("machi_flu_metadata_mgr_" ++ integer_to_list(N)).
get_manager_atom(Prefix) ->
build_metadata_mgr_name(compute_worker(compute_hash(Prefix))).
get_manager_atom(Data) ->
build_metadata_mgr_name(compute_worker(compute_hash(Data))).
lookup_md(Tid, Prefix) ->
case ets:lookup(Tid, Prefix) of
lookup_md(Tid, Data) ->
case ets:lookup(Tid, Data) of
[] -> not_found;
[R] -> R
end.
find_or_create_filename(D, Prefix) ->
N = machi_util:read_max_filenum(D, Prefix),
find_or_create_filename(D, Prefix, #md{ prefix = Prefix, next_file_num = N }).
file_exists(D, F) ->
{_, Path} = machi_util:make_data_filename(D, F),
case file:read_file_info(Path) of
{ok, _Info} -> true;
{error, enoent} -> false;
{error, Reason} ->
lager:error("Probing file information for ~p resulted in ~p", [F, Reason]),
{error, Reason}
end.
find_or_create_filename(D, Data) ->
case file_exists(D, Data) of
true ->
#md{current_file = Data};
false ->
N = machi_util:read_max_filenum(D, Data),
find_or_create_filename(D, Data, #md{ prefix = Data, next_file_num = N })
end.
find_or_create_filename(D, Prefix, R = #md{ current_file = undefined, next_file_num = 0 }) ->
F = make_filename(Prefix, 0),
{F, _N} = make_filename(Prefix, 0),
ok = machi_util:increment_max_filenum(D, Prefix),
find_or_create_filename(D, Prefix, R#md{ current_file = F, next_file_num = 1});
find_or_create_filename(D, Prefix, R = #md{ current_file = undefined, next_file_num = N }) ->
File = find_file(D, Prefix, N),
File1 = case File of
{File1, _} = case File of
not_found -> make_filename(Prefix, N);
_ -> File
_ -> {File, 0}
end,
{_, Path} = machi_util:make_data_filename(D, File1),
F = maybe_make_new_file(File1, Prefix, N, file:read_file_info(Path)),
R#md{ current_file = F }.
{F, NewN} = maybe_make_new_file(D, File1, Prefix, N, file:read_file_info(Path)),
R#md{ current_file = F, next_file_num = NewN };
find_or_create_filename(_D, _Prefix, R = #md{ current_file = _F }) ->
R.
start_file_proxy(D, Prefix) ->
start_file_proxy(D, Prefix, find_or_create_filename(D, Prefix)).
@ -225,26 +242,29 @@ start_file_proxy(_D, _Prefix, R = #md{ file_proxy_pid = _Pid }) ->
find_file(D, Prefix, N) ->
{_, Path} = machi_util:make_data_filename(D, Prefix, "*", N),
lager:debug("Search path: ~p", [Path]),
case filelib:wildcard(Path) of
[] -> not_found;
[F] -> F;
[F|_Fs] -> F %% XXX FIXME: What to do when there's more than one match?
%% Arbitrarily pick the head for now, I guess.
L = [_|_] -> lists:last(L) %% XXX FIXME: What to do when there's more than one match?
%% Arbitrarily pick the last file for now, I guess.
end.
maybe_make_new_file(F, Prefix, N, {ok, #file_info{ size = S }}) when S >= ?MAX_FILE_SIZE ->
lager:info("~p is larger than ~p. Starting new file.", [F, ?MAX_FILE_SIZE]),
make_filename(Prefix, N);
maybe_make_new_file(F, Prefix, N, Err = {error, _Reason}) ->
maybe_make_new_file(D, F, Prefix, N, {ok, #file_info{ size = S }}) when S >= ?MAX_FILE_SIZE ->
lager:info("~p is larger than ~p (~p). Starting new file.", [F, ?MAX_FILE_SIZE, S]),
ok = machi_util:increment_max_filenum(D, Prefix),
make_filename(Prefix, N+1);
maybe_make_new_file(D, F, Prefix, N, Err = {error, _Reason}) ->
lager:error("When reading file information about ~p, got ~p! Going to use new file",
[F, Err]),
make_filename(Prefix, N);
maybe_make_new_file(F, _Prefix, _N, _Info) ->
F.
ok = machi_util:increment_max_filenum(D, Prefix),
make_filename(Prefix, N+1);
maybe_make_new_file(_D, F, _Prefix, N, _Info) ->
{F, N}.
make_filename(Prefix, N) ->
{F, _} = machi_util:make_data_filename("", Prefix, something(), N),
F.
{F, N+1}.
%% XXX FIXME: Might just be time to generate UUIDs
something() ->