diff --git a/prototype/corfurl/.gitignore b/prototype/corfurl/.gitignore new file mode 100644 index 0000000..d712c69 --- /dev/null +++ b/prototype/corfurl/.gitignore @@ -0,0 +1,4 @@ +.eunit +deps +ebin/*.beam +ebin/*.app diff --git a/prototype/corfurl/Makefile b/prototype/corfurl/Makefile new file mode 100644 index 0000000..ef51767 --- /dev/null +++ b/prototype/corfurl/Makefile @@ -0,0 +1,23 @@ +REBAR_BIN := $(shell which rebar) +ifeq ($(REBAR_BIN),) +REBAR_BIN = ./rebar +endif + +.PHONY: rel deps package pkgclean + +all: deps compile + +compile: + $(REBAR_BIN) compile + +deps: + $(REBAR_BIN) get-deps + +clean: + $(REBAR_BIN) clean + +test: deps compile eunit + +eunit: + $(REBAR_BIN) -v skip_deps=true eunit + diff --git a/prototype/corfurl/include/corfurl.hrl b/prototype/corfurl/include/corfurl.hrl new file mode 100644 index 0000000..fa843e3 --- /dev/null +++ b/prototype/corfurl/include/corfurl.hrl @@ -0,0 +1,26 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +%% 1 byte @ offset 0: 0=unwritten, 1=written, 2=trimmed, 255=corrupt? TODO +%% 8 bytes @ offset 1: logical page number +%% P bytes @ offset 9: page data +%% 1 byte @ offset 9+P: 0=unwritten, 1=written +-define(PAGE_OVERHEAD, (1 + 8 + 1)). + diff --git a/prototype/corfurl/src/corfurl.erl b/prototype/corfurl/src/corfurl.erl new file mode 100644 index 0000000..87d2bb0 --- /dev/null +++ b/prototype/corfurl/src/corfurl.erl @@ -0,0 +1,311 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(corfurl). + +-export([new_simple_projection/4, + new_range/3, + read_projection/2, + save_projection/2]). +-export([append_page/3]). + +-include("corfurl.hrl"). + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). +-compile(export_all). +-endif. + +-type flu_name() :: atom(). +-type flu() :: pid() | flu_name(). +-type flu_chain() :: [flu()]. + +-record(range, { + pn_start :: non_neg_integer(), % start page number + pn_end :: non_neg_integer(), % end page number + chains :: [flu_chain()] + }). + +-record(proj, { % Projection + epoch :: non_neg_integer(), + r :: [#range{}] + }). + +%% append_page(Sequencer, P, Page) -> +%% append_page(Sequencer, P, 1, [Page]). + +%% append_page(Sequencer, P, NumPages, PageList) -> +%% FirstPN = corfurl_sequencer:get(Sequencer, NumPages), +%% [append_single_page(P, LPN, Page) || +%% {LPN, Page} <- lists:zip(lists:seq(FirstPN, FirstPN+NumPages-1), +%% PageList)]. + +append_page(Sequencer, P, Page) -> + append_page(Sequencer, P, Page, 1). + +append_page(Sequencer, P, Page, Retries) when Retries < 50 -> + case corfurl_sequencer:get(Sequencer, 1) of + LPN when is_integer(LPN) -> + case append_single_page(P, LPN, Page) of + ok -> + ok; + X when X == error_written; X == error_trimmed -> + io:format(user, "LPN ~p race lost: ~p\n", [LPN, X]), + append_page(Sequencer, P, Page); + Else -> + exit({todo, ?MODULE, line, ?LINE, Else}) + end; + _ -> + timer:sleep(Retries), % TODO naive + append_page(Sequencer, P, Page, Retries * 2) + end. + +append_single_page(#proj{epoch=Epoch} = P, LPN, Page) -> + Chain = project_to_chain(LPN, P), + append_single_page_to_chain(Chain, Epoch, LPN, Page, 1). + +append_single_page_to_chain([], _Epoch, _LPN, _Page, _Nth) -> + ok; +append_single_page_to_chain([FLU|Rest], Epoch, LPN, Page, Nth) -> + case corfurl_flu:write(flu_pid(FLU), Epoch, LPN, Page) of + ok -> + append_single_page_to_chain(Rest, Epoch, LPN, Page, Nth+1); + error_badepoch -> + %% TODO: Interesting case: there may be cases where retrying with + %% a new epoch & that epoch's projection is just fine (and + %% we'll succeed) and cases where retrying will fail. + %% Figure out what those cases are, then for the + %% destined-to-fail case, try to clean up (via trim?)? + error_badepoch; + error_trimmed -> + %% Whoa, partner, you're movin' kinda fast for a trim. + %% This might've been due to us being too slow and someone + %% else junked us. + %% TODO We should go trim our previously successful writes? + error_trimmed; + error_written when Nth == 1 -> + %% The sequencer lied, or we didn't use the sequencer and + %% guessed and guessed poorly, or someone is accidentally + %% trying to take our page. Shouganai, these things happen. + error_written; + error_written when Nth > 1 -> + %% The likely cause is that another reader has noticed that + %% we haven't finished writing this page in this chain and + %% has repaired the remainder of the chain while we were + %% drinking coffee. Let's double-check. + case corfurl_flu:read(flu_pid(FLU), Epoch, LPN) of + {ok, AlreadyThere} when AlreadyThere =:= Page -> + %% Alright, well, let's go continue the repair/writing, + %% since we agree on the page's value. + append_single_page_to_chain(Rest, Epoch, LPN, Page, Nth+1); + error_badepoch -> + %% TODO: same TODO as the above error_badepoch case. + error_badepoch; + error_overwritten -> + error({impossible, ?MODULE, ?LINE, left_off_here}) + end; + Else -> + %% TODO: corner case +io:format(user, "WTF? Else = ~p\n", [Else]), + Else + end. + +flu_pid(X) when is_pid(X) -> + X; +flu_pid(X) when is_atom(X) -> + ets:lookup_element(flu_pid_tab, X, 1). + +%%%% %%%% %%%% projection utilities %%%% %%%% %%%% + +new_range(Start, End, ChainList) -> + %% TODO: sanity checking of ChainList, Start < End, yadda + #range{pn_start=Start, pn_end=End, chains=list_to_tuple(ChainList)}. + +new_simple_projection(Epoch, Start, End, ChainList) -> + #proj{epoch=Epoch, r=[new_range(Start, End, ChainList)]}. + +make_projection_path(Dir, Epoch) -> + lists:flatten(io_lib:format("~s/~12..0w.proj", [Dir, Epoch])). + +read_projection(Dir, Epoch) -> + case file:read_file(make_projection_path(Dir, Epoch)) of + {ok, Bin} -> + {ok, binary_to_term(Bin)}; % TODO if corrupted? + {error, enoent} -> + error_unwritten; + Else -> + Else % TODO API corner case + end. + +save_projection(Dir, #proj{epoch=Epoch} = P) -> + Path = make_projection_path(Dir, Epoch), + ok = filelib:ensure_dir(Dir ++ "/ignored"), + {_, B, C} = now(), + TmpPath = Path ++ lists:flatten(io_lib:format(".~w.~w.~w", [B, C, node()])), + %% TODO: don't be lazy, do a flush before link when training wheels come off + ok = file:write_file(TmpPath, term_to_binary(P)), + case file:make_link(TmpPath, Path) of + ok -> + file:delete(TmpPath), + ok; + {error, eexist} -> + error_overwritten; + Else -> + Else % TODO API corner case + end. + +project_to_chain(LPN, P) -> + %% TODO fixme + %% TODO something other than round-robin? + [#range{pn_start=Start, pn_end=End, chains=Chains}] = P#proj.r, + if Start =< LPN, LPN =< End -> + I = ((LPN - Start) rem tuple_size(Chains)) + 1, + element(I, Chains) + end. + +%%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% + +-ifdef(TEST). + +save_read_test() -> + Dir = "/tmp/" ++ atom_to_list(?MODULE) ++".save-read", + Chain = [a,b], + P1 = new_simple_projection(1, 1, 1*100, [Chain]), + + try + filelib:ensure_dir(Dir ++ "/ignored"), + ok = save_projection(Dir, P1), + error_overwritten = save_projection(Dir, P1), + + {ok, P1} = read_projection(Dir, 1), + error_unwritten = read_projection(Dir, 2), + + ok + after + ok = corfurl_util:delete_dir(Dir) + end. + +setup_flu_basedir() -> + "/tmp/" ++ atom_to_list(?MODULE) ++ ".". + +setup_flu_dir(N) -> + setup_flu_basedir() ++ integer_to_list(N). + +setup_del_all(NumFLUs) -> + [ok = corfurl_util:delete_dir(setup_flu_dir(N)) || + N <- lists:seq(1, NumFLUs)]. + +setup_basic_flus(NumFLUs, PageSize, NumPages) -> + setup_del_all(NumFLUs), + [begin + element(2, corfurl_flu:start_link(setup_flu_dir(X), + PageSize, NumPages * (PageSize * ?PAGE_OVERHEAD))) + end || X <- lists:seq(1, NumFLUs)]. + +append_test() -> + NumFLUs = 4, + PageSize = 8, + NumPages = 10, + FLUs = [F1, F2, F3, F4] = setup_basic_flus(NumFLUs, PageSize, NumPages), + {ok, Seq} = corfurl_sequencer:start_link(FLUs), + + try + P1 = new_simple_projection(1, 1, 1*100, [[F1, F2], [F3, F4]]), + [begin + Pg = lists:flatten(io_lib:format("~8..0w", [X])), + ok = append_page(Seq, P1, list_to_binary(Pg)) + end || X <- lists:seq(1, 5)], + + ok + after + corfurl_sequencer:stop(Seq), + [corfurl_flu:stop(F) || F <- FLUs], + setup_del_all(NumFLUs) + end. + +forfun_append(0, _Seq, _P, _Page) -> + ok; +forfun_append(N, Seq, P, Page) -> + ok = append_page(Seq, P, Page), + forfun_append(N - 1, Seq, P, Page). + +-ifdef(TIMING_TEST). + +forfun_test_() -> + {timeout, 99999, fun() -> + [forfun(Procs) || Procs <- [10,100,1000,5000]] + end}. + +%%% My MBP, SSD +%%% The 1K and 5K procs shows full-mailbox-scan ickiness +%%% when getting replies from prim_file. :-( + +%%% forfun: 10 procs writing 200000 pages of 8 bytes/page to 2 chains of 4 total FLUs in 10.016815 sec +%%% forfun: 100 procs writing 200000 pages of 8 bytes/page to 2 chains of 4 total FLUs in 10.547976 sec +%%% forfun: 1000 procs writing 200000 pages of 8 bytes/page to 2 chains of 4 total FLUs in 13.706686 sec +%%% forfun: 5000 procs writing 200000 pages of 8 bytes/page to 2 chains of 4 total FLUs in 33.516312 sec + +%%% forfun: 10 procs writing 200000 pages of 8 bytes/page to 4 chains of 4 total FLUs in 5.350147 sec +%%% forfun: 100 procs writing 200000 pages of 8 bytes/page to 4 chains of 4 total FLUs in 5.429485 sec +%%% forfun: 1000 procs writing 200000 pages of 8 bytes/page to 4 chains of 4 total FLUs in 5.643233 sec +%%% forfun: 5000 procs writing 200000 pages of 8 bytes/page to 4 chains of 4 total FLUs in 15.686058 sec + +%%%% forfun: 10 procs writing 200000 pages of 4096 bytes/page to 2 chains of 4 total FLUs in 13.479458 sec +%%%% forfun: 100 procs writing 200000 pages of 4096 bytes/page to 2 chains of 4 total FLUs in 14.752565 sec +%%%% forfun: 1000 procs writing 200000 pages of 4096 bytes/page to 2 chains of 4 total FLUs in 25.012306 sec +%%%% forfun: 5000 procs writing 200000 pages of 4096 bytes/page to 2 chains of 4 total FLUs in 38.972076 sec + +forfun(NumProcs) -> + io:format(user, "\n", []), + NumFLUs = 4, + PageSize = 8, + %%PageSize = 4096, + NumPages = 200*1000, + PagesPerProc = NumPages div NumProcs, + FLUs = [F1, F2, F3, F4] = setup_basic_flus(NumFLUs, PageSize, NumPages), + {ok, Seq} = corfurl_sequencer:start_link(FLUs), + + try + Chains = [[F1, F2], [F3, F4]], + %%Chains = [[F1], [F2], [F3], [F4]], + P = new_simple_projection(1, 1, NumPages*2, Chains), + Me = self(), + Start = now(), + Ws = [begin + Page = <>, + spawn_link(fun() -> + forfun_append(PagesPerProc, Seq, P, Page), + Me ! {done, self()} + end) + end || X <- lists:seq(1, NumProcs)], + [receive {done, W} -> ok end || W <- Ws], + End = now(), + io:format(user, "forfun: ~p procs writing ~p pages of ~p bytes/page to ~p chains of ~p total FLUs in ~p sec\n", + [NumProcs, NumPages, PageSize, length(Chains), length(lists:flatten(Chains)), timer:now_diff(End, Start) / 1000000]), + ok + after + corfurl_sequencer:stop(Seq), + [corfurl_flu:stop(F) || F <- FLUs], + setup_del_all(NumFLUs) + end. + +-endif. % TIMING_TEST + +-endif. % TEST diff --git a/prototype/corfurl/src/corfurl_flu.erl b/prototype/corfurl/src/corfurl_flu.erl new file mode 100644 index 0000000..6ee0d0b --- /dev/null +++ b/prototype/corfurl/src/corfurl_flu.erl @@ -0,0 +1,471 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(corfurl_flu). + +-behaviour(gen_server). + +-type flu_error() :: 'error_badepoch' | 'error_trimmed' | + 'error_overwritten' | 'error_unwritten'. +-export_type([flu_error/0]). + +%% API +-export([start_link/1, start_link/3, status/1, stop/1]). +-export([write/4, read/3, seal/2, trim/3, fill/3]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-include("corfurl.hrl"). + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). +-export([get__mlp/1, get__min_epoch/1, get__trim_watermark/1]). +-endif. + +-include_lib("kernel/include/file.hrl"). + +-record(state, { + dir :: string(), + mem_fh :: term(), + min_epoch :: non_neg_integer(), + page_size :: non_neg_integer(), + max_mem :: non_neg_integer(), + max_logical_page :: 'unknown' | non_neg_integer(), + %% TODO: Trim watermark handling is *INCOMPLETE*. The + %% current code is broken but is occasionally correct, + %% like a broken analog watch is correct 2x per day. + trim_watermark :: non_neg_integer(), + trim_count :: non_neg_integer() + }). + +start_link(Dir) -> + start_link(Dir, 8, 64*1024*1024). + +start_link(Dir, PageSize, MaxMem) -> + gen_server:start_link(?MODULE, {Dir, PageSize, MaxMem}, []). + +status(Pid) -> + gen_server:call(Pid, status, infinity). + +stop(Pid) -> + gen_server:call(Pid, stop, infinity). + +write(Pid, Epoch, LogicalPN, PageBin) + when is_integer(LogicalPN), LogicalPN > 0, is_binary(PageBin) -> + gen_server:call(Pid, {write, Epoch, LogicalPN, PageBin}, infinity). + +read(Pid, Epoch, LogicalPN) + when is_integer(Epoch), Epoch > 0, is_integer(LogicalPN), LogicalPN > 0 -> + gen_server:call(Pid, {read, Epoch, LogicalPN}, infinity). + +seal(Pid, Epoch) when is_integer(Epoch), Epoch > 0 -> + gen_server:call(Pid, {seal, Epoch}, infinity). + +trim(Pid, Epoch, LogicalPN) + when is_integer(Epoch), Epoch > 0, is_integer(LogicalPN), LogicalPN > 0 -> + gen_server:call(Pid, {trim, Epoch, LogicalPN}, infinity). + +fill(Pid, Epoch, LogicalPN) + when is_integer(Epoch), Epoch > 0, is_integer(LogicalPN), LogicalPN > 0 -> + gen_server:call(Pid, {fill, Epoch, LogicalPN}, infinity). + +-ifdef(TEST). + +get__mlp(Pid) -> + gen_server:call(Pid, get__mlp, infinity). + +get__min_epoch(Pid) -> + gen_server:call(Pid, get__min_epoch, infinity). + +get__trim_watermark(Pid) -> + gen_server:call(Pid, get__trim_watermark, infinity). + +-endif. % TEST + +%%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% + +init({Dir, ExpPageSize, ExpMaxMem}) -> + MemFile = memfile_path(Dir), + filelib:ensure_dir(MemFile), + {ok, FH} = file:open(MemFile, [read, write, raw, binary]), + + {_Version, MinEpoch, PageSize, MaxMem, TrimWatermark} = + try + Res = read_hard_state(Dir), + case Res of + {_V, _LE, PS, MM, TW} + when PS =:= ExpPageSize, MM =:= ExpMaxMem -> + Res + end + catch + X:Y -> + io:format("init: caught ~p ~p @ ~p\n", + [X, Y, erlang:get_stacktrace()]), + {no_version_number, 0, ExpPageSize, ExpMaxMem, 0} + end, + State = #state{dir=Dir, mem_fh=FH, min_epoch=MinEpoch, page_size=PageSize, + max_mem=MaxMem, max_logical_page=unknown, + trim_watermark=TrimWatermark, trim_count=0}, + self() ! finish_init, % TODO + {ok, State}. + +handle_call({write, ClientEpoch, _LogicalPN, _PageBin}, _From, + #state{min_epoch=MinEpoch} = State) + when ClientEpoch < MinEpoch -> + {reply, error_badepoch, State}; +handle_call({write, _ClientEpoch, LogicalPN, PageBin}, _From, + #state{max_logical_page=MLPN} = State) -> + case check_write(LogicalPN, PageBin, State) of + {ok, Offset} -> + ok = write_page(Offset, LogicalPN, PageBin, State), + NewMLPN = erlang:max(LogicalPN, MLPN), + {reply, ok, State#state{max_logical_page=NewMLPN}}; + Else -> + {reply, Else, State} + end; + +handle_call({read, ClientEpoch, _LogicalPN}, _From, + #state{min_epoch=MinEpoch} = State) + when ClientEpoch < MinEpoch -> + {reply, error_badepoch, State}; +handle_call({read, _ClientEpoch, LogicalPN}, _From, State) -> + {reply, read_page(LogicalPN, State), State}; + +handle_call({seal, ClientEpoch}, _From, #state{min_epoch=MinEpoch} = State) + when ClientEpoch =< MinEpoch -> + {reply, error_badepoch, State}; +handle_call({seal, ClientEpoch}, _From, #state{max_logical_page=MLPN}=State) -> + NewState = State#state{min_epoch=ClientEpoch}, + ok = write_hard_state(NewState), + {reply, {ok, MLPN}, NewState}; + +handle_call({trim, ClientEpoch, _LogicalPN}, _From, + #state{min_epoch=MinEpoch} = State) + when ClientEpoch < MinEpoch -> + {reply, error_badepoch, State}; +handle_call({trim, _ClientEpoch, LogicalPN}, _From, State) -> + do_trim_or_fill(trim, LogicalPN, State); + +handle_call({fill, ClientEpoch, _LogicalPN}, _From, + #state{min_epoch=MinEpoch} = State) + when ClientEpoch < MinEpoch -> + {reply, error_badepoch, State}; +handle_call({fill, _ClientEpoch, LogicalPN}, _From, State) -> + do_trim_or_fill(fill, LogicalPN, State); + +handle_call(get__mlp, _From, State) -> + {reply, State#state.max_logical_page, State}; +handle_call(get__min_epoch, _From, State) -> + {reply, State#state.min_epoch, State}; +handle_call(get__trim_watermark, _From, State) -> + {reply, State#state.trim_watermark, State}; +handle_call(status, _From, State) -> + L = [{min_epoch, State#state.min_epoch}, + {page_size, State#state.page_size}, + {max_mem, State#state.max_mem}, + {max_logical_page, State#state.max_logical_page}, + {trim_watermark, State#state.trim_watermark}], + {reply, {ok, L}, State}; +handle_call(stop, _From, State) -> + {stop, normal, ok, State}; + +handle_call(Request, _From, State) -> + Reply = {whaaaaaaaaaaaaaaaaaa, Request}, + {reply, Reply, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(finish_init, State) -> + MLP = find_max_logical_page(State), + State2 = State#state{max_logical_page=MLP}, + ok = write_hard_state(State2), + {noreply, State2}; +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, State) -> + ok = write_hard_state(State), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% + +read_hard_state(Dir) -> + File = hard_state_path(Dir), + case file:read_file(File) of + {ok, Bin} -> + case binary_to_term(Bin) of + T when element(1, T) == v1 -> + T + end; + Else -> + Else + end. + +write_hard_state(#state{min_epoch=MinEpoch, page_size=PageSize, max_mem=MaxMem, + trim_watermark=TrimWatermark} = S) -> + NewPath = hard_state_path(S#state.dir), + TmpPath = NewPath ++ ".tmp", + {ok, FH} = file:open(TmpPath, [write, binary, raw]), + HS = {v1, MinEpoch, PageSize, MaxMem, TrimWatermark}, + ok = file:write(FH, term_to_binary(HS)), + %% ok = file:sync(FH), % TODO uncomment when the training wheels come off + ok = file:close(FH), + ok = file:rename(TmpPath, NewPath). + +memfile_path(Dir) -> + Dir ++ "/memfile". + +hard_state_path(Dir) -> + Dir ++ "/hard-state". + +calc_page_offset(PhysicalPN, #state{page_size=PageSize}) -> + TotalSize = ?PAGE_OVERHEAD + PageSize, + PhysicalPN * TotalSize. + +%% find_max_logical_page(): This is a kludge, based on our naive +%% implementation of not keeping the maximum logical page in hard +%% state. + +find_max_logical_page(S) -> + {ok, FI} = file:read_file_info(memfile_path(S#state.dir)), + find_max_logical_page(0, 0, FI#file_info.size, S). + +find_max_logical_page(MLP, PhysicalPN, FSize, + #state{mem_fh=FH, max_mem=MaxMem}=S) -> + Offset = calc_page_offset(PhysicalPN, S), + if Offset < MaxMem, Offset < FSize -> + case file:pread(FH, Offset, 9) of + {ok, <<1:8/big, LP:64/big>>} -> + find_max_logical_page(erlang:max(MLP, LP), PhysicalPN + 1, + FSize, S); + _ -> + find_max_logical_page(MLP, PhysicalPN + 1, FSize, S) + end; + true -> + MLP + end. + +check_write(LogicalPN, PageBin, + #state{max_mem=MaxMem, page_size=PageSize} = S) -> + Offset = calc_page_offset(LogicalPN, S), + if Offset < MaxMem, byte_size(PageBin) =:= PageSize -> + case check_is_written(Offset, LogicalPN, S) of + false -> + {ok, Offset}; + true -> + error_overwritten + end; + true -> + {bummer, ?MODULE, ?LINE, lpn, LogicalPN, offset, Offset, max_mem, MaxMem, page_size, PageSize} + end. + +check_is_written(Offset, _PhysicalPN, #state{mem_fh=FH}) -> + case file:pread(FH, Offset, 1) of + {ok, <<1:8>>} -> + true; + {ok, <<0:8>>} -> + false; + eof -> + %% We assume that Offset has been bounds-checked + false + end. + +write_page(Offset, LogicalPN, PageBin, #state{mem_fh=FH}) -> + IOList = [<<1:8>>, <>, PageBin, <<1:8>>], + ok = file:pwrite(FH, Offset, IOList). + +read_page(LogicalPN, #state{max_mem=MaxMem, mem_fh=FH, + page_size=PageSize} = S) -> + Offset = calc_page_offset(LogicalPN, S), + if Offset < MaxMem -> + case file:pread(FH, Offset, PageSize + ?PAGE_OVERHEAD) of + {ok, <<1:8, LogicalPN:64/big, Page:PageSize/binary, 1:8>>} -> + {ok, Page}; + {ok, <<1:8, _LogicalPN:64/big, _:PageSize/binary, 0:8>>} -> + io:format("BUMMER: ~s line ~w: incomplete write at ~p\n", + [?MODULE, ?LINE, LogicalPN]), + error_unwritten; + {ok, _} -> + error_unwritten; + eof -> + error_unwritten; + Else -> + io:format("BUMMER: ~s line ~w: ~p\n", + [?MODULE, ?LINE, Else]), + badarg % TODO: better idea + end; + true -> + badarg + end. + +do_trim_or_fill(Op, LogicalPN, + #state{trim_watermark=TrimWatermark, trim_count=TrimCount} = S) -> + case trim_page(Op, LogicalPN, S) of + ok -> + NewS = S#state{trim_watermark=erlang:max( + TrimWatermark, LogicalPN), + trim_count=TrimCount + 1}, + if TrimCount rem 1000 == 0 -> + ok = write_hard_state(NewS); + true -> + ok + end, + {reply, ok, NewS}; + Else -> + {reply, Else, S} + end. + +trim_page(Op, LogicalPN, #state{max_mem=MaxMem, mem_fh=FH} = S) -> + Offset = calc_page_offset(LogicalPN, S), + if Offset < MaxMem -> + Status = case file:pread(FH, Offset, 1) of + {ok, <<0:8>>} -> + error_unwritten; + {ok, <<1:8>>} -> + error_overwritten; + {ok, <<2:8>>} -> + error_trimmed; + eof -> + error_unwritten; + Else -> + io:format("BUMMER: ~s line ~w: ~p\n", + [?MODULE, ?LINE, Else]), + error_trimmed % TODO + end, + if Status == error_overwritten andalso Op == trim -> + ok = file:pwrite(FH, Offset, <<2:8>>), + ok; + Status == error_unwritten andalso Op == fill -> + ok = file:pwrite(FH, Offset, <<2:8>>), + ok; + true -> + Status + end; + true -> + badarg + end. + +%%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% + +-ifdef(TEST). + +startstop_test() -> + Dir = "/tmp/flu." ++ os:getpid(), + {ok, P1} = start_link(Dir), + try + {ok, _} = status(P1), + ok = stop(P1), + {'EXIT', _} = (catch stop(P1)), + + {ok, P2} = start_link(Dir), + 0 = get__mlp(P2), + 0 = get__min_epoch(P2), + ok = stop(P2), + + ok + after + ok = corfurl_util:delete_dir(Dir) + end. + +basic_test() -> + Dir = "/tmp/flu." ++ os:getpid(), + {ok, P1} = start_link(Dir), + try + Epoch1 = 1, + Epoch2 = 2, + LPN = 1, + Bin1 = <<42:64>>, + Bin2 = <<42042:64>>, + + error_unwritten = read(P1, Epoch1, LPN), + error_unwritten = trim(P1, Epoch1, LPN), + error_unwritten = trim(P1, Epoch1, LPN+77), + + ok = write(P1, Epoch1, LPN, Bin1), + error_overwritten = write(P1, Epoch1, LPN, Bin1), + error_overwritten = fill(P1, Epoch1, LPN), + LPN = get__mlp(P1), + 0 = get__min_epoch(P1), + 0 = get__trim_watermark(P1), + {ok, LPN} = seal(P1, Epoch1), + 1 = get__min_epoch(P1), + + error_overwritten = write(P1, Epoch2, LPN, Bin1), + ok = write(P1, Epoch2, LPN+1, Bin2), + Epoch1 = get__min_epoch(P1), + + {ok, Bin1} = read(P1, Epoch1, LPN), + {ok, Bin2} = read(P1, Epoch2, LPN+1), + error_unwritten = read(P1, Epoch2, LPN+2), + badarg = read(P1, Epoch2, 1 bsl 2982), + + error_badepoch = seal(P1, Epoch1), + {ok, _} = seal(P1, Epoch2), + error_badepoch = seal(P1, Epoch2), + + error_badepoch = read(P1, Epoch1, LPN), + error_badepoch = read(P1, Epoch1, LPN+1), + {ok, Bin1} = read(P1, Epoch2, LPN), + {ok, Bin2} = read(P1, Epoch2, LPN+1), + + error_badepoch = trim(P1, Epoch1, LPN+1), + ok = trim(P1, Epoch2, LPN+1), + error_trimmed = trim(P1, Epoch2, LPN+1), + %% Current watermark processing is broken. But we'll test what's + %% there now. + ExpectedWaterFixMe = LPN+1, + ExpectedWaterFixMe = get__trim_watermark(P1), + + ok = fill(P1, Epoch2, LPN+3), + error_trimmed = fill(P1, Epoch2, LPN+3), + error_trimmed = trim(P1, Epoch2, LPN+3), + + Epoch2 = get__min_epoch(P1), + ok = stop(P1), + ok + after + ok = corfurl_util:delete_dir(Dir) + end. + +seal_persistence_test() -> + Dir = "/tmp/flu." ++ os:getpid(), + {ok, P1} = start_link(Dir), + try + 0 = get__min_epoch(P1), + Epoch = 665, + {ok, LPN} = seal(P1, Epoch), + Epoch = get__min_epoch(P1), + ok = stop(P1), + + {ok, P2} = start_link(Dir), + Epoch = get__min_epoch(P2), + + ok = stop(P2), + ok + after + ok = corfurl_util:delete_dir(Dir) + end. + +-endif. % TEST diff --git a/prototype/corfurl/src/corfurl_sequencer.erl b/prototype/corfurl/src/corfurl_sequencer.erl new file mode 100644 index 0000000..92426d0 --- /dev/null +++ b/prototype/corfurl/src/corfurl_sequencer.erl @@ -0,0 +1,123 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(corfurl_sequencer). + +-behaviour(gen_server). + +-export([start_link/1, stop/1, get/2]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). +-endif. + +-define(SERVER, ?MODULE). + +start_link(FLUs) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, {FLUs}, []). + +stop(Pid) -> + gen_server:call(Pid, stop, infinity). + +get(Pid, NumPages) -> + gen_server:call(Pid, {get, NumPages}, infinity). + +%%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% + +init({FLUs}) -> + MLP = get_max_logical_page(FLUs), + {ok, MLP + 1}. + +handle_call({get, NumPages}, _From, MLP) -> + {reply, MLP, MLP + NumPages}; +handle_call(stop, _From, MLP) -> + {stop, normal, ok, MLP}; +handle_call(_Request, _From, MLP) -> + Reply = whaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa, + {reply, Reply, MLP}. + +handle_cast(_Msg, MLP) -> + {noreply, MLP}. + +handle_info(_Info, MLP) -> + {noreply, MLP}. + +terminate(_Reason, _MLP) -> + ok. + +code_change(_OldVsn, MLP, _Extra) -> + {ok, MLP}. + +%%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% + +get_max_logical_page(FLUs) -> + lists:max([proplists:get_value(max_logical_page, Ps, 0) || + FLU <- FLUs, + {ok, Ps} <- [corfurl_flu:status(FLU)]]). + +%%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% + +-ifdef(TEST). + +smoke_test() -> + BaseDir = "/tmp/" ++ atom_to_list(?MODULE) ++ ".", + PageSize = 8, + NumPages = 500, + NumFLUs = 4, + MyDir = fun(X) -> BaseDir ++ integer_to_list(X) end, + Del = fun() -> [ok = corfurl_util:delete_dir(MyDir(X)) || + X <- lists:seq(1, NumFLUs)] end, + + Del(), + FLUs = [begin + element(2, corfurl_flu:start_link(MyDir(X), + PageSize, NumPages*PageSize)) + end || X <- lists:seq(1, NumFLUs)], + FLUsNums = lists:zip(FLUs, lists:seq(1, NumFLUs)), + + try + [ok = corfurl_flu:write(FLU, 1, PageNum, <<42:(8*8)>>) || + {FLU, PageNum} <- FLUsNums], + MLP0 = NumFLUs, + NumFLUs = get_max_logical_page(FLUs), + + %% Excellent. Now let's start the sequencer and see if it gets + %% the same answer. If yes, then the first get will return MLP1, + %% yadda yadda. + MLP1 = MLP0 + 1, + MLP3 = MLP0 + 3, + MLP4 = MLP0 + 4, + {ok, Sequencer} = start_link(FLUs), + try + MLP1 = get(Sequencer, 2), + MLP3 = get(Sequencer, 1), + MLP4 = get(Sequencer, 1) + after + stop(Sequencer) + end + after + [ok = corfurl_flu:stop(FLU) || FLU <- FLUs], + Del() + end. + +-endif. % TEST diff --git a/prototype/corfurl/src/corfurl_util.erl b/prototype/corfurl/src/corfurl_util.erl new file mode 100644 index 0000000..c88da23 --- /dev/null +++ b/prototype/corfurl/src/corfurl_util.erl @@ -0,0 +1,36 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(corfurl_util). + +-export([delete_dir/1]). + +delete_dir(Dir) -> + %% We don't recursively delete directories, the ok pattern match will fail. + [ok = file:delete(X) || X <- filelib:wildcard(Dir ++ "/*")], + case file:del_dir(Dir) of + ok -> + ok; + {error, enoent} -> + ok; + Else -> + Else + end. +