diff --git a/Makefile b/Makefile
index 134000a..173a803 100644
--- a/Makefile
+++ b/Makefile
@@ -18,7 +18,7 @@ eunit: compile clean-test-btrees
@$(REBAR) eunit skip_deps=true
clean-test-btrees:
- rm -fr .eunit/Btree_*
+ rm -fr .eunit/Btree_* .eunit/simple
plt: compile
$(DIALYZER) --build_plt --output_plt .fractal_btree.plt \
diff --git a/README.md b/README.md
index 5b8f15d..2a4b021 100644
--- a/README.md
+++ b/README.md
@@ -7,18 +7,18 @@ This Erlang-based storage engine provides a scalable alternative to Basho Bitcas
- Operations-friendly "append-only" storage (allows you to backup live system)
- The cost of merging (evicting stale key/values) is amortized into insertion, so you don't need to schedule merge to happen at off-peak hours.
- Supports range queries (and thus potentially Riak 2i.)
-- Unlike Bitcask and InnoDB, you don't need a boat load of RAM
-- All in 100 lines of pure Erlang code
+- Keys are not kept in memory (unlike Bitcask.)
+- 100% pure Erlang code
Once we're a bit more stable, we'll provide a Riak backend.
## How It Works
-If there are N records, there are in log2(N) levels (each an individual B-tree in a file). Level #0 has 1 record, level #1 has 2 records, #2 has 4 records, and so on. I.e. level #n has 2n records.
+If there are N records, there are in log2(N) levels (each an individual B-tree in a file). Level #0 has 1 record, level #1 has 2 records, #2 has 4 records, #3 has 8 records, #5 has 16 records, and so on. I.e. level #n has 2n records.
In "stable state", each level is either full or empty; so if there are e.g. 20 records stored, then levels #5 and #2 are full; the other ones are empty.
-You can read more about Fractal Trees at [Tokutek](http://www.tokutek.com/2011/11/how-fractal-trees-work-at-mit-today/), a company providing a MySQL backend based on Fractal Trees. I have not tried it, but it looks truly amazing.
+You can read more about Fractal Trees at [Tokutek](http://www.tokutek.com/2011/11/how-fractal-trees-work-at-mit-today/), a company providing a MySQL backend based on Fractal Trees.
### Lookup
Lookup is quite simple: starting at level #0, the sought for Key is searched in the B-tree there. If nothing is found, search continues to the next level. So if there are *N* levels, then *N* disk-based B-tree lookups are performed. Each lookup is "guarded" by a bloom filter to improve the likelihood that disk-based searches are only done when likely to succeed.
@@ -43,7 +43,7 @@ With Fractal B-Trees; back-pressure is provided by the injection mechanism, whic
OK, I've told you a lie. In practice, it is not practical to create a new file for each insert (injection at level #0), so we allows you to define the "top level" to be a number higher that #0; currently defaulting to #6 (32 records). That means that you take the amortization "hit" for ever 32 inserts.
-A further trouble is that merging does in fact not have completely linear I/O complexity, because reading from a small file that was recently written is faster that reading from a file that was written a long time ago (because of OS-level caching); thus doing a merge at level #*N+1* is sometimes more than twice as slow as doing a merge at level #*N*. Because of this, sustained insert pressure may produce a situation where the system blocks while merging, though it does require an extremely high level of inserts. We're considering ways to alleviate this.
+Trouble is that merging does in fact not have completely linear I/O complexity, because reading from a small file that was recently written is faster that reading from a file that was written a long time ago (because of OS-level caching); thus doing a merge at level #*N+1* often is more than twice as slow as doing a merge at level #*N*. Because of this, sustained insert pressure may produce a situation where the system blocks while merging, though it does require an extremely high level of inserts. We're considering ways to alleviate this.
Merging can be going on concurrently at each level (in preparation for an injection to the next level), which lets you utilize available multi-core capacity to merge.
diff --git a/levelpresence.sh b/levelpresence.sh
deleted file mode 100644
index af9d573..0000000
--- a/levelpresence.sh
+++ /dev/null
@@ -1,55 +0,0 @@
-#!/bin/bash
-
-function periodic() {
- t=0
- while sleep 1 ; do
- let "t=t+1"
- printf "%5d [" "$t"
-
- for ((i=0; i<35; i++)) ; do
- if ! [ -f "A-$i.data" ] ; then
- echo -n " "
- elif ! [ -f "B-$i.data" ] ; then
- echo -n "-"
- elif ! [ -f "X-$i.data" ] ; then
- echo -n "="
- else
- echo -n "*"
- fi
- done
- echo
- done
-}
-
-function dynamic() {
- local old s t start now
- t=0
- start=`date +%s`
- while true ; do
- s=""
- for ((i=0; i<35; i++)) ; do
- if ! [ -f "A-$i.data" ] ; then
- s="$s "
- elif ! [ -f "B-$i.data" ] ; then
- s="$s-"
- elif ! [ -f "X-$i.data" ] ; then
- s="$s="
- else
- s="$s*"
- fi
- done
-
- if [[ "$s" != "$old" ]] ; then
- let "t=t+1"
- now=`date +%s`
- let "now=now-start"
- printf "%5d %6d [%s\n" "$t" "$now" "$s"
- old="$s"
- else
- # Sleep a little bit:
- perl -e 'use Time::HiRes; Time::HiRes::usleep(100000)'
- fi
- done
-}
-
-dynamic
diff --git a/src/fractal_btree.erl b/src/fractal_btree.erl
index 4399324..1aca9ee 100644
--- a/src/fractal_btree.erl
+++ b/src/fractal_btree.erl
@@ -41,7 +41,6 @@ init([Dir]) ->
case file:read_file_info(Dir) of
{ok, #file_info{ type=directory }} ->
{ok, TopLevel} = open_levels(Dir);
- %% TODO: recover nursery
{error, E} when E =:= enoent ->
ok = file:make_dir(Dir),
@@ -69,10 +68,7 @@ open_levels(Dir) ->
{?TOP_LEVEL, ?TOP_LEVEL},
Files),
-% error_logger:info_msg("found level files ... {~p,~p}~n", [MinLevel, MaxLevel]),
-
- %% remove old nursery file
- file:delete(filename:join(Dir,"nursery.data")),
+ error_logger:info_msg("found files ... {~p,~p}~n", [MinLevel, MaxLevel]),
TopLevel =
lists:foldl( fun(LevelNo, Prev) ->
@@ -103,8 +99,8 @@ handle_cast(Info,State) ->
%% premature delete -> cleanup
-terminate(_Reason,_State) ->
- % error_logger:info_msg("got terminate(~p,~p)~n", [Reason,State]),
+terminate(Reason,State) ->
+ error_logger:info_msg("got terminate(~p,~p)~n", [Reason,State]),
% flush_nursery(State),
ok.
@@ -137,6 +133,7 @@ handle_call(close, _From, State) ->
{stop, normal, ok, State2}.
+
do_put(Key, Value, State=#state{ nursery=Tree }) ->
Tree2 = gb_trees:enter(Key, Value, Tree),
TreeSize = gb_trees:size(Tree2),
diff --git a/src/fractal_btree_level.erl b/src/fractal_btree_level.erl
index 660be0d..fe7ba7b 100644
--- a/src/fractal_btree_level.erl
+++ b/src/fractal_btree_level.erl
@@ -85,9 +85,6 @@ initialize(State) ->
BFileName = filename("B",State),
CFileName = filename("C",State),
- %% remove old merge file
- file:delete( filename("X",State)),
-
case file:read_file_info(CFileName) of
{ok, _} ->
@@ -214,14 +211,9 @@ main_loop(State = #state{ next=Next }) ->
{system, From, Req} ->
plain_fsm:handle_system_msg(
From, Req, State, fun(S1) -> main_loop(S1) end);
-
{'EXIT', Parent, Reason} ->
- plain_fsm:parent_EXIT(Reason, State);
- {'EXIT', _, normal} ->
- %% Probably from a merger_pid - which we may have forgotten in the meantime.
- main_loop(State);
- {'EXIT', Pid, Reason} when Pid == State#state.merge_pid ->
- restart_merge_then_loop(State#state{merge_pid=undefined}, Reason)
+ plain_fsm:parent_EXIT(Reason, State)
+
end.
do_lookup(_Key, []) ->
@@ -244,13 +236,6 @@ stop_if_defined(undefined) -> ok;
stop_if_defined(MergePid) when is_pid(MergePid) ->
erlang:exit(MergePid, shutdown).
-
-restart_merge_then_loop(State, Reason) ->
- XFileName = filename("X",State),
- error_logger:warning_msg("Merger appears to have failed (reason: ~p). Removing outfile ~s\n", [Reason, XFileName]),
- file:delete(XFileName),
- check_begin_merge_then_loop(State).
-
begin_merge(State) ->
AFileName = filename("A",State),
BFileName = filename("B",State),
@@ -261,8 +246,8 @@ begin_merge(State) ->
MergePID = proc_lib:spawn_link(fun() ->
{ok, OutCount} = fractal_btree_merger2:merge(AFileName, BFileName, XFileName,
- 1 bsl (State#state.level + 1),
- State#state.next =:= undefined),
+ 1 bsl (State#state.level + 1)),
+
% error_logger:info_msg("merge done ~p,~p -> ~p~n", [AFileName, BFileName, XFileName]),
Owner ! {merge_done, OutCount, XFileName}
diff --git a/src/fractal_btree_merger2.erl b/src/fractal_btree_merger2.erl
index ed6b4cf..bb1f877 100644
--- a/src/fractal_btree_merger2.erl
+++ b/src/fractal_btree_merger2.erl
@@ -1,14 +1,14 @@
-module(fractal_btree_merger2).
%%
-%% Merging two BTrees
+%% Naive Merge of two b-trees. A better implementation should iterate leafs, not KV's
%%
--export([merge/5]).
+-export([merge/4]).
-define(LOCAL_WRITER, true).
-merge(A,B,C, Size, IsLastLevel) ->
+merge(A,B,C, Size) ->
{ok, BT1} = fractal_btree_reader:open(A),
{ok, BT2} = fractal_btree_reader:open(B),
case ?LOCAL_WRITER of
@@ -21,7 +21,7 @@ merge(A,B,C, Size, IsLastLevel) ->
{node, AKVs} = fractal_btree_reader:first_node(BT1),
{node, BKVs} = fractal_btree_reader:first_node(BT2),
- {ok, Count, Out2} = scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, 0),
+ {ok, Count, Out2} = scan(BT1, BT2, Out, AKVs, BKVs, 0),
%% finish stream tree
ok = fractal_btree_reader:close(BT1),
@@ -37,23 +37,23 @@ merge(A,B,C, Size, IsLastLevel) ->
{ok, Count}.
-scan(BT1, BT2, Out, IsLastLevel, [], BKVs, Count) ->
+scan(BT1, BT2, Out, [], BKVs, Count) ->
case fractal_btree_reader:next_node(BT1) of
{node, AKVs} ->
- scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count);
+ scan(BT1, BT2, Out, AKVs, BKVs, Count);
end_of_data ->
- scan_only(BT2, Out, IsLastLevel, BKVs, Count)
+ scan_only(BT2, Out, BKVs, Count)
end;
-scan(BT1, BT2, Out, IsLastLevel, AKVs, [], Count) ->
+scan(BT1, BT2, Out, AKVs, [], Count) ->
case fractal_btree_reader:next_node(BT2) of
{node, BKVs} ->
- scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count);
+ scan(BT1, BT2, Out, AKVs, BKVs, Count);
end_of_data ->
- scan_only(BT1, Out, IsLastLevel, AKVs, Count)
+ scan_only(BT1, Out, AKVs, Count)
end;
-scan(BT1, BT2, Out, IsLastLevel, [{Key1,Value1}|AT]=AKVs, [{Key2,Value2}|BT]=BKVs, Count) ->
+scan(BT1, BT2, Out, [{Key1,Value1}|AT]=AKVs, [{Key2,Value2}|BT]=BKVs, Count) ->
if Key1 < Key2 ->
case ?LOCAL_WRITER of
true ->
@@ -62,7 +62,7 @@ scan(BT1, BT2, Out, IsLastLevel, [{Key1,Value1}|AT]=AKVs, [{Key2,Value2}|BT]=BKV
ok = fractal_btree_writer:add(Out2=Out, Key1, Value1)
end,
- scan(BT1, BT2, Out2, IsLastLevel, AT, BKVs, Count+1);
+ scan(BT1, BT2, Out2, AT, BKVs, Count+1);
Key2 < Key1 ->
case ?LOCAL_WRITER of
@@ -71,37 +71,32 @@ scan(BT1, BT2, Out, IsLastLevel, [{Key1,Value1}|AT]=AKVs, [{Key2,Value2}|BT]=BKV
false ->
ok = fractal_btree_writer:add(Out2=Out, Key2, Value2)
end,
- scan(BT1, BT2, Out2, IsLastLevel, AKVs, BT, Count+1);
+ scan(BT1, BT2, Out2, AKVs, BT, Count+1);
- (delete =:= Value2) and (true =:= IsLastLevel) ->
- scan(BT1, BT2, Out, IsLastLevel, AT, BT, Count);
-
- true ->
+ Key1 == Key2 ->
+ %% TODO: eliminate tombstones, right now they just bubble down
case ?LOCAL_WRITER of
true ->
{noreply, Out2} = fractal_btree_writer:handle_cast({add, Key2, Value2}, Out);
false ->
ok = fractal_btree_writer:add(Out2=Out, Key2, Value2)
end,
- scan(BT1, BT2, Out2, IsLastLevel, AT, BT, Count+1)
+ scan(BT1, BT2, Out2, AT, BT, Count+1)
end.
-scan_only(BT, Out, IsLastLevel, [], Count) ->
+scan_only(BT, Out, [], Count) ->
case fractal_btree_reader:next_node(BT) of
{node, KVs} ->
- scan_only(BT, Out, IsLastLevel, KVs, Count);
+ scan_only(BT, Out, KVs, Count);
end_of_data ->
{ok, Count, Out}
end;
-scan_only(BT, Out, true, [{_,delete}|Rest], Count) ->
- scan_only(BT, Out, true, Rest, Count);
-
-scan_only(BT, Out, IsLastLevel, [{Key,Value}|Rest], Count) ->
+scan_only(BT, Out, [{Key,Value}|Rest], Count) ->
case ?LOCAL_WRITER of
true ->
{noreply, Out2} = fractal_btree_writer:handle_cast({add, Key, Value}, Out);
false ->
ok = fractal_btree_writer:add(Out2=Out, Key, Value)
end,
- scan_only(BT, Out2, IsLastLevel, Rest, Count+1).
+ scan_only(BT, Out2, Rest, Count+1).
diff --git a/test/fractal_btree_drv.erl b/test/fractal_btree_drv.erl
index da3edb4..cbd84ef 100644
--- a/test/fractal_btree_drv.erl
+++ b/test/fractal_btree_drv.erl
@@ -6,7 +6,9 @@
%% API
-export([start_link/0]).
--export([open/1,
+-export([
+ lookup_exist/2,
+ open/1,
put/3,
stop/0]).
@@ -27,6 +29,9 @@ start_link() ->
call(X) ->
gen_server:call(?SERVER, X, infinity).
+lookup_exist(N, K) ->
+ call({lookup_exist, N, K}).
+
open(N) ->
call({open, N}).
@@ -56,7 +61,12 @@ handle_call({put, N, K, V}, _, #state { btrees = D} = State) ->
Other ->
{reply, {error, Other}, State}
end;
+handle_call({lookup_exist, N, K}, _, #state { btrees = D} = State) ->
+ Tree = dict:fetch(N, D),
+ Reply = fractal_btree:lookup(Tree, K),
+ {reply, Reply, State};
handle_call(stop, _, State) ->
+ cleanup_trees(State),
{stop, normal, ok, State};
handle_call(_Request, _From, State) ->
Reply = ok,
@@ -68,8 +78,7 @@ handle_cast(_Msg, State) ->
handle_info(_Info, State) ->
{noreply, State}.
-terminate(_Reason, State) ->
- cleanup_trees(State),
+terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
diff --git a/test/fractal_btree_merger_tests.erl b/test/fractal_btree_merger_tests.erl
index 1d41582..d0f36e5 100644
--- a/test/fractal_btree_merger_tests.erl
+++ b/test/fractal_btree_merger_tests.erl
@@ -9,10 +9,6 @@
merge_test() ->
- file:delete("test1"),
- file:delete("test2"),
- file:delete("test3"),
-
{ok, BT1} = fractal_btree_writer:open("test1"),
lists:foldl(fun(N,_) ->
ok = fractal_btree_writer:add(BT1, <>, <<"data",N:128>>)
@@ -31,9 +27,13 @@ merge_test() ->
ok = fractal_btree_writer:close(BT2),
- {Time,{ok,Count}} = timer:tc(fractal_btree_merger2, merge, ["test1", "test2", "test3", 10000, true]),
+ {Time,{ok,Count}} = timer:tc(fractal_btree_merger2, merge, ["test1", "test2", "test3", 10000]),
error_logger:info_msg("time to merge: ~p/sec (time=~p, count=~p)~n", [1000000/(Time/Count), Time/1000000, Count]),
+ ok = file:delete("test1"),
+ ok = file:delete("test2"),
+ ok = file:delete("test3"),
+
ok.
diff --git a/test/fractal_btree_tests.erl b/test/fractal_btree_tests.erl
index b895c9e..b416fdf 100644
--- a/test/fractal_btree_tests.erl
+++ b/test/fractal_btree_tests.erl
@@ -23,9 +23,10 @@ full_test_() ->
fun () -> ok end,
fun (_) -> ok end,
[{timeout, 120, ?_test(test_proper())},
+ ?_test(test_tree_simple_1()),
?_test(test_tree())]}.
-qc_opts() -> [{numtests, 400}].
+qc_opts() -> [{numtests, 800}].
test_proper() ->
[?assertEqual([], proper:module(?MODULE, qc_opts()))].
@@ -46,17 +47,39 @@ cmd_put_args(#state { open = Open }) ->
{oneof(dict:fetch_keys(Open)), binary(), binary()},
[Name, Key, Value]).
+non_empty_btree(Open) ->
+ ?SUCHTHAT(Name, union(dict:fetch_keys(Open)),
+ dict:size(dict:fetch(Name, Open)) > 0).
+
+cmd_lookup_args(#state { open = Open}) ->
+ ?LET(Name, non_empty_btree(Open),
+ ?LET(Key, oneof(dict:fetch_keys(dict:fetch(Name, Open))),
+ [Name, Key])).
+
+count_dicts(Open) ->
+ Dicts = [ V || {_, V} <- dict:to_list(Open)],
+ lists:sum([dict:size(D) || D <- Dicts]).
+
command(#state { open = Open} = S) ->
frequency(
[ {100, {call, ?SERVER, open, [g_btree_name()]}} ] ++
[ {2000, {call, ?SERVER, put, cmd_put_args(S)}}
- || dict:size(Open) > 0]).
+ || dict:size(Open) > 0] ++
+ [ {1500, {call, ?SERVER, lookup_exist, cmd_lookup_args(S)}}
+ || dict:size(Open) > 0, count_dicts(Open) > 0]).
+precondition(#state { open = _Open }, {call, ?SERVER, lookup_exist,
+ [_Name, _K]}) ->
+ %% No need to quantify since we limit this to validity in the
+ %% command/1 generator
+ true;
precondition(#state { open = Open }, {call, ?SERVER, put, [Name, K, V]}) ->
dict:is_key(Name, Open);
precondition(#state { open = Open }, {call, ?SERVER, open, [Name]}) ->
not (dict:is_key(Name, Open)).
+next_state(S, _Res, {call, ?SERVER, lookup_exist, [_Name, _Key]}) ->
+ S;
next_state(#state { open = Open} = S, _Res,
{call, ?SERVER, put, [Name, Key, Value]}) ->
S#state { open = dict:update(Name,
@@ -67,6 +90,11 @@ next_state(#state { open = Open} = S, _Res,
next_state(#state { open = Open} = S, _Res, {call, ?SERVER, open, [Name]}) ->
S#state { open = dict:store(Name, dict:new(), Open) }.
+postcondition(#state { open = Open },
+ {call, ?SERVER, lookup_exist, [Name, Key]}, {ok, Value}) ->
+ V = dict:fetch(Key, dict:fetch(Name, Open)),
+ io:format("~p == ~p~n", [V, Value]),
+ V == Value;
postcondition(_S, {call, ?SERVER, put, [_Name, _Key, _Value]}, ok) ->
true;
postcondition(_S, {call, ?SERVER, open, [_Name]}, ok) ->
@@ -74,6 +102,13 @@ postcondition(_S, {call, ?SERVER, open, [_Name]}, ok) ->
postcondition(_, _, _) ->
false.
+cleanup_test_trees(#state { open = Open}) ->
+ [cleanup_tree(N) || N <- dict:fetch_keys(Open)].
+
+cleanup_tree(Tree) ->
+ {ok, FileNames} = file:list_dir(Tree),
+ [ok = file:delete(filename:join([Tree, Fname])) || Fname <- FileNames],
+ file:del_dir(Tree).
prop_dict_agree() ->
?FORALL(Cmds, commands(?MODULE),
@@ -82,6 +117,7 @@ prop_dict_agree() ->
fractal_btree_drv:start_link(),
{History,State,Result} = run_commands(?MODULE, Cmds),
fractal_btree_drv:stop(),
+ cleanup_test_trees(State),
?WHENFAIL(io:format("History: ~w\nState: ~w\nResult: ~w\n",
[History,State,Result]),
aggregate(command_names(Cmds), Result =:= ok))
@@ -93,10 +129,15 @@ prop_dict_agree() ->
%% UNIT TESTS -----------------------------------------------------------------
+test_tree_simple_1() ->
+ {ok, Tree} = fractal_btree:open("simple"),
+ ok = fractal_btree:put(Tree, <<>>, <<"data", 77:128>>),
+ {ok, <<"data", 77:128>>} = fractal_btree:lookup(Tree, <<>>).
+
test_tree() ->
-%% application:start(sasl),
+ application:start(sasl),
{ok, Tree} = fractal_btree:open("simple"),
lists:foldl(fun(N,_) ->