Add fledgling log implementation based on CORFU papers

This commit is contained in:
Scott Lystig Fritchie 2014-02-16 14:57:24 +09:00
parent 2bf76b5727
commit 72bf329e1c
7 changed files with 994 additions and 0 deletions

4
prototype/corfurl/.gitignore vendored Normal file
View file

@ -0,0 +1,4 @@
.eunit
deps
ebin/*.beam
ebin/*.app

View file

@ -0,0 +1,23 @@
REBAR_BIN := $(shell which rebar)
ifeq ($(REBAR_BIN),)
REBAR_BIN = ./rebar
endif
.PHONY: rel deps package pkgclean
all: deps compile
compile:
$(REBAR_BIN) compile
deps:
$(REBAR_BIN) get-deps
clean:
$(REBAR_BIN) clean
test: deps compile eunit
eunit:
$(REBAR_BIN) -v skip_deps=true eunit

View file

@ -0,0 +1,26 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%% 1 byte @ offset 0: 0=unwritten, 1=written, 2=trimmed, 255=corrupt? TODO
%% 8 bytes @ offset 1: logical page number
%% P bytes @ offset 9: page data
%% 1 byte @ offset 9+P: 0=unwritten, 1=written
-define(PAGE_OVERHEAD, (1 + 8 + 1)).

View file

@ -0,0 +1,311 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(corfurl).
-export([new_simple_projection/4,
new_range/3,
read_projection/2,
save_projection/2]).
-export([append_page/3]).
-include("corfurl.hrl").
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-compile(export_all).
-endif.
-type flu_name() :: atom().
-type flu() :: pid() | flu_name().
-type flu_chain() :: [flu()].
-record(range, {
pn_start :: non_neg_integer(), % start page number
pn_end :: non_neg_integer(), % end page number
chains :: [flu_chain()]
}).
-record(proj, { % Projection
epoch :: non_neg_integer(),
r :: [#range{}]
}).
%% append_page(Sequencer, P, Page) ->
%% append_page(Sequencer, P, 1, [Page]).
%% append_page(Sequencer, P, NumPages, PageList) ->
%% FirstPN = corfurl_sequencer:get(Sequencer, NumPages),
%% [append_single_page(P, LPN, Page) ||
%% {LPN, Page} <- lists:zip(lists:seq(FirstPN, FirstPN+NumPages-1),
%% PageList)].
append_page(Sequencer, P, Page) ->
append_page(Sequencer, P, Page, 1).
append_page(Sequencer, P, Page, Retries) when Retries < 50 ->
case corfurl_sequencer:get(Sequencer, 1) of
LPN when is_integer(LPN) ->
case append_single_page(P, LPN, Page) of
ok ->
ok;
X when X == error_written; X == error_trimmed ->
io:format(user, "LPN ~p race lost: ~p\n", [LPN, X]),
append_page(Sequencer, P, Page);
Else ->
exit({todo, ?MODULE, line, ?LINE, Else})
end;
_ ->
timer:sleep(Retries), % TODO naive
append_page(Sequencer, P, Page, Retries * 2)
end.
append_single_page(#proj{epoch=Epoch} = P, LPN, Page) ->
Chain = project_to_chain(LPN, P),
append_single_page_to_chain(Chain, Epoch, LPN, Page, 1).
append_single_page_to_chain([], _Epoch, _LPN, _Page, _Nth) ->
ok;
append_single_page_to_chain([FLU|Rest], Epoch, LPN, Page, Nth) ->
case corfurl_flu:write(flu_pid(FLU), Epoch, LPN, Page) of
ok ->
append_single_page_to_chain(Rest, Epoch, LPN, Page, Nth+1);
error_badepoch ->
%% TODO: Interesting case: there may be cases where retrying with
%% a new epoch & that epoch's projection is just fine (and
%% we'll succeed) and cases where retrying will fail.
%% Figure out what those cases are, then for the
%% destined-to-fail case, try to clean up (via trim?)?
error_badepoch;
error_trimmed ->
%% Whoa, partner, you're movin' kinda fast for a trim.
%% This might've been due to us being too slow and someone
%% else junked us.
%% TODO We should go trim our previously successful writes?
error_trimmed;
error_written when Nth == 1 ->
%% The sequencer lied, or we didn't use the sequencer and
%% guessed and guessed poorly, or someone is accidentally
%% trying to take our page. Shouganai, these things happen.
error_written;
error_written when Nth > 1 ->
%% The likely cause is that another reader has noticed that
%% we haven't finished writing this page in this chain and
%% has repaired the remainder of the chain while we were
%% drinking coffee. Let's double-check.
case corfurl_flu:read(flu_pid(FLU), Epoch, LPN) of
{ok, AlreadyThere} when AlreadyThere =:= Page ->
%% Alright, well, let's go continue the repair/writing,
%% since we agree on the page's value.
append_single_page_to_chain(Rest, Epoch, LPN, Page, Nth+1);
error_badepoch ->
%% TODO: same TODO as the above error_badepoch case.
error_badepoch;
error_overwritten ->
error({impossible, ?MODULE, ?LINE, left_off_here})
end;
Else ->
%% TODO: corner case
io:format(user, "WTF? Else = ~p\n", [Else]),
Else
end.
flu_pid(X) when is_pid(X) ->
X;
flu_pid(X) when is_atom(X) ->
ets:lookup_element(flu_pid_tab, X, 1).
%%%% %%%% %%%% projection utilities %%%% %%%% %%%%
new_range(Start, End, ChainList) ->
%% TODO: sanity checking of ChainList, Start < End, yadda
#range{pn_start=Start, pn_end=End, chains=list_to_tuple(ChainList)}.
new_simple_projection(Epoch, Start, End, ChainList) ->
#proj{epoch=Epoch, r=[new_range(Start, End, ChainList)]}.
make_projection_path(Dir, Epoch) ->
lists:flatten(io_lib:format("~s/~12..0w.proj", [Dir, Epoch])).
read_projection(Dir, Epoch) ->
case file:read_file(make_projection_path(Dir, Epoch)) of
{ok, Bin} ->
{ok, binary_to_term(Bin)}; % TODO if corrupted?
{error, enoent} ->
error_unwritten;
Else ->
Else % TODO API corner case
end.
save_projection(Dir, #proj{epoch=Epoch} = P) ->
Path = make_projection_path(Dir, Epoch),
ok = filelib:ensure_dir(Dir ++ "/ignored"),
{_, B, C} = now(),
TmpPath = Path ++ lists:flatten(io_lib:format(".~w.~w.~w", [B, C, node()])),
%% TODO: don't be lazy, do a flush before link when training wheels come off
ok = file:write_file(TmpPath, term_to_binary(P)),
case file:make_link(TmpPath, Path) of
ok ->
file:delete(TmpPath),
ok;
{error, eexist} ->
error_overwritten;
Else ->
Else % TODO API corner case
end.
project_to_chain(LPN, P) ->
%% TODO fixme
%% TODO something other than round-robin?
[#range{pn_start=Start, pn_end=End, chains=Chains}] = P#proj.r,
if Start =< LPN, LPN =< End ->
I = ((LPN - Start) rem tuple_size(Chains)) + 1,
element(I, Chains)
end.
%%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%%
-ifdef(TEST).
save_read_test() ->
Dir = "/tmp/" ++ atom_to_list(?MODULE) ++".save-read",
Chain = [a,b],
P1 = new_simple_projection(1, 1, 1*100, [Chain]),
try
filelib:ensure_dir(Dir ++ "/ignored"),
ok = save_projection(Dir, P1),
error_overwritten = save_projection(Dir, P1),
{ok, P1} = read_projection(Dir, 1),
error_unwritten = read_projection(Dir, 2),
ok
after
ok = corfurl_util:delete_dir(Dir)
end.
setup_flu_basedir() ->
"/tmp/" ++ atom_to_list(?MODULE) ++ ".".
setup_flu_dir(N) ->
setup_flu_basedir() ++ integer_to_list(N).
setup_del_all(NumFLUs) ->
[ok = corfurl_util:delete_dir(setup_flu_dir(N)) ||
N <- lists:seq(1, NumFLUs)].
setup_basic_flus(NumFLUs, PageSize, NumPages) ->
setup_del_all(NumFLUs),
[begin
element(2, corfurl_flu:start_link(setup_flu_dir(X),
PageSize, NumPages * (PageSize * ?PAGE_OVERHEAD)))
end || X <- lists:seq(1, NumFLUs)].
append_test() ->
NumFLUs = 4,
PageSize = 8,
NumPages = 10,
FLUs = [F1, F2, F3, F4] = setup_basic_flus(NumFLUs, PageSize, NumPages),
{ok, Seq} = corfurl_sequencer:start_link(FLUs),
try
P1 = new_simple_projection(1, 1, 1*100, [[F1, F2], [F3, F4]]),
[begin
Pg = lists:flatten(io_lib:format("~8..0w", [X])),
ok = append_page(Seq, P1, list_to_binary(Pg))
end || X <- lists:seq(1, 5)],
ok
after
corfurl_sequencer:stop(Seq),
[corfurl_flu:stop(F) || F <- FLUs],
setup_del_all(NumFLUs)
end.
forfun_append(0, _Seq, _P, _Page) ->
ok;
forfun_append(N, Seq, P, Page) ->
ok = append_page(Seq, P, Page),
forfun_append(N - 1, Seq, P, Page).
-ifdef(TIMING_TEST).
forfun_test_() ->
{timeout, 99999, fun() ->
[forfun(Procs) || Procs <- [10,100,1000,5000]]
end}.
%%% My MBP, SSD
%%% The 1K and 5K procs shows full-mailbox-scan ickiness
%%% when getting replies from prim_file. :-(
%%% forfun: 10 procs writing 200000 pages of 8 bytes/page to 2 chains of 4 total FLUs in 10.016815 sec
%%% forfun: 100 procs writing 200000 pages of 8 bytes/page to 2 chains of 4 total FLUs in 10.547976 sec
%%% forfun: 1000 procs writing 200000 pages of 8 bytes/page to 2 chains of 4 total FLUs in 13.706686 sec
%%% forfun: 5000 procs writing 200000 pages of 8 bytes/page to 2 chains of 4 total FLUs in 33.516312 sec
%%% forfun: 10 procs writing 200000 pages of 8 bytes/page to 4 chains of 4 total FLUs in 5.350147 sec
%%% forfun: 100 procs writing 200000 pages of 8 bytes/page to 4 chains of 4 total FLUs in 5.429485 sec
%%% forfun: 1000 procs writing 200000 pages of 8 bytes/page to 4 chains of 4 total FLUs in 5.643233 sec
%%% forfun: 5000 procs writing 200000 pages of 8 bytes/page to 4 chains of 4 total FLUs in 15.686058 sec
%%%% forfun: 10 procs writing 200000 pages of 4096 bytes/page to 2 chains of 4 total FLUs in 13.479458 sec
%%%% forfun: 100 procs writing 200000 pages of 4096 bytes/page to 2 chains of 4 total FLUs in 14.752565 sec
%%%% forfun: 1000 procs writing 200000 pages of 4096 bytes/page to 2 chains of 4 total FLUs in 25.012306 sec
%%%% forfun: 5000 procs writing 200000 pages of 4096 bytes/page to 2 chains of 4 total FLUs in 38.972076 sec
forfun(NumProcs) ->
io:format(user, "\n", []),
NumFLUs = 4,
PageSize = 8,
%%PageSize = 4096,
NumPages = 200*1000,
PagesPerProc = NumPages div NumProcs,
FLUs = [F1, F2, F3, F4] = setup_basic_flus(NumFLUs, PageSize, NumPages),
{ok, Seq} = corfurl_sequencer:start_link(FLUs),
try
Chains = [[F1, F2], [F3, F4]],
%%Chains = [[F1], [F2], [F3], [F4]],
P = new_simple_projection(1, 1, NumPages*2, Chains),
Me = self(),
Start = now(),
Ws = [begin
Page = <<X:(PageSize*8)>>,
spawn_link(fun() ->
forfun_append(PagesPerProc, Seq, P, Page),
Me ! {done, self()}
end)
end || X <- lists:seq(1, NumProcs)],
[receive {done, W} -> ok end || W <- Ws],
End = now(),
io:format(user, "forfun: ~p procs writing ~p pages of ~p bytes/page to ~p chains of ~p total FLUs in ~p sec\n",
[NumProcs, NumPages, PageSize, length(Chains), length(lists:flatten(Chains)), timer:now_diff(End, Start) / 1000000]),
ok
after
corfurl_sequencer:stop(Seq),
[corfurl_flu:stop(F) || F <- FLUs],
setup_del_all(NumFLUs)
end.
-endif. % TIMING_TEST
-endif. % TEST

View file

@ -0,0 +1,471 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(corfurl_flu).
-behaviour(gen_server).
-type flu_error() :: 'error_badepoch' | 'error_trimmed' |
'error_overwritten' | 'error_unwritten'.
-export_type([flu_error/0]).
%% API
-export([start_link/1, start_link/3, status/1, stop/1]).
-export([write/4, read/3, seal/2, trim/3, fill/3]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-include("corfurl.hrl").
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-export([get__mlp/1, get__min_epoch/1, get__trim_watermark/1]).
-endif.
-include_lib("kernel/include/file.hrl").
-record(state, {
dir :: string(),
mem_fh :: term(),
min_epoch :: non_neg_integer(),
page_size :: non_neg_integer(),
max_mem :: non_neg_integer(),
max_logical_page :: 'unknown' | non_neg_integer(),
%% TODO: Trim watermark handling is *INCOMPLETE*. The
%% current code is broken but is occasionally correct,
%% like a broken analog watch is correct 2x per day.
trim_watermark :: non_neg_integer(),
trim_count :: non_neg_integer()
}).
start_link(Dir) ->
start_link(Dir, 8, 64*1024*1024).
start_link(Dir, PageSize, MaxMem) ->
gen_server:start_link(?MODULE, {Dir, PageSize, MaxMem}, []).
status(Pid) ->
gen_server:call(Pid, status, infinity).
stop(Pid) ->
gen_server:call(Pid, stop, infinity).
write(Pid, Epoch, LogicalPN, PageBin)
when is_integer(LogicalPN), LogicalPN > 0, is_binary(PageBin) ->
gen_server:call(Pid, {write, Epoch, LogicalPN, PageBin}, infinity).
read(Pid, Epoch, LogicalPN)
when is_integer(Epoch), Epoch > 0, is_integer(LogicalPN), LogicalPN > 0 ->
gen_server:call(Pid, {read, Epoch, LogicalPN}, infinity).
seal(Pid, Epoch) when is_integer(Epoch), Epoch > 0 ->
gen_server:call(Pid, {seal, Epoch}, infinity).
trim(Pid, Epoch, LogicalPN)
when is_integer(Epoch), Epoch > 0, is_integer(LogicalPN), LogicalPN > 0 ->
gen_server:call(Pid, {trim, Epoch, LogicalPN}, infinity).
fill(Pid, Epoch, LogicalPN)
when is_integer(Epoch), Epoch > 0, is_integer(LogicalPN), LogicalPN > 0 ->
gen_server:call(Pid, {fill, Epoch, LogicalPN}, infinity).
-ifdef(TEST).
get__mlp(Pid) ->
gen_server:call(Pid, get__mlp, infinity).
get__min_epoch(Pid) ->
gen_server:call(Pid, get__min_epoch, infinity).
get__trim_watermark(Pid) ->
gen_server:call(Pid, get__trim_watermark, infinity).
-endif. % TEST
%%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%%
init({Dir, ExpPageSize, ExpMaxMem}) ->
MemFile = memfile_path(Dir),
filelib:ensure_dir(MemFile),
{ok, FH} = file:open(MemFile, [read, write, raw, binary]),
{_Version, MinEpoch, PageSize, MaxMem, TrimWatermark} =
try
Res = read_hard_state(Dir),
case Res of
{_V, _LE, PS, MM, TW}
when PS =:= ExpPageSize, MM =:= ExpMaxMem ->
Res
end
catch
X:Y ->
io:format("init: caught ~p ~p @ ~p\n",
[X, Y, erlang:get_stacktrace()]),
{no_version_number, 0, ExpPageSize, ExpMaxMem, 0}
end,
State = #state{dir=Dir, mem_fh=FH, min_epoch=MinEpoch, page_size=PageSize,
max_mem=MaxMem, max_logical_page=unknown,
trim_watermark=TrimWatermark, trim_count=0},
self() ! finish_init, % TODO
{ok, State}.
handle_call({write, ClientEpoch, _LogicalPN, _PageBin}, _From,
#state{min_epoch=MinEpoch} = State)
when ClientEpoch < MinEpoch ->
{reply, error_badepoch, State};
handle_call({write, _ClientEpoch, LogicalPN, PageBin}, _From,
#state{max_logical_page=MLPN} = State) ->
case check_write(LogicalPN, PageBin, State) of
{ok, Offset} ->
ok = write_page(Offset, LogicalPN, PageBin, State),
NewMLPN = erlang:max(LogicalPN, MLPN),
{reply, ok, State#state{max_logical_page=NewMLPN}};
Else ->
{reply, Else, State}
end;
handle_call({read, ClientEpoch, _LogicalPN}, _From,
#state{min_epoch=MinEpoch} = State)
when ClientEpoch < MinEpoch ->
{reply, error_badepoch, State};
handle_call({read, _ClientEpoch, LogicalPN}, _From, State) ->
{reply, read_page(LogicalPN, State), State};
handle_call({seal, ClientEpoch}, _From, #state{min_epoch=MinEpoch} = State)
when ClientEpoch =< MinEpoch ->
{reply, error_badepoch, State};
handle_call({seal, ClientEpoch}, _From, #state{max_logical_page=MLPN}=State) ->
NewState = State#state{min_epoch=ClientEpoch},
ok = write_hard_state(NewState),
{reply, {ok, MLPN}, NewState};
handle_call({trim, ClientEpoch, _LogicalPN}, _From,
#state{min_epoch=MinEpoch} = State)
when ClientEpoch < MinEpoch ->
{reply, error_badepoch, State};
handle_call({trim, _ClientEpoch, LogicalPN}, _From, State) ->
do_trim_or_fill(trim, LogicalPN, State);
handle_call({fill, ClientEpoch, _LogicalPN}, _From,
#state{min_epoch=MinEpoch} = State)
when ClientEpoch < MinEpoch ->
{reply, error_badepoch, State};
handle_call({fill, _ClientEpoch, LogicalPN}, _From, State) ->
do_trim_or_fill(fill, LogicalPN, State);
handle_call(get__mlp, _From, State) ->
{reply, State#state.max_logical_page, State};
handle_call(get__min_epoch, _From, State) ->
{reply, State#state.min_epoch, State};
handle_call(get__trim_watermark, _From, State) ->
{reply, State#state.trim_watermark, State};
handle_call(status, _From, State) ->
L = [{min_epoch, State#state.min_epoch},
{page_size, State#state.page_size},
{max_mem, State#state.max_mem},
{max_logical_page, State#state.max_logical_page},
{trim_watermark, State#state.trim_watermark}],
{reply, {ok, L}, State};
handle_call(stop, _From, State) ->
{stop, normal, ok, State};
handle_call(Request, _From, State) ->
Reply = {whaaaaaaaaaaaaaaaaaa, Request},
{reply, Reply, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(finish_init, State) ->
MLP = find_max_logical_page(State),
State2 = State#state{max_logical_page=MLP},
ok = write_hard_state(State2),
{noreply, State2};
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, State) ->
ok = write_hard_state(State),
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%%
read_hard_state(Dir) ->
File = hard_state_path(Dir),
case file:read_file(File) of
{ok, Bin} ->
case binary_to_term(Bin) of
T when element(1, T) == v1 ->
T
end;
Else ->
Else
end.
write_hard_state(#state{min_epoch=MinEpoch, page_size=PageSize, max_mem=MaxMem,
trim_watermark=TrimWatermark} = S) ->
NewPath = hard_state_path(S#state.dir),
TmpPath = NewPath ++ ".tmp",
{ok, FH} = file:open(TmpPath, [write, binary, raw]),
HS = {v1, MinEpoch, PageSize, MaxMem, TrimWatermark},
ok = file:write(FH, term_to_binary(HS)),
%% ok = file:sync(FH), % TODO uncomment when the training wheels come off
ok = file:close(FH),
ok = file:rename(TmpPath, NewPath).
memfile_path(Dir) ->
Dir ++ "/memfile".
hard_state_path(Dir) ->
Dir ++ "/hard-state".
calc_page_offset(PhysicalPN, #state{page_size=PageSize}) ->
TotalSize = ?PAGE_OVERHEAD + PageSize,
PhysicalPN * TotalSize.
%% find_max_logical_page(): This is a kludge, based on our naive
%% implementation of not keeping the maximum logical page in hard
%% state.
find_max_logical_page(S) ->
{ok, FI} = file:read_file_info(memfile_path(S#state.dir)),
find_max_logical_page(0, 0, FI#file_info.size, S).
find_max_logical_page(MLP, PhysicalPN, FSize,
#state{mem_fh=FH, max_mem=MaxMem}=S) ->
Offset = calc_page_offset(PhysicalPN, S),
if Offset < MaxMem, Offset < FSize ->
case file:pread(FH, Offset, 9) of
{ok, <<1:8/big, LP:64/big>>} ->
find_max_logical_page(erlang:max(MLP, LP), PhysicalPN + 1,
FSize, S);
_ ->
find_max_logical_page(MLP, PhysicalPN + 1, FSize, S)
end;
true ->
MLP
end.
check_write(LogicalPN, PageBin,
#state{max_mem=MaxMem, page_size=PageSize} = S) ->
Offset = calc_page_offset(LogicalPN, S),
if Offset < MaxMem, byte_size(PageBin) =:= PageSize ->
case check_is_written(Offset, LogicalPN, S) of
false ->
{ok, Offset};
true ->
error_overwritten
end;
true ->
{bummer, ?MODULE, ?LINE, lpn, LogicalPN, offset, Offset, max_mem, MaxMem, page_size, PageSize}
end.
check_is_written(Offset, _PhysicalPN, #state{mem_fh=FH}) ->
case file:pread(FH, Offset, 1) of
{ok, <<1:8>>} ->
true;
{ok, <<0:8>>} ->
false;
eof ->
%% We assume that Offset has been bounds-checked
false
end.
write_page(Offset, LogicalPN, PageBin, #state{mem_fh=FH}) ->
IOList = [<<1:8>>, <<LogicalPN:64/big>>, PageBin, <<1:8>>],
ok = file:pwrite(FH, Offset, IOList).
read_page(LogicalPN, #state{max_mem=MaxMem, mem_fh=FH,
page_size=PageSize} = S) ->
Offset = calc_page_offset(LogicalPN, S),
if Offset < MaxMem ->
case file:pread(FH, Offset, PageSize + ?PAGE_OVERHEAD) of
{ok, <<1:8, LogicalPN:64/big, Page:PageSize/binary, 1:8>>} ->
{ok, Page};
{ok, <<1:8, _LogicalPN:64/big, _:PageSize/binary, 0:8>>} ->
io:format("BUMMER: ~s line ~w: incomplete write at ~p\n",
[?MODULE, ?LINE, LogicalPN]),
error_unwritten;
{ok, _} ->
error_unwritten;
eof ->
error_unwritten;
Else ->
io:format("BUMMER: ~s line ~w: ~p\n",
[?MODULE, ?LINE, Else]),
badarg % TODO: better idea
end;
true ->
badarg
end.
do_trim_or_fill(Op, LogicalPN,
#state{trim_watermark=TrimWatermark, trim_count=TrimCount} = S) ->
case trim_page(Op, LogicalPN, S) of
ok ->
NewS = S#state{trim_watermark=erlang:max(
TrimWatermark, LogicalPN),
trim_count=TrimCount + 1},
if TrimCount rem 1000 == 0 ->
ok = write_hard_state(NewS);
true ->
ok
end,
{reply, ok, NewS};
Else ->
{reply, Else, S}
end.
trim_page(Op, LogicalPN, #state{max_mem=MaxMem, mem_fh=FH} = S) ->
Offset = calc_page_offset(LogicalPN, S),
if Offset < MaxMem ->
Status = case file:pread(FH, Offset, 1) of
{ok, <<0:8>>} ->
error_unwritten;
{ok, <<1:8>>} ->
error_overwritten;
{ok, <<2:8>>} ->
error_trimmed;
eof ->
error_unwritten;
Else ->
io:format("BUMMER: ~s line ~w: ~p\n",
[?MODULE, ?LINE, Else]),
error_trimmed % TODO
end,
if Status == error_overwritten andalso Op == trim ->
ok = file:pwrite(FH, Offset, <<2:8>>),
ok;
Status == error_unwritten andalso Op == fill ->
ok = file:pwrite(FH, Offset, <<2:8>>),
ok;
true ->
Status
end;
true ->
badarg
end.
%%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%%
-ifdef(TEST).
startstop_test() ->
Dir = "/tmp/flu." ++ os:getpid(),
{ok, P1} = start_link(Dir),
try
{ok, _} = status(P1),
ok = stop(P1),
{'EXIT', _} = (catch stop(P1)),
{ok, P2} = start_link(Dir),
0 = get__mlp(P2),
0 = get__min_epoch(P2),
ok = stop(P2),
ok
after
ok = corfurl_util:delete_dir(Dir)
end.
basic_test() ->
Dir = "/tmp/flu." ++ os:getpid(),
{ok, P1} = start_link(Dir),
try
Epoch1 = 1,
Epoch2 = 2,
LPN = 1,
Bin1 = <<42:64>>,
Bin2 = <<42042:64>>,
error_unwritten = read(P1, Epoch1, LPN),
error_unwritten = trim(P1, Epoch1, LPN),
error_unwritten = trim(P1, Epoch1, LPN+77),
ok = write(P1, Epoch1, LPN, Bin1),
error_overwritten = write(P1, Epoch1, LPN, Bin1),
error_overwritten = fill(P1, Epoch1, LPN),
LPN = get__mlp(P1),
0 = get__min_epoch(P1),
0 = get__trim_watermark(P1),
{ok, LPN} = seal(P1, Epoch1),
1 = get__min_epoch(P1),
error_overwritten = write(P1, Epoch2, LPN, Bin1),
ok = write(P1, Epoch2, LPN+1, Bin2),
Epoch1 = get__min_epoch(P1),
{ok, Bin1} = read(P1, Epoch1, LPN),
{ok, Bin2} = read(P1, Epoch2, LPN+1),
error_unwritten = read(P1, Epoch2, LPN+2),
badarg = read(P1, Epoch2, 1 bsl 2982),
error_badepoch = seal(P1, Epoch1),
{ok, _} = seal(P1, Epoch2),
error_badepoch = seal(P1, Epoch2),
error_badepoch = read(P1, Epoch1, LPN),
error_badepoch = read(P1, Epoch1, LPN+1),
{ok, Bin1} = read(P1, Epoch2, LPN),
{ok, Bin2} = read(P1, Epoch2, LPN+1),
error_badepoch = trim(P1, Epoch1, LPN+1),
ok = trim(P1, Epoch2, LPN+1),
error_trimmed = trim(P1, Epoch2, LPN+1),
%% Current watermark processing is broken. But we'll test what's
%% there now.
ExpectedWaterFixMe = LPN+1,
ExpectedWaterFixMe = get__trim_watermark(P1),
ok = fill(P1, Epoch2, LPN+3),
error_trimmed = fill(P1, Epoch2, LPN+3),
error_trimmed = trim(P1, Epoch2, LPN+3),
Epoch2 = get__min_epoch(P1),
ok = stop(P1),
ok
after
ok = corfurl_util:delete_dir(Dir)
end.
seal_persistence_test() ->
Dir = "/tmp/flu." ++ os:getpid(),
{ok, P1} = start_link(Dir),
try
0 = get__min_epoch(P1),
Epoch = 665,
{ok, LPN} = seal(P1, Epoch),
Epoch = get__min_epoch(P1),
ok = stop(P1),
{ok, P2} = start_link(Dir),
Epoch = get__min_epoch(P2),
ok = stop(P2),
ok
after
ok = corfurl_util:delete_dir(Dir)
end.
-endif. % TEST

View file

@ -0,0 +1,123 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(corfurl_sequencer).
-behaviour(gen_server).
-export([start_link/1, stop/1, get/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
-define(SERVER, ?MODULE).
start_link(FLUs) ->
gen_server:start_link({local, ?SERVER}, ?MODULE, {FLUs}, []).
stop(Pid) ->
gen_server:call(Pid, stop, infinity).
get(Pid, NumPages) ->
gen_server:call(Pid, {get, NumPages}, infinity).
%%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%%
init({FLUs}) ->
MLP = get_max_logical_page(FLUs),
{ok, MLP + 1}.
handle_call({get, NumPages}, _From, MLP) ->
{reply, MLP, MLP + NumPages};
handle_call(stop, _From, MLP) ->
{stop, normal, ok, MLP};
handle_call(_Request, _From, MLP) ->
Reply = whaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa,
{reply, Reply, MLP}.
handle_cast(_Msg, MLP) ->
{noreply, MLP}.
handle_info(_Info, MLP) ->
{noreply, MLP}.
terminate(_Reason, _MLP) ->
ok.
code_change(_OldVsn, MLP, _Extra) ->
{ok, MLP}.
%%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%%
get_max_logical_page(FLUs) ->
lists:max([proplists:get_value(max_logical_page, Ps, 0) ||
FLU <- FLUs,
{ok, Ps} <- [corfurl_flu:status(FLU)]]).
%%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%%
-ifdef(TEST).
smoke_test() ->
BaseDir = "/tmp/" ++ atom_to_list(?MODULE) ++ ".",
PageSize = 8,
NumPages = 500,
NumFLUs = 4,
MyDir = fun(X) -> BaseDir ++ integer_to_list(X) end,
Del = fun() -> [ok = corfurl_util:delete_dir(MyDir(X)) ||
X <- lists:seq(1, NumFLUs)] end,
Del(),
FLUs = [begin
element(2, corfurl_flu:start_link(MyDir(X),
PageSize, NumPages*PageSize))
end || X <- lists:seq(1, NumFLUs)],
FLUsNums = lists:zip(FLUs, lists:seq(1, NumFLUs)),
try
[ok = corfurl_flu:write(FLU, 1, PageNum, <<42:(8*8)>>) ||
{FLU, PageNum} <- FLUsNums],
MLP0 = NumFLUs,
NumFLUs = get_max_logical_page(FLUs),
%% Excellent. Now let's start the sequencer and see if it gets
%% the same answer. If yes, then the first get will return MLP1,
%% yadda yadda.
MLP1 = MLP0 + 1,
MLP3 = MLP0 + 3,
MLP4 = MLP0 + 4,
{ok, Sequencer} = start_link(FLUs),
try
MLP1 = get(Sequencer, 2),
MLP3 = get(Sequencer, 1),
MLP4 = get(Sequencer, 1)
after
stop(Sequencer)
end
after
[ok = corfurl_flu:stop(FLU) || FLU <- FLUs],
Del()
end.
-endif. % TEST

View file

@ -0,0 +1,36 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(corfurl_util).
-export([delete_dir/1]).
delete_dir(Dir) ->
%% We don't recursively delete directories, the ok pattern match will fail.
[ok = file:delete(X) || X <- filelib:wildcard(Dir ++ "/*")],
case file:del_dir(Dir) of
ok ->
ok;
{error, enoent} ->
ok;
Else ->
Else
end.