From 52b851a5204f1212b362cc87e237be2113bdda8b Mon Sep 17 00:00:00 2001 From: Mark Allen Date: Wed, 23 Sep 2015 13:56:59 -0500 Subject: [PATCH] WIP. It's a hot mess. --- src/machi_flu1.erl | 1156 ++++++++++++++++---------------- src/machi_flu_listener.erl | 61 +- src/machi_flu_manager.erl | 22 +- src/machi_flu_metadata_mgr.erl | 66 +- 4 files changed, 668 insertions(+), 637 deletions(-) diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index a097704..06c76a7 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -64,583 +64,579 @@ %% replication/chain repair. -module(machi_flu1). - --include_lib("kernel/include/file.hrl"). - --include("machi.hrl"). --include("machi_pb.hrl"). --include("machi_projection.hrl"). --define(V(X,Y), ok). -%% -include("machi_verbose.hrl"). - --ifdef(TEST). --include_lib("eunit/include/eunit.hrl"). --endif. % TEST - - --export([start_link/1, stop/1, - update_wedge_state/3, wedge_myself/2]). --export([make_listener_regname/1, make_projection_server_regname/1]). --export([encode_csum_file_entry/3, encode_csum_file_entry_bin/3, - decode_csum_file_entry/1, - split_checksum_list_blob/1, split_checksum_list_blob_decode/1]). - --record(state, { - flu_name :: atom(), - proj_store :: pid(), - append_pid :: pid(), - tcp_port :: non_neg_integer(), - data_dir :: string(), - wedged = true :: boolean(), - etstab :: ets:tid(), - epoch_id :: 'undefined' | machi_dt:epoch_id(), - pb_mode = undefined :: 'undefined' | 'high' | 'low', - high_clnt :: 'undefined' | pid(), - dbg_props = [] :: list(), % proplist - props = [] :: list(), % proplist - proxies = orddict:new() :: orddict() - }). - --record(http_goop, { - len, % content-length - x_csum % x-checksum - }). - -start_link([{FluName, TcpPort, DataDir}|Rest]) - when is_atom(FluName), is_integer(TcpPort), is_list(DataDir) -> - gen_server:start_link({local, - {ok, spawn_link(fun() -> main2(FluName, TcpPort, DataDir, Rest) end)}. - -stop(Pid) -> - case erlang:is_process_alive(Pid) of - true -> - Pid ! killme, - ok; - false -> - error - end. - -update_wedge_state(PidSpec, Boolean, EpochId) - when (Boolean == true orelse Boolean == false), is_tuple(EpochId) -> - PidSpec ! {wedge_state_change, Boolean, EpochId}. - -wedge_myself(PidSpec, EpochId) - when is_tuple(EpochId) -> - PidSpec ! {wedge_myself, EpochId}. - -%%%%%%%%%%%%%%%%%%%%%%%%%%%% - -ets_table_name(FluName) when is_atom(FluName) -> - list_to_atom(atom_to_list(FluName) ++ "_epoch"). -%% ets_table_name(FluName) when is_binary(FluName) -> -%% list_to_atom(binary_to_list(FluName) ++ "_epoch"). - -main2(FluName, TcpPort, DataDir, Rest) -> - {Props, DbgProps} = case proplists:get_value(dbg, Rest) of - undefined -> - {Rest, []}; - DPs -> - {lists:keydelete(dbg, 1, Rest), DPs} - end, - {SendAppendPidToProj_p, ProjectionPid} = - case proplists:get_value(projection_store_registered_name, Rest) of - undefined -> - RN = make_projection_server_regname(FluName), - {ok, PP} = - machi_projection_store:start_link(RN, DataDir, undefined), - {true, PP}; - RN -> - {false, whereis(RN)} - end, - InitialWedged_p = proplists:get_value(initial_wedged, DbgProps), - ProjRes = machi_projection_store:read_latest_projection(ProjectionPid, - private), - {Wedged_p, EpochId} = - if InitialWedged_p == undefined, - is_tuple(ProjRes), element(1, ProjRes) == ok -> - {ok, Proj} = ProjRes, - {false, {Proj#projection_v1.epoch_number, - Proj#projection_v1.epoch_csum}}; - InitialWedged_p == false -> - {false, ?DUMMY_PV1_EPOCH}; - true -> - {true, undefined} - end, - S0 = #state{flu_name=FluName, - proj_store=ProjectionPid, - tcp_port=TcpPort, - data_dir=DataDir, - wedged=Wedged_p, - etstab=ets_table_name(FluName), - epoch_id=EpochId, - dbg_props=DbgProps, - props=Props}, - AppendPid = start_append_server(S0, self()), - receive - append_server_ack -> ok - end, - if SendAppendPidToProj_p -> - machi_projection_store:set_wedge_notify_pid(ProjectionPid, - AppendPid); - true -> - ok - end, - S1 = S0#state{append_pid=AppendPid}, - ListenPid = start_listen_server(S1), - - Config_e = machi_util:make_config_filename(DataDir, "unused"), - ok = filelib:ensure_dir(Config_e), - {_, Data_e} = machi_util:make_data_filename(DataDir, "unused"), - ok = filelib:ensure_dir(Data_e), - Projection_e = machi_util:make_projection_filename(DataDir, "unused"), - ok = filelib:ensure_dir(Projection_e), - - put(flu_flu_name, FluName), - put(flu_append_pid, AppendPid), - put(flu_projection_pid, ProjectionPid), - put(flu_listen_pid, ListenPid), - receive killme -> ok end, - (catch exit(AppendPid, kill)), - (catch exit(ProjectionPid, kill)), - (catch exit(ListenPid, kill)), - ok. - - - - - -do_server_proj_request({get_latest_epochid, ProjType}, - #state{proj_store=ProjStore}) -> - machi_projection_store:get_latest_epochid(ProjStore, ProjType); -do_server_proj_request({read_latest_projection, ProjType}, - #state{proj_store=ProjStore}) -> - machi_projection_store:read_latest_projection(ProjStore, ProjType); -do_server_proj_request({read_projection, ProjType, Epoch}, - #state{proj_store=ProjStore}) -> - machi_projection_store:read(ProjStore, ProjType, Epoch); -do_server_proj_request({write_projection, ProjType, Proj}, - #state{proj_store=ProjStore}) -> - machi_projection_store:write(ProjStore, ProjType, Proj); -do_server_proj_request({get_all_projections, ProjType}, - #state{proj_store=ProjStore}) -> - machi_projection_store:get_all_projections(ProjStore, ProjType); -do_server_proj_request({list_all_projections, ProjType}, - #state{proj_store=ProjStore}) -> - machi_projection_store:list_all_projections(ProjStore, ProjType); -do_server_proj_request({kick_projection_reaction}, - #state{flu_name=FluName}) -> - %% Tell my chain manager that it might want to react to - %% this new world. - Chmgr = machi_chain_manager1:make_chmgr_regname(FluName), - spawn(fun() -> - catch machi_chain_manager1:trigger_react_to_env(Chmgr) - end), - async_no_response. - -start_seq_append_server(Prefix, EpochID, DataDir, AppendServerPid) -> - proc_lib:spawn_link(fun() -> - %% The following is only necessary to - %% make nice process relationships in - %% 'appmon' and related tools. - put('$ancestors', [AppendServerPid]), - put('$initial_call', {x,y,3}), - link(AppendServerPid), - run_seq_append_server(Prefix, EpochID, DataDir) - end). - -run_seq_append_server(Prefix, EpochID, DataDir) -> - true = register(machi_util:make_regname(Prefix), self()), - run_seq_append_server2(Prefix, EpochID, DataDir). - -run_seq_append_server2(Prefix, EpochID, DataDir) -> - FileNum = machi_util:read_max_filenum(DataDir, Prefix) + 1, - case machi_util:increment_max_filenum(DataDir, Prefix) of - ok -> - machi_util:info_msg("start: ~p server at file ~w\n", - [Prefix, FileNum]), - seq_append_server_loop(DataDir, Prefix, EpochID, FileNum); - Else -> - error_logger:error_msg("start: ~p server at file ~w: ~p\n", - [Prefix, FileNum, Else]), - exit(Else) - - end. - --spec seq_name_hack() -> string(). -seq_name_hack() -> - lists:flatten(io_lib:format("~.36B~.36B", - [element(3,now()), - list_to_integer(os:getpid())])). - -seq_append_server_loop(DataDir, Prefix, EpochID, FileNum) -> - SequencerNameHack = seq_name_hack(), - {File, FullPath} = machi_util:make_data_filename( - DataDir, Prefix, SequencerNameHack, FileNum), - {ok, FHd} = file:open(FullPath, - [read, write, raw, binary]), - CSumPath = machi_util:make_checksum_filename( - DataDir, Prefix, SequencerNameHack, FileNum), - {ok, FHc} = file:open(CSumPath, [append, raw, binary]), - seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}, EpochID, FileNum, - ?MINIMUM_OFFSET). - -seq_append_server_loop(DataDir, Prefix, _File, {FHd,FHc}, EpochID, - FileNum, Offset) - when Offset > ?MAX_FILE_SIZE -> - ok = file:close(FHd), - ok = file:close(FHc), - machi_util:info_msg("rollover: ~p server at file ~w offset ~w\n", - [Prefix, FileNum, Offset]), - run_seq_append_server2(Prefix, EpochID, DataDir); -seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}=FH_, EpochID, - FileNum, Offset) -> - receive - {seq_append, From, Prefix, Chunk, TaggedCSum, Extra, R_EpochID} - when R_EpochID == EpochID -> - if Chunk /= <<>> -> - %% Do we want better error handling here than just a bad match crash? - %% Does the error tuple need to propagate to somewhere? - ok = try_write_position(FHd, Offset, Chunk); - true -> - ok - end, - From ! {assignment, Offset, File}, - Size = iolist_size(Chunk), - CSum_info = encode_csum_file_entry(Offset, Size, TaggedCSum), - ok = file:write(FHc, CSum_info), - seq_append_server_loop(DataDir, Prefix, File, FH_, EpochID, - FileNum, Offset + Size + Extra); - {seq_append, _From, _Prefix, _Chunk, _TCSum, _Extra, R_EpochID}=MSG -> - %% Rare'ish event: send MSG to myself so it doesn't get lost - %% while we recurse around to pick up a new FileNum. - self() ! MSG, - machi_util:info_msg("rollover: ~p server at file ~w offset ~w " - "by new epoch_id ~W\n", - [Prefix, FileNum, Offset, R_EpochID, 8]), - run_seq_append_server2(Prefix, R_EpochID, DataDir); - {sync_stuff, FromPid, Ref} -> - file:sync(FHc), - FromPid ! {sync_finished, Ref}, - seq_append_server_loop(DataDir, Prefix, File, FH_, EpochID, - FileNum, Offset) - after 30*1000 -> - ok = file:close(FHd), - ok = file:close(FHc), - machi_util:info_msg("stop: ~p server ~p at file ~w offset ~w\n", - [Prefix, self(), FileNum, Offset]), - exit(normal) - end. - -try_write_position(FHd, Offset, Chunk) -> - ok = case file:pread(FHd, Offset, 1) of %% one byte should be enough right? - eof -> - ok; - {ok, _} -> - {error, error_written}; - {error, Reason} -> - {error, Reason} - end, - ok = file:pwrite(FHd, Offset, Chunk), - ok. - -make_listener_regname(BaseName) -> - list_to_atom(atom_to_list(BaseName) ++ "_listener"). - -%% This is the name of the projection store that is spawned by the -%% *flu*, for use primarily in testing scenarios. In normal use, we -%% ought to be using the OTP style of managing processes, via -%% supervisors, namely via machi_flu_psup.erl, which uses a -%% *different* naming convention for the projection store name that it -%% registers. - -make_projection_server_regname(BaseName) -> - list_to_atom(atom_to_list(BaseName) ++ "_pstore2"). - - -%% @doc Encode `Offset + Size + TaggedCSum' into an `iolist()' type for -%% internal storage by the FLU. - --spec encode_csum_file_entry( - machi_dt:file_offset(), machi_dt:chunk_size(), machi_dt:chunk_s()) -> - iolist(). -encode_csum_file_entry(Offset, Size, TaggedCSum) -> - Len = 8 + 4 + byte_size(TaggedCSum), - [<>, - TaggedCSum]. - -%% @doc Encode `Offset + Size + TaggedCSum' into an `binary()' type for -%% internal storage by the FLU. - --spec encode_csum_file_entry_bin( - machi_dt:file_offset(), machi_dt:chunk_size(), machi_dt:chunk_s()) -> - binary(). -encode_csum_file_entry_bin(Offset, Size, TaggedCSum) -> - Len = 8 + 4 + byte_size(TaggedCSum), - <>. - -%% @doc Decode a single `binary()' blob into an -%% `{Offset,Size,TaggedCSum}' tuple. -%% -%% The internal encoding (which is currently exposed to the outside world -%% via this function and related ones) is: -%% -%%
    -%%
  • 1 byte: record length -%%
  • -%%
  • 8 bytes (unsigned big-endian): byte offset -%%
  • -%%
  • 4 bytes (unsigned big-endian): chunk size -%%
  • -%%
  • all remaining bytes: tagged checksum (1st byte = type tag) -%%
  • -%%
-%% -%% See `machi.hrl' for the tagged checksum types, e.g., -%% `?CSUM_TAG_NONE'. - --spec decode_csum_file_entry(binary()) -> - error | - {machi_dt:file_offset(), machi_dt:chunk_size(), machi_dt:chunk_s()}. -decode_csum_file_entry(<<_:8/unsigned-big, Offset:64/unsigned-big, Size:32/unsigned-big, TaggedCSum/binary>>) -> - {Offset, Size, TaggedCSum}; -decode_csum_file_entry(_Else) -> - error. - -%% @doc Split a `binary()' blob of `checksum_list' data into a list of -%% unparsed `binary()' blobs, one per entry. -%% -%% Decode the unparsed blobs with {@link decode_csum_file_entry/1}, if -%% desired. -%% -%% The return value `TrailingJunk' is unparseable bytes at the end of -%% the checksum list blob. - --spec split_checksum_list_blob(binary()) -> - {list(binary()), TrailingJunk::binary()}. -split_checksum_list_blob(Bin) -> - split_checksum_list_blob(Bin, []). - -split_checksum_list_blob(<>, Acc)-> - case get(hack_length) of - Len -> ok; - _ -> put(hack_different, true) - end, - split_checksum_list_blob(Rest, [<>|Acc]); -split_checksum_list_blob(Rest, Acc) -> - {lists:reverse(Acc), Rest}. - -%% @doc Split a `binary()' blob of `checksum_list' data into a list of -%% `{Offset,Size,TaggedCSum}' tuples. - --spec split_checksum_list_blob_decode(binary()) -> - {list({machi_dt:file_offset(), machi_dt:chunk_size(), machi_dt:chunk_s()}), - TrailingJunk::binary()}. -split_checksum_list_blob_decode(Bin) -> - split_checksum_list_blob_decode(Bin, []). - -split_checksum_list_blob_decode(<>, Acc)-> - One = <>, - case decode_csum_file_entry(One) of - error -> - split_checksum_list_blob_decode(Rest, Acc); - DecOne -> - split_checksum_list_blob_decode(Rest, [DecOne|Acc]) - end; -split_checksum_list_blob_decode(Rest, Acc) -> - {lists:reverse(Acc), Rest}. - -check_or_make_tagged_checksum(?CSUM_TAG_NONE, _Client_CSum, Chunk) -> - %% TODO: If the client was foolish enough to use - %% this type of non-checksum, then the client gets - %% what it deserves wrt data integrity, alas. In - %% the client-side Chain Replication method, each - %% server will calculated this independently, which - %% isn't exactly what ought to happen for best data - %% integrity checking. In server-side CR, the csum - %% should be calculated by the head and passed down - %% the chain together with the value. - CS = machi_util:checksum_chunk(Chunk), - machi_util:make_tagged_csum(server_sha, CS); -check_or_make_tagged_checksum(?CSUM_TAG_CLIENT_SHA, Client_CSum, Chunk) -> - CS = machi_util:checksum_chunk(Chunk), - if CS == Client_CSum -> - machi_util:make_tagged_csum(server_sha, - Client_CSum); - true -> - throw({bad_csum, CS}) - end. - --ifdef(TEST). - -%% Remove "_COMMENTED" string to run the demo/exploratory code. - -timing_demo_test_COMMENTED_() -> - {timeout, 300, fun() -> timing_demo_test2() end}. - -%% Demo/exploratory hackery to check relative speeds of dealing with -%% checksum data in different ways. -%% -%% Summary: -%% -%% * Use compact binary encoding, with 1 byte header for entry length. -%% * Because the hex-style code is *far* slower just for enc & dec ops. -%% * For 1M entries of enc+dec: 0.215 sec vs. 15.5 sec. -%% * File sorter when sorting binaries as-is is only 30-40% slower -%% than an in-memory split (of huge binary emulated by file:read_file() -%% "big slurp") and sort of the same as-is sortable binaries. -%% * File sorter slows by a factor of about 2.5 if {order, fun compare/2} -%% function must be used, i.e. because the checksum entry lengths differ. -%% * File sorter + {order, fun compare/2} is still *far* faster than external -%% sort by OS X's sort(1) of sortable ASCII hex-style: -%% 4.5 sec vs. 21 sec. -%% * File sorter {order, fun compare/2} is faster than in-memory sort -%% of order-friendly 3-tuple-style: 4.5 sec vs. 15 sec. - -timing_demo_test2() -> - Xs = [random:uniform(1 bsl 32) || _ <- lists:duplicate(1*1000*1000, $x)], - CSum = <<"123456789abcdef0A">>, - 17 = byte_size(CSum), - io:format(user, "\n", []), - - %% %% {ok, ZZZ} = file:open("/tmp/foo.hex-style", [write, binary, raw, delayed_write]), - io:format(user, "Hex-style file entry enc+dec: ", []), - [erlang:garbage_collect(self()) || _ <- lists:seq(1, 4)], - {HexUSec, _} = - timer:tc(fun() -> - lists:foldl(fun(X, _) -> - B = encode_csum_file_entry_hex(X, 100, CSum), - %% file:write(ZZZ, [B, 10]), - decode_csum_file_entry_hex(list_to_binary(B)) - end, x, Xs) - end), - io:format(user, "~.3f sec\n", [HexUSec / 1000000]), - %% %% file:close(ZZZ), - - io:format(user, "Not-sortable file entry enc+dec: ", []), - [erlang:garbage_collect(self()) || _ <- lists:seq(1, 4)], - {NotSortedUSec, _} = - timer:tc(fun() -> - lists:foldl(fun(X, _) -> - B = encode_csum_file_entry(X, 100, CSum), - decode_csum_file_entry(list_to_binary(B)) - end, x, Xs) - end), - io:format(user, "~.3f sec\n", [NotSortedUSec / 1000000]), - - NotHexList = lists:foldl(fun(X, Acc) -> - B = encode_csum_file_entry(X, 100, CSum), - [B|Acc] - end, [], Xs), - NotHexBin = iolist_to_binary(NotHexList), - - io:format(user, "Split NotHexBin: ", []), - [erlang:garbage_collect(self()) || _ <- lists:seq(1, 4)], - {NotHexBinUSec, SplitRes} = - timer:tc(fun() -> - put(hack_length, 29), - put(hack_different, false), - {Sorted, _Leftover} = split_checksum_list_blob(NotHexBin), - io:format(user, " Leftover ~p (hack_different ~p) ", [_Leftover, get(hack_different)]), - Sorted - end), - io:format(user, "~.3f sec\n", [NotHexBinUSec / 1000000]), - - io:format(user, "Sort Split results: ", []), - [erlang:garbage_collect(self()) || _ <- lists:seq(1, 4)], - {SortSplitUSec, _} = - timer:tc(fun() -> - lists:sort(SplitRes) - %% lists:sort(fun sort_2lines/2, SplitRes) - end), - io:format(user, "~.3f sec\n", [SortSplitUSec / 1000000]), - - UnsortedName = "/tmp/foo.unsorted", - SortedName = "/tmp/foo.sorted", - - ok = file:write_file(UnsortedName, NotHexList), - io:format(user, "File Sort Split results: ", []), - [erlang:garbage_collect(self()) || _ <- lists:seq(1, 4)], - {FileSortUSec, _} = - timer:tc(fun() -> - {ok, FHin} = file:open(UnsortedName, [read, binary]), - {ok, FHout} = file:open(SortedName, - [write, binary, delayed_write]), - put(hack_sorter_sha_ctx, crypto:hash_init(sha)), - ok = file_sorter:sort(sort_input_fun(FHin, <<>>), - sort_output_fun(FHout), - [{format,binary}, - {header, 1} - %% , {order, fun sort_2lines/2} - ]) - end), - io:format(user, "~.3f sec\n", [FileSortUSec / 1000000]), - _SHA = crypto:hash_final(get(hack_sorter_sha_ctx)), - %% io:format(user, "SHA via (hack_sorter_sha_ctx) = ~p\n", [_SHA]), - - io:format(user, "NotHex-Not-sortable tuple list creation: ", []), - [erlang:garbage_collect(self()) || _ <- lists:seq(1, 4)], - {NotHexTupleCreationUSec, NotHexTupleList} = - timer:tc(fun() -> - lists:foldl(fun(X, Acc) -> - B = encode_csum_file_entry_hex( - X, 100, CSum), - [B|Acc] - end, [], Xs) - end), - io:format(user, "~.3f sec\n", [NotHexTupleCreationUSec / 1000000]), - - io:format(user, "NotHex-Not-sortable tuple list sort: ", []), - [erlang:garbage_collect(self()) || _ <- lists:seq(1, 4)], - {NotHexTupleSortUSec, _} = - timer:tc(fun() -> - lists:sort(NotHexTupleList) - end), - io:format(user, "~.3f sec\n", [NotHexTupleSortUSec / 1000000]), - - ok. - -sort_2lines(<<_:1/binary, A/binary>>, <<_:1/binary, B/binary>>) -> - A < B. - -sort_input_fun(FH, PrevStuff) -> - fun(close) -> - ok; - (read) -> - case file:read(FH, 1024*1024) of - {ok, NewStuff} -> - AllStuff = if PrevStuff == <<>> -> - NewStuff; - true -> - <> - end, - {SplitRes, Leftover} = split_checksum_list_blob(AllStuff), - {SplitRes, sort_input_fun(FH, Leftover)}; - eof -> - end_of_input - end - end. - -sort_output_fun(FH) -> - fun(close) -> - file:close(FH); - (Stuff) -> - Ctx = get(hack_sorter_sha_ctx), - put(hack_sorter_sha_ctx, crypto:hash_update(Ctx, Stuff)), - ok = file:write(FH, Stuff), - sort_output_fun(FH) - end. - -encode_csum_file_entry_hex(Offset, Size, TaggedCSum) -> - OffsetHex = machi_util:bin_to_hexstr(<>), - SizeHex = machi_util:bin_to_hexstr(<>), - CSumHex = machi_util:bin_to_hexstr(TaggedCSum), - [OffsetHex, 32, SizeHex, 32, CSumHex]. - -decode_csum_file_entry_hex(<>) -> - Offset = machi_util:hexstr_to_bin(OffsetHex), - Size = machi_util:hexstr_to_bin(SizeHex), - CSum = machi_util:hexstr_to_bin(CSumHex), - {Offset, Size, CSum}. - --endif. % TEST +% +%-include_lib("kernel/include/file.hrl"). +% +%-include("machi.hrl"). +%-include("machi_pb.hrl"). +%-include("machi_projection.hrl"). +%-define(V(X,Y), ok). +%%% -include("machi_verbose.hrl"). +% +%-ifdef(TEST). +%-include_lib("eunit/include/eunit.hrl"). +%-endif. % TEST +% +% +%-export([start_link/1, stop/1, +% update_wedge_state/3, wedge_myself/2]). +%-export([make_listener_regname/1, make_projection_server_regname/1]). +%-export([encode_csum_file_entry/3, encode_csum_file_entry_bin/3, +% decode_csum_file_entry/1, +% split_checksum_list_blob/1, split_checksum_list_blob_decode/1]). +% +%-record(state, { +% flu_name :: atom(), +% proj_store :: pid(), +% append_pid :: pid(), +% tcp_port :: non_neg_integer(), +% data_dir :: string(), +% wedged = true :: boolean(), +% etstab :: ets:tid(), +% epoch_id :: 'undefined' | machi_dt:epoch_id(), +% pb_mode = undefined :: 'undefined' | 'high' | 'low', +% high_clnt :: 'undefined' | pid(), +% dbg_props = [] :: list(), % proplist +% props = [] :: list() % proplist +% }). +% +%start_link([{FluName, TcpPort, DataDir}|Rest]) +% when is_atom(FluName), is_integer(TcpPort), is_list(DataDir) -> +% {ok, spawn_link(fun() -> main2(FluName, TcpPort, DataDir, Rest) end)}. +% +%stop(Pid) -> +% case erlang:is_process_alive(Pid) of +% true -> +% Pid ! killme, +% ok; +% false -> +% error +% end. +% +%update_wedge_state(PidSpec, Boolean, EpochId) +% when (Boolean == true orelse Boolean == false), is_tuple(EpochId) -> +% PidSpec ! {wedge_state_change, Boolean, EpochId}. +% +%wedge_myself(PidSpec, EpochId) +% when is_tuple(EpochId) -> +% PidSpec ! {wedge_myself, EpochId}. +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +%ets_table_name(FluName) when is_atom(FluName) -> +% list_to_atom(atom_to_list(FluName) ++ "_epoch"). +%%% ets_table_name(FluName) when is_binary(FluName) -> +%%% list_to_atom(binary_to_list(FluName) ++ "_epoch"). +% +%main2(FluName, TcpPort, DataDir, Rest) -> +% {Props, DbgProps} = case proplists:get_value(dbg, Rest) of +% undefined -> +% {Rest, []}; +% DPs -> +% {lists:keydelete(dbg, 1, Rest), DPs} +% end, +% {SendAppendPidToProj_p, ProjectionPid} = +% case proplists:get_value(projection_store_registered_name, Rest) of +% undefined -> +% RN = make_projection_server_regname(FluName), +% {ok, PP} = +% machi_projection_store:start_link(RN, DataDir, undefined), +% {true, PP}; +% RN -> +% {false, whereis(RN)} +% end, +% InitialWedged_p = proplists:get_value(initial_wedged, DbgProps), +% ProjRes = machi_projection_store:read_latest_projection(ProjectionPid, +% private), +% {Wedged_p, EpochId} = +% if InitialWedged_p == undefined, +% is_tuple(ProjRes), element(1, ProjRes) == ok -> +% {ok, Proj} = ProjRes, +% {false, {Proj#projection_v1.epoch_number, +% Proj#projection_v1.epoch_csum}}; +% InitialWedged_p == false -> +% {false, ?DUMMY_PV1_EPOCH}; +% true -> +% {true, undefined} +% end, +% S0 = #state{flu_name=FluName, +% proj_store=ProjectionPid, +% tcp_port=TcpPort, +% data_dir=DataDir, +% wedged=Wedged_p, +% etstab=ets_table_name(FluName), +% epoch_id=EpochId, +% dbg_props=DbgProps, +% props=Props}, +% AppendPid = start_append_server(S0, self()), +% receive +% append_server_ack -> ok +% end, +% if SendAppendPidToProj_p -> +% machi_projection_store:set_wedge_notify_pid(ProjectionPid, +% AppendPid); +% true -> +% ok +% end, +% S1 = S0#state{append_pid=AppendPid}, +% ListenPid = start_listen_server(S1), +% +% Config_e = machi_util:make_config_filename(DataDir, "unused"), +% ok = filelib:ensure_dir(Config_e), +% {_, Data_e} = machi_util:make_data_filename(DataDir, "unused"), +% ok = filelib:ensure_dir(Data_e), +% Projection_e = machi_util:make_projection_filename(DataDir, "unused"), +% ok = filelib:ensure_dir(Projection_e), +% +% put(flu_flu_name, FluName), +% put(flu_append_pid, AppendPid), +% put(flu_projection_pid, ProjectionPid), +% put(flu_listen_pid, ListenPid), +% receive killme -> ok end, +% (catch exit(AppendPid, kill)), +% (catch exit(ProjectionPid, kill)), +% (catch exit(ListenPid, kill)), +% ok. +% +% +% +% +% +%do_server_proj_request({get_latest_epochid, ProjType}, +% #state{proj_store=ProjStore}) -> +% machi_projection_store:get_latest_epochid(ProjStore, ProjType); +%do_server_proj_request({read_latest_projection, ProjType}, +% #state{proj_store=ProjStore}) -> +% machi_projection_store:read_latest_projection(ProjStore, ProjType); +%do_server_proj_request({read_projection, ProjType, Epoch}, +% #state{proj_store=ProjStore}) -> +% machi_projection_store:read(ProjStore, ProjType, Epoch); +%do_server_proj_request({write_projection, ProjType, Proj}, +% #state{proj_store=ProjStore}) -> +% machi_projection_store:write(ProjStore, ProjType, Proj); +%do_server_proj_request({get_all_projections, ProjType}, +% #state{proj_store=ProjStore}) -> +% machi_projection_store:get_all_projections(ProjStore, ProjType); +%do_server_proj_request({list_all_projections, ProjType}, +% #state{proj_store=ProjStore}) -> +% machi_projection_store:list_all_projections(ProjStore, ProjType); +%do_server_proj_request({kick_projection_reaction}, +% #state{flu_name=FluName}) -> +% %% Tell my chain manager that it might want to react to +% %% this new world. +% Chmgr = machi_chain_manager1:make_chmgr_regname(FluName), +% spawn(fun() -> +% catch machi_chain_manager1:trigger_react_to_env(Chmgr) +% end), +% async_no_response. +% +%start_seq_append_server(Prefix, EpochID, DataDir, AppendServerPid) -> +% proc_lib:spawn_link(fun() -> +% %% The following is only necessary to +% %% make nice process relationships in +% %% 'appmon' and related tools. +% put('$ancestors', [AppendServerPid]), +% put('$initial_call', {x,y,3}), +% link(AppendServerPid), +% run_seq_append_server(Prefix, EpochID, DataDir) +% end). +% +%run_seq_append_server(Prefix, EpochID, DataDir) -> +% true = register(machi_util:make_regname(Prefix), self()), +% run_seq_append_server2(Prefix, EpochID, DataDir). +% +%run_seq_append_server2(Prefix, EpochID, DataDir) -> +% FileNum = machi_util:read_max_filenum(DataDir, Prefix) + 1, +% case machi_util:increment_max_filenum(DataDir, Prefix) of +% ok -> +% machi_util:info_msg("start: ~p server at file ~w\n", +% [Prefix, FileNum]), +% seq_append_server_loop(DataDir, Prefix, EpochID, FileNum); +% Else -> +% error_logger:error_msg("start: ~p server at file ~w: ~p\n", +% [Prefix, FileNum, Else]), +% exit(Else) +% +% end. +% +%-spec seq_name_hack() -> string(). +%seq_name_hack() -> +% lists:flatten(io_lib:format("~.36B~.36B", +% [element(3,now()), +% list_to_integer(os:getpid())])). +% +%seq_append_server_loop(DataDir, Prefix, EpochID, FileNum) -> +% SequencerNameHack = seq_name_hack(), +% {File, FullPath} = machi_util:make_data_filename( +% DataDir, Prefix, SequencerNameHack, FileNum), +% {ok, FHd} = file:open(FullPath, +% [read, write, raw, binary]), +% CSumPath = machi_util:make_checksum_filename( +% DataDir, Prefix, SequencerNameHack, FileNum), +% {ok, FHc} = file:open(CSumPath, [append, raw, binary]), +% seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}, EpochID, FileNum, +% ?MINIMUM_OFFSET). +% +%seq_append_server_loop(DataDir, Prefix, _File, {FHd,FHc}, EpochID, +% FileNum, Offset) +% when Offset > ?MAX_FILE_SIZE -> +% ok = file:close(FHd), +% ok = file:close(FHc), +% machi_util:info_msg("rollover: ~p server at file ~w offset ~w\n", +% [Prefix, FileNum, Offset]), +% run_seq_append_server2(Prefix, EpochID, DataDir); +%seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}=FH_, EpochID, +% FileNum, Offset) -> +% receive +% {seq_append, From, Prefix, Chunk, TaggedCSum, Extra, R_EpochID} +% when R_EpochID == EpochID -> +% if Chunk /= <<>> -> +% %% Do we want better error handling here than just a bad match crash? +% %% Does the error tuple need to propagate to somewhere? +% ok = try_write_position(FHd, Offset, Chunk); +% true -> +% ok +% end, +% From ! {assignment, Offset, File}, +% Size = iolist_size(Chunk), +% CSum_info = encode_csum_file_entry(Offset, Size, TaggedCSum), +% ok = file:write(FHc, CSum_info), +% seq_append_server_loop(DataDir, Prefix, File, FH_, EpochID, +% FileNum, Offset + Size + Extra); +% {seq_append, _From, _Prefix, _Chunk, _TCSum, _Extra, R_EpochID}=MSG -> +% %% Rare'ish event: send MSG to myself so it doesn't get lost +% %% while we recurse around to pick up a new FileNum. +% self() ! MSG, +% machi_util:info_msg("rollover: ~p server at file ~w offset ~w " +% "by new epoch_id ~W\n", +% [Prefix, FileNum, Offset, R_EpochID, 8]), +% run_seq_append_server2(Prefix, R_EpochID, DataDir); +% {sync_stuff, FromPid, Ref} -> +% file:sync(FHc), +% FromPid ! {sync_finished, Ref}, +% seq_append_server_loop(DataDir, Prefix, File, FH_, EpochID, +% FileNum, Offset) +% after 30*1000 -> +% ok = file:close(FHd), +% ok = file:close(FHc), +% machi_util:info_msg("stop: ~p server ~p at file ~w offset ~w\n", +% [Prefix, self(), FileNum, Offset]), +% exit(normal) +% end. +% +%try_write_position(FHd, Offset, Chunk) -> +% ok = case file:pread(FHd, Offset, 1) of %% one byte should be enough right? +% eof -> +% ok; +% {ok, _} -> +% {error, error_written}; +% {error, Reason} -> +% {error, Reason} +% end, +% ok = file:pwrite(FHd, Offset, Chunk), +% ok. +% +%make_listener_regname(BaseName) -> +% list_to_atom(atom_to_list(BaseName) ++ "_listener"). +% +%start_append_server(_,_) -> ok. +%start_listen_server(_,_) -> ok. +% +%%% This is the name of the projection store that is spawned by the +%%% *flu*, for use primarily in testing scenarios. In normal use, we +%%% ought to be using the OTP style of managing processes, via +%%% supervisors, namely via machi_flu_psup.erl, which uses a +%%% *different* naming convention for the projection store name that it +%%% registers. +% +%make_projection_server_regname(BaseName) -> +% list_to_atom(atom_to_list(BaseName) ++ "_pstore2"). +% +% +%%% @doc Encode `Offset + Size + TaggedCSum' into an `iolist()' type for +%%% internal storage by the FLU. +% +%-spec encode_csum_file_entry( +% machi_dt:file_offset(), machi_dt:chunk_size(), machi_dt:chunk_s()) -> +% iolist(). +%encode_csum_file_entry(Offset, Size, TaggedCSum) -> +% Len = 8 + 4 + byte_size(TaggedCSum), +% [<>, +% TaggedCSum]. +% +%%% @doc Encode `Offset + Size + TaggedCSum' into an `binary()' type for +%%% internal storage by the FLU. +% +%-spec encode_csum_file_entry_bin( +% machi_dt:file_offset(), machi_dt:chunk_size(), machi_dt:chunk_s()) -> +% binary(). +%encode_csum_file_entry_bin(Offset, Size, TaggedCSum) -> +% Len = 8 + 4 + byte_size(TaggedCSum), +% <>. +% +%%% @doc Decode a single `binary()' blob into an +%%% `{Offset,Size,TaggedCSum}' tuple. +%%% +%%% The internal encoding (which is currently exposed to the outside world +%%% via this function and related ones) is: +%%% +%%%
    +%%%
  • 1 byte: record length +%%%
  • +%%%
  • 8 bytes (unsigned big-endian): byte offset +%%%
  • +%%%
  • 4 bytes (unsigned big-endian): chunk size +%%%
  • +%%%
  • all remaining bytes: tagged checksum (1st byte = type tag) +%%%
  • +%%%
+%%% +%%% See `machi.hrl' for the tagged checksum types, e.g., +%%% `?CSUM_TAG_NONE'. +% +%-spec decode_csum_file_entry(binary()) -> +% error | +% {machi_dt:file_offset(), machi_dt:chunk_size(), machi_dt:chunk_s()}. +%decode_csum_file_entry(<<_:8/unsigned-big, Offset:64/unsigned-big, Size:32/unsigned-big, TaggedCSum/binary>>) -> +% {Offset, Size, TaggedCSum}; +%decode_csum_file_entry(_Else) -> +% error. +% +%%% @doc Split a `binary()' blob of `checksum_list' data into a list of +%%% unparsed `binary()' blobs, one per entry. +%%% +%%% Decode the unparsed blobs with {@link decode_csum_file_entry/1}, if +%%% desired. +%%% +%%% The return value `TrailingJunk' is unparseable bytes at the end of +%%% the checksum list blob. +% +%-spec split_checksum_list_blob(binary()) -> +% {list(binary()), TrailingJunk::binary()}. +%split_checksum_list_blob(Bin) -> +% split_checksum_list_blob(Bin, []). +% +%split_checksum_list_blob(<>, Acc)-> +% case get(hack_length) of +% Len -> ok; +% _ -> put(hack_different, true) +% end, +% split_checksum_list_blob(Rest, [<>|Acc]); +%split_checksum_list_blob(Rest, Acc) -> +% {lists:reverse(Acc), Rest}. +% +%%% @doc Split a `binary()' blob of `checksum_list' data into a list of +%%% `{Offset,Size,TaggedCSum}' tuples. +% +%-spec split_checksum_list_blob_decode(binary()) -> +% {list({machi_dt:file_offset(), machi_dt:chunk_size(), machi_dt:chunk_s()}), +% TrailingJunk::binary()}. +%split_checksum_list_blob_decode(Bin) -> +% split_checksum_list_blob_decode(Bin, []). +% +%split_checksum_list_blob_decode(<>, Acc)-> +% One = <>, +% case decode_csum_file_entry(One) of +% error -> +% split_checksum_list_blob_decode(Rest, Acc); +% DecOne -> +% split_checksum_list_blob_decode(Rest, [DecOne|Acc]) +% end; +%split_checksum_list_blob_decode(Rest, Acc) -> +% {lists:reverse(Acc), Rest}. +% +%check_or_make_tagged_checksum(?CSUM_TAG_NONE, _Client_CSum, Chunk) -> +% %% TODO: If the client was foolish enough to use +% %% this type of non-checksum, then the client gets +% %% what it deserves wrt data integrity, alas. In +% %% the client-side Chain Replication method, each +% %% server will calculated this independently, which +% %% isn't exactly what ought to happen for best data +% %% integrity checking. In server-side CR, the csum +% %% should be calculated by the head and passed down +% %% the chain together with the value. +% CS = machi_util:checksum_chunk(Chunk), +% machi_util:make_tagged_csum(server_sha, CS); +%check_or_make_tagged_checksum(?CSUM_TAG_CLIENT_SHA, Client_CSum, Chunk) -> +% CS = machi_util:checksum_chunk(Chunk), +% if CS == Client_CSum -> +% machi_util:make_tagged_csum(server_sha, +% Client_CSum); +% true -> +% throw({bad_csum, CS}) +% end. +% +%-ifdef(TEST). +% +%%% Remove "_COMMENTED" string to run the demo/exploratory code. +% +%timing_demo_test_COMMENTED_() -> +% {timeout, 300, fun() -> timing_demo_test2() end}. +% +%%% Demo/exploratory hackery to check relative speeds of dealing with +%%% checksum data in different ways. +%%% +%%% Summary: +%%% +%%% * Use compact binary encoding, with 1 byte header for entry length. +%%% * Because the hex-style code is *far* slower just for enc & dec ops. +%%% * For 1M entries of enc+dec: 0.215 sec vs. 15.5 sec. +%%% * File sorter when sorting binaries as-is is only 30-40% slower +%%% than an in-memory split (of huge binary emulated by file:read_file() +%%% "big slurp") and sort of the same as-is sortable binaries. +%%% * File sorter slows by a factor of about 2.5 if {order, fun compare/2} +%%% function must be used, i.e. because the checksum entry lengths differ. +%%% * File sorter + {order, fun compare/2} is still *far* faster than external +%%% sort by OS X's sort(1) of sortable ASCII hex-style: +%%% 4.5 sec vs. 21 sec. +%%% * File sorter {order, fun compare/2} is faster than in-memory sort +%%% of order-friendly 3-tuple-style: 4.5 sec vs. 15 sec. +% +%timing_demo_test2() -> +% Xs = [random:uniform(1 bsl 32) || _ <- lists:duplicate(1*1000*1000, $x)], +% CSum = <<"123456789abcdef0A">>, +% 17 = byte_size(CSum), +% io:format(user, "\n", []), +% +% %% %% {ok, ZZZ} = file:open("/tmp/foo.hex-style", [write, binary, raw, delayed_write]), +% io:format(user, "Hex-style file entry enc+dec: ", []), +% [erlang:garbage_collect(self()) || _ <- lists:seq(1, 4)], +% {HexUSec, _} = +% timer:tc(fun() -> +% lists:foldl(fun(X, _) -> +% B = encode_csum_file_entry_hex(X, 100, CSum), +% %% file:write(ZZZ, [B, 10]), +% decode_csum_file_entry_hex(list_to_binary(B)) +% end, x, Xs) +% end), +% io:format(user, "~.3f sec\n", [HexUSec / 1000000]), +% %% %% file:close(ZZZ), +% +% io:format(user, "Not-sortable file entry enc+dec: ", []), +% [erlang:garbage_collect(self()) || _ <- lists:seq(1, 4)], +% {NotSortedUSec, _} = +% timer:tc(fun() -> +% lists:foldl(fun(X, _) -> +% B = encode_csum_file_entry(X, 100, CSum), +% decode_csum_file_entry(list_to_binary(B)) +% end, x, Xs) +% end), +% io:format(user, "~.3f sec\n", [NotSortedUSec / 1000000]), +% +% NotHexList = lists:foldl(fun(X, Acc) -> +% B = encode_csum_file_entry(X, 100, CSum), +% [B|Acc] +% end, [], Xs), +% NotHexBin = iolist_to_binary(NotHexList), +% +% io:format(user, "Split NotHexBin: ", []), +% [erlang:garbage_collect(self()) || _ <- lists:seq(1, 4)], +% {NotHexBinUSec, SplitRes} = +% timer:tc(fun() -> +% put(hack_length, 29), +% put(hack_different, false), +% {Sorted, _Leftover} = split_checksum_list_blob(NotHexBin), +% io:format(user, " Leftover ~p (hack_different ~p) ", [_Leftover, get(hack_different)]), +% Sorted +% end), +% io:format(user, "~.3f sec\n", [NotHexBinUSec / 1000000]), +% +% io:format(user, "Sort Split results: ", []), +% [erlang:garbage_collect(self()) || _ <- lists:seq(1, 4)], +% {SortSplitUSec, _} = +% timer:tc(fun() -> +% lists:sort(SplitRes) +% %% lists:sort(fun sort_2lines/2, SplitRes) +% end), +% io:format(user, "~.3f sec\n", [SortSplitUSec / 1000000]), +% +% UnsortedName = "/tmp/foo.unsorted", +% SortedName = "/tmp/foo.sorted", +% +% ok = file:write_file(UnsortedName, NotHexList), +% io:format(user, "File Sort Split results: ", []), +% [erlang:garbage_collect(self()) || _ <- lists:seq(1, 4)], +% {FileSortUSec, _} = +% timer:tc(fun() -> +% {ok, FHin} = file:open(UnsortedName, [read, binary]), +% {ok, FHout} = file:open(SortedName, +% [write, binary, delayed_write]), +% put(hack_sorter_sha_ctx, crypto:hash_init(sha)), +% ok = file_sorter:sort(sort_input_fun(FHin, <<>>), +% sort_output_fun(FHout), +% [{format,binary}, +% {header, 1} +% %% , {order, fun sort_2lines/2} +% ]) +% end), +% io:format(user, "~.3f sec\n", [FileSortUSec / 1000000]), +% _SHA = crypto:hash_final(get(hack_sorter_sha_ctx)), +% %% io:format(user, "SHA via (hack_sorter_sha_ctx) = ~p\n", [_SHA]), +% +% io:format(user, "NotHex-Not-sortable tuple list creation: ", []), +% [erlang:garbage_collect(self()) || _ <- lists:seq(1, 4)], +% {NotHexTupleCreationUSec, NotHexTupleList} = +% timer:tc(fun() -> +% lists:foldl(fun(X, Acc) -> +% B = encode_csum_file_entry_hex( +% X, 100, CSum), +% [B|Acc] +% end, [], Xs) +% end), +% io:format(user, "~.3f sec\n", [NotHexTupleCreationUSec / 1000000]), +% +% io:format(user, "NotHex-Not-sortable tuple list sort: ", []), +% [erlang:garbage_collect(self()) || _ <- lists:seq(1, 4)], +% {NotHexTupleSortUSec, _} = +% timer:tc(fun() -> +% lists:sort(NotHexTupleList) +% end), +% io:format(user, "~.3f sec\n", [NotHexTupleSortUSec / 1000000]), +% +% ok. +% +%sort_2lines(<<_:1/binary, A/binary>>, <<_:1/binary, B/binary>>) -> +% A < B. +% +%sort_input_fun(FH, PrevStuff) -> +% fun(close) -> +% ok; +% (read) -> +% case file:read(FH, 1024*1024) of +% {ok, NewStuff} -> +% AllStuff = if PrevStuff == <<>> -> +% NewStuff; +% true -> +% <> +% end, +% {SplitRes, Leftover} = split_checksum_list_blob(AllStuff), +% {SplitRes, sort_input_fun(FH, Leftover)}; +% eof -> +% end_of_input +% end +% end. +% +%sort_output_fun(FH) -> +% fun(close) -> +% file:close(FH); +% (Stuff) -> +% Ctx = get(hack_sorter_sha_ctx), +% put(hack_sorter_sha_ctx, crypto:hash_update(Ctx, Stuff)), +% ok = file:write(FH, Stuff), +% sort_output_fun(FH) +% end. +% +%encode_csum_file_entry_hex(Offset, Size, TaggedCSum) -> +% OffsetHex = machi_util:bin_to_hexstr(<>), +% SizeHex = machi_util:bin_to_hexstr(<>), +% CSumHex = machi_util:bin_to_hexstr(TaggedCSum), +% [OffsetHex, 32, SizeHex, 32, CSumHex]. +% +%decode_csum_file_entry_hex(<>) -> +% Offset = machi_util:hexstr_to_bin(OffsetHex), +% Size = machi_util:hexstr_to_bin(SizeHex), +% CSum = machi_util:hexstr_to_bin(CSumHex), +% {Offset, Size, CSum}. +% +%-endif. % TEST diff --git a/src/machi_flu_listener.erl b/src/machi_flu_listener.erl index b9ef5cd..6f45822 100644 --- a/src/machi_flu_listener.erl +++ b/src/machi_flu_listener.erl @@ -1,17 +1,23 @@ % 1. start file proxy supervisor % 2. start projection store % 3. start listener --module(machi_flu_protocol). +-module(machi_flu_listener). -behaviour(ranch_protocol). -export([start_link/4]). -export([init/4]). -include("machi.hrl"). +-include("machi_pb.hrl"). +-include("machi_projection.hrl"). -record(state, { pb_mode, - high_clnt + high_clnt, + proj_store, + etstab, + epoch_id, + flu_name }). -define(SERVER_CMD_READ_TIMEOUT, 600 * 1000). @@ -28,7 +34,7 @@ init(Ref, Socket, Transport, _Opts = []) -> loop(Socket, Transport, #state{}). loop(Socket, Transport, S) -> - case Transport:recv(Sock, 0, ?SERVER_CMD_READ_TIMEOUT) of + case Transport:recv(Socket, 0, ?SERVER_CMD_READ_TIMEOUT) of {ok, Bin} -> {RespBin, S2} = case machi_pb:decode_mpb_ll_request(Bin) of @@ -49,7 +55,7 @@ loop(Socket, Transport, S) -> loop(Socket, Transport, S2); {error, SockError} -> lager:error("Socket error ~w", [SockError]), - (catch Transport:close(Socket)), + (catch Transport:close(Socket)) end. make_high_clnt(#state{high_clnt=undefined}=S) -> @@ -119,27 +125,31 @@ do_pb_ll_request3({low_echo, _BogusEpochID, Msg}, S) -> {Msg, S}; do_pb_ll_request3({low_auth, _BogusEpochID, _User, _Pass}, S) -> {-6, S}; -do_pb_ll_request3({low_append_chunk, _EpochID, PKey, Prefix, Chunk, CSum_tag, - CSum, ChunkExtra}, S) -> - {do_server_append_chunk(PKey, Prefix, Chunk, CSum_tag, CSum, - ChunkExtra, S), S}; -do_pb_ll_request3({low_write_chunk, _EpochID, File, Offset, Chunk, CSum_tag, - CSum}, S) -> - {do_server_write_chunk(File, Offset, Chunk, CSum_tag, CSum, S), S}; -do_pb_ll_request3({low_read_chunk, _EpochID, File, Offset, Size, Opts}, S) -> - {do_server_read_chunk(File, Offset, Size, Opts, S), S}; -do_pb_ll_request3({low_checksum_list, _EpochID, File}, S) -> - {do_server_checksum_listing(File, S), S}; -do_pb_ll_request3({low_list_files, _EpochID}, S) -> - {do_server_list_files(S), S}; -do_pb_ll_request3({low_wedge_status, _EpochID}, S) -> - {do_server_wedge_status(S), S}; -do_pb_ll_request3({low_delete_migration, _EpochID, File}, S) -> - {do_server_delete_migration(File, S), S}; -do_pb_ll_request3({low_trunc_hack, _EpochID, File}, S) -> - {do_server_trunc_hack(File, S), S}; -do_pb_ll_request3({low_proj, PCMD}, S) -> - {do_server_proj_request(PCMD, S), S}. +do_pb_ll_request3(Cmd, S) -> + {execute_cmd(Cmd), S}. +%do_pb_ll_request3({low_append_chunk, _EpochID, PKey, Prefix, Chunk, CSum_tag, +% CSum, ChunkExtra}, S) -> +% {do_server_append_chunk(PKey, Prefix, Chunk, CSum_tag, CSum, +% ChunkExtra, S), S}; +%do_pb_ll_request3({low_write_chunk, _EpochID, File, Offset, Chunk, CSum_tag, +% CSum}, S) -> +% {do_server_write_chunk(File, Offset, Chunk, CSum_tag, CSum, S), S}; +%do_pb_ll_request3({low_read_chunk, _EpochID, File, Offset, Size, Opts}, S) -> +% {do_server_read_chunk(File, Offset, Size, Opts, S), S}; +%do_pb_ll_request3({low_checksum_list, _EpochID, File}, S) -> +% {do_server_checksum_listing(File, S), S}; +%do_pb_ll_request3({low_list_files, _EpochID}, S) -> +% {do_server_list_files(S), S}; +%do_pb_ll_request3({low_wedge_status, _EpochID}, S) -> +% {do_server_wedge_status(S), S}; +%do_pb_ll_request3({low_delete_migration, _EpochID, File}, S) -> +% {do_server_delete_migration(File, S), S}; +%do_pb_ll_request3({low_trunc_hack, _EpochID, File}, S) -> +% {do_server_trunc_hack(File, S), S}; +%do_pb_ll_request3({low_proj, PCMD}, S) -> +% {do_server_proj_request(PCMD, S), S}. +execute_cmd(_Cmd) -> + ok. do_pb_hl_request(#mpb_request{req_id=ReqID}, #state{pb_mode=low}=S) -> @@ -176,4 +186,5 @@ do_pb_hl_request2({high_list_files}, #state{high_clnt=Clnt}=S) -> Res = machi_cr_client:list_files(Clnt), {Res, S}. +wedge_myself(_, _) -> ok. diff --git a/src/machi_flu_manager.erl b/src/machi_flu_manager.erl index bda854e..fcad49c 100644 --- a/src/machi_flu_manager.erl +++ b/src/machi_flu_manager.erl @@ -26,9 +26,9 @@ %% Public API -export([ - start_link/1 + start_link/1, start/1, - stop/0 + stop/1 ]). %% gen_server callbacks @@ -67,7 +67,7 @@ handle_call(Req, _From, S) -> lager:warning("Unexpected call ~p", [Req]), {reply, unexpected, S}. -handle_info({wedge_myself, EpochId}, S = #state{wedged = true}) -> +handle_info({wedge_myself, _EpochId}, S = #state{wedged = true}) -> lager:debug("Request to wedge myself, but I'm already wedged. Ignoring."), {noreply, S}; handle_info({wedge_myself, EpochId}, S = #state{flu_name = N, @@ -78,15 +78,16 @@ handle_info({wedge_myself, EpochId}, S = #state{flu_name = N, kick_chain_manager(N), {noreply, S#state{wedged=true}}; -handle_info({wedge_state_change, Bool, {NewEpoch, _}}, S = #state{epoch_id = undefined}) -> +handle_info({wedge_state_change, Bool, {NewEpoch, _}}, + S = #state{epoch_id = undefined, etstab=Tid}) -> true = ets:insert(Tid, {epoch, {Bool, NewEpoch}}), {noreply, S#state{wedged = Bool, epoch_id = NewEpoch}}; handle_info({wedge_state_change, Bool, {NewEpoch, _}}, S = #state{epoch_id = E, etstab = Tid}) when NewEpoch >= E -> true = ets:insert(Tid, {epoch, {Bool, NewEpoch}}), {noreply, S#state{wedged = Bool, epoch_id = NewEpoch}}; -handle_info(M = {wedge_state_change, Bool, {NewEpoch, _}}, - S = #state{epoch_id = E, etstab = Tid}) when NewEpoch < E -> +handle_info(M = {wedge_state_change, _Bool, {NewEpoch, _}}, + S = #state{epoch_id = E}) when NewEpoch < E -> lager:debug("Wedge state change message ~p, but my epoch id is higher (~p). Ignoring.", [M, E]), {noreply, S}; @@ -134,12 +135,15 @@ dispatch_append(From, Prefix, Chunk, Csum, Extra) -> try {ok, Filename, Offset} = machi_flu_file_proxy:append(Pid, [{client_csum_tag, Tag}, {client_csum, CS}], - Extra, Chunk) + Extra, Chunk), From ! {assignment, Offset, Filename}, exit(normal) catch - Type:Reason -> + _Type:Reason -> lager:error("Could not append chunk to prefix ~p because ~p", - [Prefix, Reason]) + [Prefix, Reason]), exit(Reason) end. + +make_name(N, Suffix) -> + atom_to_list(N) ++ Suffix. diff --git a/src/machi_flu_metadata_mgr.erl b/src/machi_flu_metadata_mgr.erl index efe76db..59cd1f1 100644 --- a/src/machi_flu_metadata_mgr.erl +++ b/src/machi_flu_metadata_mgr.erl @@ -47,7 +47,7 @@ }). %% This record goes in the ets table where prefix is the key --record(md, {prefix :: string(), +-record(md, {prefix :: string(), %% either a prefix or a filename file_proxy_pid :: undefined|pid(), mref :: undefined|reference(), %% monitor ref for file proxy current_file :: undefined|string(), @@ -186,32 +186,49 @@ compute_worker(Hash) -> build_metadata_mgr_name(N) when is_integer(N) -> list_to_atom("machi_flu_metadata_mgr_" ++ integer_to_list(N)). -get_manager_atom(Prefix) -> - build_metadata_mgr_name(compute_worker(compute_hash(Prefix))). +get_manager_atom(Data) -> + build_metadata_mgr_name(compute_worker(compute_hash(Data))). -lookup_md(Tid, Prefix) -> - case ets:lookup(Tid, Prefix) of +lookup_md(Tid, Data) -> + case ets:lookup(Tid, Data) of [] -> not_found; [R] -> R end. -find_or_create_filename(D, Prefix) -> - N = machi_util:read_max_filenum(D, Prefix), - find_or_create_filename(D, Prefix, #md{ prefix = Prefix, next_file_num = N }). +file_exists(D, F) -> + {_, Path} = machi_util:make_data_filename(D, F), + case file:read_file_info(Path) of + {ok, _Info} -> true; + {error, enoent} -> false; + {error, Reason} -> + lager:error("Probing file information for ~p resulted in ~p", [F, Reason]), + {error, Reason} + end. + +find_or_create_filename(D, Data) -> + case file_exists(D, Data) of + true -> + #md{current_file = Data}; + false -> + N = machi_util:read_max_filenum(D, Data), + find_or_create_filename(D, Data, #md{ prefix = Data, next_file_num = N }) + end. find_or_create_filename(D, Prefix, R = #md{ current_file = undefined, next_file_num = 0 }) -> - F = make_filename(Prefix, 0), + {F, _N} = make_filename(Prefix, 0), ok = machi_util:increment_max_filenum(D, Prefix), find_or_create_filename(D, Prefix, R#md{ current_file = F, next_file_num = 1}); find_or_create_filename(D, Prefix, R = #md{ current_file = undefined, next_file_num = N }) -> File = find_file(D, Prefix, N), - File1 = case File of + {File1, _} = case File of not_found -> make_filename(Prefix, N); - _ -> File + _ -> {File, 0} end, {_, Path} = machi_util:make_data_filename(D, File1), - F = maybe_make_new_file(File1, Prefix, N, file:read_file_info(Path)), - R#md{ current_file = F }. + {F, NewN} = maybe_make_new_file(D, File1, Prefix, N, file:read_file_info(Path)), + R#md{ current_file = F, next_file_num = NewN }; +find_or_create_filename(_D, _Prefix, R = #md{ current_file = _F }) -> + R. start_file_proxy(D, Prefix) -> start_file_proxy(D, Prefix, find_or_create_filename(D, Prefix)). @@ -225,26 +242,29 @@ start_file_proxy(_D, _Prefix, R = #md{ file_proxy_pid = _Pid }) -> find_file(D, Prefix, N) -> {_, Path} = machi_util:make_data_filename(D, Prefix, "*", N), + lager:debug("Search path: ~p", [Path]), case filelib:wildcard(Path) of [] -> not_found; [F] -> F; - [F|_Fs] -> F %% XXX FIXME: What to do when there's more than one match? - %% Arbitrarily pick the head for now, I guess. + L = [_|_] -> lists:last(L) %% XXX FIXME: What to do when there's more than one match? + %% Arbitrarily pick the last file for now, I guess. end. -maybe_make_new_file(F, Prefix, N, {ok, #file_info{ size = S }}) when S >= ?MAX_FILE_SIZE -> - lager:info("~p is larger than ~p. Starting new file.", [F, ?MAX_FILE_SIZE]), - make_filename(Prefix, N); -maybe_make_new_file(F, Prefix, N, Err = {error, _Reason}) -> +maybe_make_new_file(D, F, Prefix, N, {ok, #file_info{ size = S }}) when S >= ?MAX_FILE_SIZE -> + lager:info("~p is larger than ~p (~p). Starting new file.", [F, ?MAX_FILE_SIZE, S]), + ok = machi_util:increment_max_filenum(D, Prefix), + make_filename(Prefix, N+1); +maybe_make_new_file(D, F, Prefix, N, Err = {error, _Reason}) -> lager:error("When reading file information about ~p, got ~p! Going to use new file", [F, Err]), - make_filename(Prefix, N); -maybe_make_new_file(F, _Prefix, _N, _Info) -> - F. + ok = machi_util:increment_max_filenum(D, Prefix), + make_filename(Prefix, N+1); +maybe_make_new_file(_D, F, _Prefix, N, _Info) -> + {F, N}. make_filename(Prefix, N) -> {F, _} = machi_util:make_data_filename("", Prefix, something(), N), - F. + {F, N+1}. %% XXX FIXME: Might just be time to generate UUIDs something() ->