diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index 03221c6..e78dd11 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -266,11 +266,6 @@ append_server_loop(FluPid, #state{data_dir=DataDir,wedged=Wedged_p}=S) -> -define(EpochIDSpace, ((4*2)+(20*2))). % hexencodingwhee! -define(CSumSpace, ((1*2)+(20*2))). % hexencodingwhee! -decode_epoch_id(EpochIDHex) -> - <> = - machi_util:hexstr_to_bin(EpochIDHex), - {EpochNum, EpochCSum}. - net_server_loop(Sock, S) -> case gen_tcp:recv(Sock, 0, ?SERVER_CMD_READ_TIMEOUT) of {ok, Bin} -> @@ -680,117 +675,11 @@ do_pb_server_trunc_hack(File, #state{data_dir=DataDir}=_S) -> {error, bad_arg} end. -net_server_loop_old(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) -> - %% TODO: Add testing control knob to adjust this timeout and/or inject - %% timeout condition. - case gen_tcp:recv(Sock, 0, ?SERVER_CMD_READ_TIMEOUT) of - {ok, Line} -> - %% machi_util:verb("Got: ~p\n", [Line]), - PrefixLenLF = byte_size(Line) - 2 - ?EpochIDSpace - ?CSumSpace - - 8 - 8 - 1, - %% FileLenLF = byte_size(Line) - 2 - ?EpochIDSpace - 16 - 8 - 1, - CSumFileLenLF = byte_size(Line) - 2 - ?EpochIDSpace - 1, - %% WriteFileLenLF = byte_size(Line) - 7 - ?EpochIDSpace - ?CSumSpace - %% - 16 - 8 - 1, - DelFileLenLF = byte_size(Line) - 14 - ?EpochIDSpace - 1, - case Line of - %% For normal use - <<"A ", - EpochIDHex:(?EpochIDSpace)/binary, - CSumHex:(?CSumSpace)/binary, - LenHex:8/binary, ExtraHex:8/binary, - Prefix:PrefixLenLF/binary, "\n">> -> - _EpochID = decode_epoch_id(EpochIDHex), - do_net_server_append(FluName, Sock, CSumHex, - LenHex, ExtraHex, Prefix); - <<"R ", _/binary>> -> - %% EpochIDHex:(?EpochIDSpace)/binary, - %% OffsetHex:16/binary, LenHex:8/binary, - %% File:FileLenLF/binary, "\n">> -> - %% EpochID = decode_epoch_id(EpochIDHex), - delme; %%do_net_server_read(Sock, OffsetHex, LenHex, File, DataDir, - %% EpochID, S); - <<"L ", EpochIDHex:(?EpochIDSpace)/binary, "\n">> -> - _EpochID = decode_epoch_id(EpochIDHex), - delme; %% do_net_server_listing(Sock, DataDir, S); - <<"C ", - EpochIDHex:(?EpochIDSpace)/binary, - File:CSumFileLenLF/binary, "\n">> -> - _EpochID = decode_epoch_id(EpochIDHex), - do_net_server_checksum_listing(Sock, File, DataDir, S); - <<"QUIT\n">> -> - catch gen_tcp:close(Sock), - exit(normal); - <<"QUIT\r\n">> -> - catch gen_tcp:close(Sock), - exit(normal); - %% For "internal" replication only. - %% <<"W-repl ", - %% EpochIDHex:(?EpochIDSpace)/binary, - %% CSumHex:(?CSumSpace)/binary, - %% OffsetHex:16/binary, LenHex:8/binary, - %% File:WriteFileLenLF/binary, "\n">> -> - %% _EpochID = decode_epoch_id(EpochIDHex), - %% do_net_server_write(Sock, CSumHex, OffsetHex, LenHex, - %% File, DataDir, - %% <<"fixme1">>, false, <<"fixme2">>); - %% For data migration only. - <<"DEL-migration ", - EpochIDHex:(?EpochIDSpace)/binary, - File:DelFileLenLF/binary, "\n">> -> - _EpochID = decode_epoch_id(EpochIDHex), - do_net_server_delete_migration_only(Sock, File, DataDir); - %% For erasure coding hackityhack - <<"TRUNC-hack--- ", - EpochIDHex:(?EpochIDSpace)/binary, - File:DelFileLenLF/binary, "\n">> -> - _EpochID = decode_epoch_id(EpochIDHex), - do_net_server_truncate_hackityhack(Sock, File, DataDir); - <<"PROJ ", LenHex:8/binary, "\n">> -> - do_projection_command(Sock, LenHex, S); - <<"WEDGE-STATUS\n">> -> - do_wedge_status(FluName, Sock); - <<"PUT ", _/binary>>=PutLine -> - http_server_hack(FluName, PutLine, Sock, S); - <<"GET ", _/binary>>=PutLine -> - http_server_hack(FluName, PutLine, Sock, S); - <<"PROTOCOL-BUFFERS\n">> -> - ok = gen_tcp:send(Sock, <<"OK\n">>), - ok = inet:setopts(Sock, ?PB_PACKET_OPTS), - {ok, Proj} = machi_projection_store:read_latest_projection( - S#state.proj_store, private), - Ps = [P_srvr || - {_, P_srvr} <- orddict:to_list( - Proj#projection_v1.members_dict)], - machi_pb_server:run_loop(Sock, Ps); - _ -> - machi_util:verb("Else Got: ~p\n", [Line]), - io:format(user, "TODO: Else Got: ~p\n", [Line]), - gen_tcp:send(Sock, "ERROR SYNTAX\n"), - catch gen_tcp:close(Sock), - exit(normal) - end, - net_server_loop_old(Sock, S); - _ -> - catch gen_tcp:close(Sock), - exit(normal) - end. - append_server_dispatch(From, Prefix, Chunk, CSum, Extra, DataDir, LinkPid) -> Pid = write_server_get_pid(Prefix, DataDir, LinkPid), Pid ! {seq_append, From, Prefix, Chunk, CSum, Extra}, exit(normal). -do_net_server_append(FluName, Sock, CSumHex, LenHex, ExtraHex, Prefix) -> - %% TODO: robustify against other invalid path characters such as NUL - case sanitize_file_string(Prefix) of - ok -> - do_net_server_append2(FluName, Sock, CSumHex, - LenHex, ExtraHex, Prefix); - _ -> - ok = gen_tcp:send(Sock, <<"ERROR BAD-ARG">>) - end. - sanitize_file_string(Str) -> case re:run(Str, "/") of nomatch -> @@ -799,193 +688,6 @@ sanitize_file_string(Str) -> error end. -do_net_server_append2(FluName, Sock, CSumHex, LenHex, ExtraHex, Prefix) -> - <> = machi_util:hexstr_to_bin(LenHex), - <> = machi_util:hexstr_to_bin(ExtraHex), - ClientCSum = machi_util:hexstr_to_bin(CSumHex), - ok = inet:setopts(Sock, [{packet, raw}]), - {ok, Chunk} = gen_tcp:recv(Sock, Len, 60*1000), - try - CSum = case ClientCSum of - <> -> - %% TODO: If the client was foolish enough to use - %% this type of non-checksum, then the client gets - %% what it deserves wrt data integrity, alas. In - %% the client-side Chain Replication method, each - %% server will calculated this independently, which - %% isn't exactly what ought to happen for best data - %% integrity checking. In server-side CR, the csum - %% should be calculated by the head and passed down - %% the chain together with the value. - CS = machi_util:checksum_chunk(Chunk), - machi_util:make_tagged_csum(server_sha, CS); - <> -> - CS = machi_util:checksum_chunk(Chunk), - if CS == ClientCS -> - ClientCSum; - true -> - throw({bad_csum, CS}) - end; - _ -> - ClientCSum - end, - FluName ! {seq_append, self(), Prefix, Chunk, CSum, Extra} - catch - throw:{bad_csum, _CS} -> - ok = gen_tcp:send(Sock, <<"ERROR BAD-CHECKSUM\n">>), - exit(normal); - error:badarg -> - error_logger:error_msg("Message send to ~p gave badarg, make certain server is running with correct registered name\n", [?MODULE]) - end, - receive - {assignment, Offset, File} -> - OffsetHex = machi_util:bin_to_hexstr(<>), - Out = io_lib:format("OK ~s ~s ~s\n", [CSumHex, OffsetHex, File]), - ok = gen_tcp:send(Sock, Out); - wedged -> - ok = gen_tcp:send(Sock, <<"ERROR WEDGED\n">>) - after 10*1000 -> - ok = gen_tcp:send(Sock, "TIMEOUT\n") - end. - -do_wedge_status(FluName, Sock) -> - FluName ! {wedge_status, self()}, - Reply = receive - {wedge_status_reply, Bool, EpochId} -> - BoolHex = if Bool == false -> <<"00">>; - Bool == true -> <<"01">> - end, - case EpochId of - undefined -> - EpochHex = machi_util:int_to_hexstr(0, 32), - CSumHex = machi_util:bin_to_hexstr(<<0:(20*8)/big>>); - {Epoch, EpochCSum} -> - EpochHex = machi_util:int_to_hexstr(Epoch, 32), - CSumHex = machi_util:bin_to_hexstr(EpochCSum) - end, - [<<"OK ">>, BoolHex, 32, EpochHex, 32, CSumHex, 10] - after 30*1000 -> - <<"give_it_up\n">> - end, - ok = gen_tcp:send(Sock, Reply). - - -do_net_server_readwrite_common(Sock, OffsetHex, LenHex, FileBin, DataDir, - FileOpts, DoItFun, - EpochID, Wedged_p, CurrentEpochId) -> - case {Wedged_p, sanitize_file_string(FileBin)} of - {false, ok} -> - do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, - DataDir, FileOpts, DoItFun, - EpochID, Wedged_p, CurrentEpochId); - {true, _} -> - ok = gen_tcp:send(Sock, <<"ERROR WEDGED\n">>); - {_, __} -> - ok = gen_tcp:send(Sock, <<"ERROR BAD-ARG\n">>) - end. - -do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, DataDir, - FileOpts, DoItFun, - EpochID, Wedged_p, CurrentEpochId) -> - NoSuchFileFun = fun(Sck) -> - ok = gen_tcp:send(Sck, <<"ERROR NO-SUCH-FILE\n">>) - end, - BadIoFun = fun(Sck) -> - ok = gen_tcp:send(Sck, <<"ERROR BAD-IO\n">>) - end, - do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, DataDir, - FileOpts, DoItFun, - EpochID, Wedged_p, CurrentEpochId, - NoSuchFileFun, BadIoFun). - -do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, DataDir, - FileOpts, DoItFun, - EpochID, Wedged_p, CurrentEpochId, - NoSuchFileFun, BadIoFun) -> - <> = machi_util:hexstr_to_bin(OffsetHex), - <> = machi_util:hexstr_to_bin(LenHex), - {_, Path} = machi_util:make_data_filename(DataDir, FileBin), - OptsHasWrite = lists:member(write, FileOpts), - OptsHasRead = lists:member(read, FileOpts), - case file:open(Path, FileOpts) of - {ok, FH} -> - try - DoItFun(FH, Offset, Len) - catch - throw:{bad_csum, _CS} -> - ok = gen_tcp:send(Sock, <<"ERROR BAD-CHECKSUM\n">>) - after - file:close(FH) - end; - {error, enoent} when OptsHasWrite -> - do_net_server_readwrite_common( - Sock, OffsetHex, LenHex, FileBin, DataDir, - FileOpts, DoItFun, - EpochID, Wedged_p, CurrentEpochId); - {error, enoent} when OptsHasRead -> - ok = NoSuchFileFun(Sock); - _Else -> - ok = BadIoFun(Sock) - end. - - -perhaps_do_net_server_ec_read(Sock, FH) -> - case file:pread(FH, 0, ?MINIMUM_OFFSET) of - {ok, Bin} when byte_size(Bin) == ?MINIMUM_OFFSET -> - decode_and_reply_net_server_ec_read(Sock, Bin); - {ok, _AnythingElse} -> - ok = gen_tcp:send(Sock, "ERROR PARTIAL-READ2\n"); - _AnythingElse -> - ok = gen_tcp:send(Sock, "ERROR BAD-PREAD\n") - end. - -decode_and_reply_net_server_ec_read(Sock, <<"a ", Rest/binary>>) -> - decode_and_reply_net_server_ec_read_version_a(Sock, Rest); -decode_and_reply_net_server_ec_read(Sock, <<0:8, _/binary>>) -> - ok = gen_tcp:send(Sock, <<"ERROR NOT-ERASURE\n">>). - -decode_and_reply_net_server_ec_read_version_a(Sock, Rest) -> - %% <> = Rest, - HdrLen = 80 - 2 - 4 - 1, - <> = Rest, - <> = machi_util:hexstr_to_bin(BodyLenHex), - <> = Rest2, - ok = gen_tcp:send(Sock, ["ERASURE ", BodyLenHex, " ", Hdr, Body]). - -do_net_server_checksum_listing(Sock, File, DataDir, S) -> - {Wedged_p, _CurrentEpochId} = ets:lookup_element(S#state.etstab, epoch, 2), - case {Wedged_p, sanitize_file_string(File)} of - {true, _} -> - ok = gen_tcp:send(Sock, <<"ERROR WEDGED\n">>); - {false, ok} -> - do_net_server_checksum_listing2(Sock, File, DataDir); - _ -> - ok = gen_tcp:send(Sock, <<"ERROR BAD-ARG\n">>) - end. - -do_net_server_checksum_listing2(Sock, File, DataDir) -> - ok = sync_checksum_file(File), - - CSumPath = machi_util:make_checksum_filename(DataDir, File), - case file:open(CSumPath, [read, raw, binary]) of - {ok, FH} -> - {ok, FI} = file:read_file_info(CSumPath), - Len = FI#file_info.size, - LenHex = list_to_binary(machi_util:bin_to_hexstr(<>)), - %% Client has option of line-by-line with "." terminator, - %% or using the offset in the OK message to slurp things - %% down by exact byte size. - ok = gen_tcp:send(Sock, [<<"OK ">>, LenHex, <<"\n">>]), - do_net_copy_bytes(FH, Sock), - ok = file:close(FH), - ok = gen_tcp:send(Sock, ".\n"); - {error, enoent} -> - ok = gen_tcp:send(Sock, "ERROR NO-SUCH-FILE\n"); - _ -> - ok = gen_tcp:send(Sock, "ERROR\n") - end. - sync_checksum_file(File) -> Prefix = re:replace(File, "\\..*", "", [{return, binary}]), case write_server_find_pid(Prefix) of @@ -1009,59 +711,6 @@ sync_checksum_file(File) -> end end. -do_net_copy_bytes(FH, Sock) -> - case file:read(FH, 1024*1024) of - {ok, Bin} -> - ok = gen_tcp:send(Sock, Bin), - do_net_copy_bytes(FH, Sock); - eof -> - ok - end. - -do_net_server_delete_migration_only(Sock, File, DataDir) -> - case sanitize_file_string(File) of - ok -> - do_net_server_delete_migration_only2(Sock, File, DataDir); - _ -> - ok = gen_tcp:send(Sock, <<"ERROR BAD-ARG\n">>) - end. - -do_net_server_delete_migration_only2(Sock, File, DataDir) -> - {_, Path} = machi_util:make_data_filename(DataDir, File), - case file:delete(Path) of - ok -> - ok = gen_tcp:send(Sock, "OK\n"); - {error, enoent} -> - ok = gen_tcp:send(Sock, "ERROR NO-SUCH-FILE\n"); - _ -> - ok = gen_tcp:send(Sock, "ERROR\n") - end. - -do_net_server_truncate_hackityhack(Sock, File, DataDir) -> - case sanitize_file_string(File) of - ok -> - do_net_server_truncate_hackityhack2(Sock, File, DataDir); - _ -> - ok = gen_tcp:send(Sock, <<"ERROR BAD-ARG\n">>) - end. - -do_net_server_truncate_hackityhack2(Sock, File, DataDir) -> - {_, Path} = machi_util:make_data_filename(DataDir, File), - case file:open(Path, [read, write, binary, raw]) of - {ok, FH} -> - try - {ok, ?MINIMUM_OFFSET} = file:position(FH, ?MINIMUM_OFFSET), - ok = file:truncate(FH), - ok = gen_tcp:send(Sock, "OK\n") - after - file:close(FH) - end; - {error, enoent} -> - ok = gen_tcp:send(Sock, "ERROR NO-SUCH-FILE\n"); - _ -> - ok = gen_tcp:send(Sock, "ERROR\n") - end. - write_server_get_pid(Prefix, DataDir, LinkPid) -> case write_server_find_pid(Prefix) of undefined -> @@ -1161,49 +810,6 @@ seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}=FH_, FileNum, Offset) -> exit(normal) end. -do_projection_command(Sock, LenHex, S) -> - try - Len = machi_util:hexstr_to_int(LenHex), - ok = inet:setopts(Sock, [{packet, raw}]), - {ok, ProjCmdBin} = gen_tcp:recv(Sock, Len), - ok = inet:setopts(Sock, [{packet, line}]), - ProjCmdM = machi_pb:decode_mpb_ll_request(ProjCmdBin), - {ID, ProjCmd} = machi_pb_wrap:unmake_projection_req(ProjCmdM), - ProjOp = element(1, ProjCmd), - put(hack, ProjCmd), - Res = handle_projection_command(ProjCmd, S), - ResM = machi_pb_wrap:make_projection_resp(ID, ProjOp, Res), - ResBin = machi_pb:encode_mpb_ll_response(ResM), - ResLenHex = machi_util:int_to_hexbin(iolist_size(ResBin), 32), - ok = gen_tcp:send(Sock, [<<"OK ">>, ResLenHex, <<"\n">>, ResBin]) - catch - What:Why -> - io:format(user, "OOPS ~p\n", [get(hack)]), - io:format(user, "OOPS ~p ~p ~p\n", [What, Why, erlang:get_stacktrace()]), - WHA = list_to_binary(io_lib:format("TODO-YOLO.~w:~w-~w", - [What, Why, erlang:get_stacktrace()])), - _ = (catch gen_tcp:send(Sock, [<<"ERROR ">>, WHA, <<"\n">>])) - end. - -handle_projection_command({get_latest_epochid, ProjType}, - #state{proj_store=ProjStore}) -> - machi_projection_store:get_latest_epochid(ProjStore, ProjType); -handle_projection_command({read_latest_projection, ProjType}, - #state{proj_store=ProjStore}) -> - machi_projection_store:read_latest_projection(ProjStore, ProjType); -handle_projection_command({read_projection, ProjType, Epoch}, - #state{proj_store=ProjStore}) -> - machi_projection_store:read(ProjStore, ProjType, Epoch); -handle_projection_command({write_projection, ProjType, Proj}, - #state{proj_store=ProjStore}) -> - machi_projection_store:write(ProjStore, ProjType, Proj); -handle_projection_command({get_all_projections, ProjType}, - #state{proj_store=ProjStore}) -> - machi_projection_store:get_all_projections(ProjStore, ProjType); -handle_projection_command({list_all_projections, ProjType}, - #state{proj_store=ProjStore}) -> - machi_projection_store:list_all_projections(ProjStore, ProjType). - make_listener_regname(BaseName) -> list_to_atom(atom_to_list(BaseName) ++ "_listener"). @@ -1274,41 +880,8 @@ http_server_hack_put(Sock, G, FluName, MyURI) -> ok = gen_tcp:send(Sock, <<"HTTP/1.0 499 TIMEOUT\r\n\r\n">>) end. -http_server_hack_get(Sock, _G, _FluName, MyURI, S) -> - DataDir = S#state.data_dir, - {Wedged_p, CurrentEpochId} = ets:lookup_element(S#state.etstab, epoch, 2), - EpochID = <<"unused">>, - NoSuchFileFun = fun(Sck) -> - ok = gen_tcp:send(Sck, "HTTP/1.0 455 NOT-WRITTEN\r\n\r\n") - end, - BadIoFun = fun(Sck) -> - ok = gen_tcp:send(Sck, "HTTP/1.0 466 BAD-IO\r\n\r\n") - end, - DoItFun = fun(FH, Offset, Len) -> - case file:pread(FH, Offset, Len) of - {ok, Bytes} when byte_size(Bytes) == Len -> - Hdrs = io_lib:format("HTTP/1.0 200 OK\r\nContent-Length: ~w\r\n\r\n", [Len]), - gen_tcp:send(Sock, [Hdrs, Bytes]); - {ok, Bytes} -> - machi_util:verb("ok read but wanted ~p got ~p: ~p @ offset ~p\n", - [Len, size(Bytes), Bytes, Offset]), - ok = gen_tcp:send(Sock, "HTTP/1.0 455 PARTIAL-READ\r\n\r\n"); - eof -> - ok = gen_tcp:send(Sock, "HTTP/1.0 455 NOT-WRITTEN\r\n\r\n"); - _Else2 -> - machi_util:verb("Else2 ~p ~p ~P\n", - [Offset, Len, _Else2, 20]), - ok = gen_tcp:send(Sock, "HTTP/1.0 466 ERROR BAD-READ\r\n\r\n") - end - end, - [File, OptsBin] = binary:split(MyURI, <<"?">>), - Opts = split_uri_options(OptsBin), - OffsetHex = machi_util:int_to_hexstr(proplists:get_value(offset, Opts), 64), - LenHex = machi_util:int_to_hexstr(proplists:get_value(size, Opts), 32), - do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, File, DataDir, - [read, binary, raw], DoItFun, - EpochID, Wedged_p, CurrentEpochId, - NoSuchFileFun, BadIoFun). +http_server_hack_get(Sock, _G, _FluName, _MyURI, _S) -> + ok = gen_tcp:send(Sock, <<"TODO BROKEN FEATURE see old commits\r\n">>). http_harvest_headers(Sock) -> ok = inet:setopts(Sock, [{packet, httph}]),