Compare commits

..

2 commits

Author SHA1 Message Date
Kresten Krab Thorup
2a205db9fe Use backend API in nursery also 2012-10-22 10:51:08 +02:00
Kresten Krab Thorup
3a43d9235b Separate out hanoid_backend API
First step towards a hanoidb backend API, which
will allow replacing the storage engine with
any ordered storage.

We still need an API for how to choose backend
and perhaps also how to discover the backend
of an existing store.
2012-10-22 02:00:47 +02:00
36 changed files with 749 additions and 1319 deletions

5
.envrc
View file

@ -1,5 +0,0 @@
if ! has nix_direnv_version || ! nix_direnv_version 3.0.4; then
source_url "https://raw.githubusercontent.com/nix-community/nix-direnv/3.0.4/direnvrc" "sha256-DzlYZ33mWF/Gs8DDeyjr8mnVmQGx7ASYqA5WlxwvBG4="
fi
watch_file devShell.nix shell.nix flake.nix
use flake || use nix

3
.gitignore vendored
View file

@ -3,6 +3,3 @@ deps
*~
.eunit
.project
_build
.direnv

View file

@ -1,8 +0,0 @@
language: erlang
otp_release:
- R16B03
- R15B03
- 17.0
- 18.0

View file

@ -1,12 +1,12 @@
REBAR= rebar3
REBAR= rebar
DIALYZER= dialyzer
.PHONY: plt analyze all deps compile get-deps clean
all: get-deps compile
all: compile
deps: get-deps compile
deps: get-deps
get-deps:
@$(REBAR) get-deps
@ -31,18 +31,13 @@ clean-test-btrees:
plt: compile
$(DIALYZER) --build_plt --output_plt .hanoi.plt \
-pa deps/snappy/ebin \
-pa deps/snappy/ebin \
-pa deps/lz4/ebin \
-pa deps/ebloom/ebin \
-pa deps/plain_fsm/ebin \
deps/plain_fsm/ebin \
--apps erts kernel stdlib ebloom lz4 snappy
--apps kernel stdlib
analyze: compile
$(DIALYZER) --plt .hanoi.plt \
-pa deps/snappy/ebin \
-pa deps/lz4/ebin \
-pa deps/ebloom/ebin \
-pa deps/plain_fsm/ebin \
ebin

View file

@ -1,7 +1,5 @@
# HanoiDB Indexed Key/Value Storage
[![Build Status](https://travis-ci.org/krestenkrab/hanoidb.svg?branch=master)](https://travis-ci.org/krestenkrab/hanoidb)
HanoiDB implements an indexed, key/value storage engine. The primary index is
a log-structured merge tree (LSM-BTree) implemented using "doubling sizes"
persistent ordered sets of key/value pairs, similar is some regards to
@ -82,13 +80,7 @@ Put these values in your `app.config` in the `hanoidb` section
%% Both have same log2(N) worst case, but `fast' is
%% sometimes faster; yielding latency fluctuations.
%%
{merge_strategy, fast | predictable},
%% "Level0" files has 2^N KVs in it, defaulting to 1024.
%% If the database is to contain very small KVs, this is
%% likely too small, and will result in many unnecessary
%% file operations. (Subsequent levels double in size).
{top_level, 10} % 1024 Key/Values
{merge_strategy, fast | predictable}
]},
```

1
TODO
View file

@ -1,4 +1,5 @@
* Phase 1: Minimum viable product (in order of priority)
* lager; check for uses of lager:error/2
* configurable TOP_LEVEL size
* test new snappy compression support
* status and statistics

View file

@ -1,61 +0,0 @@
{
"nodes": {
"nixpkgs": {
"locked": {
"lastModified": 1701282334,
"narHash": "sha256-MxCVrXY6v4QmfTwIysjjaX0XUhqBbxTWWB4HXtDYsdk=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "057f9aecfb71c4437d2b27d3323df7f93c010b7e",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "23.11",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"nixpkgs": "nixpkgs",
"utils": "utils"
}
},
"systems": {
"locked": {
"lastModified": 1681028828,
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
"owner": "nix-systems",
"repo": "default",
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
"type": "github"
},
"original": {
"owner": "nix-systems",
"repo": "default",
"type": "github"
}
},
"utils": {
"inputs": {
"systems": "systems"
},
"locked": {
"lastModified": 1710146030,
"narHash": "sha256-SZ5L6eA7HJ/nmkzGG7/ISclqe6oZdOZTNoesiInkXPQ=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "b1d9ab70662946ef0850d488da1c9019f3a9752a",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "flake-utils",
"type": "github"
}
}
},
"root": "root",
"version": 7
}

View file

@ -1,35 +0,0 @@
{
description = "A Helm Chart for OpenLDAP.";
inputs = {
# nixpkgs.url = "github:NixOS/nixpkgs/nixpkgs-unstable";
nixpkgs.url = "github:NixOS/nixpkgs/23.11";
utils.url = "github:numtide/flake-utils";
};
outputs =
{ self, nixpkgs, ... }@inputs:
inputs.utils.lib.eachSystem
[
"x86_64-linux"
"i686-linux"
"aarch64-linux"
"x86_64-darwin"
]
(
system:
let
pkgs = import nixpkgs {
inherit system;
overlays = [ ];
config.allowUnfree = true;
};
in
{
flake-utils.inputs.systems.follows = "system";
formatter = with pkgs; [ nixfmt ];
devShells.default = import ./shell.nix { inherit pkgs; };
DOCKER_BUILDKIT = 1;
}
);
}

View file

@ -27,3 +27,5 @@
-define(REPLY(Ref,Msg), {'$reply', Ref, Msg}).
-define(CAST(From,Msg), {'$cast', From, Msg}).
-type caller() :: { pid(), reference() }.

View file

@ -4,7 +4,7 @@
{eunit_opts, [verbose, {report, {eunit_surefire, [{dir, "."}]}}]}.
{erl_opts, [%{d,'DEBUG',true},
%{d,'USE_EBLOOM',true},
{parse_transform, lager_transform},
fail_on_warning,
warn_unused_vars,
warn_export_all,
@ -20,20 +20,18 @@
warn_untyped_record,
% warn_missing_spec,
% strict_validation,
{platform_define, "^R|17", pre18},
debug_info]}.
{xref_checks, [undefined_function_calls]}.
{deps, [ {sext, ".*", {git, "https://github.com/uwiger/sext.git", {branch, "master"}}}
%% , {snappy, "1.*", {git, "https://github.com/fdmanana/snappy-erlang-nif.git", {branch, "master"}}}
, {plain_fsm, "1.*", {git, "https://github.com/gburd/plain_fsm.git", {branch, "master"}}}
%% , {basho_bench, ".*", {git, "https://github.com/basho/basho_bench.git", {branch, "master"}}}
%% , {ebloom, ".*", {git, "https://github.com/basho/ebloom.git", {branch, "develop"}}}
%% , {bloomerl, ".*", {git, "https://github.com/gburd/bloomerl.git", {branch, "master"}}}
, {lz4, ".*", {git, "https://github.com/krestenkrab/erlang-lz4.git", {branch, "master"}}}
, {triq, ".*", {git, "https://github.com/gburd/triq.git", {branch, "master"}}}
% , {edown, "0.3.*", {git, "git://github.com/uwiger/edown.git", {branch, "master"}}}
{deps, [ {sext, ".*", {git, "git://github.com/esl/sext", {branch, "master"}}}
, {lager, ".*", {git, "git://github.com/basho/lager", {branch, "master"}}}
, {snappy, "1.0.*", {git, "git://github.com/fdmanana/snappy-erlang-nif.git", {branch, "master"}}}
, {plain_fsm, "1.1.*", {git, "git://github.com/gburd/plain_fsm", {branch, "master"}}}
, {basho_bench, ".*", {git, "git://github.com/basho/basho_bench", {branch, "master"}}}
, {triq, ".*", {git, "git://github.com/krestenkrab/triq", {branch, "master"}}}
, {lz4, ".*", {git, "git://github.com/krestenkrab/erlang-lz4.git", {branch, "master"}}}
% , {edown, "0.3.*", {git, "git://github.com/esl/edown.git", {branch, "master"}}}
% , {asciiedoc, "0.1.*", {git, "git://github.com/norton/asciiedoc.git", {branch, "master"}}}
% , {triq, ".*", {git, "git://github.com/krestenkrab/triq.git", {branch, "master"}}}
% , {proper, ".*", {git, "git://github.com/manopapad/proper.git", {branch, "master"}}}

View file

@ -1,16 +0,0 @@
[{<<"lz4">>,
{git,"https://github.com/krestenkrab/erlang-lz4.git",
{ref,"5fd90ca1e2345bdc359ee43d958da87fafb4fd78"}},
0},
{<<"plain_fsm">>,
{git,"https://github.com/gburd/plain_fsm.git",
{ref,"6421158d742956836dfa39fca857422afcf56419"}},
0},
{<<"sext">>,
{git,"https://github.com/uwiger/sext.git",
{ref,"c22486add9cc374dc8138b1f547c0999a1922a65"}},
0},
{<<"triq">>,
{git,"https://github.com/gburd/triq.git",
{ref,"5d4b98e8323eec70aff474a578a1e5ebe9495e70"}},
0}].

View file

@ -1,62 +0,0 @@
{
pkgs ? import <nixpkgs> { },
}:
with pkgs;
mkShell rec {
name = "hanoidb";
packages = with pkgs; [
bashInteractive
gigalixir
inotify-tools
libnotify
];
buildInputs = [
# basics
curl
git
openssh
ripgrep
shellcheck
erlang
erlang-ls
rebar3
erlfmt
# BEAM support
#beam.interpreters.erlangR26
#rebar3
#beam.packages.erlangR26.elixir_1_15
#nodejs-18_x
# elixir-typst support
#pkgs.iconv
# rust support
#cargo
];
shellHook =
let
icon = "f121";
in
''
export PS1="$(echo -e '\u${icon}') {\[$(tput sgr0)\]\[\033[38;5;228m\]\w\[$(tput sgr0)\]\[\033[38;5;15m\]} (${name}) \\$ \[$(tput sgr0)\]"
alias gitc="git -c user.email=greg@burd.me commit --gpg-sign=22931AF7895E82DF ."
# allows mix to work on the local directory
mkdir -p .nix-mix
mkdir -p .nix-hex
export MIX_HOME=$PWD/.nix-mix
export HEX_HOME=$PWD/.nix-hex
export PATH=$MIX_HOME/bin:$PATH
export PATH=$HEX_HOME/bin:$PATH
export LANG=en_US.UTF-8
export ERL_AFLAGS="-kernel shell_history enabled"
'';
}
# NOTES:
# * https://github.com/fbettag/nixos-erlang-flake/blob/main/flake.nix
# * https://discourse.nixos.org/t/installing-different-versions-of-elixir-and-erlang/35765/5
# * https://github.com/fbettag/nixos-erlang-flake/blob/main/flake.nix

View file

@ -0,0 +1,122 @@
%% ----------------------------------------------------------------------------
%%
%% hanoidb: LSM-trees (Log-Structured Merge Trees) Indexed Storage
%%
%% Copyright 2011-2012 (c) Trifork A/S. All Rights Reserved.
%% http://trifork.com/ info@trifork.com
%%
%% Copyright 2012 (c) Basho Technologies, Inc. All Rights Reserved.
%% http://basho.com/ info@basho.com
%%
%% This file is provided to you under the Apache License, Version 2.0 (the
%% "License"); you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
%% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
%% License for the specific language governing permissions and limitations
%% under the License.
%%
%% ----------------------------------------------------------------------------
-module(basho_bench_driver_hanoidb).
-record(state, { tree :: hanoidb:hanoidb(),
filename :: string(),
flags :: list(),
sync_interval :: integer() | infinity,
last_sync :: erlang:timestamp() }).
-export([new/1,
run/4]).
-include("hanoidb.hrl").
-include_lib("basho_bench/include/basho_bench.hrl").
-record(key_range, { from_key = <<>> :: binary(),
from_inclusive = true :: boolean(),
to_key :: binary() | undefined,
to_inclusive = false :: boolean(),
limit :: pos_integer() | undefined }).
%% ====================================================================
%% API
%% ====================================================================
new(_Id) ->
%% Make sure bitcask is available
case code:which(hanoidb) of
non_existing ->
?FAIL_MSG("~s requires hanoidb to be available on code path.\n",
[?MODULE]);
_ ->
ok
end,
%% Get the target directory
Dir = basho_bench_config:get(hanoidb_dir, "."),
Filename = filename:join(Dir, "test.hanoidb"),
Config = basho_bench_config:get(hanoidb_flags, []),
%% Look for sync interval config
SyncInterval =
case basho_bench_config:get(hanoidb_sync_interval, infinity) of
Value when is_integer(Value) ->
Value;
infinity ->
infinity
end,
%% Get any bitcask flags
case hanoidb:open(Filename, Config) of
{error, Reason} ->
?FAIL_MSG("Failed to open hanoidb in ~s: ~p\n", [Filename, Reason]);
{ok, FBTree} ->
{ok, #state { tree = FBTree,
filename = Filename,
sync_interval = SyncInterval,
last_sync = os:timestamp() }}
end.
run(get, KeyGen, _ValueGen, State) ->
case hanoidb:lookup(State#state.tree, KeyGen()) of
{ok, _Value} ->
{ok, State};
not_found ->
{ok, State};
{error, Reason} ->
{error, Reason}
end;
run(put, KeyGen, ValueGen, State) ->
case hanoidb:put(State#state.tree, KeyGen(), ValueGen()) of
ok ->
{ok, State};
{error, Reason} ->
{error, Reason}
end;
run(delete, KeyGen, _ValueGen, State) ->
case hanoidb:delete(State#state.tree, KeyGen()) of
ok ->
{ok, State};
{error, Reason} ->
{error, Reason}
end;
run(fold_100, KeyGen, _ValueGen, State) ->
[From,To] = lists:usort([KeyGen(), KeyGen()]),
case hanoidb:fold_range(State#state.tree,
fun(_Key,_Value,Count) ->
Count+1
end,
0,
#key_range{ from_key=From,
to_key=To,
limit=100 }) of
Count when Count >= 0; Count =< 100 ->
{ok,State};
Count ->
{error, {bad_fold_count, Count}}
end.

View file

@ -5,7 +5,7 @@
% author: http://erlang.2086793.n4.nabble.com/gb-trees-fold-td2228614.html
-spec fold(fun((term(), term(), term()) -> term()), term(), gb_trees:tree()) -> term().
-spec fold(fun((term(), term(), term()) -> term()), term(), gb_tree()) -> term().
fold(F, A, {_, T})
when is_function(F, 3) ->
fold_1(F, A, T).

View file

@ -24,13 +24,12 @@
{application, hanoidb,
[
{description, "LSM-tree tiered storage engine with amortized merging"},
{description, ""},
{vsn, "1.3.0"},
{registered, []},
{applications, [
kernel,
stdlib,
plain_fsm
stdlib
]},
{mod, {hanoidb_app, []}},
{env, []}

View file

@ -70,7 +70,6 @@
| {sync_strategy, none | sync | {seconds, pos_integer()}}
| {expiry_secs, non_neg_integer()}
| {spawn_opt, list()}
| {top_level, pos_integer()}
.
%% @doc
@ -210,9 +209,9 @@ receive_fold_range(MRef,PID,Fun,Acc0, Limit) ->
try
{ok, Fun(K,V,Acc0)}
catch
Class:Exception:Stacktrace ->
Class:Exception ->
% TODO ?log("Exception in hanoidb fold: ~p ~p", [Exception, erlang:get_stacktrace()]),
{'EXIT', Class, Exception, Stacktrace}
{'EXIT', Class, Exception, erlang:get_stacktrace()}
end
of
{ok, Acc1} ->
@ -279,26 +278,24 @@ init([Dir, Opts0]) ->
end,
hanoidb_util:ensure_expiry(Opts),
{Top, Nur, Max} =
{Nursery, MaxLevel, TopLevel} =
case file:read_file_info(Dir) of
{ok, #file_info{ type=directory }} ->
{ok, TopLevel, MinLevel, MaxLevel} = open_levels(Dir, Opts),
{ok, Nursery} = hanoidb_nursery:recover(Dir, TopLevel, MinLevel, MaxLevel, Opts),
{TopLevel, Nursery, MaxLevel};
{ok, TL, ML} = open_levels(Dir, Opts),
{ok, N0} = hanoidb_nursery:recover(Dir, TL, ML, Opts),
{N0, ML, TL};
{error, E} when E =:= enoent ->
ok = file:make_dir(Dir),
MinLevel = get_opt(top_level, Opts0, ?TOP_LEVEL),
{ok, TopLevel} = hanoidb_level:open(Dir, MinLevel, undefined, Opts, self()),
MaxLevel = MinLevel,
{ok, Nursery} = hanoidb_nursery:new(Dir, MinLevel, MaxLevel, Opts),
{TopLevel, Nursery, MaxLevel}
{ok, TL} = hanoidb_level:open(Dir, ?TOP_LEVEL, undefined, Opts, self()),
ML = ?TOP_LEVEL,
{ok, N0} = hanoidb_nursery:new(Dir, ML, Opts),
{N0, ML, TL}
end,
{ok, #state{ top=Top, dir=Dir, nursery=Nur, opt=Opts, max_level=Max }}.
{ok, #state{ top=TopLevel, dir=Dir, nursery=Nursery, opt=Opts, max_level=MaxLevel }}.
open_levels(Dir, Options) ->
{ok, Files} = file:list_dir(Dir),
TopLevel0 = get_opt(top_level, Options, ?TOP_LEVEL),
%% parse file names and find max level
{MinLevel, MaxLevel} =
@ -311,7 +308,7 @@ open_levels(Dir, Options) ->
{MinLevel, MaxLevel}
end
end,
{TopLevel0, TopLevel0},
{?TOP_LEVEL, ?TOP_LEVEL},
Files),
%% remove old nursery data file
@ -326,17 +323,17 @@ open_levels(Dir, Options) ->
{Level, MergeWork}
end,
{undefined, 0},
lists:seq(MaxLevel, MinLevel, -1)),
WorkPerIter = (MaxLevel - MinLevel + 1) * ?BTREE_SIZE(MinLevel),
lists:seq(MaxLevel, min(?TOP_LEVEL, MinLevel), -1)),
WorkPerIter = (MaxLevel - MinLevel + 1) * ?BTREE_SIZE(?TOP_LEVEL),
% error_logger:info_msg("do_merge ... {~p,~p,~p}~n", [TopLevel, WorkPerIter, MaxMerge]),
do_merge(TopLevel, WorkPerIter, MaxMerge, MinLevel),
{ok, TopLevel, MinLevel, MaxLevel}.
do_merge(TopLevel, WorkPerIter, MaxMerge),
{ok, TopLevel, MaxLevel}.
do_merge(TopLevel, _Inc, N, _MinLevel) when N =< 0 ->
do_merge(TopLevel, _Inc, N) when N =< 0 ->
ok = hanoidb_level:await_incremental_merge(TopLevel);
do_merge(TopLevel, Inc, N, MinLevel) ->
ok = hanoidb_level:begin_incremental_merge(TopLevel, ?BTREE_SIZE(MinLevel)),
do_merge(TopLevel, Inc, N-Inc, MinLevel).
do_merge(TopLevel, Inc, N) ->
ok = hanoidb_level:begin_incremental_merge(TopLevel, ?BTREE_SIZE(?TOP_LEVEL)),
do_merge(TopLevel, Inc, N-Inc).
parse_level(FileName) ->
@ -416,8 +413,7 @@ handle_call(close, _From, State=#state{ nursery=undefined }) ->
handle_call(close, _From, State=#state{ nursery=Nursery, top=Top, dir=Dir, max_level=MaxLevel, opt=Config }) ->
try
ok = hanoidb_nursery:finish(Nursery, Top),
MinLevel = hanoidb_level:level(Top),
{ok, Nursery2} = hanoidb_nursery:new(Dir, MinLevel, MaxLevel, Config),
{ok, Nursery2} = hanoidb_nursery:new(Dir, MaxLevel, Config),
ok = hanoidb_level:close(Top),
{stop, normal, ok, State#state{ nursery=Nursery2 }}
catch
@ -427,10 +423,9 @@ handle_call(close, _From, State=#state{ nursery=Nursery, top=Top, dir=Dir, max_l
end;
handle_call(destroy, _From, State=#state{top=Top, nursery=Nursery }) ->
TopLevelNumber = hanoidb_level:level(Top),
ok = hanoidb_nursery:destroy(Nursery),
ok = hanoidb_level:destroy(Top),
{stop, normal, ok, State#state{ top=undefined, nursery=undefined, max_level=TopLevelNumber }}.
{stop, normal, ok, State#state{ top=undefined, nursery=undefined, max_level=?TOP_LEVEL }}.
-spec do_put(key(), value(), expiry(), #state{}) -> {ok, #state{}}.
do_put(Key, Value, Expiry, State=#state{ nursery=Nursery, top=Top }) when Nursery =/= undefined ->
@ -448,12 +443,7 @@ do_transact(TransactionSpec, State=#state{ nursery=Nursery, top=Top }) ->
{ok, State#state{ nursery=Nursery2 }}.
start_app() ->
ok = ensure_started(syntax_tools),
ok = ensure_started(plain_fsm),
ok = ensure_started(?MODULE).
ensure_started(Application) ->
case application:start(Application) of
case application:start(?MODULE) of
ok ->
ok;
{error, {already_started, _}} ->

View file

@ -23,7 +23,7 @@
%% ----------------------------------------------------------------------------
%% smallest levels are 1024 entries
%% smallest levels are 256 entries
-define(TOP_LEVEL, 8).
-define(BTREE_SIZE(Level), (1 bsl (Level))).
-define(FILE_FORMAT, <<"HAN2">>).
@ -48,24 +48,18 @@
-define(KEY_IN_RANGE(Key,Range),
(?KEY_IN_FROM_RANGE(Key,Range) andalso ?KEY_IN_TO_RANGE(Key,Range))).
-ifdef(pre18).
-define(TIMESTAMP, now()).
-else.
-define(TIMESTAMP, erlang:timestamp()).
-endif.
-record(nursery, { log_file :: file:fd(),
dir :: string(),
cache :: gb_trees:tree(binary(), binary()),
cache :: gb_tree(),
total_size=0 :: integer(),
count=0 :: integer(),
last_sync=?TIMESTAMP :: erlang:timestamp(),
min_level :: integer(),
last_sync=now() :: erlang:timestamp(),
max_level :: integer(),
config=[] :: [{atom(), term()}],
step=0 :: integer(),
merge_done=0 :: integer()}).
merge_done=0 :: integer(),
backend = hanoidb_han2_backend :: atom()
}).
-type kventry() :: { key(), expvalue() } | [ kventry() ].
-type key() :: binary().
@ -77,25 +71,3 @@
| value()
| filepos().
-ifdef(USE_EBLOOM).
-define(HANOI_BLOOM_TYPE, ebloom).
-else.
-define(HANOI_BLOOM_TYPE, sbloom).
-endif.
-define(BLOOM_NEW(Size), hanoidb_util:bloom_new(Size, ?HANOI_BLOOM_TYPE)).
-define(BLOOM_TO_BIN(Bloom), hanoidb_util:bloom_to_bin(Bloom)).
-define(BIN_TO_BLOOM(Bin, Fmt), hanoidb_util:bin_to_bloom(Bin, Fmt)).
-define(BLOOM_INSERT(Bloom, Key), hanoidb_util:bloom_insert(Bloom, Key)).
-define(BLOOM_CONTAINS(Bloom, Key), hanoidb_util:bloom_contains(Bloom, Key)).
%% tags used in the on-disk representation
-define(TAG_KV_DATA, 16#80).
-define(TAG_DELETED, 16#81).
-define(TAG_POSLEN32, 16#82).
-define(TAG_TRANSACT, 16#83).
-define(TAG_KV_DATA2, 16#84).
-define(TAG_DELETED2, 16#85).
-define(TAG_END, 16#FF).

174
src/hanoidb_backend.erl Normal file
View file

@ -0,0 +1,174 @@
-module(hanoidb_backend).
-include("include/hanoidb.hrl").
-include("src/hanoidb.hrl").
-type options() :: [ atom() | { atom(), term() } ].
-type kvexp_entry() :: { Key :: key(), Value :: value(), TimeOut :: expiry() }.
-type batch_reader() :: any().
-type batch_writer() :: any().
-type random_reader() :: any().
-export([merge/7]).
%%%=========================================================================
%%% API
%%%=========================================================================
%% batch_reader and batch_writer are used by the merging logic. A batch_reader
%% must return the values in lexicographical order of the binary keys.
-callback open_batch_reader(File :: string(), Options :: options())
-> {ok, batch_reader()} | { error, term() }.
-callback read_next(batch_reader())
-> { [kvexp_entry(), ...], batch_reader()} | 'done'.
-callback close_batch_reader( batch_reader() )
-> ok | {error, term()}.
-callback open_batch_writer(File :: string(), Options :: options())
-> {ok, batch_writer()} | {error, term()}.
-callback write_next( kvexp_entry() , batch_writer() )
-> {ok, batch_writer()} | {error, term()}.
-callback write_count( batch_writer() ) ->
{ok, non_neg_integer()} | {error, term()}.
-callback close_batch_writer( batch_writer() )
-> ok | {error, term()}.
-callback open_random_reader(File :: string(), Options :: options()) ->
{ok, random_reader()} | {error, term()}.
-callback file_name( random_reader() ) ->
{ok, string()} | {error, term()}.
-callback lookup( Key :: key(), random_reader() ) ->
not_found | {ok, value()}.
-callback range_fold( fun( (key(), value(), term()) -> term() ),
Acc0 :: term(),
Reader :: random_reader(),
Range :: #key_range{} ) ->
{limit, term(), LastKey :: binary()} | {ok, term()}.
-callback close_random_reader(random_reader()) ->
ok | {error, term()}.
-spec merge(atom(), string(), string(), string(), integer(), boolean(), list()) -> {ok, integer()}.
merge(Mod,A,B,C, Size, IsLastLevel, Options) ->
{ok, IXA} = Mod:open_batch_reader(A, Options),
{ok, IXB} = Mod:open_batch_reader(B, Options),
{ok, Out} = Mod:open_batch_writer(C, [{size, Size} | Options]),
scan(Mod,IXA, IXB, Out, IsLastLevel, [], [], {0, none}).
terminate(Mod, Out) ->
{ok, Count} = Mod:write_count( Out ),
ok = Mod:close_batch_writer( Out ),
{ok, Count}.
step(S) ->
step(S, 1).
step({N, From}, Steps) ->
{N-Steps, From}.
scan(Mod, IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N, FromPID}) when N < 1, AKVs =/= [], BKVs =/= [] ->
case FromPID of
none ->
ok;
{PID, Ref} ->
PID ! {Ref, step_done}
end,
receive
{step, From, HowMany} ->
scan(Mod, IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N+HowMany, From})
end;
scan(Mod, IXA, IXB, Out, IsLastLevel, [], BKVs, Step) ->
case Mod:read_next(IXA) of
{AKVs, IXA2} ->
scan(Mod, IXA2, IXB, Out, IsLastLevel, AKVs, BKVs, Step);
done ->
ok = Mod:close_batch_reader(IXA),
scan_only(Mod, IXB, Out, IsLastLevel, BKVs, Step)
end;
scan(Mod, IXA, IXB, Out, IsLastLevel, AKVs, [], Step) ->
case Mod:read_next(IXB) of
{BKVs, IXB2} ->
scan(Mod, IXA, IXB2, Out, IsLastLevel, AKVs, BKVs, Step);
done ->
ok = Mod:close_batch_reader(IXB),
scan_only(Mod, IXA, Out, IsLastLevel, AKVs, Step)
end;
scan(Mod, IXA, IXB, Out, IsLastLevel, [{Key1,_,_}=Entry|AT], [{Key2,_,_}|_]=BKVs, Step)
when Key1 < Key2 ->
case Entry of
{_, ?TOMBSTONE, _} when IsLastLevel ->
scan(Mod, IXA, IXB, Out, true, AT, BKVs, step(Step));
_ ->
{ok, Out3} = Mod:write_next( Entry, Out ),
scan(Mod, IXA, IXB, Out3, IsLastLevel, AT, BKVs, step(Step))
end;
scan(Mod, IXA, IXB, Out, IsLastLevel, [{Key1,_,_}|_]=AKVs, [{Key2,_,_}=Entry|BT], Step)
when Key1 > Key2 ->
case Entry of
{_, ?TOMBSTONE, _} when IsLastLevel ->
scan(Mod, IXA, IXB, Out, true, AKVs, BT, step(Step));
_ ->
{ok, Out3} = Mod:write_next( Entry, Out ),
scan(Mod, IXA, IXB, Out3, IsLastLevel, AKVs, BT, step(Step))
end;
scan(Mod, IXA, IXB, Out, IsLastLevel, [_|AT], [Entry|BT], Step) ->
case Entry of
{_, ?TOMBSTONE, _} when IsLastLevel ->
scan(Mod, IXA, IXB, Out, true, AT, BT, step(Step));
_ ->
{ok, Out3} = Mod:write_next( Entry, Out ),
scan(Mod, IXA, IXB, Out3, IsLastLevel, AT, BT, step(Step, 2))
end.
scan_only(Mod, IX, Out, IsLastLevel, KVs, {N, FromPID}) when N < 1, KVs =/= [] ->
case FromPID of
none ->
ok;
{PID, Ref} ->
PID ! {Ref, step_done}
end,
receive
{step, From, HowMany} ->
scan_only(Mod, IX, Out, IsLastLevel, KVs, {N+HowMany, From})
end;
scan_only(Mod, IX, Out, IsLastLevel, [], {_, FromPID}=Step) ->
case Mod:read_next(IX) of
{KVs, IX2} ->
scan_only(Mod, IX2, Out, IsLastLevel, KVs, Step);
done ->
case FromPID of
none ->
ok;
{PID, Ref} ->
PID ! {Ref, step_done}
end,
ok = Mod:close_batch_reader(IX),
terminate(Mod, Out)
end;
scan_only(Mod, IX, Out, true, [{_,?TOMBSTONE,_}|Rest], Step) ->
scan_only(Mod, IX, Out, true, Rest, step(Step));
scan_only(Mod, IX, Out, IsLastLevel, [Entry|Rest], Step) ->
{ok, Out3} = Mod:write_next( Entry, Out ),
scan_only(Mod, IX, Out3, IsLastLevel, Rest, step(Step)).

View file

@ -1,4 +1,4 @@
% The contents of this file are subject to the Erlang Public License, Version
%% The contents of this file are subject to the Erlang Public License, Version
%% 1.1, (the "License"); you may not use this file except in compliance with
%% the License. You should have received a copy of the Erlang Public License
%% along with this software. If not, it can be retrieved via the world wide web
@ -41,11 +41,7 @@
-define(W, 27).
-ifdef(pre18).
-type bitmask() :: array() | any().
-else.
-type bitmask() :: arrays:array() | any().
-endif.
-record(bloom, {
e :: float(), % error probability
@ -208,7 +204,6 @@ bitmask_build(BM) ->
case element(1,BM) of
array -> BM;
sparse_bitmap -> BM;
dense_bitmap -> BM;
dense_bitmap_ets -> hanoidb_dense_bitmap:build(BM)
end.
@ -220,25 +215,16 @@ bitmask_get(I, BM) ->
dense_bitmap -> hanoidb_dense_bitmap:member(I, BM)
end.
-ifdef(pre18).
-spec as_array(bitmask()) -> array().
-else.
-spec as_array(bitmask()) -> arrays:array().
-endif.
as_array(BM) ->
case array:is_array(BM) of
true -> BM
end.
%%%========== Bitarray representation - suitable for sparse arrays ==========
%% bitarray_new(N) -> array:new((N-1) div ?W + 1, {default, 0}).
bitarray_new(N) -> array:new((N-1) div ?W + 1, {default, 0}).
-ifdef(pre18).
-spec bitarray_set( non_neg_integer(), array() ) -> array().
-else.
-spec bitarray_set( non_neg_integer(), arrays:array() ) -> arrays:array().
-endif.
bitarray_set(I, A1) ->
A = as_array(A1),
AI = I div ?W,
@ -248,11 +234,7 @@ bitarray_set(I, A1) ->
true -> array:set(AI, V1, A)
end.
-ifdef(pre18).
-spec bitarray_get( non_neg_integer(), array() ) -> boolean().
-else.
-spec bitarray_get( non_neg_integer(), arrays:array() ) -> boolean().
-endif.
bitarray_get(I, A) ->
AI = I div ?W,
V = array:get(AI, A),

View file

@ -79,10 +79,10 @@ start(SendTo) ->
initialize(#state{sendto=SendTo, sendto_ref=MRef}, []),
?log("fold_worker done ~p~n", [self()])
catch
Class:Ex:Stacktrace ->
?log("fold_worker exception ~p:~p ~p~n", [Class, Ex, Stacktrace]),
error_logger:error_msg("Unexpected: ~p:~p ~p~n", [Class, Ex, Stacktrace]),
exit({bad, Class, Ex, Stacktrace})
Class:Ex ->
?log("fold_worker exception ~p:~p ~p~n", [Class, Ex, erlang:get_stacktrace()]),
error_logger:error_msg("Unexpected: ~p:~p ~p~n", [Class, Ex, erlang:get_stacktrace()]),
exit({bad, Class, Ex, erlang:get_stacktrace()})
end
end,
PID = plain_fsm:spawn(?MODULE, F),
@ -102,7 +102,7 @@ initialize(State, PrefixFolders) ->
%% gen_fsm handling
{system, From, Req} ->
plain_fsm:handle_system_msg(
Req, From, State, fun(S1) -> initialize(S1, PrefixFolders) end);
From, Req, State, fun(S1) -> initialize(S1, PrefixFolders) end);
{'DOWN', MRef, _, _, _} when MRef =:= State#state.sendto_ref ->
ok;
@ -164,7 +164,7 @@ fill_from_inbox(State, Values, Queues, [PID|_]=PIDs, SavePIDs) ->
%% gen_fsm handling
{system, From, Req} ->
plain_fsm:handle_system_msg(
Req, From, State, fun(S1) -> fill_from_inbox(S1, Values, Queues, PIDs, SavePIDs) end);
From, Req, State, fun(S1) -> fill_from_inbox(S1, Values, Queues, PIDs, SavePIDs) end);
{'DOWN', MRef, _, _, _} when MRef =:= State#state.sendto_ref ->
ok;
@ -231,3 +231,5 @@ data_vsn() ->
code_change(_OldVsn, _State, _Extra) ->
{ok, {#state{}, data_vsn()}}.

View file

@ -0,0 +1,70 @@
-module(hanoidb_han2_backend).
-include("hanoidb.hrl").
-behavior(hanoidb_backend).
-export([open_random_reader/2, file_name/1, range_fold/4, lookup/2, close_random_reader/1]).
-export([open_batch_reader/2, read_next/1, close_batch_reader/1]).
-export([open_batch_writer/2, write_next/2, write_count/1, close_batch_writer/1]).
open_random_reader(Name, Options) ->
hanoidb_reader:open(Name, [random|Options]).
file_name(Reader) ->
hanoidb_reader:file_name(Reader).
lookup(Key, Reader) ->
hanoidb_reader:lookup(Reader, Key).
range_fold(Fun, Acc, Reader, Range) ->
hanoidb_reader:range_fold(Fun, Acc, Reader, Range).
close_random_reader(Reader) ->
hanoidb_reader:close(Reader).
open_batch_reader(Name, Options) ->
hanoidb_reader:open(Name, [sequential|Options]).
read_next(Reader) ->
case hanoidb_reader:next_node(Reader) of
{node, KVs} ->
{[ unfold(KV) || KV <- KVs], Reader};
end_of_data ->
'done';
{error, _}=Err ->
Err
end.
unfold({Key,{Value, Expiry}}) when is_binary(Value); ?TOMBSTONE =:= Value ->
{Key,Value,Expiry};
unfold({Key,Value}) ->
{Key, Value, infinity}.
close_batch_reader(Reader) ->
hanoidb_reader:close(Reader).
open_batch_writer(Name, Options) ->
hanoidb_writer:init([Name, Options]).
write_next( {Key, Value, infinity}, Writer) ->
{noreply, Writer2} = hanoidb_writer:handle_cast({add, Key, Value}, Writer),
{ok, Writer2};
write_next( {Key, Value, Expiry}, Writer) ->
{noreply, Writer2} = hanoidb_writer:handle_cast({add, Key, {Value, Expiry}}, Writer),
{ok, Writer2}.
write_count( Writer ) ->
case hanoidb_writer:handle_call(count, self(), Writer) of
{ok, Count, _} ->
{ok, Count};
Err ->
{error, Err}
end.
close_batch_writer(Writer) ->
{stop, normal, ok, _} = hanoidb_writer:handle_call(close, self(), Writer),
ok.

View file

@ -46,14 +46,29 @@
-export([open/5, lookup/2, lookup/3, inject/2, close/1, snapshot_range/3, blocking_range/3,
begin_incremental_merge/2, await_incremental_merge/1, set_max_level/2,
unmerged_count/1, destroy/1, level/1]).
unmerged_count/1, destroy/1]).
-include_lib("kernel/include/file.hrl").
-record(state, {
a, b, c, next, dir, level, inject_done_ref, merge_pid, folding = [],
step_next_ref, step_caller, step_merge_ref,
opts = [], owner, work_in_progress=0, work_done=0, max_level=?TOP_LEVEL
a :: undefined | hanoidb_backend:random_reader(),
b :: undefined | hanoidb_backend:random_reader(),
c :: undefined | hanoidb_backend:random_reader(),
next :: pid() | undefined,
dir :: string(),
level :: non_neg_integer(),
inject_done_ref :: reference(),
merge_pid :: pid(),
folding = [] :: list( pid() ),
step_next_ref :: reference() | undefined,
step_caller :: plain_rpc:caller() | undefined,
step_merge_ref :: reference() | undefined,
opts = [] :: list(),
owner :: pid(),
work_in_progress=0 :: non_neg_integer(),
work_done=0 :: non_neg_integer(),
max_level=?TOP_LEVEL :: pos_integer(),
backend = hanoidb_han2_backend :: atom()
}).
@ -90,9 +105,6 @@ open(Dir,Level,Next,Opts,Owner) when Level>0 ->
SpawnOpt),
{ok, PID}.
level(Ref) ->
plain_rpc:call(Ref, level).
lookup(Ref, Key) ->
plain_rpc:call(Ref, {lookup, Key}).
@ -159,9 +171,9 @@ initialize(State) ->
_Result = initialize2(State),
?log(" ** terminated ~p", [_Result])
catch
Class:Ex:Stacktrace when not (Class == exit andalso Ex == normal) ->
?log("crashing ~p:~p ~p~n", [Class,Ex,Stacktrace]),
error_logger:error_msg("crash: ~p:~p ~p~n", [Class,Ex,Stacktrace])
Class:Ex when not (Class == exit andalso Ex == normal) ->
?log("crashing ~p:~p ~p~n", [Class,Ex,erlang:get_stacktrace()]),
error_logger:error_msg("crash: ~p:~p ~p~n", [Class,Ex,erlang:get_stacktrace()])
end.
initialize2(State) ->
@ -180,6 +192,8 @@ initialize2(State) ->
file:delete(filename("BF",State)),
file:delete(filename("CF",State)),
Mod = State#state.backend,
case file:read_file_info(MFileName) of
{ok, _} ->
@ -191,12 +205,12 @@ initialize2(State) ->
file:delete(BFileName),
ok = file:rename(MFileName, AFileName),
{ok, IXA} = hanoidb_reader:open(AFileName, [random|State#state.opts]),
{ok, IXA} = Mod:open_random_reader(AFileName, [random|State#state.opts]),
case file:read_file_info(CFileName) of
{ok, _} ->
file:rename(CFileName, BFileName),
{ok, IXB} = hanoidb_reader:open(BFileName, [random|State#state.opts]),
{ok, IXB} = Mod:open_random_reader(BFileName, [random|State#state.opts]),
check_begin_merge_then_loop0(init_state(State#state{ a= IXA, b=IXB }));
{error, enoent} ->
@ -206,13 +220,13 @@ initialize2(State) ->
{error, enoent} ->
case file:read_file_info(BFileName) of
{ok, _} ->
{ok, IXA} = hanoidb_reader:open(AFileName, [random|State#state.opts]),
{ok, IXB} = hanoidb_reader:open(BFileName, [random|State#state.opts]),
{ok, IXA} = Mod:open_random_reader(AFileName, [random|State#state.opts]),
{ok, IXB} = Mod:open_random_reader(BFileName, [random|State#state.opts]),
IXC =
case file:read_file_info(CFileName) of
{ok, _} ->
{ok, C} = hanoidb_reader:open(CFileName, [random|State#state.opts]),
{ok, C} = Mod:open_random_reader(CFileName, [random|State#state.opts]),
C;
{error, enoent} ->
undefined
@ -227,7 +241,7 @@ initialize2(State) ->
case file:read_file_info(AFileName) of
{ok, _} ->
{ok, IXA} = hanoidb_reader:open(AFileName, [random|State#state.opts]),
{ok, IXA} = Mod:open_random_reader(AFileName, [random|State#state.opts]),
main_loop(init_state(State#state{ a=IXA }));
{error, enoent} ->
@ -266,28 +280,28 @@ check_begin_merge_then_loop(State=#state{a=IXA, b=IXB, merge_pid=undefined})
check_begin_merge_then_loop(State) ->
main_loop(State).
main_loop(State = #state{ next=Next }) ->
main_loop(State = #state{ next=Next, backend=Mod }) ->
Parent = plain_fsm:info(parent),
receive
?CALL(From, {lookup, Key})=Req ->
case do_lookup(Key, [State#state.c, State#state.b, State#state.a, Next]) of
not_found ->
case do_lookup(Mod, Key, [State#state.c, State#state.b, State#state.a]) of
not_found when Next =:= undefined ->
plain_rpc:send_reply(From, not_found);
{found, Result} ->
plain_rpc:send_reply(From, {ok, Result});
{delegate, DelegatePid} ->
DelegatePid ! Req
not_found ->
Next ! Req
end,
main_loop(State);
?CAST(_From, {lookup, Key, ReplyFun})=Req ->
case do_lookup(Key, [State#state.c, State#state.b, State#state.a, Next]) of
not_found ->
case do_lookup(Mod, Key, [State#state.c, State#state.b, State#state.a]) of
not_found when Next =:= undefined ->
ReplyFun(not_found);
{found, Result} ->
ReplyFun({ok, Result});
{delegate, DelegatePid} ->
DelegatePid ! Req
not_found ->
Next ! Req
end,
main_loop(State);
@ -313,7 +327,7 @@ main_loop(State = #state{ next=Next }) ->
plain_rpc:send_reply(From, ok),
case hanoidb_reader:open(ToFileName, [random|State#state.opts]) of
case Mod:open_random_reader(ToFileName, [random|State#state.opts]) of
{ok, BT} ->
if SetPos == #state.b ->
check_begin_merge_then_loop(setelement(SetPos, State, BT));
@ -359,11 +373,6 @@ main_loop(State = #state{ next=Next }) ->
->
do_step(StepFrom, DoneWork, StepSize, State);
%% simply replies the level number
?CALL(From, level) ->
plain_rpc:send_reply(From, State#state.level),
main_loop(State);
{MRef, step_done} when MRef == State#state.step_merge_ref ->
demonitor(MRef, [flush]),
@ -411,13 +420,12 @@ main_loop(State = #state{ next=Next }) ->
%% and we have finished the incremental merge at this level
State2 = reply_step_ok(State),
erlang:demonitor(MRef, [flush]),
main_loop(State2#state{ step_next_ref=undefined });
?CALL(From, close) ->
close_if_defined(State#state.a),
close_if_defined(State#state.b),
close_if_defined(State#state.c),
close_if_defined(Mod, State#state.a),
close_if_defined(Mod, State#state.b),
close_if_defined(Mod, State#state.c),
[stop_if_defined(PID) || PID <- [State#state.merge_pid | State#state.folding]],
%% this is synchronous all the way down, because our
@ -431,9 +439,9 @@ main_loop(State = #state{ next=Next }) ->
{ok, closing};
?CALL(From, destroy) ->
destroy_if_defined(State#state.a),
destroy_if_defined(State#state.b),
destroy_if_defined(State#state.c),
destroy_if_defined(Mod,State#state.a),
destroy_if_defined(Mod,State#state.b),
destroy_if_defined(Mod,State#state.c),
[stop_if_defined(PID) || PID <- [State#state.merge_pid | State#state.folding]],
%% this is synchronous all the way down, because our
@ -505,27 +513,27 @@ main_loop(State = #state{ next=Next }) ->
{_, undefined, undefined} ->
ARef = erlang:make_ref(),
ok = do_range_fold(State#state.a, WorkerPID, ARef, Range),
ok = do_range_fold(Mod,State#state.a, WorkerPID, ARef, Range),
[ARef|List];
{_, _, undefined} ->
BRef = erlang:make_ref(),
ok = do_range_fold(State#state.b, WorkerPID, BRef, Range),
ok = do_range_fold(Mod,State#state.b, WorkerPID, BRef, Range),
ARef = erlang:make_ref(),
ok = do_range_fold(State#state.a, WorkerPID, ARef, Range),
ok = do_range_fold(Mod,State#state.a, WorkerPID, ARef, Range),
[ARef,BRef|List];
{_, _, _} ->
CRef = erlang:make_ref(),
ok = do_range_fold(State#state.c, WorkerPID, CRef, Range),
ok = do_range_fold(Mod,State#state.c, WorkerPID, CRef, Range),
BRef = erlang:make_ref(),
ok = do_range_fold(State#state.b, WorkerPID, BRef, Range),
ok = do_range_fold(Mod,State#state.b, WorkerPID, BRef, Range),
ARef = erlang:make_ref(),
ok = do_range_fold(State#state.a, WorkerPID, ARef, Range),
ok = do_range_fold(Mod,State#state.a, WorkerPID, ARef, Range),
[ARef,BRef,CRef|List]
end,
@ -551,9 +559,9 @@ main_loop(State = #state{ next=Next }) ->
undefined ->
main_loop(State2#state{ merge_pid=undefined });
CFile ->
ok = hanoidb_reader:close(CFile),
ok = Mod:close_random_reader(CFile),
ok = file:rename(filename("C", State2), filename("A", State2)),
{ok, AFile} = hanoidb_reader:open(filename("A", State2), [random|State#state.opts]),
{ok, AFile} = Mod:open_random_reader(filename("A", State2), [random|State#state.opts]),
main_loop(State2#state{ a = AFile, c = undefined, merge_pid=undefined })
end;
@ -574,7 +582,7 @@ main_loop(State = #state{ next=Next }) ->
% then, rename M to A, and open it
AFileName = filename("A",State2),
ok = file:rename(MFileName, AFileName),
{ok, AFile} = hanoidb_reader:open(AFileName, [random|State#state.opts]),
{ok, AFile} = Mod:open_random_reader(AFileName, [random|State#state.opts]),
% iff there is a C file, then move it to B position
% TODO: consider recovery for this
@ -582,9 +590,9 @@ main_loop(State = #state{ next=Next }) ->
undefined ->
main_loop(State2#state{ a=AFile, b=undefined, merge_pid=undefined });
CFile ->
ok = hanoidb_reader:close(CFile),
ok = Mod:close_random_reader(CFile),
ok = file:rename(filename("C", State2), filename("B", State2)),
{ok, BFile} = hanoidb_reader:open(filename("B", State2), [random|State#state.opts]),
{ok, BFile} = Mod:open_random_reader(filename("B", State2), [random|State#state.opts]),
check_begin_merge_then_loop(State2#state{ a=AFile, b=BFile, c=undefined,
merge_pid=undefined })
end;
@ -641,7 +649,7 @@ main_loop(State = #state{ next=Next }) ->
%% gen_fsm handling
{system, From, Req} ->
plain_fsm:handle_system_msg(
Req, From, State, fun(S1) -> main_loop(S1) end);
From, Req, State, fun(S1) -> main_loop(S1) end);
{'EXIT', Parent, Reason} ->
plain_fsm:parent_EXIT(Reason, State);
@ -731,9 +739,9 @@ reply_step_ok(State) ->
case State#state.step_caller of
undefined ->
ok;
_ ->
?log("step_ok -> ~p", [State#state.step_caller]),
plain_rpc:send_reply(State#state.step_caller, step_ok)
Caller ->
?log("step_ok -> ~p", [Caller]),
plain_rpc:send_reply(Caller, step_ok)
end,
State#state{ step_caller=undefined }.
@ -747,27 +755,28 @@ total_unmerged(State) ->
do_lookup(_Key, []) ->
do_lookup(_, _Key, []) ->
not_found;
do_lookup(_Key, [Pid]) when is_pid(Pid) ->
{delegate, Pid};
do_lookup(Key, [undefined|Rest]) ->
do_lookup(Key, Rest);
do_lookup(Key, [BT|Rest]) ->
case hanoidb_reader:lookup(BT, Key) of
do_lookup(Mod, Key, [undefined|Rest]) ->
do_lookup(Mod, Key, Rest);
do_lookup(Mod, Key, [BT|Rest]) ->
case Mod:lookup(Key, BT) of
{ok, ?TOMBSTONE} ->
not_found;
{ok, Result} ->
{found, Result};
not_found ->
do_lookup(Key, Rest)
do_lookup(Mod,Key, Rest)
end.
close_if_defined(undefined) -> ok;
close_if_defined(BT) -> hanoidb_reader:close(BT).
close_if_defined(_, undefined) -> ok;
close_if_defined(Mod, BT) -> Mod:close_random_reader(BT).
destroy_if_defined(undefined) -> ok;
destroy_if_defined(BT) -> hanoidb_reader:destroy(BT).
destroy_if_defined(_, undefined) -> ok;
destroy_if_defined(Mod, BT) ->
{ok, Name} = Mod:file_name(BT),
Mod:close_random_reader(BT),
file:delete(Name).
stop_if_defined(undefined) -> ok;
stop_if_defined(MergePid) when is_pid(MergePid) ->
@ -784,26 +793,42 @@ begin_merge(State) ->
AFileName = filename("A",State),
BFileName = filename("B",State),
XFileName = filename("X",State),
Owner = self(),
?log("starting merge~n", []),
file:delete(XFileName),
MergePID = hanoidb_merger:start(AFileName, BFileName, XFileName,
MergePID = proc_lib:spawn_link(fun() ->
try
?log("merge begun~n", []),
{ok, OutCount} = hanoidb_backend:merge(
State#state.backend,
AFileName, BFileName, XFileName,
?BTREE_SIZE(State#state.level + 1),
State#state.next =:= undefined,
State#state.opts),
State#state.opts ),
Owner ! ?CAST(self(),{merge_done, OutCount, XFileName})
catch
C:E ->
error_logger:error_msg("merge failed ~p:~p ~p~n",
[C,E,erlang:get_stacktrace()]),
erlang:raise(C,E,erlang:get_stacktrace())
end
end),
{ok, MergePID}.
close_and_delete_a_and_b(State) ->
AFileName = filename("A",State),
BFileName = filename("B",State),
Mod = State#state.backend,
ok = hanoidb_reader:close(State#state.a),
ok = hanoidb_reader:close(State#state.b),
ok = Mod:close_random_reader(State#state.a),
ok = Mod:close_random_reader(State#state.b),
ok = file:delete(AFileName),
ok = file:delete(BFileName),
@ -817,32 +842,33 @@ filename(PFX, State) ->
start_range_fold(FileName, WorkerPID, Range, State) ->
Owner = self(),
Mod = State#state.backend,
PID = proc_lib:spawn( fun() ->
try
?log("start_range_fold ~p on ~p -> ~p", [self(), FileName, WorkerPID]),
erlang:link(WorkerPID),
{ok, File} = hanoidb_reader:open(FileName, [folding|State#state.opts]),
do_range_fold2(File, WorkerPID, self(), Range),
{ok, File} = Mod:open_random_reader(FileName, [folding|State#state.opts]),
do_range_fold2(Mod,File, WorkerPID, self(), Range),
erlang:unlink(WorkerPID),
hanoidb_reader:close(File),
Mod:close_random_reader(File),
%% this will release the pinning of the fold file
Owner ! {range_fold_done, self(), FileName},
ok
catch
Class:Ex ->
try throw(42) catch _:_:Stk -> io:format(user, "BAD: ~p:~p ~p~n", [Class,Ex,Stk]) end
io:format(user, "BAD: ~p:~p ~p~n", [Class,Ex,erlang:get_stacktrace()])
end
end ),
{ok, PID}.
-spec do_range_fold(BT :: hanoidb_reader:read_file(),
-spec do_range_fold(Mod :: atom(),
BT :: hanoidb_backend:random_reader(),
WorkerPID :: pid(),
SelfOrRef :: pid() | reference(),
Range :: #key_range{} ) -> ok.
do_range_fold(BT, WorkerPID, SelfOrRef, Range) ->
case hanoidb_reader:range_fold(fun(Key,Value,_) ->
do_range_fold(Mod, BT, WorkerPID, SelfOrRef, Range) ->
case Mod:range_fold(fun(Key,Value,_) ->
WorkerPID ! {level_result, SelfOrRef, Key, Value},
ok
end,
@ -860,12 +886,13 @@ do_range_fold(BT, WorkerPID, SelfOrRef, Range) ->
-define(FOLD_CHUNK_SIZE, 100).
-spec do_range_fold2(BT :: hanoidb_reader:read_file(),
-spec do_range_fold2(Mod :: atom(),
BT :: hanoidb_reader:read_file(),
WorkerPID :: pid(),
SelfOrRef :: pid() | reference(),
Range :: #key_range{} ) -> ok.
do_range_fold2(BT, WorkerPID, SelfOrRef, Range) ->
try hanoidb_reader:range_fold(fun(Key,Value,{0,KVs}) ->
do_range_fold2(Mod, BT, WorkerPID, SelfOrRef, Range) ->
try Mod:range_fold(fun(Key,Value,{0,KVs}) ->
send(WorkerPID, SelfOrRef, [{Key,Value}|KVs]),
{?FOLD_CHUNK_SIZE-1, []};
(Key,Value,{N,KVs}) ->

View file

@ -1,35 +0,0 @@
-module(hanoidb_load).
-export([run/1]).
run(Dir) ->
case hanoidb:open(filename:join(Dir, "test.hanoidb"), []) of
{error, Reason} ->
{error, Reason};
{ok, Tree} -> fill_db_timed(Tree, 15 * 60 * 1000)
end.
%% fill_db(Tree) -> fill_db(Tree, 5000).
%% fill_db(Tree, 0) -> hanoidb:close(Tree);
%% fill_db(Tree, N) ->
%% Letter = N rem 26 + $a,
%% Length = rand:uniform(100),
%% Key = << <<Letter/utf8>> || _ <- lists:seq(0, Length) >>,
%% ok = hanoidb:put(Tree, Key, Key),
%% fill_db(Tree, N - 1).
fill_db_timed(Tree, Timeout) ->
erlang:send_after(Timeout, self(), stop),
fill_db_loop(Tree, 0).
fill_db_loop(Tree, N) ->
receive
stop ->
ok
after 0 ->
Key = crypto:strong_rand_bytes(2000),
Letter = N rem 26 + $a,
Length = rand:uniform(100),
Value = << <<Letter/utf8>> || _ <- lists:seq(0, Length) >>,
ok = hanoidb:put(Tree, Key, Value),
fill_db_loop(Tree, N+1)
end.

View file

@ -28,10 +28,9 @@
%% @doc Merging two Indexes
-export([start/6, merge/6]).
-export([merge/6]).
-include("hanoidb.hrl").
-include("include/plain_rpc.hrl").
%% A merger which is inactive for this long will sleep which means that it will
%% close open files, and compress the current bloom filter.
@ -41,27 +40,6 @@
%% merges, so we default to running the entire merge in one process.
-define(LOCAL_WRITER, true).
-spec start(string(), string(), string(), integer(), boolean(), list()) -> pid().
start(A,B,X, Size, IsLastLevel, Options) ->
Owner = self(),
plain_fsm:spawn_link(?MODULE, fun() ->
try
{ok, OutCount} = hanoidb_merger:merge(A, B, X,
Size,
IsLastLevel,
Options),
Owner ! ?CAST(self(),{merge_done, OutCount, X})
catch
C:E:Stacktrace ->
%% this semi-bogus code makes sure we always get a stack trace if merging fails
error_logger:error_msg("~p: merge failed ~p:~p ~p -> ~s~n",
[self(), C,E,Stacktrace, X]),
erlang:raise(C,E,Stacktrace)
end
end).
-spec merge(string(), string(), string(), integer(), boolean(), list()) -> {ok, integer()}.
merge(A,B,C, Size, IsLastLevel, Options) ->
{ok, IXA} = hanoidb_reader:open(A, [sequential|Options]),
@ -69,19 +47,19 @@ merge(A,B,C, Size, IsLastLevel, Options) ->
{ok, Out} = hanoidb_writer:init([C, [{size, Size} | Options]]),
AKVs =
case hanoidb_reader:first_node(IXA) of
{kvlist, AKV} -> AKV;
{node, AKV} -> AKV;
none -> []
end,
BKVs =
case hanoidb_reader:first_node(IXB) of
{kvlist, BKV} ->BKV;
{node, BKV} ->BKV;
none -> []
end,
scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {0, none}).
terminate(Out) ->
{ok, Count, Out1} = hanoidb_writer:handle_call(count, self(), Out),
{stop, normal, ok, _Out2} = hanoidb_writer:handle_call(close, self(), Out1),
{ok, Count, _} = hanoidb_writer:handle_call(count, self(), Out),
{stop, normal, ok, _} = hanoidb_writer:handle_call(close, self(), Out),
{ok, Count}.
step(S) ->
@ -94,77 +72,13 @@ hibernate_scan(Keep) ->
erlang:garbage_collect(),
receive
{step, From, HowMany} ->
{IXA, IXB, Out, IsLastLevel, AKVs, BKVs, N} = erlang:binary_to_term(Keep),
{IXA, IXB, Out, IsLastLevel, AKVs, BKVs, N} = erlang:binary_to_term(zlib:gunzip(Keep)),
scan(hanoidb_reader:deserialize(IXA),
hanoidb_reader:deserialize(IXB),
hanoidb_writer:deserialize(Out),
IsLastLevel, AKVs, BKVs, {N+HowMany, From});
%% gen_fsm handling
{system, From, Req} ->
plain_fsm:handle_system_msg(
Req, From, Keep, fun hibernate_scan/1);
{'EXIT', Parent, Reason} ->
case plain_fsm:info(parent) of
Parent ->
plain_fsm:parent_EXIT(Reason, Keep)
end
IsLastLevel, AKVs, BKVs, {N+HowMany, From})
end.
hibernate_scan_only(Keep) ->
erlang:garbage_collect(),
receive
{step, From, HowMany} ->
{IX, OutBin, IsLastLevel, KVs, N} = erlang:binary_to_term(Keep),
scan_only(hanoidb_reader:deserialize(IX),
hanoidb_writer:deserialize(OutBin),
IsLastLevel, KVs, {N+HowMany, From});
%% gen_fsm handling
{system, From, Req} ->
plain_fsm:handle_system_msg(
Req, From, Keep, fun hibernate_scan_only/1);
{'EXIT', Parent, Reason} ->
case plain_fsm:info(parent) of
Parent ->
plain_fsm:parent_EXIT(Reason, Keep)
end
end.
receive_scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N, FromPID}) ->
receive
{step, From, HowMany} ->
scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N+HowMany, From});
%% gen_fsm handling
{system, From, Req} ->
plain_fsm:handle_system_msg(
Req, From, {IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N, FromPID}},
fun({IXA2, IXB2, Out2, IsLastLevel2, AKVs2, BKVs2, {N2, FromPID2}}) ->
receive_scan(IXA2, IXB2, Out2, IsLastLevel2, AKVs2, BKVs2, {N2, FromPID2})
end);
{'EXIT', Parent, Reason} ->
case plain_fsm:info(parent) of
Parent ->
plain_fsm:parent_EXIT(Reason, {IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N, FromPID}})
end
after ?HIBERNATE_TIMEOUT ->
Args = {hanoidb_reader:serialize(IXA),
hanoidb_reader:serialize(IXB),
hanoidb_writer:serialize(Out), IsLastLevel, AKVs, BKVs, N},
Keep = erlang:term_to_binary(Args, [compressed]),
hibernate_scan(Keep)
end.
scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N, FromPID}) when N < 1, AKVs =/= [], BKVs =/= [] ->
case FromPID of
none ->
@ -173,11 +87,20 @@ scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N, FromPID}) when N < 1, AKVs =/=
PID ! {Ref, step_done}
end,
receive_scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N, FromPID});
receive
{step, From, HowMany} ->
scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N+HowMany, From})
after ?HIBERNATE_TIMEOUT ->
Args = {hanoidb_reader:serialize(IXA),
hanoidb_reader:serialize(IXB),
hanoidb_writer:serialize(Out), IsLastLevel, AKVs, BKVs, N},
Keep = zlib:gzip(erlang:term_to_binary(Args)),
hibernate_scan(Keep)
end;
scan(IXA, IXB, Out, IsLastLevel, [], BKVs, Step) ->
case hanoidb_reader:next_node(IXA) of
{kvlist, AKVs} ->
{node, AKVs} ->
scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, Step);
end_of_data ->
hanoidb_reader:close(IXA),
@ -186,7 +109,7 @@ scan(IXA, IXB, Out, IsLastLevel, [], BKVs, Step) ->
scan(IXA, IXB, Out, IsLastLevel, AKVs, [], Step) ->
case hanoidb_reader:next_node(IXB) of
{kvlist, BKVs} ->
{node, BKVs} ->
scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, Step);
end_of_data ->
hanoidb_reader:close(IXB),
@ -205,37 +128,17 @@ scan(IXA, IXB, Out, IsLastLevel, [{_Key1,_Value1}|AT]=_AKVs, [{Key2,Value2}|IX]=
{noreply, Out3} = hanoidb_writer:handle_cast({add, Key2, Value2}, Out),
scan(IXA, IXB, Out3, IsLastLevel, AT, IX, step(Step, 2)).
receive_scan_only(IX, Out, IsLastLevel, KVs, {N, FromPID}) ->
hibernate_scan_only(Keep) ->
erlang:garbage_collect(),
receive
{step, From, HowMany} ->
scan_only(IX, Out, IsLastLevel, KVs, {N+HowMany, From});
%% gen_fsm handling
{system, From, Req} ->
plain_fsm:handle_system_msg(
Req, From, {IX, Out, IsLastLevel, KVs, {N, FromPID}},
fun({IX2, Out2, IsLastLevel2, KVs2, {N2, FromPID2}}) ->
receive_scan_only(IX2, Out2, IsLastLevel2, KVs2, {N2, FromPID2})
end);
{'EXIT', Parent, Reason} ->
case plain_fsm:info(parent) of
Parent ->
plain_fsm:parent_EXIT(Reason, {IX, Out, IsLastLevel, KVs, {N, FromPID}})
end
after ?HIBERNATE_TIMEOUT ->
Args = {hanoidb_reader:serialize(IX),
hanoidb_writer:serialize(Out), IsLastLevel, KVs, N},
Keep = erlang:term_to_binary(Args, [compressed]),
hibernate_scan_only(Keep)
{IX, OutBin, IsLastLevel, KVs, N} = erlang:binary_to_term(zlib:gunzip(Keep)),
scan_only(hanoidb_reader:deserialize(IX),
hanoidb_writer:deserialize(OutBin),
IsLastLevel, KVs, {N+HowMany, From})
end.
scan_only(IX, Out, IsLastLevel, KVs, {N, FromPID}) when N < 1, KVs =/= [] ->
case FromPID of
none ->
@ -244,11 +147,19 @@ scan_only(IX, Out, IsLastLevel, KVs, {N, FromPID}) when N < 1, KVs =/= [] ->
PID ! {Ref, step_done}
end,
receive_scan_only(IX, Out, IsLastLevel, KVs, {N, FromPID});
receive
{step, From, HowMany} ->
scan_only(IX, Out, IsLastLevel, KVs, {N+HowMany, From})
after ?HIBERNATE_TIMEOUT ->
Args = {hanoidb_reader:serialize(IX),
hanoidb_writer:serialize(Out), IsLastLevel, KVs, N},
Keep = zlib:gzip(erlang:term_to_binary(Args)),
hibernate_scan_only(Keep)
end;
scan_only(IX, Out, IsLastLevel, [], {_, FromPID}=Step) ->
case hanoidb_reader:next_node(IX) of
{kvlist, KVs} ->
{node, KVs} ->
scan_only(IX, Out, IsLastLevel, KVs, Step);
end_of_data ->
case FromPID of

View file

@ -25,46 +25,46 @@
-module(hanoidb_nursery).
-author('Kresten Krab Thorup <krab@trifork.com>').
-export([new/4, recover/5, finish/2, lookup/2, add/4, add/5]).
-export([new/3, recover/4, finish/2, lookup/2, add/4, add/5]).
-export([do_level_fold/3, set_max_level/2, transact/3, destroy/1]).
-include("include/hanoidb.hrl").
-include("hanoidb.hrl").
-include_lib("kernel/include/file.hrl").
-spec new(string(), integer(), integer(), [_]) -> {ok, #nursery{}} | {error, term()}.
-spec new(string(), integer(), [_]) -> {ok, #nursery{}} | {error, term()}.
-define(LOGFILENAME(Dir), filename:join(Dir, "nursery.log")).
%% do incremental merge every this many inserts
%% this value *must* be less than or equal to
%% 2^TOP_LEVEL == ?BTREE_SIZE(?TOP_LEVEL)
-define(INC_MERGE_STEP, ?BTREE_SIZE(MinLevel) div 2).
-define(INC_MERGE_STEP, ?BTREE_SIZE(?TOP_LEVEL)/2).
new(Directory, MinLevel, MaxLevel, Config) ->
new(Directory, MaxLevel, Config) ->
hanoidb_util:ensure_expiry(Config),
{ok, File} = file:open(?LOGFILENAME(Directory),
[raw, exclusive, write, delayed_write, append]),
{ok, #nursery{ log_file=File, dir=Directory, cache= gb_trees:empty(),
min_level=MinLevel, max_level=MaxLevel, config=Config }}.
max_level=MaxLevel, config=Config }}.
recover(Directory, TopLevel, MinLevel, MaxLevel, Config)
when MinLevel =< MaxLevel, is_integer(MinLevel), is_integer(MaxLevel) ->
recover(Directory, TopLevel, MaxLevel, Config) ->
hanoidb_util:ensure_expiry(Config),
case file:read_file_info(?LOGFILENAME(Directory)) of
{ok, _} ->
ok = do_recover(Directory, TopLevel, MinLevel, MaxLevel, Config),
new(Directory, MinLevel, MaxLevel, Config);
ok = do_recover(Directory, TopLevel, MaxLevel, Config),
new(Directory, MaxLevel, Config);
{error, enoent} ->
new(Directory, MinLevel, MaxLevel, Config)
new(Directory, MaxLevel, Config)
end.
do_recover(Directory, TopLevel, MinLevel, MaxLevel, Config) ->
do_recover(Directory, TopLevel, MaxLevel, Config) ->
%% repair the log file; storing it in nursery2
LogFileName = ?LOGFILENAME(Directory),
{ok, Nursery} = read_nursery_from_log(Directory, MinLevel, MaxLevel, Config),
{ok, Nursery} = read_nursery_from_log(Directory, MaxLevel, Config),
ok = finish(Nursery, TopLevel),
%% assert log file is gone
{error, enoent} = file:read_file_info(LogFileName),
@ -82,7 +82,7 @@ fill_cache(Transactions, Cache)
when is_list(Transactions) ->
lists:foldl(fun fill_cache/2, Cache, Transactions).
read_nursery_from_log(Directory, MinLevel, MaxLevel, Config) ->
read_nursery_from_log(Directory, MaxLevel, Config) ->
{ok, LogBinary} = file:read_file(?LOGFILENAME(Directory)),
Cache =
case hanoidb_util:decode_crc_data(LogBinary, [], []) of
@ -92,14 +92,14 @@ read_nursery_from_log(Directory, MinLevel, MaxLevel, Config) ->
error_logger:info_msg("ignoring undecypherable bytes in ~p~n", [?LOGFILENAME(Directory)]),
fill_cache(KVs, gb_trees:empty())
end,
{ok, #nursery{ dir=Directory, cache=Cache, count=gb_trees:size(Cache), min_level=MinLevel, max_level=MaxLevel, config=Config }}.
{ok, #nursery{ dir=Directory, cache=Cache, count=gb_trees:size(Cache), max_level=MaxLevel, config=Config }}.
%% @doc Add a Key/Value to the nursery
%% @end
-spec do_add(#nursery{}, binary(), binary()|?TOMBSTONE, non_neg_integer() | infinity, pid()) -> {ok, #nursery{}} | {full, #nursery{}}.
do_add(Nursery, Key, Value, infinity, Top) ->
do_add(Nursery, Key, Value, 0, Top);
do_add(Nursery=#nursery{log_file=File, cache=Cache, total_size=TotalSize, count=Count, config=Config}, Key, Value, KeyExpiryTime, Top) ->
do_add(Nursery=#nursery{log_file=File, cache=Cache, total_size=TotalSize, count=Count, config=Config}, Key, Value, KeyExpiryTime, Top) when is_integer(KeyExpiryTime) ->
DatabaseExpiryTime = hanoidb:get_opt(expiry_secs, Config),
{Data, Cache2} =
@ -142,12 +142,12 @@ do_sync(File, Nursery) ->
case application:get_env(hanoidb, sync_strategy) of
{ok, sync} ->
file:datasync(File),
os:timestamp();
now();
{ok, {seconds, N}} ->
MicrosSinceLastSync = timer:now_diff(os:timestamp(), Nursery#nursery.last_sync),
if (MicrosSinceLastSync div 1000000) >= N ->
MicrosSinceLastSync = timer:now_diff(now(), Nursery#nursery.last_sync),
if (MicrosSinceLastSync / 1000000) >= N ->
file:datasync(File),
os:timestamp();
now();
true ->
Nursery#nursery.last_sync
end;
@ -175,7 +175,7 @@ lookup(Key, #nursery{cache=Cache}) ->
%% @end
-spec finish(Nursery::#nursery{}, TopLevel::pid()) -> ok.
finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile, merge_done=DoneMerge,
count=Count, config=Config, min_level=MinLevel }, TopLevel) ->
count=Count, config=Config, backend=Backend }, TopLevel) ->
hanoidb_util:ensure_expiry(Config),
@ -189,26 +189,27 @@ finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile, merge_done=DoneMerge,
N when N > 0 ->
%% next, flush cache to a new BTree
BTreeFileName = filename:join(Dir, "nursery.data"),
{ok, BT} = hanoidb_writer:open(BTreeFileName, [{size, ?BTREE_SIZE(MinLevel)},
{ok, BT} = Backend:open_batch_writer(BTreeFileName, [{size, ?BTREE_SIZE(?TOP_LEVEL)},
{compress, none} | Config]),
try
ok = gb_trees_ext:fold(fun(Key, Value, Acc) ->
ok = hanoidb_writer:add(BT, Key, Value),
Acc
end, ok, Cache)
after
ok = hanoidb_writer:close(BT)
end,
BT3 = gb_trees_ext:fold(fun(Key, {Value,Expiry}, BT1) ->
{ok, BT2} = Backend:write_next({Key,Value,Expiry}, BT1),
BT2;
(Key, Value, BT1) ->
{ok, BT2} = Backend:write_next({Key,Value,infinity}, BT1),
BT2
end, BT, Cache),
ok = Backend:close_batch_writer(BT3),
%% Inject the B-Tree (blocking RPC)
ok = hanoidb_level:inject(TopLevel, BTreeFileName),
%% Issue some work if this is a top-level inject (blocks until previous such
%% incremental merge is finished).
if DoneMerge >= ?BTREE_SIZE(MinLevel) ->
if DoneMerge >= ?BTREE_SIZE(?TOP_LEVEL) ->
ok;
true ->
hanoidb_level:begin_incremental_merge(TopLevel, ?BTREE_SIZE(MinLevel) - DoneMerge)
hanoidb_level:begin_incremental_merge(TopLevel, ?BTREE_SIZE(?TOP_LEVEL) - DoneMerge)
end;
% {ok, _Nursery2} = do_inc_merge(Nursery, Count, TopLevel);
@ -247,13 +248,13 @@ add(Key, Value, Expiry, Nursery, Top) ->
end.
-spec flush(#nursery{}, pid()) -> {ok, #nursery{}}.
flush(Nursery=#nursery{ dir=Dir, min_level=MinLevel, max_level=MaxLevel, config=Config }, Top) ->
flush(Nursery=#nursery{ dir=Dir, max_level=MaxLevel, config=Config }, Top) ->
ok = finish(Nursery, Top),
{error, enoent} = file:read_file_info(filename:join(Dir, "nursery.log")),
hanoidb_nursery:new(Dir, MinLevel, MaxLevel, Config).
hanoidb_nursery:new(Dir, MaxLevel, Config).
has_room(#nursery{ count=Count, min_level=MinLevel }, N) ->
(Count + N + 1) < ?BTREE_SIZE(MinLevel).
has_room(#nursery{ count=Count }, N) ->
(Count + N + 1) < ?BTREE_SIZE(?TOP_LEVEL).
ensure_space(Nursery, NeededRoom, Top) ->
case has_room(Nursery, NeededRoom) of
@ -303,7 +304,7 @@ transact1(Spec, Nursery1=#nursery{ log_file=File, cache=Cache0, total_size=Total
do_inc_merge(Nursery2#nursery{ cache=Cache2, total_size=TotalSize+erlang:iolist_size(Data), count=Count }, length(Spec), Top).
do_inc_merge(Nursery=#nursery{ step=Step, merge_done=Done, min_level=MinLevel }, N, TopLevel) ->
do_inc_merge(Nursery=#nursery{ step=Step, merge_done=Done }, N, TopLevel) ->
if Step+N >= ?INC_MERGE_STEP ->
hanoidb_level:begin_incremental_merge(TopLevel, Step + N),
{ok, Nursery#nursery{ step=0, merge_done=Done + Step + N }};

View file

@ -33,33 +33,35 @@
-define(ASSERT_WHEN(X), when X).
-export([open/1, open/2,close/1,lookup/2,fold/3,range_fold/4, destroy/1]).
-export([first_node/1,next_node/1]).
-export([first_node/1,next_node/1,file_name/1]).
-export([serialize/1, deserialize/1]).
-record(node, {level :: non_neg_integer(),
members=[] :: list(any()) | binary() }).
members=[] :: list(any()) }).
-record(index, {file :: file:io_device(),
root= none :: #node{} | none,
root :: #node{} | none,
bloom :: term(),
name :: string(),
config=[] :: term() }).
-type read_file() :: #index{}.
-export_type([read_file/0]).
-spec open(Name::string()) -> {ok, read_file()} | {error, any()}.
open(Name) ->
open(Name, [random]).
-type config() :: [sequential | folding | random | {atom(), term()}].
-spec open(Name::string(), config()) -> {ok, read_file()} | {error, any()}.
open(Name, Config) ->
case proplists:get_bool(sequential, Config) of
true ->
ReadBufferSize = hanoidb:get_opt(read_buffer_size, Config, 512 * 1024),
case file:open(Name, [raw,read,{read_ahead, ReadBufferSize},binary]) of
{ok, File} ->
{ok, Pos} = file:position(File, ?FIRST_BLOCK_POS),
{ok, #index{file=File, name=Name, config=Config}};
{error, _}=Err ->
Err
@ -84,7 +86,7 @@ open(Name, Config) ->
{ok, <<RootPos:64/unsigned>>} = file:pread(File, FileInfo#file_info.size - 8, 8),
{ok, <<BloomSize:32/unsigned>>} = file:pread(File, FileInfo#file_info.size - 12, 4),
{ok, BloomData} = file:pread(File, (FileInfo#file_info.size - 12 - BloomSize), BloomSize),
{ok, Bloom} = hanoidb_util:bin_to_bloom(BloomData),
Bloom = hanoidb_bloom:decode(BloomData),
%% read in the root node
Root =
@ -98,6 +100,9 @@ open(Name, Config) ->
{ok, #index{file=File, root=Root, bloom=Bloom, name=Name, config=Config}}
end.
file_name(#index{name=Name}) ->
{ok, Name}.
destroy(#index{file=File, name=Name}) ->
ok = file:close(File),
file:delete(Name).
@ -114,15 +119,11 @@ deserialize({seq_read_file, Index, Position}) ->
fold(Fun, Acc0, #index{file=File}) ->
{ok, Node} = read_node(File,?FIRST_BLOCK_POS),
fold0(File,fun({K,V},Acc) -> Fun(K,V,Acc) end,Node,Acc0).
fold0(File,Fun,#node{level=0, members=BinPage},Acc0) when is_binary(BinPage) ->
Acc1 = vbisect:foldl(fun(K, V, Acc2) -> Fun({K, decode_binary_value(V)}, Acc2) end,Acc0,BinPage),
fold1(File,Fun,Acc1);
fold0(File,Fun,#node{level=0, members=List},Acc0) when is_list(List) ->
fold0(File,Fun,#node{level=0, members=List},Acc0) ->
Acc1 = lists:foldl(Fun,Acc0,List),
fold1(File,Fun,Acc1);
fold0(File,Fun,_InnerNode,Acc0) ->
@ -133,42 +134,27 @@ fold1(File,Fun,Acc0) ->
eof ->
Acc0;
{ok, Node} ->
fold0(File,Fun,Node,Acc0)
fold0(File,Fun,Node,Acc0);
{error,_}=Err ->
exit(Err)
end.
-spec range_fold(fun((binary(),binary(),any()) -> any()), any(), #index{}, #key_range{}) ->
-spec range_fold(function(), any(), #index{}, #key_range{}) ->
{limit, any(), binary()} | {done, any()}.
range_fold(Fun, Acc0, #index{file=File,root=Root}, Range) ->
case Range#key_range.from_key =< first_key(Root) of
true ->
{ok, _} = file:position(File, ?FIRST_BLOCK_POS),
range_fold_from_here(Fun, Acc0, File, Range, Range#key_range.limit);
false ->
case find_leaf_node(File,Range#key_range.from_key,Root,?FIRST_BLOCK_POS) of
case lookup_node(File,Range#key_range.from_key,Root,?FIRST_BLOCK_POS) of
{ok, {Pos,_}} ->
{ok, _} = file:position(File, Pos),
range_fold_from_here(Fun, Acc0, File, Range, Range#key_range.limit);
do_range_fold(Fun, Acc0, File, Range, Range#key_range.limit);
{ok, Pos} ->
{ok, _} = file:position(File, Pos),
range_fold_from_here(Fun, Acc0, File, Range, Range#key_range.limit);
do_range_fold(Fun, Acc0, File, Range, Range#key_range.limit);
none ->
{done, Acc0}
end
end.
first_key(#node{members=Dict}) ->
{_,FirstKey} = fold_until_stop(fun({K,_},_) -> {stop, K} end, none, Dict),
FirstKey.
fold_until_stop(Fun,Acc,List) when is_list(List) ->
fold_until_stop2(Fun, {continue, Acc}, List);
fold_until_stop(Fun,Acc0,Bin) when is_binary(Bin) ->
vbisect:fold_until_stop(fun({Key,VBin},Acc1) ->
% io:format("-> DOING ~p,~p~n", [Key,Acc1]),
Fun({Key, decode_binary_value(VBin)}, Acc1)
end,
Acc0,
Bin).
fold_until_stop(Fun,Acc,List) ->
fold_until_stop2(Fun, {continue, Acc}, List).
fold_until_stop2(_Fun,{stop,Result},_) ->
{stopped, Result};
@ -190,8 +176,7 @@ get_value({Value, _TStamp}) ->
get_value(Value) ->
Value.
range_fold_from_here(Fun, Acc0, File, Range, undefined) ->
% io:format("RANGE_FOLD_FROM_HERE(~p,~p)~n", [Acc0,File]),
do_range_fold(Fun, Acc0, File, Range, undefined) ->
case next_leaf_node(File) of
eof ->
{done, Acc0};
@ -206,19 +191,21 @@ range_fold_from_here(Fun, Acc0, File, Range, undefined) ->
false ->
{continue, Fun(Key, get_value(Value), Acc)}
end;
(_Huh, Acc) ->
% io:format("SKIPPING ~p~n", [_Huh]),
(_, Acc) ->
{continue, Acc}
end,
Acc0,
Members) of
{stopped, Result} -> Result;
{ok, Acc1} ->
range_fold_from_here(Fun, Acc1, File, Range, undefined)
end
do_range_fold(Fun, Acc1, File, Range, undefined)
end;
range_fold_from_here(Fun, Acc0, File, Range, N0) ->
{error,_}=Err ->
Err
end;
do_range_fold(Fun, Acc0, File, Range, N0) ->
case next_leaf_node(File) of
eof ->
{done, Acc0};
@ -231,7 +218,7 @@ range_fold_from_here(Fun, Acc0, File, Range, N0) ->
({Key,?TOMBSTONE}, {N1,Acc}) when ?KEY_IN_FROM_RANGE(Key,Range) ->
{continue, {N1, Fun(Key, ?TOMBSTONE, Acc)}};
({Key,{?TOMBSTONE,TStamp}}, {N1,Acc}) when ?KEY_IN_FROM_RANGE(Key,Range) ->
case hanoidb_util:has_expired(TStamp) of
case hanoidb_utils:has_expired(TStamp) of
true ->
{continue, {N1,Acc}};
false ->
@ -248,73 +235,57 @@ range_fold_from_here(Fun, Acc0, File, Range, N0) ->
{continue, Acc}
end,
{N0, Acc0},
Members)
of
{stopped, Result} ->
Result;
Members) of
{stopped, Result} -> Result;
{ok, {N2, Acc1}} ->
range_fold_from_here(Fun, Acc1, File, Range, N2)
end
do_range_fold(Fun, Acc1, File, Range, N2)
end;
{error,_}=Err ->
Err
end.
find_leaf_node(_File,_FromKey,#node{level=0},Pos) ->
lookup_node(_File,_FromKey,#node{level=0},Pos) ->
{ok, Pos};
find_leaf_node(File,FromKey,#node{members=Members,level=N},_) when is_list(Members) ->
lookup_node(File,FromKey,#node{members=Members,level=N},_) ->
case find_start(FromKey, Members) of
{ok, ChildPos} when N==1 ->
{ok, ChildPos};
{ok, ChildPos} ->
recursive_find(File, FromKey, N, ChildPos);
case read_node(File,ChildPos) of
{ok, ChildNode} ->
lookup_node(File,FromKey,ChildNode,ChildPos);
eof ->
none
end;
not_found ->
none
end;
find_leaf_node(File,FromKey,#node{members=Members,level=N},_) when is_binary(Members) ->
case vbisect:find_geq(FromKey,Members) of
{ok, _, <<?TAG_POSLEN32, Pos:64/unsigned, Len:32/unsigned>>} ->
% io:format("** FIND_LEAF_NODE(~p,~p) -> {~p,~p}~n", [FromKey, N, Pos,Len]),
recursive_find(File, FromKey, N, {Pos,Len});
none ->
% io:format("** FIND_LEAF_NODE(~p,~p) -> none~n", [FromKey, N]),
none
end;
find_leaf_node(_,_,none,_) ->
lookup_node(_,_,none,_) ->
none.
recursive_find(_File,_FromKey,1,ChildPos) ->
{ok, ChildPos};
recursive_find(File,FromKey,N,ChildPos) when N>1 ->
case read_node(File,ChildPos) of
{ok, ChildNode} ->
find_leaf_node(File, FromKey,ChildNode,ChildPos);
eof ->
none
end.
%% used by the merger, needs list value
first_node(#index{file=File}) ->
case read_node(File, ?FIRST_BLOCK_POS) of
{ok, #node{level=0, members=Members}} ->
{kvlist, decode_member_list(Members)};
{node, Members};
eof->
none
end.
%% used by the merger, needs list value
next_node(#index{file=File}=_Index) ->
case next_leaf_node(File) of
{ok, #node{level=0, members=Members}} ->
{kvlist, decode_member_list(Members)};
{node, Members};
% {ok, #node{level=N}} when N>0 ->
% next_node(Index);
eof ->
end_of_data
end.
end_of_data;
decode_member_list(List) when is_list(List) ->
List;
decode_member_list(BinDict) when is_binary(BinDict) ->
vbisect:foldr( fun(Key,Value,Acc) ->
[{Key, decode_binary_value(Value) }|Acc]
end,
[],
BinDict).
{error,_}=Err ->
Err
end.
close(#index{file=undefined}) ->
ok;
@ -323,7 +294,7 @@ close(#index{file=File}) ->
lookup(#index{file=File, root=Node, bloom=Bloom}, Key) ->
case ?BLOOM_CONTAINS(Bloom, Key) of
case hanoidb_bloom:member(Key, Bloom) of
true ->
case lookup_in_node(File, Node, Key) of
not_found ->
@ -341,20 +312,11 @@ lookup(#index{file=File, root=Node, bloom=Bloom}, Key) ->
end.
lookup_in_node(_File,#node{level=0,members=Members}, Key) ->
find_in_leaf(Key,Members);
lookup_in_node(File,#node{members=Members},Key) when is_binary(Members) ->
case vbisect:find_geq(Key,Members) of
{ok, _Key, <<?TAG_POSLEN32, Pos:64, Size:32>>} ->
% io:format("FOUND ~p @ ~p~n", [_Key, {Pos,Size}]),
case read_node(File,{Pos,Size}) of
{ok, Node} ->
lookup_in_node(File, Node, Key);
eof ->
not_found
end;
none ->
not_found
case lists:keyfind(Key,1,Members) of
false ->
not_found;
{_,Value} ->
{ok, Value}
end;
lookup_in_node(File,#node{members=Members},Key) ->
@ -376,8 +338,8 @@ lookup_in_node(File,#node{members=Members},Key) ->
end),
try plain_rpc:call(PID, read)
catch
Class:Ex:Stacktrace ->
error_logger:error_msg("crashX: ~p:~p ~p~n", [Class,Ex,Stacktrace]),
Class:Ex ->
error_logger:error_msg("crashX: ~p:~p ~p~n", [Class,Ex,erlang:get_stacktrace()]),
not_found
end;
@ -466,32 +428,8 @@ next_leaf_node(File) ->
hanoidb_util:decode_index_node(0, Data);
{ok, <<Len:32/unsigned, _:16/unsigned>>} ->
{ok, _} = file:position(File, {cur,Len-2}),
next_leaf_node(File)
next_leaf_node(File);
{error,_}=Err ->
Err
end.
find_in_leaf(Key,Bin) when is_binary(Bin) ->
case vbisect:find(Key,Bin) of
{ok, BinValue} ->
{ok, decode_binary_value(BinValue)};
error ->
not_found
end;
find_in_leaf(Key,List) when is_list(List) ->
case lists:keyfind(Key, 1, List) of
{_, Value} ->
{ok, Value};
false ->
not_found
end.
decode_binary_value(<<?TAG_KV_DATA, Value/binary>>) ->
Value;
decode_binary_value(<<?TAG_KV_DATA2, TStamp:32, Value/binary>>) ->
{Value, TStamp};
decode_binary_value(<<?TAG_DELETED>>) ->
?TOMBSTONE;
decode_binary_value(<<?TAG_DELETED2, TStamp:32>>) ->
{?TOMBSTONE, TStamp};
decode_binary_value(<<?TAG_POSLEN32, Pos:64, Len:32>>) ->
{Pos, Len}.

View file

@ -38,25 +38,20 @@
, tstamp/0
, expiry_time/1
, has_expired/1
, ensure_expiry/1
, bloom_type/1
, bloom_new/2
, bloom_to_bin/1
, bin_to_bloom/1
, bin_to_bloom/2
, bloom_insert/2
, bloom_contains/2
]).
, ensure_expiry/1 ]).
-include("src/hanoidb.hrl").
-define(ERLANG_ENCODED, 131).
-define(CRC_ENCODED, 127).
-define(BISECT_ENCODED, 126).
-define(FILE_ENCODING, hanoi2).
-define(TAG_KV_DATA, 16#80).
-define(TAG_DELETED, 16#81).
-define(TAG_POSLEN32, 16#82).
-define(TAG_TRANSACT, 16#83).
-define(TAG_KV_DATA2, 16#84).
-define(TAG_DELETED2, 16#85).
-define(TAG_END, 16#FF).
-compile({inline, [crc_encapsulate/1, crc_encapsulate_kv_entry/2 ]}).
@ -139,47 +134,18 @@ uncompress(<<?GZIP_COMPRESSION, Data/binary>>) ->
zlib:gunzip(Data).
encode_index_node(KVList, Method) ->
TermData =
case ?FILE_ENCODING of
bisect ->
Binary = vbisect:from_orddict(lists:map(fun binary_encode_kv/1, KVList)),
CRC = erlang:crc32(Binary),
[?BISECT_ENCODED, <<CRC:32>>, Binary];
hanoi2 ->
[ ?TAG_END |
TermData = [ ?TAG_END |
lists:map(fun ({Key,Value}) ->
crc_encapsulate_kv_entry(Key, Value)
end,
KVList) ]
end,
KVList) ],
{MethodName, OutData} = compress(Method, TermData),
{ok, [MethodName | OutData]}.
decode_index_node(Level, Data) ->
TermData = uncompress(Data),
case decode_kv_list(TermData) of
{ok, KVList} ->
{ok, {node, Level, KVList}};
{bisect, Binary} ->
% io:format("[page level=~p~n", [Level]),
% vbisect:foldl(fun(K,V,_) -> io:format(" ~p -> ~p,~n", [K,V]) end, 0, Binary),
% io:format("]~n",[]),
{ok, {node, Level, Binary}}
end.
binary_encode_kv({Key, {Value,infinity}}) ->
binary_encode_kv({Key,Value});
binary_encode_kv({Key, {?TOMBSTONE, TStamp}}) ->
{Key, <<?TAG_DELETED2, TStamp:32>>};
binary_encode_kv({Key, ?TOMBSTONE}) ->
{Key, <<?TAG_DELETED>>};
binary_encode_kv({Key, {Value, TStamp}}) when is_binary(Value) ->
{Key, <<?TAG_KV_DATA2, TStamp:32, Value/binary>>};
binary_encode_kv({Key, Value}) when is_binary(Value)->
{Key, <<?TAG_KV_DATA, Value/binary>>};
binary_encode_kv({Key, {Pos, Len}}) when Len < 16#ffffffff ->
{Key, <<?TAG_POSLEN32, Pos:64/unsigned, Len:32/unsigned>>}.
{ok, KVList} = decode_kv_list(TermData),
{ok, {node, Level, KVList}}.
-spec crc_encapsulate_kv_entry(binary(), expvalue()) -> iolist().
@ -218,14 +184,7 @@ decode_kv_list(<<?TAG_END, Custom/binary>>) ->
decode_kv_list(<<?ERLANG_ENCODED, _/binary>>=TermData) ->
{ok, erlang:term_to_binary(TermData)};
decode_kv_list(<<?CRC_ENCODED, Custom/binary>>) ->
decode_crc_data(Custom, [], []);
decode_kv_list(<<?BISECT_ENCODED, CRC:32/unsigned, Binary/binary>>) ->
CRCTest = erlang:crc32( Binary ),
if CRC == CRCTest ->
{bisect, Binary};
true ->
{bisect, vbisect:from_orddict([])}
end.
decode_crc_data(Custom, [], []).
-spec decode_crc_data(binary(), list(), list()) -> {ok, [kventry()]} | {partial, [kventry()], iolist()}.
decode_crc_data(<<>>, [], Acc) ->
@ -298,50 +257,12 @@ ensure_expiry(Opts) ->
undefined ->
try exit(err)
catch
exit:err:Stacktrace ->
io:format(user, "~p~n", [Stacktrace])
exit:err ->
io:format(user, "~p~n", [erlang:get_stacktrace()])
end,
exit(expiry_secs_not_set);
N when N >= 0 ->
ok
end.
bloom_type({ebloom, _}) ->
ebloom;
bloom_type({sbloom, _}) ->
sbloom.
bloom_new(Size, sbloom) ->
{ok, {sbloom, hanoidb_bloom:bloom(Size, 0.01)}};
bloom_new(Size, ebloom) ->
{ok, Bloom} = ebloom:new(Size, 0.01, Size),
{ok, {ebloom, Bloom}}.
bloom_to_bin({sbloom, Bloom}) ->
hanoidb_bloom:encode(Bloom);
bloom_to_bin({ebloom, Bloom}) ->
ebloom:serialize(Bloom).
bin_to_bloom(GZiped = <<16#1F, 16#8B, _/binary>>) ->
bin_to_bloom(GZiped, sbloom);
bin_to_bloom(TermBin = <<131, _/binary>>) ->
erlang:term_to_binary(TermBin);
bin_to_bloom(Blob) ->
bin_to_bloom(Blob, ebloom).
bin_to_bloom(Binary, sbloom) ->
{ok, {sbloom, hanoidb_bloom:decode(Binary)}};
bin_to_bloom(Binary, ebloom) ->
{ok, Bloom} = ebloom:deserialize(Binary),
{ok, {ebloom, Bloom}}.
bloom_insert({sbloom, Bloom}, Key) ->
{ok, {sbloom, hanoidb_bloom:add(Key, Bloom)}};
bloom_insert({ebloom, Bloom}, Key) ->
ok = ebloom:insert(Bloom, Key),
{ok, {ebloom, Bloom}}.
bloom_contains({sbloom, Bloom}, Key) ->
hanoidb_bloom:member(Key, Bloom);
bloom_contains({ebloom, Bloom}, Key) ->
ebloom:contains(Bloom, Key).

View file

@ -55,9 +55,9 @@
name :: string(),
bloom :: {ebloom, term()} | {sbloom, term()},
bloom :: term(),
block_size = ?NODE_SIZE :: integer(),
compress = none :: none | snappy | gzip | lz4,
compress = none :: none | snappy | gzip, % | lz4,
opts = [] :: list(any()),
value_count = 0 :: integer(),
@ -94,7 +94,7 @@ init([Name, Options]) ->
case do_open(Name, Options, [exclusive]) of
{ok, IdxFile} ->
ok = file:write(IdxFile, ?FILE_FORMAT),
{ok, Bloom} = ?BLOOM_NEW(Size),
Bloom = hanoidb_bloom:bloom(Size),
BlockSize = hanoidb:get_opt(block_size, Options, ?NODE_SIZE),
{ok, #state{ name=Name,
index_file_pos=?FIRST_BLOCK_POS, index_file=IdxFile,
@ -170,11 +170,11 @@ serialize(#state{ bloom=Bloom, index_file=File, index_file_pos=Position }=State)
exit({bad_position, Position, WrongPosition})
end,
ok = file:close(File),
erlang:term_to_binary( { State#state{ index_file=undefined, bloom=undefined }, ?BLOOM_TO_BIN(Bloom), hanoidb_util:bloom_type(Bloom) } ).
erlang:term_to_binary( { State#state{ index_file=undefined }, hanoidb_bloom:encode(Bloom) } ).
deserialize(Binary) ->
{State, Bin, Type} = erlang:binary_to_term(Binary),
{ok, Bloom} = ?BIN_TO_BLOOM(Bin, Type),
{State, Bin} = erlang:binary_to_term(Binary),
Bloom = hanoidb_bloom:decode(Bin),
{ok, IdxFile} = do_open(State#state.name, State#state.opts, []),
State#state{ bloom=Bloom, index_file=IdxFile }.
@ -188,8 +188,7 @@ do_open(Name, Options, OpenOpts) ->
%% @doc flush pending nodes and write trailer
archive_nodes(#state{ nodes=[], last_node_pos=LastNodePos, last_node_size=_LastNodeSize, bloom=Bloom, index_file=IdxFile }=State) ->
BloomBin = ?BLOOM_TO_BIN(Bloom),
true = is_binary(BloomBin),
BloomBin = hanoidb_bloom:encode(Bloom),
BloomSize = byte_size(BloomBin),
RootPos =
case LastNodePos of
@ -200,12 +199,12 @@ archive_nodes(#state{ nodes=[], last_node_pos=LastNodePos, last_node_size=_LastN
_ ->
LastNodePos
end,
Trailer = [ << 0:32/unsigned>> , BloomBin, << BloomSize:32/unsigned, RootPos:64/unsigned >> ],
Trailer = << 0:32/unsigned, BloomBin/binary, BloomSize:32/unsigned, RootPos:64/unsigned >>,
ok = file:write(IdxFile, Trailer),
ok = file:datasync(IdxFile),
ok = file:close(IdxFile),
{ok, State#state{ index_file=undefined, index_file_pos=undefined, bloom=undefined }};
{ok, State#state{ index_file=undefined, index_file_pos=undefined }};
archive_nodes(State=#state{ nodes=[#node{level=N, members=[{_,{Pos,_Len}}]}], last_node_pos=Pos })
when N > 0 ->
@ -222,8 +221,7 @@ append_node(Level, Key, Value, State=#state{ nodes=[] }) ->
append_node(Level, Key, Value, State=#state{ nodes=[ #node{level=Level2 } |_]=Stack })
when Level < Level2 ->
append_node(Level, Key, Value, State#state{ nodes=[ #node{ level=(Level2 - 1) } | Stack] });
append_node(Level, Key, Value, #state{ nodes=[ #node{level=Level, members=List, size=NodeSize}=CurrNode | RestNodes ], value_count=VC, tombstone_count=TC, bloom=Bloom }=State)
when Bloom /= undefined ->
append_node(Level, Key, Value, #state{ nodes=[ #node{level=Level, members=List, size=NodeSize}=CurrNode | RestNodes ], value_count=VC, tombstone_count=TC, bloom=Bloom }=State) ->
%% The top-of-stack node is at the level we wish to insert at.
%% Assert that keys are increasing:
@ -238,14 +236,10 @@ append_node(Level, Key, Value, #state{ nodes=[ #node{level=Level, members=List,
exit({badarg, Key})
end
end,
NewSize = NodeSize + hanoidb_util:estimate_node_size_increment(List, Key, Value),
{ok,Bloom2} = case Level of
0 ->
?BLOOM_INSERT(Bloom, Key);
_ ->
{ok,Bloom}
end,
NewBloom = hanoidb_bloom:add(Key, Bloom),
{TC1, VC1} =
case Level of
@ -264,7 +258,7 @@ append_node(Level, Key, Value, #state{ nodes=[ #node{level=Level, members=List,
NodeMembers = [{Key, Value} | List],
State2 = State#state{ nodes=[CurrNode#node{members=NodeMembers, size=NewSize} | RestNodes],
value_count=VC1, tombstone_count=TC1, bloom=Bloom2 },
value_count=VC1, tombstone_count=TC1, bloom=NewBloom },
case NewSize >= State#state.block_size of
true ->
@ -273,8 +267,7 @@ append_node(Level, Key, Value, #state{ nodes=[ #node{level=Level, members=List,
{ok, State2}
end.
flush_node_buffer(#state{nodes=[#node{ level=Level, members=NodeMembers }|RestNodes], compress=Compress, index_file_pos=NodePos } = State) ->
flush_node_buffer(#state{nodes=[#node{ level=Level, members=NodeMembers }|RestNodes], compress=Compress, index_file_pos=NodePos} = State) ->
OrderedMembers = lists:reverse(NodeMembers),
{ok, BlockData} = hanoidb_util:encode_index_node(OrderedMembers, Compress),

View file

@ -1,280 +0,0 @@
%% ----------------------------------------------------------------------------
%%
%% hanoidb: LSM-trees (Log-Structured Merge Trees) Indexed Storage
%%
%% Copyright 2014 (c) Trifork A/S. All Rights Reserved.
%% http://trifork.com/ info@trifork.com
%%
%% This file is provided to you under the Apache License, Version 2.0 (the
%% "License"); you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
%% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
%% License for the specific language governing permissions and limitations
%% under the License.
%%
%% ----------------------------------------------------------------------------
-module(vbisect).
-export([from_orddict/1,
from_gb_tree/1,
to_gb_tree/1,
first_key/1,
find/2, find_geq/2,
foldl/3, foldr/3, fold_until_stop/3,
to_orddict/1,
merge/3]).
-define(MAGIC, "vbis").
-type key() :: binary().
-type value() :: binary().
-type bindict() :: binary().
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
-spec from_gb_tree(gb_trees:tree()) -> bindict().
from_gb_tree({Count,Node}) when Count =< 16#ffffffff ->
{_BinSize,IOList} = encode_gb_node(Node),
erlang:iolist_to_binary([ <<?MAGIC, Count:32/unsigned >> | IOList ]).
encode_gb_node({Key, Value, Smaller, Bigger}) when is_binary(Key), is_binary(Value) ->
{BinSizeSmaller, IOSmaller} = encode_gb_node(Smaller),
{BinSizeBigger, IOBigger} = encode_gb_node(Bigger),
KeySize = byte_size(Key),
ValueSize = byte_size(Value),
{ 2 + KeySize
+ 4 + ValueSize
+ 4 + BinSizeSmaller
+ BinSizeBigger,
[ << KeySize:16, Key/binary,
BinSizeSmaller:32 >>, IOSmaller,
<< ValueSize:32, Value/binary >> | IOBigger ] };
encode_gb_node(nil) ->
{ 0, [] }.
to_gb_tree(<<?MAGIC, Count:32, Nodes/binary >>) ->
{ Count, to_gb_node(Nodes) }.
to_gb_node( <<>> ) ->
nil;
to_gb_node( << KeySize:16, Key:KeySize/binary,
BinSizeSmaller:32, Smaller:BinSizeSmaller/binary,
ValueSize:32, Value:ValueSize/binary,
Bigger/binary >> ) ->
{Key, Value,
to_gb_node(Smaller),
to_gb_node(Bigger)}.
-spec find(Key::key(), Dict::bindict()) ->
{ ok, value() } | error.
find(Key, <<?MAGIC, _:32, Binary/binary>>) ->
find_node(byte_size(Key), Key, Binary).
find_node(KeySize, Key, <<HereKeySize:16, HereKey:HereKeySize/binary,
BinSizeSmaller:32, _:BinSizeSmaller/binary,
ValueSize:32, Value:ValueSize/binary,
_/binary>> = Bin) ->
if
Key < HereKey ->
Skip = 6 + HereKeySize,
<< _:Skip/binary, Smaller:BinSizeSmaller/binary, _/binary>> = Bin,
find_node(KeySize, Key, Smaller);
HereKey < Key ->
Skip = 10 + HereKeySize + BinSizeSmaller + ValueSize,
<< _:Skip/binary, Bigger/binary>> = Bin,
find_node(KeySize, Key, Bigger);
true ->
{ok, Value}
end;
find_node(_, _, <<>>) ->
error.
to_orddict(BinDict) ->
foldr(fun(Key,Value,Acc) ->
[{Key,Value}|Acc]
end,
[],
BinDict).
merge(Fun, BinDict1, BinDict2) ->
OD1 = to_orddict(BinDict1),
OD2 = to_orddict(BinDict2),
OD3 = orddict:merge(Fun, OD1, OD2),
from_orddict(OD3).
-spec first_key( bindict() ) -> binary() | none.
first_key(BinDict) ->
{_, Key} = fold_until_stop(fun({K,_},_) -> {stop, K} end, none, BinDict),
Key.
%% @doc Find largest {K,V} where K is smaller than or equal to key.
%% This is good for an inner node where key is the smallest key
%% in the child node.
-spec find_geq(Key::binary(), Binary::binary()) ->
none | {ok, Key::key(), Value::value()}.
find_geq(Key, <<?MAGIC, _:32, Binary/binary>>) ->
find_geq_node(byte_size(Key), Key, Binary, none).
find_geq_node(_, _, <<>>, Else) ->
Else;
find_geq_node(KeySize, Key, <<HereKeySize:16, HereKey:HereKeySize/binary,
BinSizeSmaller:32, _:BinSizeSmaller/binary,
ValueSize:32, Value:ValueSize/binary,
_/binary>> = Bin, Else) ->
if
Key < HereKey ->
Skip = 6 + HereKeySize,
<< _:Skip/binary, Smaller:BinSizeSmaller/binary, _/binary>> = Bin,
find_geq_node(KeySize, Key, Smaller, Else);
HereKey < Key ->
Skip = 10 + HereKeySize + BinSizeSmaller + ValueSize,
<< _:Skip/binary, Bigger/binary>> = Bin,
find_geq_node(KeySize, Key, Bigger, {ok, HereKey, Value});
true ->
{ok, HereKey, Value}
end.
-spec foldl(fun((Key::key(), Value::value(), Acc::term()) -> term()), term(), bindict()) ->
term().
foldl(Fun, Acc, <<?MAGIC, _:32, Binary/binary>>) ->
foldl_node(Fun, Acc, Binary).
foldl_node(_Fun, Acc, <<>>) ->
Acc;
foldl_node(Fun, Acc, <<KeySize:16, Key:KeySize/binary,
BinSizeSmaller:32, Smaller:BinSizeSmaller/binary,
ValueSize:32, Value:ValueSize/binary,
Bigger/binary>>) ->
Acc1 = foldl_node(Fun, Acc, Smaller),
Acc2 = Fun(Key, Value, Acc1),
foldl_node(Fun, Acc2, Bigger).
-spec fold_until_stop(function(), term(), bindict()) -> {stopped, term()} | {ok, term()}.
fold_until_stop(Fun, Acc, <<?MAGIC, _:32, Bin/binary>>) ->
fold_until_stop2(Fun, {continue, Acc}, Bin).
fold_until_stop2(_Fun,{stop,Result},_) ->
{stopped, Result};
fold_until_stop2(_Fun,{continue, Acc},<<>>) ->
{ok, Acc};
fold_until_stop2(Fun,{continue, Acc}, <<KeySize:16, Key:KeySize/binary,
BinSizeSmaller:32, Smaller:BinSizeSmaller/binary,
ValueSize:32, Value:ValueSize/binary,
Bigger/binary>>) ->
case fold_until_stop2(Fun, {continue, Acc}, Smaller) of
{stopped, Result} ->
{stopped, Result};
{ok, Acc1} ->
ContinueOrStopAcc = Fun({Key,Value}, Acc1),
fold_until_stop2(Fun, ContinueOrStopAcc, Bigger)
end.
-spec foldr(fun((Key::key(), Value::value(), Acc::term()) -> term()), term(), bindict()) ->
term().
foldr(Fun, Acc, <<?MAGIC, _:32, Binary/binary>>) ->
foldr_node(Fun, Acc, Binary).
foldr_node(_Fun, Acc, <<>>) ->
Acc;
foldr_node(Fun, Acc, <<KeySize:16, Key:KeySize/binary,
BinSizeSmaller:32, Smaller:BinSizeSmaller/binary,
ValueSize:32, Value:ValueSize/binary,
Bigger/binary>>) ->
Acc1 = foldr_node(Fun, Acc, Bigger),
Acc2 = Fun(Key, Value, Acc1),
foldr_node(Fun, Acc2, Smaller).
from_orddict(OrdDict) ->
from_gb_tree(gb_trees:from_orddict(OrdDict)).
-ifdef(TEST).
speed_test_() ->
{timeout, 600,
fun() ->
Start = 100000000000000,
N = 100000,
Keys = lists:seq(Start, Start+N),
KeyValuePairs = lists:map(fun (I) -> {<<I:64/integer>>, <<255:8/integer>>} end,
Keys),
%% Will mostly be unique, if N is bigger than 10000
ReadKeys = [<<(lists:nth(random:uniform(N), Keys)):64/integer>> || _ <- lists:seq(1, 1000)],
B = from_orddict(KeyValuePairs),
time_reads(B, N, ReadKeys)
end}.
geq_test() ->
B = from_orddict([{<<2>>,<<2>>},{<<4>>,<<4>>},{<<6>>,<<6>>},{<<122>>,<<122>>}]),
none = find_geq(<<1>>, B),
{ok, <<2>>, <<2>>} = find_geq(<<2>>, B),
{ok, <<2>>, <<2>>} = find_geq(<<3>>, B),
{ok, <<4>>, <<4>>} = find_geq(<<5>>, B),
{ok, <<6>>, <<6>>} = find_geq(<<100>>, B),
{ok, <<122>>, <<122>>} = find_geq(<<150>>, B),
true.
time_reads(B, Size, ReadKeys) ->
Parent = self(),
spawn(
fun() ->
Runs = 20,
Timings =
lists:map(
fun (_) ->
StartTime = now(),
find_many(B, ReadKeys),
timer:now_diff(now(), StartTime)
end, lists:seq(1, Runs)),
Rps = 1000000 / ((lists:sum(Timings) / length(Timings)) / 1000),
error_logger:info_msg("Average over ~p runs, ~p keys in dict~n"
"Average fetch ~p keys: ~p us, max: ~p us~n"
"Average fetch 1 key: ~p us~n"
"Theoretical sequential RPS: ~w~n",
[Runs, Size, length(ReadKeys),
lists:sum(Timings) / length(Timings),
lists:max(Timings),
(lists:sum(Timings) / length(Timings)) / length(ReadKeys),
trunc(Rps)]),
Parent ! done
end),
receive done -> ok after 1000 -> ok end.
-spec find_many(bindict(), [key()]) -> non_neg_integer().
find_many(B, Keys) ->
lists:foldl(fun (K, N) ->
case find(K, B) of
{ok, _} -> N+1;
error -> N
end
end,
0, Keys).
-endif.

View file

@ -25,6 +25,8 @@
%% @doc Drive a set of LSM BTrees
-module(hanoidb_drv).
-ifdef(QC_PROPER).
-behaviour(gen_server).
%% API
@ -141,3 +143,4 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-endif. %% -ifdef(QC_PROPER).

View file

@ -24,6 +24,8 @@
-module(hanoidb_merger_tests).
-ifdef(QC_PROPER).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
@ -61,3 +63,4 @@ merge_test() ->
ok.
-endif. %% -ifdef(QC_PROPER).

View file

@ -24,6 +24,8 @@
-module(hanoidb_tests).
-ifdef(QC_PROPER).
-include("include/hanoidb.hrl").
-include("src/hanoidb.hrl").
@ -47,15 +49,9 @@
next_state/3, postcondition/3,
precondition/2]).
-ifdef(pre18).
-define(OTP_DICT, dict()).
-else.
-define(OTP_DICT, dict:dict()).
-endif.
-record(tree, { elements = dict:new() :: ?OTP_DICT }).
-record(state, { open = dict:new() :: ?OTP_DICT,
closed = dict:new() :: ?OTP_DICT}).
-record(tree, { elements = dict:new() :: dict() }).
-record(state, { open = dict:new() :: dict(),
closed = dict:new() :: dict()}).
-define(SERVER, hanoidb_drv).
full_test_() ->
@ -67,21 +63,13 @@ full_test_() ->
?_test(test_tree_simple_5())
]}.
longer_tree_test_() ->
{setup,
spawn,
fun () -> ok end,
fun (_) -> ok end,
[
{timeout, 300, ?_test(test_tree())}
]}.
longer_qc_test_() ->
longer_test_() ->
{setup,
spawn,
fun () -> ok end,
fun (_) -> ok end,
[
{timeout, 300, ?_test(test_tree())},
{timeout, 120, ?_test(test_qc())}
]}.
@ -440,3 +428,4 @@ dict_range_query(Dict, Range) ->
[{K, V} || {K, V} <- dict:to_list(Dict),
?KEY_IN_RANGE(K, Range)].
-endif. %% -ifdef(QC_PROPER).

View file

@ -24,6 +24,8 @@
-module(hanoidb_writer_tests).
-ifdef(QC_PROPER).
-ifdef(TEST).
-ifdef(TEST).
-ifdef(TRIQ).
@ -62,9 +64,9 @@ simple_test() ->
simple1_test() ->
file:delete("testdata"),
{ok, BT} = hanoidb_writer:open("testdata", [{block_size, 102},{expiry_secs, 0}]),
{ok, BT} = hanoidb_writer:open("testdata", [{block_size, 1024},{expiry_secs, 0}]),
Max = 102,
Max = 1024,
Seq = lists:seq(0, Max),
{Time1,_} = timer:tc(
@ -78,16 +80,15 @@ simple1_test() ->
end,
[]),
error_logger:info_msg("time to insert: ~p/sec~n", [1000000/(Time1/Max)]),
% error_logger:info_msg("time to insert: ~p/sec~n", [1000000/(Time1/Max)]),
{ok, IN} = hanoidb_reader:open("testdata", [{expiry_secs,0}]),
Middle = Max div 2,
io:format("LOOKING UP ~p~n", [<<Middle:128>>]),
{ok, <<"valuevalue/", Middle:128>>} = hanoidb_reader:lookup(IN, <<Middle:128>>),
{Time2,Count} = timer:tc(
fun() -> hanoidb_reader:fold(fun(_Key, <<"valuevalue/", N:128>>, N) ->
fun() -> hanoidb_reader:fold(fun(Key, <<"valuevalue/", Key/binary>>, N) ->
N+1
end,
0,
@ -95,13 +96,12 @@ simple1_test() ->
end,
[]),
io:format("time to scan: ~p/sec~n", [1000000/(Time2 div Max)]),
% error_logger:info_msg("time to scan: ~p/sec~n", [1000000/(Time2/Max)]),
Max = Count-1,
{Time3,{done,Count2}} = timer:tc(
fun() -> hanoidb_reader:range_fold(fun(_Key, <<"valuevalue/", N:128>>, N) ->
% io:format("[~p]~n", N),
fun() -> hanoidb_reader:range_fold(fun(Key, <<"valuevalue/", Key/binary>>, N) ->
N+1
end,
0,
@ -110,13 +110,12 @@ simple1_test() ->
end,
[]),
% error_logger:info_msg("time to range_fold: ~p/sec~n", [1000000/(Time3/Max)]),
%error_logger:info_msg("time to range_fold: ~p/sec~n", [1000000/(Time3 div Max)]),
io:format("count2=~p~n", [Count2]),
% error_logger:info_msg("count2=~p~n", [Count2]),
Max = Count2-1,
ok = hanoidb_reader:close(IN).
-endif. %% -ifdef(QC_PROPER).

View file

@ -1,83 +0,0 @@
#!/bin/bash
## ----------------------------------------------------------------------------
##
## hanoi: LSM-trees (Log-Structured Merge Trees) Indexed Storage
##
## Copyright 2011-2012 (c) Trifork A/S. All Rights Reserved.
## http://trifork.com/ info@trifork.com
##
## Copyright 2012 (c) Basho Technologies, Inc. All Rights Reserved.
## http://basho.com/ info@basho.com
##
## This file is provided to you under the Apache License, Version 2.0 (the
## "License"); you may not use this file except in compliance with the License.
## You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
## WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
## License for the specific language governing permissions and limitations
## under the License.
##
## ----------------------------------------------------------------------------
function periodic() {
t=0
while sleep 1 ; do
let "t=t+1"
printf "%5d [" "$t"
for ((i=0; i<35; i++)) ; do
if ! [ -f "A-$i.data" ] ; then
echo -n " "
elif ! [ -f "B-$i.data" ] ; then
echo -n "-"
elif ! [ -f "C-$i.data" ] ; then
echo -n "#"
elif ! [ -f "X-$i.data" ] ; then
echo -n "="
else
echo -n "*"
fi
done
echo
done
}
function dynamic() {
local old s t start now
t=0
start=`date +%s`
while true ; do
s=""
for ((i=0; i<35; i++)) ; do
if ! [ -f "A-$i.data" ] ; then
s="$s "
elif ! [ -f "B-$i.data" ] ; then
s="$s-"
elif ! [ -f "C-$i.data" ] ; then
s="$s="
elif ! [ -f "X-$i.data" ] ; then
s="$s%"
else
s="$s*"
fi
done
if [[ "$s" != "$old" ]] ; then
let "t=t+1"
now=`date +%s`
let "now=now-start"
printf "%5d %6d [%s\n" "$t" "$now" "$s"
old="$s"
else
# Sleep a little bit:
perl -e 'use Time::HiRes; Time::HiRes::usleep(100000)'
fi
done
}
dynamic

View file

@ -48,13 +48,13 @@ function periodic() {
}
merge_diff() {
SA=`stat -c %s A-${ID}.data 2> /dev/null`
SB=`stat -c %s B-${ID}.data 2> /dev/null`
SX=`stat -c %s X-${ID}.data 2> /dev/null`
SA=`ls -l A-${ID}.data 2> /dev/null | awk '{print $5}'`
SB=`ls -l B-${ID}.data 2> /dev/null | awk '{print $5}'`
SX=`ls -l X-${ID}.data 2> /dev/null | awk '{print $5}'`
if [ \( -n "$SA" \) -a \( -n "$SB" \) -a \( -n "$SX" \) ]; then
export RES=`expr ${SX:-0} / \( $SA + $SB \)`
export RES=`expr ${SX}0 / \( $SA + $SB \)`
else
export RES=""
export RES="?"
fi
}
@ -64,7 +64,7 @@ function dynamic() {
start=`date +%s`
while true ; do
s=""
for ((i=10; i<22; i++)) ; do
for ((i=8; i<22; i++)) ; do
if [ -f "C-$i.data" ] ; then
s="${s}C"
else

View file

@ -1,66 +0,0 @@
#!/usr/bin/env bash
merge_diff() {
SA=`stat -c %s A-${ID}.data 2> /dev/null`
SB=`stat -c %s B-${ID}.data 2> /dev/null`
SX=`stat -c %s X-${ID}.data 2> /dev/null`
if [ \( -n "$SA" \) -a \( -n "$SB" \) -a \( -n "$SX" \) ]; then
export RES="$(expr ${SX:-0} / \( $SA + $SB \))"
else
export RES=0
fi
if [[ $RES -eq 0 ]]; then
export RES="⋈"
fi
}
function dynamic() {
local old s t start now
t=0
start=`date +%s`
while true ; do
s=""
for ((i=10; i<22; i++)) ; do
if [ -f "C-$i.data" ] ; then
s="${s}"
else
s="$s "
fi
if [ -f "B-$i.data" ] ; then
s="${s}"
else
s="$s "
fi
if [ -f "A-$i.data" ] ; then
s="${s}"
else
s="$s "
fi
if [ -f "X-$i.data" ] ; then
export ID="$i"
merge_diff
s="${s}$RES"
elif [ -f "M-$i.data" ] ; then
s="${s}M"
else
s="$s "
fi
s="$s|"
done
if [[ "$s" != "$old" ]] ; then
let "t=t+1"
now=`date +%s`
let "now=now-start"
free=`df -m . 2> /dev/null | tail -1 | awk '{print $4}'`
used=`du -m 2> /dev/null | awk '{print $1}' `
printf "%5d %6d [%s\n" "$t" "$now" "$s ${used}MB (${free}MB free)"
old="$s"
else
# Sleep a little bit:
sleep 1
fi
done
}
dynamic