Compare commits

..

2 commits

Author SHA1 Message Date
Kresten Krab Thorup
de75b5ed3d Improve types/specs
This patch also removes array representations
from the bloom implementation, as it is unused.
2014-04-02 23:51:23 +02:00
Kresten Krab Thorup
0d5ec3738e Move basho_bench driver to util 2014-04-02 23:48:38 +02:00
34 changed files with 461 additions and 1273 deletions

5
.envrc
View file

@ -1,5 +0,0 @@
if ! has nix_direnv_version || ! nix_direnv_version 3.0.4; then
source_url "https://raw.githubusercontent.com/nix-community/nix-direnv/3.0.4/direnvrc" "sha256-DzlYZ33mWF/Gs8DDeyjr8mnVmQGx7ASYqA5WlxwvBG4="
fi
watch_file devShell.nix shell.nix flake.nix
use flake || use nix

3
.gitignore vendored
View file

@ -3,6 +3,3 @@ deps
*~
.eunit
.project
_build
.direnv

View file

@ -1,8 +0,0 @@
language: erlang
otp_release:
- R16B03
- R15B03
- 17.0
- 18.0

View file

@ -1,12 +1,49 @@
REBAR= rebar3
REBAR= rebar
DIALYZER= dialyzer
DEPS = $(CURDIR)/deps
DIALYZER_OPTS = # -Wunderspecs
# List dependencies that should be included in a cached dialyzer PLT file.
DIALYZER_DEPS = deps/lz4/ebin \
deps/snappy/ebin \
deps/plain_fsm/ebin
DEPS_PLT = hanoi.plt
ERLANG_DIALYZER_APPS = asn1 \
compiler \
crypto \
edoc \
edoc \
erts \
eunit \
eunit \
gs \
hipe \
inets \
kernel \
mnesia \
mnesia \
observer \
public_key \
runtime_tools \
runtime_tools \
ssl \
stdlib \
syntax_tools \
syntax_tools \
tools \
webtool \
xmerl
.PHONY: plt analyze all deps compile get-deps clean
all: get-deps compile
all: compile
deps: get-deps compile
deps: get-deps
get-deps:
@$(REBAR) get-deps
@ -31,18 +68,13 @@ clean-test-btrees:
plt: compile
$(DIALYZER) --build_plt --output_plt .hanoi.plt \
-pa deps/snappy/ebin \
-pa deps/snappy/ebin \
-pa deps/lz4/ebin \
-pa deps/ebloom/ebin \
-pa deps/plain_fsm/ebin \
deps/plain_fsm/ebin \
--apps erts kernel stdlib ebloom lz4 snappy
--apps kernel stdlib
analyze: compile
$(DIALYZER) --plt .hanoi.plt \
-pa deps/snappy/ebin \
-pa deps/lz4/ebin \
-pa deps/ebloom/ebin \
-pa deps/plain_fsm/ebin \
ebin
@ -52,5 +84,22 @@ analyze-nospec: compile
--no_spec \
ebin
# Only include local PLT if we have deps that we are going to analyze
ifeq ($(strip $(DIALYZER_DEPS)),)
dialyzer: ~/.dialyzer_plt compile
@dialyzer $(DIALYZER_OPTS) -r ebin
else
dialyzer: ~/.dialyzer_plt $(DEPS_PLT) compile
@dialyzer $(DIALYZER_OPTS) --plts ~/.dialyzer_plt $(DEPS_PLT) -r ebin
$(DEPS_PLT):
@dialyzer --build_plt $(DIALYZER_DEPS) --output_plt $(DEPS_PLT)
endif
~/.dialyzer_plt:
@echo "ERROR: Missing ~/.dialyzer_plt. Please wait while a new PLT is compiled."
dialyzer --build_plt --apps $(ERLANG_DIALYZER_APPS)
@echo "now try your build again"
repl:
erl -pz deps/*/ebin -pa ebin

View file

@ -1,7 +1,5 @@
# HanoiDB Indexed Key/Value Storage
[![Build Status](https://travis-ci.org/krestenkrab/hanoidb.svg?branch=master)](https://travis-ci.org/krestenkrab/hanoidb)
HanoiDB implements an indexed, key/value storage engine. The primary index is
a log-structured merge tree (LSM-BTree) implemented using "doubling sizes"
persistent ordered sets of key/value pairs, similar is some regards to
@ -82,13 +80,7 @@ Put these values in your `app.config` in the `hanoidb` section
%% Both have same log2(N) worst case, but `fast' is
%% sometimes faster; yielding latency fluctuations.
%%
{merge_strategy, fast | predictable},
%% "Level0" files has 2^N KVs in it, defaulting to 1024.
%% If the database is to contain very small KVs, this is
%% likely too small, and will result in many unnecessary
%% file operations. (Subsequent levels double in size).
{top_level, 10} % 1024 Key/Values
{merge_strategy, fast | predictable}
]},
```

1
TODO
View file

@ -1,4 +1,5 @@
* Phase 1: Minimum viable product (in order of priority)
* lager; check for uses of lager:error/2
* configurable TOP_LEVEL size
* test new snappy compression support
* status and statistics

View file

@ -1,61 +0,0 @@
{
"nodes": {
"nixpkgs": {
"locked": {
"lastModified": 1701282334,
"narHash": "sha256-MxCVrXY6v4QmfTwIysjjaX0XUhqBbxTWWB4HXtDYsdk=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "057f9aecfb71c4437d2b27d3323df7f93c010b7e",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "23.11",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"nixpkgs": "nixpkgs",
"utils": "utils"
}
},
"systems": {
"locked": {
"lastModified": 1681028828,
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
"owner": "nix-systems",
"repo": "default",
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
"type": "github"
},
"original": {
"owner": "nix-systems",
"repo": "default",
"type": "github"
}
},
"utils": {
"inputs": {
"systems": "systems"
},
"locked": {
"lastModified": 1710146030,
"narHash": "sha256-SZ5L6eA7HJ/nmkzGG7/ISclqe6oZdOZTNoesiInkXPQ=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "b1d9ab70662946ef0850d488da1c9019f3a9752a",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "flake-utils",
"type": "github"
}
}
},
"root": "root",
"version": 7
}

View file

@ -1,35 +0,0 @@
{
description = "A Helm Chart for OpenLDAP.";
inputs = {
# nixpkgs.url = "github:NixOS/nixpkgs/nixpkgs-unstable";
nixpkgs.url = "github:NixOS/nixpkgs/23.11";
utils.url = "github:numtide/flake-utils";
};
outputs =
{ self, nixpkgs, ... }@inputs:
inputs.utils.lib.eachSystem
[
"x86_64-linux"
"i686-linux"
"aarch64-linux"
"x86_64-darwin"
]
(
system:
let
pkgs = import nixpkgs {
inherit system;
overlays = [ ];
config.allowUnfree = true;
};
in
{
flake-utils.inputs.systems.follows = "system";
formatter = with pkgs; [ nixfmt ];
devShells.default = import ./shell.nix { inherit pkgs; };
DOCKER_BUILDKIT = 1;
}
);
}

View file

@ -4,7 +4,7 @@
{eunit_opts, [verbose, {report, {eunit_surefire, [{dir, "."}]}}]}.
{erl_opts, [%{d,'DEBUG',true},
%{d,'USE_EBLOOM',true},
{parse_transform, lager_transform},
fail_on_warning,
warn_unused_vars,
warn_export_all,
@ -20,20 +20,17 @@
warn_untyped_record,
% warn_missing_spec,
% strict_validation,
{platform_define, "^R|17", pre18},
debug_info]}.
{xref_checks, [undefined_function_calls]}.
{deps, [ {sext, ".*", {git, "https://github.com/uwiger/sext.git", {branch, "master"}}}
%% , {snappy, "1.*", {git, "https://github.com/fdmanana/snappy-erlang-nif.git", {branch, "master"}}}
, {plain_fsm, "1.*", {git, "https://github.com/gburd/plain_fsm.git", {branch, "master"}}}
%% , {basho_bench, ".*", {git, "https://github.com/basho/basho_bench.git", {branch, "master"}}}
%% , {ebloom, ".*", {git, "https://github.com/basho/ebloom.git", {branch, "develop"}}}
%% , {bloomerl, ".*", {git, "https://github.com/gburd/bloomerl.git", {branch, "master"}}}
, {lz4, ".*", {git, "https://github.com/krestenkrab/erlang-lz4.git", {branch, "master"}}}
, {triq, ".*", {git, "https://github.com/gburd/triq.git", {branch, "master"}}}
% , {edown, "0.3.*", {git, "git://github.com/uwiger/edown.git", {branch, "master"}}}
{deps, [ {sext, ".*", {git, "git://github.com/esl/sext", {branch, "master"}}}
, {lager, ".*", {git, "git://github.com/basho/lager", {branch, "master"}}}
, {snappy, "1.1.*", {git, "git://github.com/fdmanana/snappy-erlang-nif.git", {branch, "master"}}}
, {plain_fsm, "1.1.*", {git, "git://github.com/gburd/plain_fsm", {branch, "master"}}}
% , {basho_bench, ".*", {git, "git://github.com/basho/basho_bench", {branch, "master"}}}
, {lz4, ".*", {git, "git://github.com/krestenkrab/erlang-lz4.git", {branch, "master"}}}
% , {edown, "0.3.*", {git, "git://github.com/esl/edown.git", {branch, "master"}}}
% , {asciiedoc, "0.1.*", {git, "git://github.com/norton/asciiedoc.git", {branch, "master"}}}
% , {triq, ".*", {git, "git://github.com/krestenkrab/triq.git", {branch, "master"}}}
% , {proper, ".*", {git, "git://github.com/manopapad/proper.git", {branch, "master"}}}

View file

@ -1,16 +0,0 @@
[{<<"lz4">>,
{git,"https://github.com/krestenkrab/erlang-lz4.git",
{ref,"5fd90ca1e2345bdc359ee43d958da87fafb4fd78"}},
0},
{<<"plain_fsm">>,
{git,"https://github.com/gburd/plain_fsm.git",
{ref,"6421158d742956836dfa39fca857422afcf56419"}},
0},
{<<"sext">>,
{git,"https://github.com/uwiger/sext.git",
{ref,"c22486add9cc374dc8138b1f547c0999a1922a65"}},
0},
{<<"triq">>,
{git,"https://github.com/gburd/triq.git",
{ref,"5d4b98e8323eec70aff474a578a1e5ebe9495e70"}},
0}].

View file

@ -1,62 +0,0 @@
{
pkgs ? import <nixpkgs> { },
}:
with pkgs;
mkShell rec {
name = "hanoidb";
packages = with pkgs; [
bashInteractive
gigalixir
inotify-tools
libnotify
];
buildInputs = [
# basics
curl
git
openssh
ripgrep
shellcheck
erlang
erlang-ls
rebar3
erlfmt
# BEAM support
#beam.interpreters.erlangR26
#rebar3
#beam.packages.erlangR26.elixir_1_15
#nodejs-18_x
# elixir-typst support
#pkgs.iconv
# rust support
#cargo
];
shellHook =
let
icon = "f121";
in
''
export PS1="$(echo -e '\u${icon}') {\[$(tput sgr0)\]\[\033[38;5;228m\]\w\[$(tput sgr0)\]\[\033[38;5;15m\]} (${name}) \\$ \[$(tput sgr0)\]"
alias gitc="git -c user.email=greg@burd.me commit --gpg-sign=22931AF7895E82DF ."
# allows mix to work on the local directory
mkdir -p .nix-mix
mkdir -p .nix-hex
export MIX_HOME=$PWD/.nix-mix
export HEX_HOME=$PWD/.nix-hex
export PATH=$MIX_HOME/bin:$PATH
export PATH=$HEX_HOME/bin:$PATH
export LANG=en_US.UTF-8
export ERL_AFLAGS="-kernel shell_history enabled"
'';
}
# NOTES:
# * https://github.com/fbettag/nixos-erlang-flake/blob/main/flake.nix
# * https://discourse.nixos.org/t/installing-different-versions-of-elixir-and-erlang/35765/5
# * https://github.com/fbettag/nixos-erlang-flake/blob/main/flake.nix

View file

@ -5,7 +5,7 @@
% author: http://erlang.2086793.n4.nabble.com/gb-trees-fold-td2228614.html
-spec fold(fun((term(), term(), term()) -> term()), term(), gb_trees:tree()) -> term().
-spec fold(fun((term(), term(), term()) -> term()), term(), gb_tree()) -> term().
fold(F, A, {_, T})
when is_function(F, 3) ->
fold_1(F, A, T).

View file

@ -24,13 +24,12 @@
{application, hanoidb,
[
{description, "LSM-tree tiered storage engine with amortized merging"},
{description, ""},
{vsn, "1.3.0"},
{registered, []},
{applications, [
kernel,
stdlib,
plain_fsm
stdlib
]},
{mod, {hanoidb_app, []}},
{env, []}

View file

@ -70,7 +70,6 @@
| {sync_strategy, none | sync | {seconds, pos_integer()}}
| {expiry_secs, non_neg_integer()}
| {spawn_opt, list()}
| {top_level, pos_integer()}
.
%% @doc
@ -210,9 +209,9 @@ receive_fold_range(MRef,PID,Fun,Acc0, Limit) ->
try
{ok, Fun(K,V,Acc0)}
catch
Class:Exception:Stacktrace ->
Class:Exception ->
% TODO ?log("Exception in hanoidb fold: ~p ~p", [Exception, erlang:get_stacktrace()]),
{'EXIT', Class, Exception, Stacktrace}
{'EXIT', Class, Exception, erlang:get_stacktrace()}
end
of
{ok, Acc1} ->
@ -279,26 +278,24 @@ init([Dir, Opts0]) ->
end,
hanoidb_util:ensure_expiry(Opts),
{Top, Nur, Max} =
{Nursery, MaxLevel, TopLevel} =
case file:read_file_info(Dir) of
{ok, #file_info{ type=directory }} ->
{ok, TopLevel, MinLevel, MaxLevel} = open_levels(Dir, Opts),
{ok, Nursery} = hanoidb_nursery:recover(Dir, TopLevel, MinLevel, MaxLevel, Opts),
{TopLevel, Nursery, MaxLevel};
{ok, TL, ML} = open_levels(Dir, Opts),
{ok, N0} = hanoidb_nursery:recover(Dir, TL, ML, Opts),
{N0, ML, TL};
{error, E} when E =:= enoent ->
ok = file:make_dir(Dir),
MinLevel = get_opt(top_level, Opts0, ?TOP_LEVEL),
{ok, TopLevel} = hanoidb_level:open(Dir, MinLevel, undefined, Opts, self()),
MaxLevel = MinLevel,
{ok, Nursery} = hanoidb_nursery:new(Dir, MinLevel, MaxLevel, Opts),
{TopLevel, Nursery, MaxLevel}
end,
{ok, #state{ top=Top, dir=Dir, nursery=Nur, opt=Opts, max_level=Max }}.
{ok, TL} = hanoidb_level:open(Dir, ?TOP_LEVEL, undefined, Opts, self()),
ML = ?TOP_LEVEL,
{ok, N0} = hanoidb_nursery:new(Dir, ML, Opts),
{N0, ML, TL}
end,
{ok, #state{ top=TopLevel, dir=Dir, nursery=Nursery, opt=Opts, max_level=MaxLevel }}.
open_levels(Dir, Options) ->
{ok, Files} = file:list_dir(Dir),
TopLevel0 = get_opt(top_level, Options, ?TOP_LEVEL),
%% parse file names and find max level
{MinLevel, MaxLevel} =
@ -311,7 +308,7 @@ open_levels(Dir, Options) ->
{MinLevel, MaxLevel}
end
end,
{TopLevel0, TopLevel0},
{?TOP_LEVEL, ?TOP_LEVEL},
Files),
%% remove old nursery data file
@ -326,17 +323,17 @@ open_levels(Dir, Options) ->
{Level, MergeWork}
end,
{undefined, 0},
lists:seq(MaxLevel, MinLevel, -1)),
WorkPerIter = (MaxLevel - MinLevel + 1) * ?BTREE_SIZE(MinLevel),
lists:seq(MaxLevel, min(?TOP_LEVEL, MinLevel), -1)),
WorkPerIter = (MaxLevel - MinLevel + 1) * ?BTREE_SIZE(?TOP_LEVEL),
% error_logger:info_msg("do_merge ... {~p,~p,~p}~n", [TopLevel, WorkPerIter, MaxMerge]),
do_merge(TopLevel, WorkPerIter, MaxMerge, MinLevel),
{ok, TopLevel, MinLevel, MaxLevel}.
do_merge(TopLevel, WorkPerIter, MaxMerge),
{ok, TopLevel, MaxLevel}.
do_merge(TopLevel, _Inc, N, _MinLevel) when N =< 0 ->
do_merge(TopLevel, _Inc, N) when N =< 0 ->
ok = hanoidb_level:await_incremental_merge(TopLevel);
do_merge(TopLevel, Inc, N, MinLevel) ->
ok = hanoidb_level:begin_incremental_merge(TopLevel, ?BTREE_SIZE(MinLevel)),
do_merge(TopLevel, Inc, N-Inc, MinLevel).
do_merge(TopLevel, Inc, N) ->
ok = hanoidb_level:begin_incremental_merge(TopLevel, ?BTREE_SIZE(?TOP_LEVEL)),
do_merge(TopLevel, Inc, N-Inc).
parse_level(FileName) ->
@ -416,8 +413,7 @@ handle_call(close, _From, State=#state{ nursery=undefined }) ->
handle_call(close, _From, State=#state{ nursery=Nursery, top=Top, dir=Dir, max_level=MaxLevel, opt=Config }) ->
try
ok = hanoidb_nursery:finish(Nursery, Top),
MinLevel = hanoidb_level:level(Top),
{ok, Nursery2} = hanoidb_nursery:new(Dir, MinLevel, MaxLevel, Config),
{ok, Nursery2} = hanoidb_nursery:new(Dir, MaxLevel, Config),
ok = hanoidb_level:close(Top),
{stop, normal, ok, State#state{ nursery=Nursery2 }}
catch
@ -427,10 +423,9 @@ handle_call(close, _From, State=#state{ nursery=Nursery, top=Top, dir=Dir, max_l
end;
handle_call(destroy, _From, State=#state{top=Top, nursery=Nursery }) ->
TopLevelNumber = hanoidb_level:level(Top),
ok = hanoidb_nursery:destroy(Nursery),
ok = hanoidb_level:destroy(Top),
{stop, normal, ok, State#state{ top=undefined, nursery=undefined, max_level=TopLevelNumber }}.
{stop, normal, ok, State#state{ top=undefined, nursery=undefined, max_level=?TOP_LEVEL }}.
-spec do_put(key(), value(), expiry(), #state{}) -> {ok, #state{}}.
do_put(Key, Value, Expiry, State=#state{ nursery=Nursery, top=Top }) when Nursery =/= undefined ->
@ -448,12 +443,7 @@ do_transact(TransactionSpec, State=#state{ nursery=Nursery, top=Top }) ->
{ok, State#state{ nursery=Nursery2 }}.
start_app() ->
ok = ensure_started(syntax_tools),
ok = ensure_started(plain_fsm),
ok = ensure_started(?MODULE).
ensure_started(Application) ->
case application:start(Application) of
case application:start(?MODULE) of
ok ->
ok;
{error, {already_started, _}} ->

View file

@ -23,7 +23,7 @@
%% ----------------------------------------------------------------------------
%% smallest levels are 1024 entries
%% smallest levels are 256 entries
-define(TOP_LEVEL, 8).
-define(BTREE_SIZE(Level), (1 bsl (Level))).
-define(FILE_FORMAT, <<"HAN2">>).
@ -48,20 +48,12 @@
-define(KEY_IN_RANGE(Key,Range),
(?KEY_IN_FROM_RANGE(Key,Range) andalso ?KEY_IN_TO_RANGE(Key,Range))).
-ifdef(pre18).
-define(TIMESTAMP, now()).
-else.
-define(TIMESTAMP, erlang:timestamp()).
-endif.
-record(nursery, { log_file :: file:fd(),
dir :: string(),
cache :: gb_trees:tree(binary(), binary()),
cache :: gb_tree(),
total_size=0 :: integer(),
count=0 :: integer(),
last_sync=?TIMESTAMP :: erlang:timestamp(),
min_level :: integer(),
last_sync=now() :: erlang:timestamp(),
max_level :: integer(),
config=[] :: [{atom(), term()}],
step=0 :: integer(),
@ -77,25 +69,3 @@
| value()
| filepos().
-ifdef(USE_EBLOOM).
-define(HANOI_BLOOM_TYPE, ebloom).
-else.
-define(HANOI_BLOOM_TYPE, sbloom).
-endif.
-define(BLOOM_NEW(Size), hanoidb_util:bloom_new(Size, ?HANOI_BLOOM_TYPE)).
-define(BLOOM_TO_BIN(Bloom), hanoidb_util:bloom_to_bin(Bloom)).
-define(BIN_TO_BLOOM(Bin, Fmt), hanoidb_util:bin_to_bloom(Bin, Fmt)).
-define(BLOOM_INSERT(Bloom, Key), hanoidb_util:bloom_insert(Bloom, Key)).
-define(BLOOM_CONTAINS(Bloom, Key), hanoidb_util:bloom_contains(Bloom, Key)).
%% tags used in the on-disk representation
-define(TAG_KV_DATA, 16#80).
-define(TAG_DELETED, 16#81).
-define(TAG_POSLEN32, 16#82).
-define(TAG_TRANSACT, 16#83).
-define(TAG_KV_DATA2, 16#84).
-define(TAG_DELETED2, 16#85).
-define(TAG_END, 16#FF).

View file

@ -1,4 +1,4 @@
% The contents of this file are subject to the Erlang Public License, Version
%% The contents of this file are subject to the Erlang Public License, Version
%% 1.1, (the "License"); you may not use this file except in compliance with
%% the License. You should have received a copy of the Erlang Public License
%% along with this software. If not, it can be retrieved via the world wide web
@ -41,11 +41,9 @@
-define(W, 27).
-ifdef(pre18).
-type bitmask() :: array() | any().
-else.
-type bitmask() :: arrays:array() | any().
-endif.
-type bitmask() :: {sparse_bitmap, non_neg_integer(), list()}
| hanoidb_dense_bitmap:bitmap()
.
-record(bloom, {
e :: float(), % error probability
@ -117,16 +115,19 @@ sbf(N, E, S, R) when is_number(N), N > 0,
%% Returns number of elements
%%
-spec size( #bloom{} | #sbf{} ) -> non_neg_integer().
size(#bloom{size=Size}) -> Size;
size(#sbf{size=Size}) -> Size.
%% Returns capacity
%%
-spec capacity( #bloom{} | #sbf{} ) -> non_neg_integer() | infinity.
capacity(#bloom{n=N}) -> N;
capacity(#sbf{}) -> infinity.
%% Test for membership
%%
-spec member( any(), #bloom{} | #sbf{} ) -> boolean().
member(Elem, #bloom{mb=Mb}=B) ->
Hashes = make_hashes(Mb, Elem),
hash_member(Hashes, B);
@ -193,76 +194,42 @@ bitmask_new(LogN) ->
hanoidb_dense_bitmap:new(1 bsl LogN)
end.
bitmask_set(I, BM) ->
case element(1,BM) of
array -> bitarray_set(I, as_array(BM));
% array -> bitarray_set(I, as_array(BM));
sparse_bitmap -> hanoidb_sparse_bitmap:set(I, BM);
dense_bitmap_ets -> hanoidb_dense_bitmap:set(I, BM);
dense_bitmap ->
dense_bitmap_term ->
%% Surprise - we need to mutate a built representation:
hanoidb_dense_bitmap:set(I, hanoidb_dense_bitmap:unbuild(BM))
end.
%%% Convert to external form.
-spec bitmask_build( bitmask() ) -> {sparse_bitmap, non_neg_integer(), list()}
| {dense_bitmap_term, tuple()}.
bitmask_build(BM) ->
case element(1,BM) of
array -> BM;
sparse_bitmap -> BM;
dense_bitmap -> BM;
dense_bitmap_ets -> hanoidb_dense_bitmap:build(BM)
dense_bitmap_ets -> hanoidb_dense_bitmap:build(BM);
_ -> BM
end.
-spec bitmask_get( non_neg_integer(), bitmask() ) -> boolean().
bitmask_get(I, BM) ->
case element(1,BM) of
array -> bitarray_get(I, as_array(BM));
sparse_bitmap -> hanoidb_sparse_bitmap:member(I, BM);
dense_bitmap_ets -> hanoidb_dense_bitmap:member(I, BM);
dense_bitmap -> hanoidb_dense_bitmap:member(I, BM)
sparse_bitmap -> hanoidb_sparse_bitmap:member(I, BM);
dense_bitmap_ets -> hanoidb_dense_bitmap:member(I, BM);
dense_bitmap_term -> hanoidb_dense_bitmap:member(I, BM)
end.
-ifdef(pre18).
-spec as_array(bitmask()) -> array().
-else.
-spec as_array(bitmask()) -> arrays:array().
-endif.
as_array(BM) ->
case array:is_array(BM) of
true -> BM
end.
%%%========== Bitarray representation - suitable for sparse arrays ==========
%% bitarray_new(N) -> array:new((N-1) div ?W + 1, {default, 0}).
-ifdef(pre18).
-spec bitarray_set( non_neg_integer(), array() ) -> array().
-else.
-spec bitarray_set( non_neg_integer(), arrays:array() ) -> arrays:array().
-endif.
bitarray_set(I, A1) ->
A = as_array(A1),
AI = I div ?W,
V = array:get(AI, A),
V1 = V bor (1 bsl (I rem ?W)),
if V =:= V1 -> A; % The bit is already set
true -> array:set(AI, V1, A)
end.
-ifdef(pre18).
-spec bitarray_get( non_neg_integer(), array() ) -> boolean().
-else.
-spec bitarray_get( non_neg_integer(), arrays:array() ) -> boolean().
-endif.
bitarray_get(I, A) ->
AI = I div ?W,
V = array:get(AI, A),
(V band (1 bsl (I rem ?W))) =/= 0.
%%%^^^^^^^^^^ Bitarray representation - suitable for sparse arrays ^^^^^^^^^^
-spec encode( #bloom{} | #sbf{} ) -> binary().
encode(Bloom) ->
zlib:gzip(term_to_binary(bloom_build(Bloom))).
-spec decode( binary() ) -> #bloom{} | #sbf{}.
decode(Bin) ->
binary_to_term(zlib:gunzip(Bin)).

View file

@ -5,6 +5,11 @@
-define(REPR_NAME, dense_bitmap).
-type bitmap() :: {dense_bitmap_ets, non_neg_integer()|undefined, non_neg_integer()|undefined, ets:tid()}
| {dense_bitmap_term, tuple() }.
-export_type([ bitmap/0 ]).
-spec new( non_neg_integer() ) -> bitmap().
new(N) ->
Tab = ets:new(dense_bitmap, [private, set]),
Width = 1 + (N-1) div ?BITS_PER_CELL,
@ -28,14 +33,18 @@ set(I, {dense_bitmap_ets, _,_, Tab}=DBM) ->
build({dense_bitmap_ets, _, _, Tab}) ->
[Row] = ets:lookup(Tab, ?REPR_NAME),
ets:delete(Tab),
Row.
{dense_bitmap_term, Row};
build({dense_bitmap_term, _}=Value) ->
Value.
unbuild(Row) when element(1,Row)==?REPR_NAME ->
-spec unbuild( {dense_bitmap_term, any()} ) -> bitmap().
unbuild({dense_bitmap_term, Row}) when element(1,Row)==?REPR_NAME ->
Tab = ets:new(dense_bitmap, [private, set]),
ets:insert(Tab, Row),
{dense_bitmap_ets, undefined, undefined, Tab}.
member(I, Row) when element(1,Row)==?REPR_NAME ->
-spec member( non_neg_integer(), bitmap() ) -> boolean().
member(I, {dense_bitmap_term, Row}) when element(1,Row)==?REPR_NAME ->
Cell = 2 + I div ?BITS_PER_CELL,
BitInCell = I rem ?BITS_PER_CELL,
CellValue = element(Cell, Row),

View file

@ -79,10 +79,10 @@ start(SendTo) ->
initialize(#state{sendto=SendTo, sendto_ref=MRef}, []),
?log("fold_worker done ~p~n", [self()])
catch
Class:Ex:Stacktrace ->
?log("fold_worker exception ~p:~p ~p~n", [Class, Ex, Stacktrace]),
error_logger:error_msg("Unexpected: ~p:~p ~p~n", [Class, Ex, Stacktrace]),
exit({bad, Class, Ex, Stacktrace})
Class:Ex ->
?log("fold_worker exception ~p:~p ~p~n", [Class, Ex, erlang:get_stacktrace()]),
error_logger:error_msg("Unexpected: ~p:~p ~p~n", [Class, Ex, erlang:get_stacktrace()]),
exit({bad, Class, Ex, erlang:get_stacktrace()})
end
end,
PID = plain_fsm:spawn(?MODULE, F),
@ -231,3 +231,5 @@ data_vsn() ->
code_change(_OldVsn, _State, _Extra) ->
{ok, {#state{}, data_vsn()}}.

View file

@ -46,7 +46,7 @@
-export([open/5, lookup/2, lookup/3, inject/2, close/1, snapshot_range/3, blocking_range/3,
begin_incremental_merge/2, await_incremental_merge/1, set_max_level/2,
unmerged_count/1, destroy/1, level/1]).
unmerged_count/1, destroy/1]).
-include_lib("kernel/include/file.hrl").
@ -90,9 +90,6 @@ open(Dir,Level,Next,Opts,Owner) when Level>0 ->
SpawnOpt),
{ok, PID}.
level(Ref) ->
plain_rpc:call(Ref, level).
lookup(Ref, Key) ->
plain_rpc:call(Ref, {lookup, Key}).
@ -159,9 +156,9 @@ initialize(State) ->
_Result = initialize2(State),
?log(" ** terminated ~p", [_Result])
catch
Class:Ex:Stacktrace when not (Class == exit andalso Ex == normal) ->
?log("crashing ~p:~p ~p~n", [Class,Ex,Stacktrace]),
error_logger:error_msg("crash: ~p:~p ~p~n", [Class,Ex,Stacktrace])
Class:Ex when not (Class == exit andalso Ex == normal) ->
?log("crashing ~p:~p ~p~n", [Class,Ex,erlang:get_stacktrace()]),
error_logger:error_msg("crash: ~p:~p ~p~n", [Class,Ex,erlang:get_stacktrace()])
end.
initialize2(State) ->
@ -359,11 +356,6 @@ main_loop(State = #state{ next=Next }) ->
->
do_step(StepFrom, DoneWork, StepSize, State);
%% simply replies the level number
?CALL(From, level) ->
plain_rpc:send_reply(From, State#state.level),
main_loop(State);
{MRef, step_done} when MRef == State#state.step_merge_ref ->
demonitor(MRef, [flush]),
@ -411,7 +403,6 @@ main_loop(State = #state{ next=Next }) ->
%% and we have finished the incremental merge at this level
State2 = reply_step_ok(State),
erlang:demonitor(MRef, [flush]),
main_loop(State2#state{ step_next_ref=undefined });
?CALL(From, close) ->
@ -784,20 +775,33 @@ begin_merge(State) ->
AFileName = filename("A",State),
BFileName = filename("B",State),
XFileName = filename("X",State),
Owner = self(),
?log("starting merge~n", []),
file:delete(XFileName),
MergePID = hanoidb_merger:start(AFileName, BFileName, XFileName,
?BTREE_SIZE(State#state.level + 1),
State#state.next =:= undefined,
State#state.opts),
MergePID = proc_lib:spawn_link(fun() ->
try
?log("merge begun~n", []),
{ok, OutCount} = hanoidb_merger:merge(AFileName, BFileName, XFileName,
?BTREE_SIZE(State#state.level + 1),
State#state.next =:= undefined,
State#state.opts ),
Owner ! ?CAST(self(),{merge_done, OutCount, XFileName})
catch
C:E ->
error_logger:error_msg("merge failed ~p:~p ~p~n",
[C,E,erlang:get_stacktrace()]),
erlang:raise(C,E,erlang:get_stacktrace())
end
end),
{ok, MergePID}.
close_and_delete_a_and_b(State) ->
AFileName = filename("A",State),
BFileName = filename("B",State),
@ -831,8 +835,7 @@ start_range_fold(FileName, WorkerPID, Range, State) ->
ok
catch
Class:Ex ->
try throw(42) catch _:_:Stk -> io:format(user, "BAD: ~p:~p ~p~n", [Class,Ex,Stk]) end
io:format(user, "BAD: ~p:~p ~p~n", [Class,Ex,erlang:get_stacktrace()])
end
end ),
{ok, PID}.

View file

@ -1,35 +0,0 @@
-module(hanoidb_load).
-export([run/1]).
run(Dir) ->
case hanoidb:open(filename:join(Dir, "test.hanoidb"), []) of
{error, Reason} ->
{error, Reason};
{ok, Tree} -> fill_db_timed(Tree, 15 * 60 * 1000)
end.
%% fill_db(Tree) -> fill_db(Tree, 5000).
%% fill_db(Tree, 0) -> hanoidb:close(Tree);
%% fill_db(Tree, N) ->
%% Letter = N rem 26 + $a,
%% Length = rand:uniform(100),
%% Key = << <<Letter/utf8>> || _ <- lists:seq(0, Length) >>,
%% ok = hanoidb:put(Tree, Key, Key),
%% fill_db(Tree, N - 1).
fill_db_timed(Tree, Timeout) ->
erlang:send_after(Timeout, self(), stop),
fill_db_loop(Tree, 0).
fill_db_loop(Tree, N) ->
receive
stop ->
ok
after 0 ->
Key = crypto:strong_rand_bytes(2000),
Letter = N rem 26 + $a,
Length = rand:uniform(100),
Value = << <<Letter/utf8>> || _ <- lists:seq(0, Length) >>,
ok = hanoidb:put(Tree, Key, Value),
fill_db_loop(Tree, N+1)
end.

View file

@ -28,10 +28,9 @@
%% @doc Merging two Indexes
-export([start/6, merge/6]).
-export([merge/6]).
-include("hanoidb.hrl").
-include("include/plain_rpc.hrl").
%% A merger which is inactive for this long will sleep which means that it will
%% close open files, and compress the current bloom filter.
@ -41,27 +40,6 @@
%% merges, so we default to running the entire merge in one process.
-define(LOCAL_WRITER, true).
-spec start(string(), string(), string(), integer(), boolean(), list()) -> pid().
start(A,B,X, Size, IsLastLevel, Options) ->
Owner = self(),
plain_fsm:spawn_link(?MODULE, fun() ->
try
{ok, OutCount} = hanoidb_merger:merge(A, B, X,
Size,
IsLastLevel,
Options),
Owner ! ?CAST(self(),{merge_done, OutCount, X})
catch
C:E:Stacktrace ->
%% this semi-bogus code makes sure we always get a stack trace if merging fails
error_logger:error_msg("~p: merge failed ~p:~p ~p -> ~s~n",
[self(), C,E,Stacktrace, X]),
erlang:raise(C,E,Stacktrace)
end
end).
-spec merge(string(), string(), string(), integer(), boolean(), list()) -> {ok, integer()}.
merge(A,B,C, Size, IsLastLevel, Options) ->
{ok, IXA} = hanoidb_reader:open(A, [sequential|Options]),
@ -69,19 +47,19 @@ merge(A,B,C, Size, IsLastLevel, Options) ->
{ok, Out} = hanoidb_writer:init([C, [{size, Size} | Options]]),
AKVs =
case hanoidb_reader:first_node(IXA) of
{kvlist, AKV} -> AKV;
{node, AKV} -> AKV;
none -> []
end,
BKVs =
case hanoidb_reader:first_node(IXB) of
{kvlist, BKV} ->BKV;
{node, BKV} ->BKV;
none -> []
end,
scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {0, none}).
terminate(Out) ->
{ok, Count, Out1} = hanoidb_writer:handle_call(count, self(), Out),
{stop, normal, ok, _Out2} = hanoidb_writer:handle_call(close, self(), Out1),
{ok, Count, _} = hanoidb_writer:handle_call(count, self(), Out),
{stop, normal, ok, _} = hanoidb_writer:handle_call(close, self(), Out),
{ok, Count}.
step(S) ->
@ -94,77 +72,13 @@ hibernate_scan(Keep) ->
erlang:garbage_collect(),
receive
{step, From, HowMany} ->
{IXA, IXB, Out, IsLastLevel, AKVs, BKVs, N} = erlang:binary_to_term(Keep),
{IXA, IXB, Out, IsLastLevel, AKVs, BKVs, N} = erlang:binary_to_term(zlib:gunzip(Keep)),
scan(hanoidb_reader:deserialize(IXA),
hanoidb_reader:deserialize(IXB),
hanoidb_writer:deserialize(Out),
IsLastLevel, AKVs, BKVs, {N+HowMany, From});
%% gen_fsm handling
{system, From, Req} ->
plain_fsm:handle_system_msg(
Req, From, Keep, fun hibernate_scan/1);
{'EXIT', Parent, Reason} ->
case plain_fsm:info(parent) of
Parent ->
plain_fsm:parent_EXIT(Reason, Keep)
end
IsLastLevel, AKVs, BKVs, {N+HowMany, From})
end.
hibernate_scan_only(Keep) ->
erlang:garbage_collect(),
receive
{step, From, HowMany} ->
{IX, OutBin, IsLastLevel, KVs, N} = erlang:binary_to_term(Keep),
scan_only(hanoidb_reader:deserialize(IX),
hanoidb_writer:deserialize(OutBin),
IsLastLevel, KVs, {N+HowMany, From});
%% gen_fsm handling
{system, From, Req} ->
plain_fsm:handle_system_msg(
Req, From, Keep, fun hibernate_scan_only/1);
{'EXIT', Parent, Reason} ->
case plain_fsm:info(parent) of
Parent ->
plain_fsm:parent_EXIT(Reason, Keep)
end
end.
receive_scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N, FromPID}) ->
receive
{step, From, HowMany} ->
scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N+HowMany, From});
%% gen_fsm handling
{system, From, Req} ->
plain_fsm:handle_system_msg(
Req, From, {IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N, FromPID}},
fun({IXA2, IXB2, Out2, IsLastLevel2, AKVs2, BKVs2, {N2, FromPID2}}) ->
receive_scan(IXA2, IXB2, Out2, IsLastLevel2, AKVs2, BKVs2, {N2, FromPID2})
end);
{'EXIT', Parent, Reason} ->
case plain_fsm:info(parent) of
Parent ->
plain_fsm:parent_EXIT(Reason, {IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N, FromPID}})
end
after ?HIBERNATE_TIMEOUT ->
Args = {hanoidb_reader:serialize(IXA),
hanoidb_reader:serialize(IXB),
hanoidb_writer:serialize(Out), IsLastLevel, AKVs, BKVs, N},
Keep = erlang:term_to_binary(Args, [compressed]),
hibernate_scan(Keep)
end.
scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N, FromPID}) when N < 1, AKVs =/= [], BKVs =/= [] ->
case FromPID of
none ->
@ -173,11 +87,20 @@ scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N, FromPID}) when N < 1, AKVs =/=
PID ! {Ref, step_done}
end,
receive_scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N, FromPID});
receive
{step, From, HowMany} ->
scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N+HowMany, From})
after ?HIBERNATE_TIMEOUT ->
Args = {hanoidb_reader:serialize(IXA),
hanoidb_reader:serialize(IXB),
hanoidb_writer:serialize(Out), IsLastLevel, AKVs, BKVs, N},
Keep = zlib:gzip(erlang:term_to_binary(Args)),
hibernate_scan(Keep)
end;
scan(IXA, IXB, Out, IsLastLevel, [], BKVs, Step) ->
case hanoidb_reader:next_node(IXA) of
{kvlist, AKVs} ->
{node, AKVs} ->
scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, Step);
end_of_data ->
hanoidb_reader:close(IXA),
@ -186,7 +109,7 @@ scan(IXA, IXB, Out, IsLastLevel, [], BKVs, Step) ->
scan(IXA, IXB, Out, IsLastLevel, AKVs, [], Step) ->
case hanoidb_reader:next_node(IXB) of
{kvlist, BKVs} ->
{node, BKVs} ->
scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, Step);
end_of_data ->
hanoidb_reader:close(IXB),
@ -205,37 +128,17 @@ scan(IXA, IXB, Out, IsLastLevel, [{_Key1,_Value1}|AT]=_AKVs, [{Key2,Value2}|IX]=
{noreply, Out3} = hanoidb_writer:handle_cast({add, Key2, Value2}, Out),
scan(IXA, IXB, Out3, IsLastLevel, AT, IX, step(Step, 2)).
receive_scan_only(IX, Out, IsLastLevel, KVs, {N, FromPID}) ->
hibernate_scan_only(Keep) ->
erlang:garbage_collect(),
receive
{step, From, HowMany} ->
scan_only(IX, Out, IsLastLevel, KVs, {N+HowMany, From});
%% gen_fsm handling
{system, From, Req} ->
plain_fsm:handle_system_msg(
Req, From, {IX, Out, IsLastLevel, KVs, {N, FromPID}},
fun({IX2, Out2, IsLastLevel2, KVs2, {N2, FromPID2}}) ->
receive_scan_only(IX2, Out2, IsLastLevel2, KVs2, {N2, FromPID2})
end);
{'EXIT', Parent, Reason} ->
case plain_fsm:info(parent) of
Parent ->
plain_fsm:parent_EXIT(Reason, {IX, Out, IsLastLevel, KVs, {N, FromPID}})
end
after ?HIBERNATE_TIMEOUT ->
Args = {hanoidb_reader:serialize(IX),
hanoidb_writer:serialize(Out), IsLastLevel, KVs, N},
Keep = erlang:term_to_binary(Args, [compressed]),
hibernate_scan_only(Keep)
{IX, OutBin, IsLastLevel, KVs, N} = erlang:binary_to_term(zlib:gunzip(Keep)),
scan_only(hanoidb_reader:deserialize(IX),
hanoidb_writer:deserialize(OutBin),
IsLastLevel, KVs, {N+HowMany, From})
end.
scan_only(IX, Out, IsLastLevel, KVs, {N, FromPID}) when N < 1, KVs =/= [] ->
case FromPID of
none ->
@ -244,11 +147,19 @@ scan_only(IX, Out, IsLastLevel, KVs, {N, FromPID}) when N < 1, KVs =/= [] ->
PID ! {Ref, step_done}
end,
receive_scan_only(IX, Out, IsLastLevel, KVs, {N, FromPID});
receive
{step, From, HowMany} ->
scan_only(IX, Out, IsLastLevel, KVs, {N+HowMany, From})
after ?HIBERNATE_TIMEOUT ->
Args = {hanoidb_reader:serialize(IX),
hanoidb_writer:serialize(Out), IsLastLevel, KVs, N},
Keep = zlib:gzip(erlang:term_to_binary(Args)),
hibernate_scan_only(Keep)
end;
scan_only(IX, Out, IsLastLevel, [], {_, FromPID}=Step) ->
case hanoidb_reader:next_node(IX) of
{kvlist, KVs} ->
{node, KVs} ->
scan_only(IX, Out, IsLastLevel, KVs, Step);
end_of_data ->
case FromPID of

View file

@ -25,46 +25,46 @@
-module(hanoidb_nursery).
-author('Kresten Krab Thorup <krab@trifork.com>').
-export([new/4, recover/5, finish/2, lookup/2, add/4, add/5]).
-export([new/3, recover/4, finish/2, lookup/2, add/4, add/5]).
-export([do_level_fold/3, set_max_level/2, transact/3, destroy/1]).
-include("include/hanoidb.hrl").
-include("hanoidb.hrl").
-include_lib("kernel/include/file.hrl").
-spec new(string(), integer(), integer(), [_]) -> {ok, #nursery{}} | {error, term()}.
-spec new(string(), integer(), [_]) -> {ok, #nursery{}} | {error, term()}.
-define(LOGFILENAME(Dir), filename:join(Dir, "nursery.log")).
%% do incremental merge every this many inserts
%% this value *must* be less than or equal to
%% 2^TOP_LEVEL == ?BTREE_SIZE(?TOP_LEVEL)
-define(INC_MERGE_STEP, ?BTREE_SIZE(MinLevel) div 2).
-define(INC_MERGE_STEP, ?BTREE_SIZE(?TOP_LEVEL)/2).
new(Directory, MinLevel, MaxLevel, Config) ->
new(Directory, MaxLevel, Config) ->
hanoidb_util:ensure_expiry(Config),
{ok, File} = file:open(?LOGFILENAME(Directory),
[raw, exclusive, write, delayed_write, append]),
{ok, #nursery{ log_file=File, dir=Directory, cache= gb_trees:empty(),
min_level=MinLevel, max_level=MaxLevel, config=Config }}.
max_level=MaxLevel, config=Config }}.
recover(Directory, TopLevel, MinLevel, MaxLevel, Config)
when MinLevel =< MaxLevel, is_integer(MinLevel), is_integer(MaxLevel) ->
recover(Directory, TopLevel, MaxLevel, Config) ->
hanoidb_util:ensure_expiry(Config),
case file:read_file_info(?LOGFILENAME(Directory)) of
{ok, _} ->
ok = do_recover(Directory, TopLevel, MinLevel, MaxLevel, Config),
new(Directory, MinLevel, MaxLevel, Config);
ok = do_recover(Directory, TopLevel, MaxLevel, Config),
new(Directory, MaxLevel, Config);
{error, enoent} ->
new(Directory, MinLevel, MaxLevel, Config)
new(Directory, MaxLevel, Config)
end.
do_recover(Directory, TopLevel, MinLevel, MaxLevel, Config) ->
do_recover(Directory, TopLevel, MaxLevel, Config) ->
%% repair the log file; storing it in nursery2
LogFileName = ?LOGFILENAME(Directory),
{ok, Nursery} = read_nursery_from_log(Directory, MinLevel, MaxLevel, Config),
{ok, Nursery} = read_nursery_from_log(Directory, MaxLevel, Config),
ok = finish(Nursery, TopLevel),
%% assert log file is gone
{error, enoent} = file:read_file_info(LogFileName),
@ -82,7 +82,7 @@ fill_cache(Transactions, Cache)
when is_list(Transactions) ->
lists:foldl(fun fill_cache/2, Cache, Transactions).
read_nursery_from_log(Directory, MinLevel, MaxLevel, Config) ->
read_nursery_from_log(Directory, MaxLevel, Config) ->
{ok, LogBinary} = file:read_file(?LOGFILENAME(Directory)),
Cache =
case hanoidb_util:decode_crc_data(LogBinary, [], []) of
@ -92,7 +92,7 @@ read_nursery_from_log(Directory, MinLevel, MaxLevel, Config) ->
error_logger:info_msg("ignoring undecypherable bytes in ~p~n", [?LOGFILENAME(Directory)]),
fill_cache(KVs, gb_trees:empty())
end,
{ok, #nursery{ dir=Directory, cache=Cache, count=gb_trees:size(Cache), min_level=MinLevel, max_level=MaxLevel, config=Config }}.
{ok, #nursery{ dir=Directory, cache=Cache, count=gb_trees:size(Cache), max_level=MaxLevel, config=Config }}.
%% @doc Add a Key/Value to the nursery
%% @end
@ -142,12 +142,12 @@ do_sync(File, Nursery) ->
case application:get_env(hanoidb, sync_strategy) of
{ok, sync} ->
file:datasync(File),
os:timestamp();
now();
{ok, {seconds, N}} ->
MicrosSinceLastSync = timer:now_diff(os:timestamp(), Nursery#nursery.last_sync),
if (MicrosSinceLastSync div 1000000) >= N ->
MicrosSinceLastSync = timer:now_diff(now(), Nursery#nursery.last_sync),
if (MicrosSinceLastSync / 1000000) >= N ->
file:datasync(File),
os:timestamp();
now();
true ->
Nursery#nursery.last_sync
end;
@ -175,7 +175,7 @@ lookup(Key, #nursery{cache=Cache}) ->
%% @end
-spec finish(Nursery::#nursery{}, TopLevel::pid()) -> ok.
finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile, merge_done=DoneMerge,
count=Count, config=Config, min_level=MinLevel }, TopLevel) ->
count=Count, config=Config }, TopLevel) ->
hanoidb_util:ensure_expiry(Config),
@ -189,7 +189,7 @@ finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile, merge_done=DoneMerge,
N when N > 0 ->
%% next, flush cache to a new BTree
BTreeFileName = filename:join(Dir, "nursery.data"),
{ok, BT} = hanoidb_writer:open(BTreeFileName, [{size, ?BTREE_SIZE(MinLevel)},
{ok, BT} = hanoidb_writer:open(BTreeFileName, [{size, ?BTREE_SIZE(?TOP_LEVEL)},
{compress, none} | Config]),
try
ok = gb_trees_ext:fold(fun(Key, Value, Acc) ->
@ -205,10 +205,10 @@ finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile, merge_done=DoneMerge,
%% Issue some work if this is a top-level inject (blocks until previous such
%% incremental merge is finished).
if DoneMerge >= ?BTREE_SIZE(MinLevel) ->
if DoneMerge >= ?BTREE_SIZE(?TOP_LEVEL) ->
ok;
true ->
hanoidb_level:begin_incremental_merge(TopLevel, ?BTREE_SIZE(MinLevel) - DoneMerge)
hanoidb_level:begin_incremental_merge(TopLevel, ?BTREE_SIZE(?TOP_LEVEL) - DoneMerge)
end;
% {ok, _Nursery2} = do_inc_merge(Nursery, Count, TopLevel);
@ -247,13 +247,13 @@ add(Key, Value, Expiry, Nursery, Top) ->
end.
-spec flush(#nursery{}, pid()) -> {ok, #nursery{}}.
flush(Nursery=#nursery{ dir=Dir, min_level=MinLevel, max_level=MaxLevel, config=Config }, Top) ->
flush(Nursery=#nursery{ dir=Dir, max_level=MaxLevel, config=Config }, Top) ->
ok = finish(Nursery, Top),
{error, enoent} = file:read_file_info(filename:join(Dir, "nursery.log")),
hanoidb_nursery:new(Dir, MinLevel, MaxLevel, Config).
hanoidb_nursery:new(Dir, MaxLevel, Config).
has_room(#nursery{ count=Count, min_level=MinLevel }, N) ->
(Count + N + 1) < ?BTREE_SIZE(MinLevel).
has_room(#nursery{ count=Count }, N) ->
(Count + N + 1) < ?BTREE_SIZE(?TOP_LEVEL).
ensure_space(Nursery, NeededRoom, Top) ->
case has_room(Nursery, NeededRoom) of
@ -303,7 +303,7 @@ transact1(Spec, Nursery1=#nursery{ log_file=File, cache=Cache0, total_size=Total
do_inc_merge(Nursery2#nursery{ cache=Cache2, total_size=TotalSize+erlang:iolist_size(Data), count=Count }, length(Spec), Top).
do_inc_merge(Nursery=#nursery{ step=Step, merge_done=Done, min_level=MinLevel }, N, TopLevel) ->
do_inc_merge(Nursery=#nursery{ step=Step, merge_done=Done }, N, TopLevel) ->
if Step+N >= ?INC_MERGE_STEP ->
hanoidb_level:begin_incremental_merge(TopLevel, Step + N),
{ok, Nursery#nursery{ step=0, merge_done=Done + Step + N }};

View file

@ -37,10 +37,10 @@
-export([serialize/1, deserialize/1]).
-record(node, {level :: non_neg_integer(),
members=[] :: list(any()) | binary() }).
members=[] :: list(any()) }).
-record(index, {file :: file:io_device(),
root= none :: #node{} | none,
root :: #node{} | none,
bloom :: term(),
name :: string(),
config=[] :: term() }).
@ -53,7 +53,9 @@ open(Name) ->
open(Name, [random]).
-type config() :: [sequential | folding | random | {atom(), term()}].
-spec open(Name::string(), config()) -> {ok, read_file()} | {error, any()}.
open(Name, Config) ->
case proplists:get_bool(sequential, Config) of
true ->
@ -84,7 +86,7 @@ open(Name, Config) ->
{ok, <<RootPos:64/unsigned>>} = file:pread(File, FileInfo#file_info.size - 8, 8),
{ok, <<BloomSize:32/unsigned>>} = file:pread(File, FileInfo#file_info.size - 12, 4),
{ok, BloomData} = file:pread(File, (FileInfo#file_info.size - 12 - BloomSize), BloomSize),
{ok, Bloom} = hanoidb_util:bin_to_bloom(BloomData),
Bloom = hanoidb_bloom:decode(BloomData),
%% read in the root node
Root =
@ -114,15 +116,11 @@ deserialize({seq_read_file, Index, Position}) ->
fold(Fun, Acc0, #index{file=File}) ->
{ok, Node} = read_node(File,?FIRST_BLOCK_POS),
fold0(File,fun({K,V},Acc) -> Fun(K,V,Acc) end,Node,Acc0).
fold0(File,Fun,#node{level=0, members=BinPage},Acc0) when is_binary(BinPage) ->
Acc1 = vbisect:foldl(fun(K, V, Acc2) -> Fun({K, decode_binary_value(V)}, Acc2) end,Acc0,BinPage),
fold1(File,Fun,Acc1);
fold0(File,Fun,#node{level=0, members=List},Acc0) when is_list(List) ->
fold0(File,Fun,#node{level=0, members=List},Acc0) ->
Acc1 = lists:foldl(Fun,Acc0,List),
fold1(File,Fun,Acc1);
fold0(File,Fun,_InnerNode,Acc0) ->
@ -136,39 +134,22 @@ fold1(File,Fun,Acc0) ->
fold0(File,Fun,Node,Acc0)
end.
-spec range_fold(fun((binary(),binary(),any()) -> any()), any(), #index{}, #key_range{}) ->
-spec range_fold(function(), any(), #index{}, #key_range{}) ->
{limit, any(), binary()} | {done, any()}.
range_fold(Fun, Acc0, #index{file=File,root=Root}, Range) ->
case Range#key_range.from_key =< first_key(Root) of
true ->
{ok, _} = file:position(File, ?FIRST_BLOCK_POS),
range_fold_from_here(Fun, Acc0, File, Range, Range#key_range.limit);
false ->
case find_leaf_node(File,Range#key_range.from_key,Root,?FIRST_BLOCK_POS) of
{ok, {Pos,_}} ->
{ok, _} = file:position(File, Pos),
range_fold_from_here(Fun, Acc0, File, Range, Range#key_range.limit);
{ok, Pos} ->
{ok, _} = file:position(File, Pos),
range_fold_from_here(Fun, Acc0, File, Range, Range#key_range.limit);
none ->
{done, Acc0}
end
case lookup_node(File,Range#key_range.from_key,Root,?FIRST_BLOCK_POS) of
{ok, {Pos,_}} ->
{ok, _} = file:position(File, Pos),
do_range_fold(Fun, Acc0, File, Range, Range#key_range.limit);
{ok, Pos} ->
{ok, _} = file:position(File, Pos),
do_range_fold(Fun, Acc0, File, Range, Range#key_range.limit);
none ->
{done, Acc0}
end.
first_key(#node{members=Dict}) ->
{_,FirstKey} = fold_until_stop(fun({K,_},_) -> {stop, K} end, none, Dict),
FirstKey.
fold_until_stop(Fun,Acc,List) when is_list(List) ->
fold_until_stop2(Fun, {continue, Acc}, List);
fold_until_stop(Fun,Acc0,Bin) when is_binary(Bin) ->
vbisect:fold_until_stop(fun({Key,VBin},Acc1) ->
% io:format("-> DOING ~p,~p~n", [Key,Acc1]),
Fun({Key, decode_binary_value(VBin)}, Acc1)
end,
Acc0,
Bin).
fold_until_stop(Fun,Acc,List) ->
fold_until_stop2(Fun, {continue, Acc}, List).
fold_until_stop2(_Fun,{stop,Result},_) ->
{stopped, Result};
@ -190,8 +171,7 @@ get_value({Value, _TStamp}) ->
get_value(Value) ->
Value.
range_fold_from_here(Fun, Acc0, File, Range, undefined) ->
% io:format("RANGE_FOLD_FROM_HERE(~p,~p)~n", [Acc0,File]),
do_range_fold(Fun, Acc0, File, Range, undefined) ->
case next_leaf_node(File) of
eof ->
{done, Acc0};
@ -206,19 +186,18 @@ range_fold_from_here(Fun, Acc0, File, Range, undefined) ->
false ->
{continue, Fun(Key, get_value(Value), Acc)}
end;
(_Huh, Acc) ->
% io:format("SKIPPING ~p~n", [_Huh]),
(_, Acc) ->
{continue, Acc}
end,
Acc0,
Members) of
{stopped, Result} -> Result;
{ok, Acc1} ->
range_fold_from_here(Fun, Acc1, File, Range, undefined)
do_range_fold(Fun, Acc1, File, Range, undefined)
end
end;
range_fold_from_here(Fun, Acc0, File, Range, N0) ->
do_range_fold(Fun, Acc0, File, Range, N0) ->
case next_leaf_node(File) of
eof ->
{done, Acc0};
@ -248,74 +227,52 @@ range_fold_from_here(Fun, Acc0, File, Range, N0) ->
{continue, Acc}
end,
{N0, Acc0},
Members)
of
{stopped, Result} ->
Result;
Members) of
{stopped, Result} -> Result;
{ok, {N2, Acc1}} ->
range_fold_from_here(Fun, Acc1, File, Range, N2)
do_range_fold(Fun, Acc1, File, Range, N2)
end
end.
find_leaf_node(_File,_FromKey,#node{level=0},Pos) ->
lookup_node(_File,_FromKey,#node{level=0},Pos) ->
{ok, Pos};
find_leaf_node(File,FromKey,#node{members=Members,level=N},_) when is_list(Members) ->
lookup_node(File,FromKey,#node{members=Members,level=N},_) ->
case find_start(FromKey, Members) of
{ok, ChildPos} when N==1 ->
{ok, ChildPos};
{ok, ChildPos} ->
recursive_find(File, FromKey, N, ChildPos);
case read_node(File,ChildPos) of
{ok, ChildNode} ->
lookup_node(File,FromKey,ChildNode,ChildPos);
eof ->
none
end;
not_found ->
none
end;
find_leaf_node(File,FromKey,#node{members=Members,level=N},_) when is_binary(Members) ->
case vbisect:find_geq(FromKey,Members) of
{ok, _, <<?TAG_POSLEN32, Pos:64/unsigned, Len:32/unsigned>>} ->
% io:format("** FIND_LEAF_NODE(~p,~p) -> {~p,~p}~n", [FromKey, N, Pos,Len]),
recursive_find(File, FromKey, N, {Pos,Len});
none ->
% io:format("** FIND_LEAF_NODE(~p,~p) -> none~n", [FromKey, N]),
none
end;
find_leaf_node(_,_,none,_) ->
lookup_node(_,_,none,_) ->
none.
recursive_find(_File,_FromKey,1,ChildPos) ->
{ok, ChildPos};
recursive_find(File,FromKey,N,ChildPos) when N>1 ->
case read_node(File,ChildPos) of
{ok, ChildNode} ->
find_leaf_node(File, FromKey,ChildNode,ChildPos);
eof ->
none
end.
%% used by the merger, needs list value
first_node(#index{file=File}) ->
case read_node(File, ?FIRST_BLOCK_POS) of
{ok, #node{level=0, members=Members}} ->
{kvlist, decode_member_list(Members)};
{node, Members};
eof->
none
end.
%% used by the merger, needs list value
next_node(#index{file=File}=_Index) ->
case next_leaf_node(File) of
{ok, #node{level=0, members=Members}} ->
{kvlist, decode_member_list(Members)};
{node, Members};
% {ok, #node{level=N}} when N>0 ->
% next_node(Index);
eof ->
end_of_data
end.
decode_member_list(List) when is_list(List) ->
List;
decode_member_list(BinDict) when is_binary(BinDict) ->
vbisect:foldr( fun(Key,Value,Acc) ->
[{Key, decode_binary_value(Value) }|Acc]
end,
[],
BinDict).
close(#index{file=undefined}) ->
ok;
close(#index{file=File}) ->
@ -323,7 +280,7 @@ close(#index{file=File}) ->
lookup(#index{file=File, root=Node, bloom=Bloom}, Key) ->
case ?BLOOM_CONTAINS(Bloom, Key) of
case hanoidb_bloom:member(Key, Bloom) of
true ->
case lookup_in_node(File, Node, Key) of
not_found ->
@ -341,20 +298,11 @@ lookup(#index{file=File, root=Node, bloom=Bloom}, Key) ->
end.
lookup_in_node(_File,#node{level=0,members=Members}, Key) ->
find_in_leaf(Key,Members);
lookup_in_node(File,#node{members=Members},Key) when is_binary(Members) ->
case vbisect:find_geq(Key,Members) of
{ok, _Key, <<?TAG_POSLEN32, Pos:64, Size:32>>} ->
% io:format("FOUND ~p @ ~p~n", [_Key, {Pos,Size}]),
case read_node(File,{Pos,Size}) of
{ok, Node} ->
lookup_in_node(File, Node, Key);
eof ->
not_found
end;
none ->
not_found
case lists:keyfind(Key,1,Members) of
false ->
not_found;
{_,Value} ->
{ok, Value}
end;
lookup_in_node(File,#node{members=Members},Key) ->
@ -376,8 +324,8 @@ lookup_in_node(File,#node{members=Members},Key) ->
end),
try plain_rpc:call(PID, read)
catch
Class:Ex:Stacktrace ->
error_logger:error_msg("crashX: ~p:~p ~p~n", [Class,Ex,Stacktrace]),
Class:Ex ->
error_logger:error_msg("crashX: ~p:~p ~p~n", [Class,Ex,erlang:get_stacktrace()]),
not_found
end;
@ -469,29 +417,3 @@ next_leaf_node(File) ->
next_leaf_node(File)
end.
find_in_leaf(Key,Bin) when is_binary(Bin) ->
case vbisect:find(Key,Bin) of
{ok, BinValue} ->
{ok, decode_binary_value(BinValue)};
error ->
not_found
end;
find_in_leaf(Key,List) when is_list(List) ->
case lists:keyfind(Key, 1, List) of
{_, Value} ->
{ok, Value};
false ->
not_found
end.
decode_binary_value(<<?TAG_KV_DATA, Value/binary>>) ->
Value;
decode_binary_value(<<?TAG_KV_DATA2, TStamp:32, Value/binary>>) ->
{Value, TStamp};
decode_binary_value(<<?TAG_DELETED>>) ->
?TOMBSTONE;
decode_binary_value(<<?TAG_DELETED2, TStamp:32>>) ->
{?TOMBSTONE, TStamp};
decode_binary_value(<<?TAG_POSLEN32, Pos:64, Len:32>>) ->
{Pos, Len}.

View file

@ -38,25 +38,20 @@
, tstamp/0
, expiry_time/1
, has_expired/1
, ensure_expiry/1
, bloom_type/1
, bloom_new/2
, bloom_to_bin/1
, bin_to_bloom/1
, bin_to_bloom/2
, bloom_insert/2
, bloom_contains/2
]).
, ensure_expiry/1 ]).
-include("src/hanoidb.hrl").
-define(ERLANG_ENCODED, 131).
-define(CRC_ENCODED, 127).
-define(BISECT_ENCODED, 126).
-define(FILE_ENCODING, hanoi2).
-define(TAG_KV_DATA, 16#80).
-define(TAG_DELETED, 16#81).
-define(TAG_POSLEN32, 16#82).
-define(TAG_TRANSACT, 16#83).
-define(TAG_KV_DATA2, 16#84).
-define(TAG_DELETED2, 16#85).
-define(TAG_END, 16#FF).
-compile({inline, [crc_encapsulate/1, crc_encapsulate_kv_entry/2 ]}).
@ -139,47 +134,18 @@ uncompress(<<?GZIP_COMPRESSION, Data/binary>>) ->
zlib:gunzip(Data).
encode_index_node(KVList, Method) ->
TermData =
case ?FILE_ENCODING of
bisect ->
Binary = vbisect:from_orddict(lists:map(fun binary_encode_kv/1, KVList)),
CRC = erlang:crc32(Binary),
[?BISECT_ENCODED, <<CRC:32>>, Binary];
hanoi2 ->
[ ?TAG_END |
lists:map(fun ({Key,Value}) ->
crc_encapsulate_kv_entry(Key, Value)
end,
KVList) ]
end,
TermData = [ ?TAG_END |
lists:map(fun ({Key,Value}) ->
crc_encapsulate_kv_entry(Key, Value)
end,
KVList) ],
{MethodName, OutData} = compress(Method, TermData),
{ok, [MethodName | OutData]}.
decode_index_node(Level, Data) ->
TermData = uncompress(Data),
case decode_kv_list(TermData) of
{ok, KVList} ->
{ok, {node, Level, KVList}};
{bisect, Binary} ->
% io:format("[page level=~p~n", [Level]),
% vbisect:foldl(fun(K,V,_) -> io:format(" ~p -> ~p,~n", [K,V]) end, 0, Binary),
% io:format("]~n",[]),
{ok, {node, Level, Binary}}
end.
binary_encode_kv({Key, {Value,infinity}}) ->
binary_encode_kv({Key,Value});
binary_encode_kv({Key, {?TOMBSTONE, TStamp}}) ->
{Key, <<?TAG_DELETED2, TStamp:32>>};
binary_encode_kv({Key, ?TOMBSTONE}) ->
{Key, <<?TAG_DELETED>>};
binary_encode_kv({Key, {Value, TStamp}}) when is_binary(Value) ->
{Key, <<?TAG_KV_DATA2, TStamp:32, Value/binary>>};
binary_encode_kv({Key, Value}) when is_binary(Value)->
{Key, <<?TAG_KV_DATA, Value/binary>>};
binary_encode_kv({Key, {Pos, Len}}) when Len < 16#ffffffff ->
{Key, <<?TAG_POSLEN32, Pos:64/unsigned, Len:32/unsigned>>}.
{ok, KVList} = decode_kv_list(TermData),
{ok, {node, Level, KVList}}.
-spec crc_encapsulate_kv_entry(binary(), expvalue()) -> iolist().
@ -218,14 +184,7 @@ decode_kv_list(<<?TAG_END, Custom/binary>>) ->
decode_kv_list(<<?ERLANG_ENCODED, _/binary>>=TermData) ->
{ok, erlang:term_to_binary(TermData)};
decode_kv_list(<<?CRC_ENCODED, Custom/binary>>) ->
decode_crc_data(Custom, [], []);
decode_kv_list(<<?BISECT_ENCODED, CRC:32/unsigned, Binary/binary>>) ->
CRCTest = erlang:crc32( Binary ),
if CRC == CRCTest ->
{bisect, Binary};
true ->
{bisect, vbisect:from_orddict([])}
end.
decode_crc_data(Custom, [], []).
-spec decode_crc_data(binary(), list(), list()) -> {ok, [kventry()]} | {partial, [kventry()], iolist()}.
decode_crc_data(<<>>, [], Acc) ->
@ -298,50 +257,12 @@ ensure_expiry(Opts) ->
undefined ->
try exit(err)
catch
exit:err:Stacktrace ->
io:format(user, "~p~n", [Stacktrace])
exit:err ->
io:format(user, "~p~n", [erlang:get_stacktrace()])
end,
exit(expiry_secs_not_set);
N when N >= 0 ->
ok
end.
bloom_type({ebloom, _}) ->
ebloom;
bloom_type({sbloom, _}) ->
sbloom.
bloom_new(Size, sbloom) ->
{ok, {sbloom, hanoidb_bloom:bloom(Size, 0.01)}};
bloom_new(Size, ebloom) ->
{ok, Bloom} = ebloom:new(Size, 0.01, Size),
{ok, {ebloom, Bloom}}.
bloom_to_bin({sbloom, Bloom}) ->
hanoidb_bloom:encode(Bloom);
bloom_to_bin({ebloom, Bloom}) ->
ebloom:serialize(Bloom).
bin_to_bloom(GZiped = <<16#1F, 16#8B, _/binary>>) ->
bin_to_bloom(GZiped, sbloom);
bin_to_bloom(TermBin = <<131, _/binary>>) ->
erlang:term_to_binary(TermBin);
bin_to_bloom(Blob) ->
bin_to_bloom(Blob, ebloom).
bin_to_bloom(Binary, sbloom) ->
{ok, {sbloom, hanoidb_bloom:decode(Binary)}};
bin_to_bloom(Binary, ebloom) ->
{ok, Bloom} = ebloom:deserialize(Binary),
{ok, {ebloom, Bloom}}.
bloom_insert({sbloom, Bloom}, Key) ->
{ok, {sbloom, hanoidb_bloom:add(Key, Bloom)}};
bloom_insert({ebloom, Bloom}, Key) ->
ok = ebloom:insert(Bloom, Key),
{ok, {ebloom, Bloom}}.
bloom_contains({sbloom, Bloom}, Key) ->
hanoidb_bloom:member(Key, Bloom);
bloom_contains({ebloom, Bloom}, Key) ->
ebloom:contains(Bloom, Key).

View file

@ -55,9 +55,9 @@
name :: string(),
bloom :: {ebloom, term()} | {sbloom, term()},
bloom :: term(),
block_size = ?NODE_SIZE :: integer(),
compress = none :: none | snappy | gzip | lz4,
compress = none :: none | snappy | gzip, % | lz4,
opts = [] :: list(any()),
value_count = 0 :: integer(),
@ -94,7 +94,7 @@ init([Name, Options]) ->
case do_open(Name, Options, [exclusive]) of
{ok, IdxFile} ->
ok = file:write(IdxFile, ?FILE_FORMAT),
{ok, Bloom} = ?BLOOM_NEW(Size),
Bloom = hanoidb_bloom:bloom(Size),
BlockSize = hanoidb:get_opt(block_size, Options, ?NODE_SIZE),
{ok, #state{ name=Name,
index_file_pos=?FIRST_BLOCK_POS, index_file=IdxFile,
@ -170,11 +170,11 @@ serialize(#state{ bloom=Bloom, index_file=File, index_file_pos=Position }=State)
exit({bad_position, Position, WrongPosition})
end,
ok = file:close(File),
erlang:term_to_binary( { State#state{ index_file=undefined, bloom=undefined }, ?BLOOM_TO_BIN(Bloom), hanoidb_util:bloom_type(Bloom) } ).
erlang:term_to_binary( { State#state{ index_file=undefined }, hanoidb_bloom:encode(Bloom) } ).
deserialize(Binary) ->
{State, Bin, Type} = erlang:binary_to_term(Binary),
{ok, Bloom} = ?BIN_TO_BLOOM(Bin, Type),
{State, Bin} = erlang:binary_to_term(Binary),
Bloom = hanoidb_bloom:decode(Bin),
{ok, IdxFile} = do_open(State#state.name, State#state.opts, []),
State#state{ bloom=Bloom, index_file=IdxFile }.
@ -188,8 +188,7 @@ do_open(Name, Options, OpenOpts) ->
%% @doc flush pending nodes and write trailer
archive_nodes(#state{ nodes=[], last_node_pos=LastNodePos, last_node_size=_LastNodeSize, bloom=Bloom, index_file=IdxFile }=State) ->
BloomBin = ?BLOOM_TO_BIN(Bloom),
true = is_binary(BloomBin),
BloomBin = hanoidb_bloom:encode(Bloom),
BloomSize = byte_size(BloomBin),
RootPos =
case LastNodePos of
@ -200,12 +199,12 @@ archive_nodes(#state{ nodes=[], last_node_pos=LastNodePos, last_node_size=_LastN
_ ->
LastNodePos
end,
Trailer = [ << 0:32/unsigned>> , BloomBin, << BloomSize:32/unsigned, RootPos:64/unsigned >> ],
Trailer = << 0:32/unsigned, BloomBin/binary, BloomSize:32/unsigned, RootPos:64/unsigned >>,
ok = file:write(IdxFile, Trailer),
ok = file:datasync(IdxFile),
ok = file:close(IdxFile),
{ok, State#state{ index_file=undefined, index_file_pos=undefined, bloom=undefined }};
{ok, State#state{ index_file=undefined, index_file_pos=undefined }};
archive_nodes(State=#state{ nodes=[#node{level=N, members=[{_,{Pos,_Len}}]}], last_node_pos=Pos })
when N > 0 ->
@ -222,8 +221,7 @@ append_node(Level, Key, Value, State=#state{ nodes=[] }) ->
append_node(Level, Key, Value, State=#state{ nodes=[ #node{level=Level2 } |_]=Stack })
when Level < Level2 ->
append_node(Level, Key, Value, State#state{ nodes=[ #node{ level=(Level2 - 1) } | Stack] });
append_node(Level, Key, Value, #state{ nodes=[ #node{level=Level, members=List, size=NodeSize}=CurrNode | RestNodes ], value_count=VC, tombstone_count=TC, bloom=Bloom }=State)
when Bloom /= undefined ->
append_node(Level, Key, Value, #state{ nodes=[ #node{level=Level, members=List, size=NodeSize}=CurrNode | RestNodes ], value_count=VC, tombstone_count=TC, bloom=Bloom }=State) ->
%% The top-of-stack node is at the level we wish to insert at.
%% Assert that keys are increasing:
@ -238,14 +236,10 @@ append_node(Level, Key, Value, #state{ nodes=[ #node{level=Level, members=List,
exit({badarg, Key})
end
end,
NewSize = NodeSize + hanoidb_util:estimate_node_size_increment(List, Key, Value),
{ok,Bloom2} = case Level of
0 ->
?BLOOM_INSERT(Bloom, Key);
_ ->
{ok,Bloom}
end,
NewBloom = hanoidb_bloom:add(Key, Bloom),
{TC1, VC1} =
case Level of
@ -264,7 +258,7 @@ append_node(Level, Key, Value, #state{ nodes=[ #node{level=Level, members=List,
NodeMembers = [{Key, Value} | List],
State2 = State#state{ nodes=[CurrNode#node{members=NodeMembers, size=NewSize} | RestNodes],
value_count=VC1, tombstone_count=TC1, bloom=Bloom2 },
value_count=VC1, tombstone_count=TC1, bloom=NewBloom },
case NewSize >= State#state.block_size of
true ->
@ -273,8 +267,7 @@ append_node(Level, Key, Value, #state{ nodes=[ #node{level=Level, members=List,
{ok, State2}
end.
flush_node_buffer(#state{nodes=[#node{ level=Level, members=NodeMembers }|RestNodes], compress=Compress, index_file_pos=NodePos } = State) ->
flush_node_buffer(#state{nodes=[#node{ level=Level, members=NodeMembers }|RestNodes], compress=Compress, index_file_pos=NodePos} = State) ->
OrderedMembers = lists:reverse(NodeMembers),
{ok, BlockData} = hanoidb_util:encode_index_node(OrderedMembers, Compress),

View file

@ -1,280 +0,0 @@
%% ----------------------------------------------------------------------------
%%
%% hanoidb: LSM-trees (Log-Structured Merge Trees) Indexed Storage
%%
%% Copyright 2014 (c) Trifork A/S. All Rights Reserved.
%% http://trifork.com/ info@trifork.com
%%
%% This file is provided to you under the Apache License, Version 2.0 (the
%% "License"); you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
%% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
%% License for the specific language governing permissions and limitations
%% under the License.
%%
%% ----------------------------------------------------------------------------
-module(vbisect).
-export([from_orddict/1,
from_gb_tree/1,
to_gb_tree/1,
first_key/1,
find/2, find_geq/2,
foldl/3, foldr/3, fold_until_stop/3,
to_orddict/1,
merge/3]).
-define(MAGIC, "vbis").
-type key() :: binary().
-type value() :: binary().
-type bindict() :: binary().
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
-spec from_gb_tree(gb_trees:tree()) -> bindict().
from_gb_tree({Count,Node}) when Count =< 16#ffffffff ->
{_BinSize,IOList} = encode_gb_node(Node),
erlang:iolist_to_binary([ <<?MAGIC, Count:32/unsigned >> | IOList ]).
encode_gb_node({Key, Value, Smaller, Bigger}) when is_binary(Key), is_binary(Value) ->
{BinSizeSmaller, IOSmaller} = encode_gb_node(Smaller),
{BinSizeBigger, IOBigger} = encode_gb_node(Bigger),
KeySize = byte_size(Key),
ValueSize = byte_size(Value),
{ 2 + KeySize
+ 4 + ValueSize
+ 4 + BinSizeSmaller
+ BinSizeBigger,
[ << KeySize:16, Key/binary,
BinSizeSmaller:32 >>, IOSmaller,
<< ValueSize:32, Value/binary >> | IOBigger ] };
encode_gb_node(nil) ->
{ 0, [] }.
to_gb_tree(<<?MAGIC, Count:32, Nodes/binary >>) ->
{ Count, to_gb_node(Nodes) }.
to_gb_node( <<>> ) ->
nil;
to_gb_node( << KeySize:16, Key:KeySize/binary,
BinSizeSmaller:32, Smaller:BinSizeSmaller/binary,
ValueSize:32, Value:ValueSize/binary,
Bigger/binary >> ) ->
{Key, Value,
to_gb_node(Smaller),
to_gb_node(Bigger)}.
-spec find(Key::key(), Dict::bindict()) ->
{ ok, value() } | error.
find(Key, <<?MAGIC, _:32, Binary/binary>>) ->
find_node(byte_size(Key), Key, Binary).
find_node(KeySize, Key, <<HereKeySize:16, HereKey:HereKeySize/binary,
BinSizeSmaller:32, _:BinSizeSmaller/binary,
ValueSize:32, Value:ValueSize/binary,
_/binary>> = Bin) ->
if
Key < HereKey ->
Skip = 6 + HereKeySize,
<< _:Skip/binary, Smaller:BinSizeSmaller/binary, _/binary>> = Bin,
find_node(KeySize, Key, Smaller);
HereKey < Key ->
Skip = 10 + HereKeySize + BinSizeSmaller + ValueSize,
<< _:Skip/binary, Bigger/binary>> = Bin,
find_node(KeySize, Key, Bigger);
true ->
{ok, Value}
end;
find_node(_, _, <<>>) ->
error.
to_orddict(BinDict) ->
foldr(fun(Key,Value,Acc) ->
[{Key,Value}|Acc]
end,
[],
BinDict).
merge(Fun, BinDict1, BinDict2) ->
OD1 = to_orddict(BinDict1),
OD2 = to_orddict(BinDict2),
OD3 = orddict:merge(Fun, OD1, OD2),
from_orddict(OD3).
-spec first_key( bindict() ) -> binary() | none.
first_key(BinDict) ->
{_, Key} = fold_until_stop(fun({K,_},_) -> {stop, K} end, none, BinDict),
Key.
%% @doc Find largest {K,V} where K is smaller than or equal to key.
%% This is good for an inner node where key is the smallest key
%% in the child node.
-spec find_geq(Key::binary(), Binary::binary()) ->
none | {ok, Key::key(), Value::value()}.
find_geq(Key, <<?MAGIC, _:32, Binary/binary>>) ->
find_geq_node(byte_size(Key), Key, Binary, none).
find_geq_node(_, _, <<>>, Else) ->
Else;
find_geq_node(KeySize, Key, <<HereKeySize:16, HereKey:HereKeySize/binary,
BinSizeSmaller:32, _:BinSizeSmaller/binary,
ValueSize:32, Value:ValueSize/binary,
_/binary>> = Bin, Else) ->
if
Key < HereKey ->
Skip = 6 + HereKeySize,
<< _:Skip/binary, Smaller:BinSizeSmaller/binary, _/binary>> = Bin,
find_geq_node(KeySize, Key, Smaller, Else);
HereKey < Key ->
Skip = 10 + HereKeySize + BinSizeSmaller + ValueSize,
<< _:Skip/binary, Bigger/binary>> = Bin,
find_geq_node(KeySize, Key, Bigger, {ok, HereKey, Value});
true ->
{ok, HereKey, Value}
end.
-spec foldl(fun((Key::key(), Value::value(), Acc::term()) -> term()), term(), bindict()) ->
term().
foldl(Fun, Acc, <<?MAGIC, _:32, Binary/binary>>) ->
foldl_node(Fun, Acc, Binary).
foldl_node(_Fun, Acc, <<>>) ->
Acc;
foldl_node(Fun, Acc, <<KeySize:16, Key:KeySize/binary,
BinSizeSmaller:32, Smaller:BinSizeSmaller/binary,
ValueSize:32, Value:ValueSize/binary,
Bigger/binary>>) ->
Acc1 = foldl_node(Fun, Acc, Smaller),
Acc2 = Fun(Key, Value, Acc1),
foldl_node(Fun, Acc2, Bigger).
-spec fold_until_stop(function(), term(), bindict()) -> {stopped, term()} | {ok, term()}.
fold_until_stop(Fun, Acc, <<?MAGIC, _:32, Bin/binary>>) ->
fold_until_stop2(Fun, {continue, Acc}, Bin).
fold_until_stop2(_Fun,{stop,Result},_) ->
{stopped, Result};
fold_until_stop2(_Fun,{continue, Acc},<<>>) ->
{ok, Acc};
fold_until_stop2(Fun,{continue, Acc}, <<KeySize:16, Key:KeySize/binary,
BinSizeSmaller:32, Smaller:BinSizeSmaller/binary,
ValueSize:32, Value:ValueSize/binary,
Bigger/binary>>) ->
case fold_until_stop2(Fun, {continue, Acc}, Smaller) of
{stopped, Result} ->
{stopped, Result};
{ok, Acc1} ->
ContinueOrStopAcc = Fun({Key,Value}, Acc1),
fold_until_stop2(Fun, ContinueOrStopAcc, Bigger)
end.
-spec foldr(fun((Key::key(), Value::value(), Acc::term()) -> term()), term(), bindict()) ->
term().
foldr(Fun, Acc, <<?MAGIC, _:32, Binary/binary>>) ->
foldr_node(Fun, Acc, Binary).
foldr_node(_Fun, Acc, <<>>) ->
Acc;
foldr_node(Fun, Acc, <<KeySize:16, Key:KeySize/binary,
BinSizeSmaller:32, Smaller:BinSizeSmaller/binary,
ValueSize:32, Value:ValueSize/binary,
Bigger/binary>>) ->
Acc1 = foldr_node(Fun, Acc, Bigger),
Acc2 = Fun(Key, Value, Acc1),
foldr_node(Fun, Acc2, Smaller).
from_orddict(OrdDict) ->
from_gb_tree(gb_trees:from_orddict(OrdDict)).
-ifdef(TEST).
speed_test_() ->
{timeout, 600,
fun() ->
Start = 100000000000000,
N = 100000,
Keys = lists:seq(Start, Start+N),
KeyValuePairs = lists:map(fun (I) -> {<<I:64/integer>>, <<255:8/integer>>} end,
Keys),
%% Will mostly be unique, if N is bigger than 10000
ReadKeys = [<<(lists:nth(random:uniform(N), Keys)):64/integer>> || _ <- lists:seq(1, 1000)],
B = from_orddict(KeyValuePairs),
time_reads(B, N, ReadKeys)
end}.
geq_test() ->
B = from_orddict([{<<2>>,<<2>>},{<<4>>,<<4>>},{<<6>>,<<6>>},{<<122>>,<<122>>}]),
none = find_geq(<<1>>, B),
{ok, <<2>>, <<2>>} = find_geq(<<2>>, B),
{ok, <<2>>, <<2>>} = find_geq(<<3>>, B),
{ok, <<4>>, <<4>>} = find_geq(<<5>>, B),
{ok, <<6>>, <<6>>} = find_geq(<<100>>, B),
{ok, <<122>>, <<122>>} = find_geq(<<150>>, B),
true.
time_reads(B, Size, ReadKeys) ->
Parent = self(),
spawn(
fun() ->
Runs = 20,
Timings =
lists:map(
fun (_) ->
StartTime = now(),
find_many(B, ReadKeys),
timer:now_diff(now(), StartTime)
end, lists:seq(1, Runs)),
Rps = 1000000 / ((lists:sum(Timings) / length(Timings)) / 1000),
error_logger:info_msg("Average over ~p runs, ~p keys in dict~n"
"Average fetch ~p keys: ~p us, max: ~p us~n"
"Average fetch 1 key: ~p us~n"
"Theoretical sequential RPS: ~w~n",
[Runs, Size, length(ReadKeys),
lists:sum(Timings) / length(Timings),
lists:max(Timings),
(lists:sum(Timings) / length(Timings)) / length(ReadKeys),
trunc(Rps)]),
Parent ! done
end),
receive done -> ok after 1000 -> ok end.
-spec find_many(bindict(), [key()]) -> non_neg_integer().
find_many(B, Keys) ->
lists:foldl(fun (K, N) ->
case find(K, B) of
{ok, _} -> N+1;
error -> N
end
end,
0, Keys).
-endif.

View file

@ -25,6 +25,8 @@
%% @doc Drive a set of LSM BTrees
-module(hanoidb_drv).
-ifdef(QC_PROPER).
-behaviour(gen_server).
%% API
@ -141,3 +143,4 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-endif. %% -ifdef(QC_PROPER).

View file

@ -24,6 +24,8 @@
-module(hanoidb_merger_tests).
-ifdef(QC_PROPER).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
@ -61,3 +63,4 @@ merge_test() ->
ok.
-endif. %% -ifdef(QC_PROPER).

View file

@ -24,6 +24,8 @@
-module(hanoidb_tests).
-ifdef(QC_PROPER).
-include("include/hanoidb.hrl").
-include("src/hanoidb.hrl").
@ -47,15 +49,9 @@
next_state/3, postcondition/3,
precondition/2]).
-ifdef(pre18).
-define(OTP_DICT, dict()).
-else.
-define(OTP_DICT, dict:dict()).
-endif.
-record(tree, { elements = dict:new() :: ?OTP_DICT }).
-record(state, { open = dict:new() :: ?OTP_DICT,
closed = dict:new() :: ?OTP_DICT}).
-record(tree, { elements = dict:new() :: dict() }).
-record(state, { open = dict:new() :: dict(),
closed = dict:new() :: dict()}).
-define(SERVER, hanoidb_drv).
full_test_() ->
@ -67,23 +63,15 @@ full_test_() ->
?_test(test_tree_simple_5())
]}.
longer_tree_test_() ->
longer_test_() ->
{setup,
spawn,
fun () -> ok end,
fun (_) -> ok end,
[
{timeout, 300, ?_test(test_tree())}
]}.
longer_qc_test_() ->
{setup,
spawn,
fun () -> ok end,
fun (_) -> ok end,
[
{timeout, 120, ?_test(test_qc())}
]}.
spawn,
fun () -> ok end,
fun (_) -> ok end,
[
{timeout, 300, ?_test(test_tree())},
{timeout, 120, ?_test(test_qc())}
]}.
-ifdef(TRIQ).
test_qc() ->
@ -440,3 +428,4 @@ dict_range_query(Dict, Range) ->
[{K, V} || {K, V} <- dict:to_list(Dict),
?KEY_IN_RANGE(K, Range)].
-endif. %% -ifdef(QC_PROPER).

View file

@ -24,6 +24,8 @@
-module(hanoidb_writer_tests).
-ifdef(QC_PROPER).
-ifdef(TEST).
-ifdef(TEST).
-ifdef(TRIQ).
@ -62,9 +64,9 @@ simple_test() ->
simple1_test() ->
file:delete("testdata"),
{ok, BT} = hanoidb_writer:open("testdata", [{block_size, 102},{expiry_secs, 0}]),
{ok, BT} = hanoidb_writer:open("testdata", [{block_size, 1024},{expiry_secs, 0}]),
Max = 102,
Max = 1024,
Seq = lists:seq(0, Max),
{Time1,_} = timer:tc(
@ -78,16 +80,15 @@ simple1_test() ->
end,
[]),
error_logger:info_msg("time to insert: ~p/sec~n", [1000000/(Time1/Max)]),
% error_logger:info_msg("time to insert: ~p/sec~n", [1000000/(Time1/Max)]),
{ok, IN} = hanoidb_reader:open("testdata", [{expiry_secs,0}]),
Middle = Max div 2,
io:format("LOOKING UP ~p~n", [<<Middle:128>>]),
{ok, <<"valuevalue/", Middle:128>>} = hanoidb_reader:lookup(IN, <<Middle:128>>),
{Time2,Count} = timer:tc(
fun() -> hanoidb_reader:fold(fun(_Key, <<"valuevalue/", N:128>>, N) ->
fun() -> hanoidb_reader:fold(fun(Key, <<"valuevalue/", Key/binary>>, N) ->
N+1
end,
0,
@ -95,13 +96,12 @@ simple1_test() ->
end,
[]),
io:format("time to scan: ~p/sec~n", [1000000/(Time2 div Max)]),
% error_logger:info_msg("time to scan: ~p/sec~n", [1000000/(Time2/Max)]),
Max = Count-1,
{Time3,{done,Count2}} = timer:tc(
fun() -> hanoidb_reader:range_fold(fun(_Key, <<"valuevalue/", N:128>>, N) ->
% io:format("[~p]~n", N),
fun() -> hanoidb_reader:range_fold(fun(Key, <<"valuevalue/", Key/binary>>, N) ->
N+1
end,
0,
@ -110,13 +110,12 @@ simple1_test() ->
end,
[]),
% error_logger:info_msg("time to range_fold: ~p/sec~n", [1000000/(Time3/Max)]),
%error_logger:info_msg("time to range_fold: ~p/sec~n", [1000000/(Time3 div Max)]),
io:format("count2=~p~n", [Count2]),
% error_logger:info_msg("count2=~p~n", [Count2]),
Max = Count2-1,
ok = hanoidb_reader:close(IN).
-endif. %% -ifdef(QC_PROPER).

View file

@ -1,83 +0,0 @@
#!/bin/bash
## ----------------------------------------------------------------------------
##
## hanoi: LSM-trees (Log-Structured Merge Trees) Indexed Storage
##
## Copyright 2011-2012 (c) Trifork A/S. All Rights Reserved.
## http://trifork.com/ info@trifork.com
##
## Copyright 2012 (c) Basho Technologies, Inc. All Rights Reserved.
## http://basho.com/ info@basho.com
##
## This file is provided to you under the Apache License, Version 2.0 (the
## "License"); you may not use this file except in compliance with the License.
## You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
## WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
## License for the specific language governing permissions and limitations
## under the License.
##
## ----------------------------------------------------------------------------
function periodic() {
t=0
while sleep 1 ; do
let "t=t+1"
printf "%5d [" "$t"
for ((i=0; i<35; i++)) ; do
if ! [ -f "A-$i.data" ] ; then
echo -n " "
elif ! [ -f "B-$i.data" ] ; then
echo -n "-"
elif ! [ -f "C-$i.data" ] ; then
echo -n "#"
elif ! [ -f "X-$i.data" ] ; then
echo -n "="
else
echo -n "*"
fi
done
echo
done
}
function dynamic() {
local old s t start now
t=0
start=`date +%s`
while true ; do
s=""
for ((i=0; i<35; i++)) ; do
if ! [ -f "A-$i.data" ] ; then
s="$s "
elif ! [ -f "B-$i.data" ] ; then
s="$s-"
elif ! [ -f "C-$i.data" ] ; then
s="$s="
elif ! [ -f "X-$i.data" ] ; then
s="$s%"
else
s="$s*"
fi
done
if [[ "$s" != "$old" ]] ; then
let "t=t+1"
now=`date +%s`
let "now=now-start"
printf "%5d %6d [%s\n" "$t" "$now" "$s"
old="$s"
else
# Sleep a little bit:
perl -e 'use Time::HiRes; Time::HiRes::usleep(100000)'
fi
done
}
dynamic

View file

@ -48,13 +48,13 @@ function periodic() {
}
merge_diff() {
SA=`stat -c %s A-${ID}.data 2> /dev/null`
SB=`stat -c %s B-${ID}.data 2> /dev/null`
SX=`stat -c %s X-${ID}.data 2> /dev/null`
SA=`ls -l A-${ID}.data 2> /dev/null | awk '{print $5}'`
SB=`ls -l B-${ID}.data 2> /dev/null | awk '{print $5}'`
SX=`ls -l X-${ID}.data 2> /dev/null | awk '{print $5}'`
if [ \( -n "$SA" \) -a \( -n "$SB" \) -a \( -n "$SX" \) ]; then
export RES=`expr ${SX:-0} / \( $SA + $SB \)`
export RES=`expr ${SX}0 / \( $SA + $SB \)`
else
export RES=""
export RES="?"
fi
}
@ -64,7 +64,7 @@ function dynamic() {
start=`date +%s`
while true ; do
s=""
for ((i=10; i<22; i++)) ; do
for ((i=8; i<22; i++)) ; do
if [ -f "C-$i.data" ] ; then
s="${s}C"
else

View file

@ -1,66 +0,0 @@
#!/usr/bin/env bash
merge_diff() {
SA=`stat -c %s A-${ID}.data 2> /dev/null`
SB=`stat -c %s B-${ID}.data 2> /dev/null`
SX=`stat -c %s X-${ID}.data 2> /dev/null`
if [ \( -n "$SA" \) -a \( -n "$SB" \) -a \( -n "$SX" \) ]; then
export RES="$(expr ${SX:-0} / \( $SA + $SB \))"
else
export RES=0
fi
if [[ $RES -eq 0 ]]; then
export RES="⋈"
fi
}
function dynamic() {
local old s t start now
t=0
start=`date +%s`
while true ; do
s=""
for ((i=10; i<22; i++)) ; do
if [ -f "C-$i.data" ] ; then
s="${s}"
else
s="$s "
fi
if [ -f "B-$i.data" ] ; then
s="${s}"
else
s="$s "
fi
if [ -f "A-$i.data" ] ; then
s="${s}"
else
s="$s "
fi
if [ -f "X-$i.data" ] ; then
export ID="$i"
merge_diff
s="${s}$RES"
elif [ -f "M-$i.data" ] ; then
s="${s}M"
else
s="$s "
fi
s="$s|"
done
if [[ "$s" != "$old" ]] ; then
let "t=t+1"
now=`date +%s`
let "now=now-start"
free=`df -m . 2> /dev/null | tail -1 | awk '{print $4}'`
used=`du -m 2> /dev/null | awk '{print $1}' `
printf "%5d %6d [%s\n" "$t" "$now" "$s ${used}MB (${free}MB free)"
old="$s"
else
# Sleep a little bit:
sleep 1
fi
done
}
dynamic

View file

@ -0,0 +1,122 @@
%% ----------------------------------------------------------------------------
%%
%% hanoidb: LSM-trees (Log-Structured Merge Trees) Indexed Storage
%%
%% Copyright 2011-2012 (c) Trifork A/S. All Rights Reserved.
%% http://trifork.com/ info@trifork.com
%%
%% Copyright 2012 (c) Basho Technologies, Inc. All Rights Reserved.
%% http://basho.com/ info@basho.com
%%
%% This file is provided to you under the Apache License, Version 2.0 (the
%% "License"); you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
%% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
%% License for the specific language governing permissions and limitations
%% under the License.
%%
%% ----------------------------------------------------------------------------
-module(basho_bench_driver_hanoidb).
-record(state, { tree :: hanoidb:hanoidb(),
filename :: string(),
flags :: list(),
sync_interval :: integer() | infinity,
last_sync :: erlang:timestamp() }).
-export([new/1,
run/4]).
-include("hanoidb.hrl").
-include_lib("basho_bench/include/basho_bench.hrl").
-record(key_range, { from_key = <<>> :: binary(),
from_inclusive = true :: boolean(),
to_key :: binary() | undefined,
to_inclusive = false :: boolean(),
limit :: pos_integer() | undefined }).
%% ====================================================================
%% API
%% ====================================================================
new(_Id) ->
%% Make sure bitcask is available
case code:which(hanoidb) of
non_existing ->
?FAIL_MSG("~s requires hanoidb to be available on code path.\n",
[?MODULE]);
_ ->
ok
end,
%% Get the target directory
Dir = basho_bench_config:get(hanoidb_dir, "."),
Filename = filename:join(Dir, "test.hanoidb"),
Config = basho_bench_config:get(hanoidb_flags, []),
%% Look for sync interval config
SyncInterval =
case basho_bench_config:get(hanoidb_sync_interval, infinity) of
Value when is_integer(Value) ->
Value;
infinity ->
infinity
end,
%% Get any bitcask flags
case hanoidb:open(Filename, Config) of
{error, Reason} ->
?FAIL_MSG("Failed to open hanoidb in ~s: ~p\n", [Filename, Reason]);
{ok, FBTree} ->
{ok, #state { tree = FBTree,
filename = Filename,
sync_interval = SyncInterval,
last_sync = os:timestamp() }}
end.
run(get, KeyGen, _ValueGen, State) ->
case hanoidb:lookup(State#state.tree, KeyGen()) of
{ok, _Value} ->
{ok, State};
not_found ->
{ok, State};
{error, Reason} ->
{error, Reason}
end;
run(put, KeyGen, ValueGen, State) ->
case hanoidb:put(State#state.tree, KeyGen(), ValueGen()) of
ok ->
{ok, State};
{error, Reason} ->
{error, Reason}
end;
run(delete, KeyGen, _ValueGen, State) ->
case hanoidb:delete(State#state.tree, KeyGen()) of
ok ->
{ok, State};
{error, Reason} ->
{error, Reason}
end;
run(fold_100, KeyGen, _ValueGen, State) ->
[From,To] = lists:usort([KeyGen(), KeyGen()]),
case hanoidb:fold_range(State#state.tree,
fun(_Key,_Value,Count) ->
Count+1
end,
0,
#key_range{ from_key=From,
to_key=To,
limit=100 }) of
Count when Count >= 0; Count =< 100 ->
{ok,State};
Count ->
{error, {bad_fold_count, Count}}
end.