Compare commits

...

39 commits

Author SHA1 Message Date
bc750ed98e triangles 2024-07-19 06:34:40 -04:00
1959f973b2 added another visualization 2024-07-18 05:25:22 -04:00
f09e514d30 more fixes 2024-07-18 05:07:20 -04:00
994b28a6a4 fixes 2024-07-18 04:25:32 -04:00
80b5b85e81 personal work 2024-07-17 05:34:43 -04:00
b69015a72d Nix flake, shell 2024-07-17 05:25:23 -04:00
Sean Cribbs
0d651700ac Update to work on OTP 26
* erlang:stacktrace() has been deprecated a long time
* Update deps to OTP 26-compatible versions
* Add rebar3 lockfile
2024-07-17 05:08:34 -04:00
Kresten Krab Thorup
68333fa51a Merge pull request #37 from a13x/master
Fixing the building of hanoidb for R18
2016-03-26 02:03:12 +01:00
Aleksandar Radulovic
32b40f25f9 Fix building of the lib
Fix sext and edown dependency
Fix array() type defs

Change-Id: Ibc2986b690fb81ef9d757fa7b8ba20a994bc8fa2
2016-03-23 15:25:57 +01:00
Kresten Krab Thorup
4e82d5f81a Add license for vbisect 2014-11-28 16:25:48 +01:00
Kresten Krab Thorup
875459dcd2 Improve tests/dialyzing. 2014-11-28 16:22:20 +01:00
Kresten Krab Thorup
198da5ef5f Use new on-disk page format
This updates moves from kvlist to on-disk bisect
format (https://github.com/krestenkrab/vbisect)
which means that tree pages are not deserialised
when read form disk.  This improves performance
of GETs by ~20% and improves top-level latencies
significantly.

CRC32 checks are now on the page level.
2014-11-28 16:21:59 +01:00
Kresten Krab Thorup
f4feca27e5 Use ebloom by default
This commit changes the default bloom filter to
be basho’s bloom, which is significantly more
stable w.r.t. performance.  The code can still
read the old bloom filters; new files are written
with the new filters.

The default is controlled in src/hanoidb.hrl
using the USE_EBLOOM macro.
2014-11-28 16:15:20 +01:00
Kresten Krab Thorup
a1bbadfb34 Update travis to include 17 2014-11-26 14:00:22 +01:00
Kresten Krab Thorup
810d183770 Fix crash in recovery code 2014-11-26 13:59:36 +01:00
Kresten Krab Thorup
1715c29364 Remove some lock contention when fsync’ing 2014-11-26 13:57:47 +01:00
Kresten Krab Thorup
0fa8ab4a02 Fix up the ensure_started code 2014-11-26 13:21:53 +01:00
Kresten Krab Thorup
752f9f5b62 Fix min/max level for recovery
Previous commit f0d24894c5
introduced a bug in recovery code.
2014-11-26 12:51:01 +01:00
Kresten Krab Thorup
71c4243701 Don’t open exclusive when re-opening a merge file 2014-11-26 12:44:50 +01:00
Kresten Krab Thorup
3b6b0ce197 Support sys messages in hanoidb_merger processes 2014-11-26 11:52:53 +01:00
Kresten Krab Thorup
f0d24894c5 Make the top_level parameter configurable. 2014-11-21 00:41:40 +01:00
Kresten Krab Thorup
4ba712512d Add travis build status button 2014-11-20 23:11:51 +01:00
Kresten Krab Thorup
70936cd51f Make tests run again
We had a bad regression with #31 due to the
tests not being run automatically.
2014-11-20 23:08:14 +01:00
Kresten Krab Thorup
74c9bf6154 Swap arguments. Fixes issue #31 2014-11-20 22:42:14 +01:00
Kresten Krab Thorup
ea88b32d4e Merge pull request #30 from brigadier/patch-2
add plain_fsm app in the 'applications' section of .app.src
2014-11-20 12:49:53 +01:00
Evgeny M.
7595560572 add plain_fsm app in the 'applications' section of .app.src
plain_fsm must be started prior to hanoidb
2014-11-20 14:01:13 +03:00
Kresten Krab Thorup
bac8a8620f Leave the tests out as per @essen’s comments
resolves issue #29
2014-11-19 00:07:33 +01:00
Kresten Krab Thorup
d0620f241f Make default “all” target really do all.
This fixes issue #29
2014-11-18 22:04:28 +01:00
Kresten Krab Thorup
2b95bfcf49 Merge branch 'master' of github.com:krestenkrab/hanoidb 2014-11-17 11:03:23 +01:00
Kresten Krab Thorup
5acd8cbc7a Add travis build file 2014-11-17 10:57:25 +01:00
Kresten Krab Thorup
3b776a7bf1 Merge pull request #28 from brigadier/patch-1
Update Makefile
2014-11-14 19:34:22 +01:00
Evgeny M.
59c6d47bf7 Update Makefile
update makefile to make it compatible with erlang.mk
2014-11-14 20:47:38 +03:00
Kresten Krab Thorup
697683240d Make ebloom / scalable bloom a compile-time option
bloom (from basho) uses a NIF implementation which
may be faster and use less memory; test before use.

The “old” bloom, a.k.a. “scalable bloom” 
is pure erlang, which also has nice properties. 

Note!  Switching to ebloom changes the file format
so the default is still to use ‘scalable bloom’.
2014-10-21 15:03:53 +01:00
Kresten Krab Thorup
813863c9c3 Merge pull request #27 from eryx67/master
fix memory leak
2014-08-11 12:36:58 +02:00
eryx67
b97de217ed fix memory leak 2014-08-11 16:17:17 +06:00
Kresten Krab Thorup
e58d2577a0 Update dependency for snappy 1.0 -> 1.1 2014-04-03 00:00:33 +02:00
Kresten Krab Thorup
6bde50dd73 Trying to build the build representation fails.
More specifically

   encode(decode(encode(...)))

fails for dense (small) bloom filter sizes.
2014-04-02 23:57:23 +02:00
Kresten Krab Thorup
ce3a7049f2 Fix plain bug 2014-04-02 22:39:43 +02:00
Kresten Krab Thorup
e37623a84c Fix argument order for plain_fsm:handle_system_msg
plain_fsm documentation was wrong https://github.com/uwiger/plain_fsm/pull/4, so now it should be able to send sys messages
to hanoi.
2014-02-14 08:25:26 -05:00
33 changed files with 1241 additions and 381 deletions

5
.envrc Normal file
View file

@ -0,0 +1,5 @@
if ! has nix_direnv_version || ! nix_direnv_version 3.0.4; then
source_url "https://raw.githubusercontent.com/nix-community/nix-direnv/3.0.4/direnvrc" "sha256-DzlYZ33mWF/Gs8DDeyjr8mnVmQGx7ASYqA5WlxwvBG4="
fi
watch_file devShell.nix shell.nix flake.nix
use flake || use nix

3
.gitignore vendored
View file

@ -3,3 +3,6 @@ deps
*~
.eunit
.project
_build
.direnv

8
.travis.yml Normal file
View file

@ -0,0 +1,8 @@
language: erlang
otp_release:
- R16B03
- R15B03
- 17.0
- 18.0

View file

@ -1,12 +1,12 @@
REBAR= rebar
REBAR= rebar3
DIALYZER= dialyzer
.PHONY: plt analyze all deps compile get-deps clean
all: compile
all: get-deps compile
deps: get-deps
deps: get-deps compile
get-deps:
@$(REBAR) get-deps
@ -31,13 +31,18 @@ clean-test-btrees:
plt: compile
$(DIALYZER) --build_plt --output_plt .hanoi.plt \
-pa deps/snappy/ebin \
-pa deps/snappy/ebin \
-pa deps/lz4/ebin \
-pa deps/ebloom/ebin \
-pa deps/plain_fsm/ebin \
deps/plain_fsm/ebin \
--apps kernel stdlib
--apps erts kernel stdlib ebloom lz4 snappy
analyze: compile
$(DIALYZER) --plt .hanoi.plt \
-pa deps/snappy/ebin \
-pa deps/lz4/ebin \
-pa deps/ebloom/ebin \
-pa deps/plain_fsm/ebin \
ebin

View file

@ -1,5 +1,7 @@
# HanoiDB Indexed Key/Value Storage
[![Build Status](https://travis-ci.org/krestenkrab/hanoidb.svg?branch=master)](https://travis-ci.org/krestenkrab/hanoidb)
HanoiDB implements an indexed, key/value storage engine. The primary index is
a log-structured merge tree (LSM-BTree) implemented using "doubling sizes"
persistent ordered sets of key/value pairs, similar is some regards to
@ -80,7 +82,13 @@ Put these values in your `app.config` in the `hanoidb` section
%% Both have same log2(N) worst case, but `fast' is
%% sometimes faster; yielding latency fluctuations.
%%
{merge_strategy, fast | predictable}
{merge_strategy, fast | predictable},
%% "Level0" files has 2^N KVs in it, defaulting to 1024.
%% If the database is to contain very small KVs, this is
%% likely too small, and will result in many unnecessary
%% file operations. (Subsequent levels double in size).
{top_level, 10} % 1024 Key/Values
]},
```

1
TODO
View file

@ -1,5 +1,4 @@
* Phase 1: Minimum viable product (in order of priority)
* lager; check for uses of lager:error/2
* configurable TOP_LEVEL size
* test new snappy compression support
* status and statistics

61
flake.lock Normal file
View file

@ -0,0 +1,61 @@
{
"nodes": {
"nixpkgs": {
"locked": {
"lastModified": 1701282334,
"narHash": "sha256-MxCVrXY6v4QmfTwIysjjaX0XUhqBbxTWWB4HXtDYsdk=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "057f9aecfb71c4437d2b27d3323df7f93c010b7e",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "23.11",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"nixpkgs": "nixpkgs",
"utils": "utils"
}
},
"systems": {
"locked": {
"lastModified": 1681028828,
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
"owner": "nix-systems",
"repo": "default",
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
"type": "github"
},
"original": {
"owner": "nix-systems",
"repo": "default",
"type": "github"
}
},
"utils": {
"inputs": {
"systems": "systems"
},
"locked": {
"lastModified": 1710146030,
"narHash": "sha256-SZ5L6eA7HJ/nmkzGG7/ISclqe6oZdOZTNoesiInkXPQ=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "b1d9ab70662946ef0850d488da1c9019f3a9752a",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "flake-utils",
"type": "github"
}
}
},
"root": "root",
"version": 7
}

35
flake.nix Normal file
View file

@ -0,0 +1,35 @@
{
description = "A Helm Chart for OpenLDAP.";
inputs = {
# nixpkgs.url = "github:NixOS/nixpkgs/nixpkgs-unstable";
nixpkgs.url = "github:NixOS/nixpkgs/23.11";
utils.url = "github:numtide/flake-utils";
};
outputs =
{ self, nixpkgs, ... }@inputs:
inputs.utils.lib.eachSystem
[
"x86_64-linux"
"i686-linux"
"aarch64-linux"
"x86_64-darwin"
]
(
system:
let
pkgs = import nixpkgs {
inherit system;
overlays = [ ];
config.allowUnfree = true;
};
in
{
flake-utils.inputs.systems.follows = "system";
formatter = with pkgs; [ nixfmt ];
devShells.default = import ./shell.nix { inherit pkgs; };
DOCKER_BUILDKIT = 1;
}
);
}

View file

@ -4,7 +4,7 @@
{eunit_opts, [verbose, {report, {eunit_surefire, [{dir, "."}]}}]}.
{erl_opts, [%{d,'DEBUG',true},
{parse_transform, lager_transform},
%{d,'USE_EBLOOM',true},
fail_on_warning,
warn_unused_vars,
warn_export_all,
@ -20,18 +20,20 @@
warn_untyped_record,
% warn_missing_spec,
% strict_validation,
{platform_define, "^R|17", pre18},
debug_info]}.
{xref_checks, [undefined_function_calls]}.
{deps, [ {sext, ".*", {git, "git://github.com/esl/sext", {branch, "master"}}}
, {lager, ".*", {git, "git://github.com/basho/lager", {branch, "master"}}}
, {snappy, "1.0.*", {git, "git://github.com/fdmanana/snappy-erlang-nif.git", {branch, "master"}}}
, {plain_fsm, "1.1.*", {git, "git://github.com/gburd/plain_fsm", {branch, "master"}}}
, {basho_bench, ".*", {git, "git://github.com/basho/basho_bench", {branch, "master"}}}
, {triq, ".*", {git, "git://github.com/krestenkrab/triq", {branch, "master"}}}
, {lz4, ".*", {git, "git://github.com/krestenkrab/erlang-lz4.git", {branch, "master"}}}
% , {edown, "0.3.*", {git, "git://github.com/esl/edown.git", {branch, "master"}}}
{deps, [ {sext, ".*", {git, "https://github.com/uwiger/sext.git", {branch, "master"}}}
%% , {snappy, "1.*", {git, "https://github.com/fdmanana/snappy-erlang-nif.git", {branch, "master"}}}
, {plain_fsm, "1.*", {git, "https://github.com/gburd/plain_fsm.git", {branch, "master"}}}
%% , {basho_bench, ".*", {git, "https://github.com/basho/basho_bench.git", {branch, "master"}}}
%% , {ebloom, ".*", {git, "https://github.com/basho/ebloom.git", {branch, "develop"}}}
%% , {bloomerl, ".*", {git, "https://github.com/gburd/bloomerl.git", {branch, "master"}}}
, {lz4, ".*", {git, "https://github.com/krestenkrab/erlang-lz4.git", {branch, "master"}}}
, {triq, ".*", {git, "https://github.com/gburd/triq.git", {branch, "master"}}}
% , {edown, "0.3.*", {git, "git://github.com/uwiger/edown.git", {branch, "master"}}}
% , {asciiedoc, "0.1.*", {git, "git://github.com/norton/asciiedoc.git", {branch, "master"}}}
% , {triq, ".*", {git, "git://github.com/krestenkrab/triq.git", {branch, "master"}}}
% , {proper, ".*", {git, "git://github.com/manopapad/proper.git", {branch, "master"}}}

16
rebar.lock Normal file
View file

@ -0,0 +1,16 @@
[{<<"lz4">>,
{git,"https://github.com/krestenkrab/erlang-lz4.git",
{ref,"5fd90ca1e2345bdc359ee43d958da87fafb4fd78"}},
0},
{<<"plain_fsm">>,
{git,"https://github.com/gburd/plain_fsm.git",
{ref,"6421158d742956836dfa39fca857422afcf56419"}},
0},
{<<"sext">>,
{git,"https://github.com/uwiger/sext.git",
{ref,"c22486add9cc374dc8138b1f547c0999a1922a65"}},
0},
{<<"triq">>,
{git,"https://github.com/gburd/triq.git",
{ref,"5d4b98e8323eec70aff474a578a1e5ebe9495e70"}},
0}].

62
shell.nix Normal file
View file

@ -0,0 +1,62 @@
{
pkgs ? import <nixpkgs> { },
}:
with pkgs;
mkShell rec {
name = "hanoidb";
packages = with pkgs; [
bashInteractive
gigalixir
inotify-tools
libnotify
];
buildInputs = [
# basics
curl
git
openssh
ripgrep
shellcheck
erlang
erlang-ls
rebar3
erlfmt
# BEAM support
#beam.interpreters.erlangR26
#rebar3
#beam.packages.erlangR26.elixir_1_15
#nodejs-18_x
# elixir-typst support
#pkgs.iconv
# rust support
#cargo
];
shellHook =
let
icon = "f121";
in
''
export PS1="$(echo -e '\u${icon}') {\[$(tput sgr0)\]\[\033[38;5;228m\]\w\[$(tput sgr0)\]\[\033[38;5;15m\]} (${name}) \\$ \[$(tput sgr0)\]"
alias gitc="git -c user.email=greg@burd.me commit --gpg-sign=22931AF7895E82DF ."
# allows mix to work on the local directory
mkdir -p .nix-mix
mkdir -p .nix-hex
export MIX_HOME=$PWD/.nix-mix
export HEX_HOME=$PWD/.nix-hex
export PATH=$MIX_HOME/bin:$PATH
export PATH=$HEX_HOME/bin:$PATH
export LANG=en_US.UTF-8
export ERL_AFLAGS="-kernel shell_history enabled"
'';
}
# NOTES:
# * https://github.com/fbettag/nixos-erlang-flake/blob/main/flake.nix
# * https://discourse.nixos.org/t/installing-different-versions-of-elixir-and-erlang/35765/5
# * https://github.com/fbettag/nixos-erlang-flake/blob/main/flake.nix

View file

@ -1,122 +0,0 @@
%% ----------------------------------------------------------------------------
%%
%% hanoidb: LSM-trees (Log-Structured Merge Trees) Indexed Storage
%%
%% Copyright 2011-2012 (c) Trifork A/S. All Rights Reserved.
%% http://trifork.com/ info@trifork.com
%%
%% Copyright 2012 (c) Basho Technologies, Inc. All Rights Reserved.
%% http://basho.com/ info@basho.com
%%
%% This file is provided to you under the Apache License, Version 2.0 (the
%% "License"); you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
%% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
%% License for the specific language governing permissions and limitations
%% under the License.
%%
%% ----------------------------------------------------------------------------
-module(basho_bench_driver_hanoidb).
-record(state, { tree :: hanoidb:hanoidb(),
filename :: string(),
flags :: list(),
sync_interval :: integer() | infinity,
last_sync :: erlang:timestamp() }).
-export([new/1,
run/4]).
-include("hanoidb.hrl").
-include_lib("basho_bench/include/basho_bench.hrl").
-record(key_range, { from_key = <<>> :: binary(),
from_inclusive = true :: boolean(),
to_key :: binary() | undefined,
to_inclusive = false :: boolean(),
limit :: pos_integer() | undefined }).
%% ====================================================================
%% API
%% ====================================================================
new(_Id) ->
%% Make sure bitcask is available
case code:which(hanoidb) of
non_existing ->
?FAIL_MSG("~s requires hanoidb to be available on code path.\n",
[?MODULE]);
_ ->
ok
end,
%% Get the target directory
Dir = basho_bench_config:get(hanoidb_dir, "."),
Filename = filename:join(Dir, "test.hanoidb"),
Config = basho_bench_config:get(hanoidb_flags, []),
%% Look for sync interval config
SyncInterval =
case basho_bench_config:get(hanoidb_sync_interval, infinity) of
Value when is_integer(Value) ->
Value;
infinity ->
infinity
end,
%% Get any bitcask flags
case hanoidb:open(Filename, Config) of
{error, Reason} ->
?FAIL_MSG("Failed to open hanoidb in ~s: ~p\n", [Filename, Reason]);
{ok, FBTree} ->
{ok, #state { tree = FBTree,
filename = Filename,
sync_interval = SyncInterval,
last_sync = os:timestamp() }}
end.
run(get, KeyGen, _ValueGen, State) ->
case hanoidb:lookup(State#state.tree, KeyGen()) of
{ok, _Value} ->
{ok, State};
not_found ->
{ok, State};
{error, Reason} ->
{error, Reason}
end;
run(put, KeyGen, ValueGen, State) ->
case hanoidb:put(State#state.tree, KeyGen(), ValueGen()) of
ok ->
{ok, State};
{error, Reason} ->
{error, Reason}
end;
run(delete, KeyGen, _ValueGen, State) ->
case hanoidb:delete(State#state.tree, KeyGen()) of
ok ->
{ok, State};
{error, Reason} ->
{error, Reason}
end;
run(fold_100, KeyGen, _ValueGen, State) ->
[From,To] = lists:usort([KeyGen(), KeyGen()]),
case hanoidb:fold_range(State#state.tree,
fun(_Key,_Value,Count) ->
Count+1
end,
0,
#key_range{ from_key=From,
to_key=To,
limit=100 }) of
Count when Count >= 0; Count =< 100 ->
{ok,State};
Count ->
{error, {bad_fold_count, Count}}
end.

View file

@ -5,7 +5,7 @@
% author: http://erlang.2086793.n4.nabble.com/gb-trees-fold-td2228614.html
-spec fold(fun((term(), term(), term()) -> term()), term(), gb_tree()) -> term().
-spec fold(fun((term(), term(), term()) -> term()), term(), gb_trees:tree()) -> term().
fold(F, A, {_, T})
when is_function(F, 3) ->
fold_1(F, A, T).

View file

@ -24,12 +24,13 @@
{application, hanoidb,
[
{description, ""},
{description, "LSM-tree tiered storage engine with amortized merging"},
{vsn, "1.3.0"},
{registered, []},
{applications, [
kernel,
stdlib
stdlib,
plain_fsm
]},
{mod, {hanoidb_app, []}},
{env, []}

View file

@ -70,6 +70,7 @@
| {sync_strategy, none | sync | {seconds, pos_integer()}}
| {expiry_secs, non_neg_integer()}
| {spawn_opt, list()}
| {top_level, pos_integer()}
.
%% @doc
@ -209,9 +210,9 @@ receive_fold_range(MRef,PID,Fun,Acc0, Limit) ->
try
{ok, Fun(K,V,Acc0)}
catch
Class:Exception ->
Class:Exception:Stacktrace ->
% TODO ?log("Exception in hanoidb fold: ~p ~p", [Exception, erlang:get_stacktrace()]),
{'EXIT', Class, Exception, erlang:get_stacktrace()}
{'EXIT', Class, Exception, Stacktrace}
end
of
{ok, Acc1} ->
@ -278,24 +279,26 @@ init([Dir, Opts0]) ->
end,
hanoidb_util:ensure_expiry(Opts),
{Nursery, MaxLevel, TopLevel} =
{Top, Nur, Max} =
case file:read_file_info(Dir) of
{ok, #file_info{ type=directory }} ->
{ok, TL, ML} = open_levels(Dir, Opts),
{ok, N0} = hanoidb_nursery:recover(Dir, TL, ML, Opts),
{N0, ML, TL};
{ok, TopLevel, MinLevel, MaxLevel} = open_levels(Dir, Opts),
{ok, Nursery} = hanoidb_nursery:recover(Dir, TopLevel, MinLevel, MaxLevel, Opts),
{TopLevel, Nursery, MaxLevel};
{error, E} when E =:= enoent ->
ok = file:make_dir(Dir),
{ok, TL} = hanoidb_level:open(Dir, ?TOP_LEVEL, undefined, Opts, self()),
ML = ?TOP_LEVEL,
{ok, N0} = hanoidb_nursery:new(Dir, ML, Opts),
{N0, ML, TL}
end,
{ok, #state{ top=TopLevel, dir=Dir, nursery=Nursery, opt=Opts, max_level=MaxLevel }}.
MinLevel = get_opt(top_level, Opts0, ?TOP_LEVEL),
{ok, TopLevel} = hanoidb_level:open(Dir, MinLevel, undefined, Opts, self()),
MaxLevel = MinLevel,
{ok, Nursery} = hanoidb_nursery:new(Dir, MinLevel, MaxLevel, Opts),
{TopLevel, Nursery, MaxLevel}
end,
{ok, #state{ top=Top, dir=Dir, nursery=Nur, opt=Opts, max_level=Max }}.
open_levels(Dir, Options) ->
{ok, Files} = file:list_dir(Dir),
TopLevel0 = get_opt(top_level, Options, ?TOP_LEVEL),
%% parse file names and find max level
{MinLevel, MaxLevel} =
@ -308,7 +311,7 @@ open_levels(Dir, Options) ->
{MinLevel, MaxLevel}
end
end,
{?TOP_LEVEL, ?TOP_LEVEL},
{TopLevel0, TopLevel0},
Files),
%% remove old nursery data file
@ -323,17 +326,17 @@ open_levels(Dir, Options) ->
{Level, MergeWork}
end,
{undefined, 0},
lists:seq(MaxLevel, min(?TOP_LEVEL, MinLevel), -1)),
WorkPerIter = (MaxLevel - MinLevel + 1) * ?BTREE_SIZE(?TOP_LEVEL),
lists:seq(MaxLevel, MinLevel, -1)),
WorkPerIter = (MaxLevel - MinLevel + 1) * ?BTREE_SIZE(MinLevel),
% error_logger:info_msg("do_merge ... {~p,~p,~p}~n", [TopLevel, WorkPerIter, MaxMerge]),
do_merge(TopLevel, WorkPerIter, MaxMerge),
{ok, TopLevel, MaxLevel}.
do_merge(TopLevel, WorkPerIter, MaxMerge, MinLevel),
{ok, TopLevel, MinLevel, MaxLevel}.
do_merge(TopLevel, _Inc, N) when N =< 0 ->
do_merge(TopLevel, _Inc, N, _MinLevel) when N =< 0 ->
ok = hanoidb_level:await_incremental_merge(TopLevel);
do_merge(TopLevel, Inc, N) ->
ok = hanoidb_level:begin_incremental_merge(TopLevel, ?BTREE_SIZE(?TOP_LEVEL)),
do_merge(TopLevel, Inc, N-Inc).
do_merge(TopLevel, Inc, N, MinLevel) ->
ok = hanoidb_level:begin_incremental_merge(TopLevel, ?BTREE_SIZE(MinLevel)),
do_merge(TopLevel, Inc, N-Inc, MinLevel).
parse_level(FileName) ->
@ -413,7 +416,8 @@ handle_call(close, _From, State=#state{ nursery=undefined }) ->
handle_call(close, _From, State=#state{ nursery=Nursery, top=Top, dir=Dir, max_level=MaxLevel, opt=Config }) ->
try
ok = hanoidb_nursery:finish(Nursery, Top),
{ok, Nursery2} = hanoidb_nursery:new(Dir, MaxLevel, Config),
MinLevel = hanoidb_level:level(Top),
{ok, Nursery2} = hanoidb_nursery:new(Dir, MinLevel, MaxLevel, Config),
ok = hanoidb_level:close(Top),
{stop, normal, ok, State#state{ nursery=Nursery2 }}
catch
@ -423,9 +427,10 @@ handle_call(close, _From, State=#state{ nursery=Nursery, top=Top, dir=Dir, max_l
end;
handle_call(destroy, _From, State=#state{top=Top, nursery=Nursery }) ->
TopLevelNumber = hanoidb_level:level(Top),
ok = hanoidb_nursery:destroy(Nursery),
ok = hanoidb_level:destroy(Top),
{stop, normal, ok, State#state{ top=undefined, nursery=undefined, max_level=?TOP_LEVEL }}.
{stop, normal, ok, State#state{ top=undefined, nursery=undefined, max_level=TopLevelNumber }}.
-spec do_put(key(), value(), expiry(), #state{}) -> {ok, #state{}}.
do_put(Key, Value, Expiry, State=#state{ nursery=Nursery, top=Top }) when Nursery =/= undefined ->
@ -443,7 +448,12 @@ do_transact(TransactionSpec, State=#state{ nursery=Nursery, top=Top }) ->
{ok, State#state{ nursery=Nursery2 }}.
start_app() ->
case application:start(?MODULE) of
ok = ensure_started(syntax_tools),
ok = ensure_started(plain_fsm),
ok = ensure_started(?MODULE).
ensure_started(Application) ->
case application:start(Application) of
ok ->
ok;
{error, {already_started, _}} ->

View file

@ -23,7 +23,7 @@
%% ----------------------------------------------------------------------------
%% smallest levels are 256 entries
%% smallest levels are 1024 entries
-define(TOP_LEVEL, 8).
-define(BTREE_SIZE(Level), (1 bsl (Level))).
-define(FILE_FORMAT, <<"HAN2">>).
@ -48,12 +48,20 @@
-define(KEY_IN_RANGE(Key,Range),
(?KEY_IN_FROM_RANGE(Key,Range) andalso ?KEY_IN_TO_RANGE(Key,Range))).
-ifdef(pre18).
-define(TIMESTAMP, now()).
-else.
-define(TIMESTAMP, erlang:timestamp()).
-endif.
-record(nursery, { log_file :: file:fd(),
dir :: string(),
cache :: gb_tree(),
cache :: gb_trees:tree(binary(), binary()),
total_size=0 :: integer(),
count=0 :: integer(),
last_sync=now() :: erlang:timestamp(),
last_sync=?TIMESTAMP :: erlang:timestamp(),
min_level :: integer(),
max_level :: integer(),
config=[] :: [{atom(), term()}],
step=0 :: integer(),
@ -69,3 +77,25 @@
| value()
| filepos().
-ifdef(USE_EBLOOM).
-define(HANOI_BLOOM_TYPE, ebloom).
-else.
-define(HANOI_BLOOM_TYPE, sbloom).
-endif.
-define(BLOOM_NEW(Size), hanoidb_util:bloom_new(Size, ?HANOI_BLOOM_TYPE)).
-define(BLOOM_TO_BIN(Bloom), hanoidb_util:bloom_to_bin(Bloom)).
-define(BIN_TO_BLOOM(Bin, Fmt), hanoidb_util:bin_to_bloom(Bin, Fmt)).
-define(BLOOM_INSERT(Bloom, Key), hanoidb_util:bloom_insert(Bloom, Key)).
-define(BLOOM_CONTAINS(Bloom, Key), hanoidb_util:bloom_contains(Bloom, Key)).
%% tags used in the on-disk representation
-define(TAG_KV_DATA, 16#80).
-define(TAG_DELETED, 16#81).
-define(TAG_POSLEN32, 16#82).
-define(TAG_TRANSACT, 16#83).
-define(TAG_KV_DATA2, 16#84).
-define(TAG_DELETED2, 16#85).
-define(TAG_END, 16#FF).

View file

@ -1,4 +1,4 @@
%% The contents of this file are subject to the Erlang Public License, Version
% The contents of this file are subject to the Erlang Public License, Version
%% 1.1, (the "License"); you may not use this file except in compliance with
%% the License. You should have received a copy of the Erlang Public License
%% along with this software. If not, it can be retrieved via the world wide web
@ -41,7 +41,11 @@
-define(W, 27).
-ifdef(pre18).
-type bitmask() :: array() | any().
-else.
-type bitmask() :: arrays:array() | any().
-endif.
-record(bloom, {
e :: float(), % error probability
@ -204,6 +208,7 @@ bitmask_build(BM) ->
case element(1,BM) of
array -> BM;
sparse_bitmap -> BM;
dense_bitmap -> BM;
dense_bitmap_ets -> hanoidb_dense_bitmap:build(BM)
end.
@ -215,16 +220,25 @@ bitmask_get(I, BM) ->
dense_bitmap -> hanoidb_dense_bitmap:member(I, BM)
end.
-ifdef(pre18).
-spec as_array(bitmask()) -> array().
-else.
-spec as_array(bitmask()) -> arrays:array().
-endif.
as_array(BM) ->
case array:is_array(BM) of
true -> BM
end.
%%%========== Bitarray representation - suitable for sparse arrays ==========
bitarray_new(N) -> array:new((N-1) div ?W + 1, {default, 0}).
%% bitarray_new(N) -> array:new((N-1) div ?W + 1, {default, 0}).
-ifdef(pre18).
-spec bitarray_set( non_neg_integer(), array() ) -> array().
-else.
-spec bitarray_set( non_neg_integer(), arrays:array() ) -> arrays:array().
-endif.
bitarray_set(I, A1) ->
A = as_array(A1),
AI = I div ?W,
@ -234,7 +248,11 @@ bitarray_set(I, A1) ->
true -> array:set(AI, V1, A)
end.
-ifdef(pre18).
-spec bitarray_get( non_neg_integer(), array() ) -> boolean().
-else.
-spec bitarray_get( non_neg_integer(), arrays:array() ) -> boolean().
-endif.
bitarray_get(I, A) ->
AI = I div ?W,
V = array:get(AI, A),

View file

@ -79,10 +79,10 @@ start(SendTo) ->
initialize(#state{sendto=SendTo, sendto_ref=MRef}, []),
?log("fold_worker done ~p~n", [self()])
catch
Class:Ex ->
?log("fold_worker exception ~p:~p ~p~n", [Class, Ex, erlang:get_stacktrace()]),
error_logger:error_msg("Unexpected: ~p:~p ~p~n", [Class, Ex, erlang:get_stacktrace()]),
exit({bad, Class, Ex, erlang:get_stacktrace()})
Class:Ex:Stacktrace ->
?log("fold_worker exception ~p:~p ~p~n", [Class, Ex, Stacktrace]),
error_logger:error_msg("Unexpected: ~p:~p ~p~n", [Class, Ex, Stacktrace]),
exit({bad, Class, Ex, Stacktrace})
end
end,
PID = plain_fsm:spawn(?MODULE, F),
@ -102,7 +102,7 @@ initialize(State, PrefixFolders) ->
%% gen_fsm handling
{system, From, Req} ->
plain_fsm:handle_system_msg(
From, Req, State, fun(S1) -> initialize(S1, PrefixFolders) end);
Req, From, State, fun(S1) -> initialize(S1, PrefixFolders) end);
{'DOWN', MRef, _, _, _} when MRef =:= State#state.sendto_ref ->
ok;
@ -164,7 +164,7 @@ fill_from_inbox(State, Values, Queues, [PID|_]=PIDs, SavePIDs) ->
%% gen_fsm handling
{system, From, Req} ->
plain_fsm:handle_system_msg(
From, Req, State, fun(S1) -> fill_from_inbox(S1, Values, Queues, PIDs, SavePIDs) end);
Req, From, State, fun(S1) -> fill_from_inbox(S1, Values, Queues, PIDs, SavePIDs) end);
{'DOWN', MRef, _, _, _} when MRef =:= State#state.sendto_ref ->
ok;
@ -231,5 +231,3 @@ data_vsn() ->
code_change(_OldVsn, _State, _Extra) ->
{ok, {#state{}, data_vsn()}}.

View file

@ -46,7 +46,7 @@
-export([open/5, lookup/2, lookup/3, inject/2, close/1, snapshot_range/3, blocking_range/3,
begin_incremental_merge/2, await_incremental_merge/1, set_max_level/2,
unmerged_count/1, destroy/1]).
unmerged_count/1, destroy/1, level/1]).
-include_lib("kernel/include/file.hrl").
@ -90,6 +90,9 @@ open(Dir,Level,Next,Opts,Owner) when Level>0 ->
SpawnOpt),
{ok, PID}.
level(Ref) ->
plain_rpc:call(Ref, level).
lookup(Ref, Key) ->
plain_rpc:call(Ref, {lookup, Key}).
@ -156,9 +159,9 @@ initialize(State) ->
_Result = initialize2(State),
?log(" ** terminated ~p", [_Result])
catch
Class:Ex when not (Class == exit andalso Ex == normal) ->
?log("crashing ~p:~p ~p~n", [Class,Ex,erlang:get_stacktrace()]),
error_logger:error_msg("crash: ~p:~p ~p~n", [Class,Ex,erlang:get_stacktrace()])
Class:Ex:Stacktrace when not (Class == exit andalso Ex == normal) ->
?log("crashing ~p:~p ~p~n", [Class,Ex,Stacktrace]),
error_logger:error_msg("crash: ~p:~p ~p~n", [Class,Ex,Stacktrace])
end.
initialize2(State) ->
@ -356,6 +359,11 @@ main_loop(State = #state{ next=Next }) ->
->
do_step(StepFrom, DoneWork, StepSize, State);
%% simply replies the level number
?CALL(From, level) ->
plain_rpc:send_reply(From, State#state.level),
main_loop(State);
{MRef, step_done} when MRef == State#state.step_merge_ref ->
demonitor(MRef, [flush]),
@ -403,6 +411,7 @@ main_loop(State = #state{ next=Next }) ->
%% and we have finished the incremental merge at this level
State2 = reply_step_ok(State),
erlang:demonitor(MRef, [flush]),
main_loop(State2#state{ step_next_ref=undefined });
?CALL(From, close) ->
@ -632,7 +641,7 @@ main_loop(State = #state{ next=Next }) ->
%% gen_fsm handling
{system, From, Req} ->
plain_fsm:handle_system_msg(
From, Req, State, fun(S1) -> main_loop(S1) end);
Req, From, State, fun(S1) -> main_loop(S1) end);
{'EXIT', Parent, Reason} ->
plain_fsm:parent_EXIT(Reason, State);
@ -775,33 +784,20 @@ begin_merge(State) ->
AFileName = filename("A",State),
BFileName = filename("B",State),
XFileName = filename("X",State),
Owner = self(),
?log("starting merge~n", []),
file:delete(XFileName),
MergePID = proc_lib:spawn_link(fun() ->
try
?log("merge begun~n", []),
{ok, OutCount} = hanoidb_merger:merge(AFileName, BFileName, XFileName,
?BTREE_SIZE(State#state.level + 1),
State#state.next =:= undefined,
State#state.opts ),
Owner ! ?CAST(self(),{merge_done, OutCount, XFileName})
catch
C:E ->
error_logger:error_msg("merge failed ~p:~p ~p~n",
[C,E,erlang:get_stacktrace()]),
erlang:raise(C,E,erlang:get_stacktrace())
end
end),
MergePID = hanoidb_merger:start(AFileName, BFileName, XFileName,
?BTREE_SIZE(State#state.level + 1),
State#state.next =:= undefined,
State#state.opts),
{ok, MergePID}.
close_and_delete_a_and_b(State) ->
AFileName = filename("A",State),
BFileName = filename("B",State),
@ -835,7 +831,8 @@ start_range_fold(FileName, WorkerPID, Range, State) ->
ok
catch
Class:Ex ->
io:format(user, "BAD: ~p:~p ~p~n", [Class,Ex,erlang:get_stacktrace()])
try throw(42) catch _:_:Stk -> io:format(user, "BAD: ~p:~p ~p~n", [Class,Ex,Stk]) end
end
end ),
{ok, PID}.

35
src/hanoidb_load.erl Normal file
View file

@ -0,0 +1,35 @@
-module(hanoidb_load).
-export([run/1]).
run(Dir) ->
case hanoidb:open(filename:join(Dir, "test.hanoidb"), []) of
{error, Reason} ->
{error, Reason};
{ok, Tree} -> fill_db_timed(Tree, 15 * 60 * 1000)
end.
%% fill_db(Tree) -> fill_db(Tree, 5000).
%% fill_db(Tree, 0) -> hanoidb:close(Tree);
%% fill_db(Tree, N) ->
%% Letter = N rem 26 + $a,
%% Length = rand:uniform(100),
%% Key = << <<Letter/utf8>> || _ <- lists:seq(0, Length) >>,
%% ok = hanoidb:put(Tree, Key, Key),
%% fill_db(Tree, N - 1).
fill_db_timed(Tree, Timeout) ->
erlang:send_after(Timeout, self(), stop),
fill_db_loop(Tree, 0).
fill_db_loop(Tree, N) ->
receive
stop ->
ok
after 0 ->
Key = crypto:strong_rand_bytes(2000),
Letter = N rem 26 + $a,
Length = rand:uniform(100),
Value = << <<Letter/utf8>> || _ <- lists:seq(0, Length) >>,
ok = hanoidb:put(Tree, Key, Value),
fill_db_loop(Tree, N+1)
end.

View file

@ -28,9 +28,10 @@
%% @doc Merging two Indexes
-export([merge/6]).
-export([start/6, merge/6]).
-include("hanoidb.hrl").
-include("include/plain_rpc.hrl").
%% A merger which is inactive for this long will sleep which means that it will
%% close open files, and compress the current bloom filter.
@ -40,6 +41,27 @@
%% merges, so we default to running the entire merge in one process.
-define(LOCAL_WRITER, true).
-spec start(string(), string(), string(), integer(), boolean(), list()) -> pid().
start(A,B,X, Size, IsLastLevel, Options) ->
Owner = self(),
plain_fsm:spawn_link(?MODULE, fun() ->
try
{ok, OutCount} = hanoidb_merger:merge(A, B, X,
Size,
IsLastLevel,
Options),
Owner ! ?CAST(self(),{merge_done, OutCount, X})
catch
C:E:Stacktrace ->
%% this semi-bogus code makes sure we always get a stack trace if merging fails
error_logger:error_msg("~p: merge failed ~p:~p ~p -> ~s~n",
[self(), C,E,Stacktrace, X]),
erlang:raise(C,E,Stacktrace)
end
end).
-spec merge(string(), string(), string(), integer(), boolean(), list()) -> {ok, integer()}.
merge(A,B,C, Size, IsLastLevel, Options) ->
{ok, IXA} = hanoidb_reader:open(A, [sequential|Options]),
@ -47,19 +69,19 @@ merge(A,B,C, Size, IsLastLevel, Options) ->
{ok, Out} = hanoidb_writer:init([C, [{size, Size} | Options]]),
AKVs =
case hanoidb_reader:first_node(IXA) of
{node, AKV} -> AKV;
{kvlist, AKV} -> AKV;
none -> []
end,
BKVs =
case hanoidb_reader:first_node(IXB) of
{node, BKV} ->BKV;
{kvlist, BKV} ->BKV;
none -> []
end,
scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {0, none}).
terminate(Out) ->
{ok, Count, _} = hanoidb_writer:handle_call(count, self(), Out),
{stop, normal, ok, _} = hanoidb_writer:handle_call(close, self(), Out),
{ok, Count, Out1} = hanoidb_writer:handle_call(count, self(), Out),
{stop, normal, ok, _Out2} = hanoidb_writer:handle_call(close, self(), Out1),
{ok, Count}.
step(S) ->
@ -72,13 +94,77 @@ hibernate_scan(Keep) ->
erlang:garbage_collect(),
receive
{step, From, HowMany} ->
{IXA, IXB, Out, IsLastLevel, AKVs, BKVs, N} = erlang:binary_to_term(zlib:gunzip(Keep)),
{IXA, IXB, Out, IsLastLevel, AKVs, BKVs, N} = erlang:binary_to_term(Keep),
scan(hanoidb_reader:deserialize(IXA),
hanoidb_reader:deserialize(IXB),
hanoidb_writer:deserialize(Out),
IsLastLevel, AKVs, BKVs, {N+HowMany, From})
IsLastLevel, AKVs, BKVs, {N+HowMany, From});
%% gen_fsm handling
{system, From, Req} ->
plain_fsm:handle_system_msg(
Req, From, Keep, fun hibernate_scan/1);
{'EXIT', Parent, Reason} ->
case plain_fsm:info(parent) of
Parent ->
plain_fsm:parent_EXIT(Reason, Keep)
end
end.
hibernate_scan_only(Keep) ->
erlang:garbage_collect(),
receive
{step, From, HowMany} ->
{IX, OutBin, IsLastLevel, KVs, N} = erlang:binary_to_term(Keep),
scan_only(hanoidb_reader:deserialize(IX),
hanoidb_writer:deserialize(OutBin),
IsLastLevel, KVs, {N+HowMany, From});
%% gen_fsm handling
{system, From, Req} ->
plain_fsm:handle_system_msg(
Req, From, Keep, fun hibernate_scan_only/1);
{'EXIT', Parent, Reason} ->
case plain_fsm:info(parent) of
Parent ->
plain_fsm:parent_EXIT(Reason, Keep)
end
end.
receive_scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N, FromPID}) ->
receive
{step, From, HowMany} ->
scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N+HowMany, From});
%% gen_fsm handling
{system, From, Req} ->
plain_fsm:handle_system_msg(
Req, From, {IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N, FromPID}},
fun({IXA2, IXB2, Out2, IsLastLevel2, AKVs2, BKVs2, {N2, FromPID2}}) ->
receive_scan(IXA2, IXB2, Out2, IsLastLevel2, AKVs2, BKVs2, {N2, FromPID2})
end);
{'EXIT', Parent, Reason} ->
case plain_fsm:info(parent) of
Parent ->
plain_fsm:parent_EXIT(Reason, {IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N, FromPID}})
end
after ?HIBERNATE_TIMEOUT ->
Args = {hanoidb_reader:serialize(IXA),
hanoidb_reader:serialize(IXB),
hanoidb_writer:serialize(Out), IsLastLevel, AKVs, BKVs, N},
Keep = erlang:term_to_binary(Args, [compressed]),
hibernate_scan(Keep)
end.
scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N, FromPID}) when N < 1, AKVs =/= [], BKVs =/= [] ->
case FromPID of
none ->
@ -87,20 +173,11 @@ scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N, FromPID}) when N < 1, AKVs =/=
PID ! {Ref, step_done}
end,
receive
{step, From, HowMany} ->
scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N+HowMany, From})
after ?HIBERNATE_TIMEOUT ->
Args = {hanoidb_reader:serialize(IXA),
hanoidb_reader:serialize(IXB),
hanoidb_writer:serialize(Out), IsLastLevel, AKVs, BKVs, N},
Keep = zlib:gzip(erlang:term_to_binary(Args)),
hibernate_scan(Keep)
end;
receive_scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N, FromPID});
scan(IXA, IXB, Out, IsLastLevel, [], BKVs, Step) ->
case hanoidb_reader:next_node(IXA) of
{node, AKVs} ->
{kvlist, AKVs} ->
scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, Step);
end_of_data ->
hanoidb_reader:close(IXA),
@ -109,7 +186,7 @@ scan(IXA, IXB, Out, IsLastLevel, [], BKVs, Step) ->
scan(IXA, IXB, Out, IsLastLevel, AKVs, [], Step) ->
case hanoidb_reader:next_node(IXB) of
{node, BKVs} ->
{kvlist, BKVs} ->
scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, Step);
end_of_data ->
hanoidb_reader:close(IXB),
@ -128,17 +205,37 @@ scan(IXA, IXB, Out, IsLastLevel, [{_Key1,_Value1}|AT]=_AKVs, [{Key2,Value2}|IX]=
{noreply, Out3} = hanoidb_writer:handle_cast({add, Key2, Value2}, Out),
scan(IXA, IXB, Out3, IsLastLevel, AT, IX, step(Step, 2)).
hibernate_scan_only(Keep) ->
erlang:garbage_collect(),
receive_scan_only(IX, Out, IsLastLevel, KVs, {N, FromPID}) ->
receive
{step, From, HowMany} ->
{IX, OutBin, IsLastLevel, KVs, N} = erlang:binary_to_term(zlib:gunzip(Keep)),
scan_only(hanoidb_reader:deserialize(IX),
hanoidb_writer:deserialize(OutBin),
IsLastLevel, KVs, {N+HowMany, From})
scan_only(IX, Out, IsLastLevel, KVs, {N+HowMany, From});
%% gen_fsm handling
{system, From, Req} ->
plain_fsm:handle_system_msg(
Req, From, {IX, Out, IsLastLevel, KVs, {N, FromPID}},
fun({IX2, Out2, IsLastLevel2, KVs2, {N2, FromPID2}}) ->
receive_scan_only(IX2, Out2, IsLastLevel2, KVs2, {N2, FromPID2})
end);
{'EXIT', Parent, Reason} ->
case plain_fsm:info(parent) of
Parent ->
plain_fsm:parent_EXIT(Reason, {IX, Out, IsLastLevel, KVs, {N, FromPID}})
end
after ?HIBERNATE_TIMEOUT ->
Args = {hanoidb_reader:serialize(IX),
hanoidb_writer:serialize(Out), IsLastLevel, KVs, N},
Keep = erlang:term_to_binary(Args, [compressed]),
hibernate_scan_only(Keep)
end.
scan_only(IX, Out, IsLastLevel, KVs, {N, FromPID}) when N < 1, KVs =/= [] ->
case FromPID of
none ->
@ -147,19 +244,11 @@ scan_only(IX, Out, IsLastLevel, KVs, {N, FromPID}) when N < 1, KVs =/= [] ->
PID ! {Ref, step_done}
end,
receive
{step, From, HowMany} ->
scan_only(IX, Out, IsLastLevel, KVs, {N+HowMany, From})
after ?HIBERNATE_TIMEOUT ->
Args = {hanoidb_reader:serialize(IX),
hanoidb_writer:serialize(Out), IsLastLevel, KVs, N},
Keep = zlib:gzip(erlang:term_to_binary(Args)),
hibernate_scan_only(Keep)
end;
receive_scan_only(IX, Out, IsLastLevel, KVs, {N, FromPID});
scan_only(IX, Out, IsLastLevel, [], {_, FromPID}=Step) ->
case hanoidb_reader:next_node(IX) of
{node, KVs} ->
{kvlist, KVs} ->
scan_only(IX, Out, IsLastLevel, KVs, Step);
end_of_data ->
case FromPID of

View file

@ -25,46 +25,46 @@
-module(hanoidb_nursery).
-author('Kresten Krab Thorup <krab@trifork.com>').
-export([new/3, recover/4, finish/2, lookup/2, add/4, add/5]).
-export([new/4, recover/5, finish/2, lookup/2, add/4, add/5]).
-export([do_level_fold/3, set_max_level/2, transact/3, destroy/1]).
-include("include/hanoidb.hrl").
-include("hanoidb.hrl").
-include_lib("kernel/include/file.hrl").
-spec new(string(), integer(), [_]) -> {ok, #nursery{}} | {error, term()}.
-spec new(string(), integer(), integer(), [_]) -> {ok, #nursery{}} | {error, term()}.
-define(LOGFILENAME(Dir), filename:join(Dir, "nursery.log")).
%% do incremental merge every this many inserts
%% this value *must* be less than or equal to
%% 2^TOP_LEVEL == ?BTREE_SIZE(?TOP_LEVEL)
-define(INC_MERGE_STEP, ?BTREE_SIZE(?TOP_LEVEL)/2).
-define(INC_MERGE_STEP, ?BTREE_SIZE(MinLevel) div 2).
new(Directory, MaxLevel, Config) ->
new(Directory, MinLevel, MaxLevel, Config) ->
hanoidb_util:ensure_expiry(Config),
{ok, File} = file:open(?LOGFILENAME(Directory),
[raw, exclusive, write, delayed_write, append]),
{ok, #nursery{ log_file=File, dir=Directory, cache= gb_trees:empty(),
max_level=MaxLevel, config=Config }}.
min_level=MinLevel, max_level=MaxLevel, config=Config }}.
recover(Directory, TopLevel, MaxLevel, Config) ->
recover(Directory, TopLevel, MinLevel, MaxLevel, Config)
when MinLevel =< MaxLevel, is_integer(MinLevel), is_integer(MaxLevel) ->
hanoidb_util:ensure_expiry(Config),
case file:read_file_info(?LOGFILENAME(Directory)) of
{ok, _} ->
ok = do_recover(Directory, TopLevel, MaxLevel, Config),
new(Directory, MaxLevel, Config);
ok = do_recover(Directory, TopLevel, MinLevel, MaxLevel, Config),
new(Directory, MinLevel, MaxLevel, Config);
{error, enoent} ->
new(Directory, MaxLevel, Config)
new(Directory, MinLevel, MaxLevel, Config)
end.
do_recover(Directory, TopLevel, MaxLevel, Config) ->
do_recover(Directory, TopLevel, MinLevel, MaxLevel, Config) ->
%% repair the log file; storing it in nursery2
LogFileName = ?LOGFILENAME(Directory),
{ok, Nursery} = read_nursery_from_log(Directory, MaxLevel, Config),
{ok, Nursery} = read_nursery_from_log(Directory, MinLevel, MaxLevel, Config),
ok = finish(Nursery, TopLevel),
%% assert log file is gone
{error, enoent} = file:read_file_info(LogFileName),
@ -82,7 +82,7 @@ fill_cache(Transactions, Cache)
when is_list(Transactions) ->
lists:foldl(fun fill_cache/2, Cache, Transactions).
read_nursery_from_log(Directory, MaxLevel, Config) ->
read_nursery_from_log(Directory, MinLevel, MaxLevel, Config) ->
{ok, LogBinary} = file:read_file(?LOGFILENAME(Directory)),
Cache =
case hanoidb_util:decode_crc_data(LogBinary, [], []) of
@ -92,7 +92,7 @@ read_nursery_from_log(Directory, MaxLevel, Config) ->
error_logger:info_msg("ignoring undecypherable bytes in ~p~n", [?LOGFILENAME(Directory)]),
fill_cache(KVs, gb_trees:empty())
end,
{ok, #nursery{ dir=Directory, cache=Cache, count=gb_trees:size(Cache), max_level=MaxLevel, config=Config }}.
{ok, #nursery{ dir=Directory, cache=Cache, count=gb_trees:size(Cache), min_level=MinLevel, max_level=MaxLevel, config=Config }}.
%% @doc Add a Key/Value to the nursery
%% @end
@ -142,12 +142,12 @@ do_sync(File, Nursery) ->
case application:get_env(hanoidb, sync_strategy) of
{ok, sync} ->
file:datasync(File),
now();
os:timestamp();
{ok, {seconds, N}} ->
MicrosSinceLastSync = timer:now_diff(now(), Nursery#nursery.last_sync),
if (MicrosSinceLastSync / 1000000) >= N ->
MicrosSinceLastSync = timer:now_diff(os:timestamp(), Nursery#nursery.last_sync),
if (MicrosSinceLastSync div 1000000) >= N ->
file:datasync(File),
now();
os:timestamp();
true ->
Nursery#nursery.last_sync
end;
@ -175,7 +175,7 @@ lookup(Key, #nursery{cache=Cache}) ->
%% @end
-spec finish(Nursery::#nursery{}, TopLevel::pid()) -> ok.
finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile, merge_done=DoneMerge,
count=Count, config=Config }, TopLevel) ->
count=Count, config=Config, min_level=MinLevel }, TopLevel) ->
hanoidb_util:ensure_expiry(Config),
@ -189,7 +189,7 @@ finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile, merge_done=DoneMerge,
N when N > 0 ->
%% next, flush cache to a new BTree
BTreeFileName = filename:join(Dir, "nursery.data"),
{ok, BT} = hanoidb_writer:open(BTreeFileName, [{size, ?BTREE_SIZE(?TOP_LEVEL)},
{ok, BT} = hanoidb_writer:open(BTreeFileName, [{size, ?BTREE_SIZE(MinLevel)},
{compress, none} | Config]),
try
ok = gb_trees_ext:fold(fun(Key, Value, Acc) ->
@ -205,10 +205,10 @@ finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile, merge_done=DoneMerge,
%% Issue some work if this is a top-level inject (blocks until previous such
%% incremental merge is finished).
if DoneMerge >= ?BTREE_SIZE(?TOP_LEVEL) ->
if DoneMerge >= ?BTREE_SIZE(MinLevel) ->
ok;
true ->
hanoidb_level:begin_incremental_merge(TopLevel, ?BTREE_SIZE(?TOP_LEVEL) - DoneMerge)
hanoidb_level:begin_incremental_merge(TopLevel, ?BTREE_SIZE(MinLevel) - DoneMerge)
end;
% {ok, _Nursery2} = do_inc_merge(Nursery, Count, TopLevel);
@ -247,13 +247,13 @@ add(Key, Value, Expiry, Nursery, Top) ->
end.
-spec flush(#nursery{}, pid()) -> {ok, #nursery{}}.
flush(Nursery=#nursery{ dir=Dir, max_level=MaxLevel, config=Config }, Top) ->
flush(Nursery=#nursery{ dir=Dir, min_level=MinLevel, max_level=MaxLevel, config=Config }, Top) ->
ok = finish(Nursery, Top),
{error, enoent} = file:read_file_info(filename:join(Dir, "nursery.log")),
hanoidb_nursery:new(Dir, MaxLevel, Config).
hanoidb_nursery:new(Dir, MinLevel, MaxLevel, Config).
has_room(#nursery{ count=Count }, N) ->
(Count + N + 1) < ?BTREE_SIZE(?TOP_LEVEL).
has_room(#nursery{ count=Count, min_level=MinLevel }, N) ->
(Count + N + 1) < ?BTREE_SIZE(MinLevel).
ensure_space(Nursery, NeededRoom, Top) ->
case has_room(Nursery, NeededRoom) of
@ -303,7 +303,7 @@ transact1(Spec, Nursery1=#nursery{ log_file=File, cache=Cache0, total_size=Total
do_inc_merge(Nursery2#nursery{ cache=Cache2, total_size=TotalSize+erlang:iolist_size(Data), count=Count }, length(Spec), Top).
do_inc_merge(Nursery=#nursery{ step=Step, merge_done=Done }, N, TopLevel) ->
do_inc_merge(Nursery=#nursery{ step=Step, merge_done=Done, min_level=MinLevel }, N, TopLevel) ->
if Step+N >= ?INC_MERGE_STEP ->
hanoidb_level:begin_incremental_merge(TopLevel, Step + N),
{ok, Nursery#nursery{ step=0, merge_done=Done + Step + N }};

View file

@ -37,24 +37,23 @@
-export([serialize/1, deserialize/1]).
-record(node, {level :: non_neg_integer(),
members=[] :: list(any()) }).
members=[] :: list(any()) | binary() }).
-record(index, {file :: file:io_device(),
root :: #node{} | none,
root= none :: #node{} | none,
bloom :: term(),
name :: string(),
config=[] :: term() }).
-type read_file() :: #index{}.
-export_type([read_file/0]).
-spec open(Name::string()) -> {ok, read_file()} | {error, any()}.
open(Name) ->
open(Name, [random]).
-type config() :: [sequential | folding | random | {atom(), term()}].
-spec open(Name::string(), config()) -> {ok, read_file()} | {error, any()}.
open(Name, Config) ->
case proplists:get_bool(sequential, Config) of
true ->
@ -85,7 +84,7 @@ open(Name, Config) ->
{ok, <<RootPos:64/unsigned>>} = file:pread(File, FileInfo#file_info.size - 8, 8),
{ok, <<BloomSize:32/unsigned>>} = file:pread(File, FileInfo#file_info.size - 12, 4),
{ok, BloomData} = file:pread(File, (FileInfo#file_info.size - 12 - BloomSize), BloomSize),
Bloom = hanoidb_bloom:decode(BloomData),
{ok, Bloom} = hanoidb_util:bin_to_bloom(BloomData),
%% read in the root node
Root =
@ -115,11 +114,15 @@ deserialize({seq_read_file, Index, Position}) ->
fold(Fun, Acc0, #index{file=File}) ->
{ok, Node} = read_node(File,?FIRST_BLOCK_POS),
fold0(File,fun({K,V},Acc) -> Fun(K,V,Acc) end,Node,Acc0).
fold0(File,Fun,#node{level=0, members=List},Acc0) ->
fold0(File,Fun,#node{level=0, members=BinPage},Acc0) when is_binary(BinPage) ->
Acc1 = vbisect:foldl(fun(K, V, Acc2) -> Fun({K, decode_binary_value(V)}, Acc2) end,Acc0,BinPage),
fold1(File,Fun,Acc1);
fold0(File,Fun,#node{level=0, members=List},Acc0) when is_list(List) ->
Acc1 = lists:foldl(Fun,Acc0,List),
fold1(File,Fun,Acc1);
fold0(File,Fun,_InnerNode,Acc0) ->
@ -133,22 +136,39 @@ fold1(File,Fun,Acc0) ->
fold0(File,Fun,Node,Acc0)
end.
-spec range_fold(function(), any(), #index{}, #key_range{}) ->
-spec range_fold(fun((binary(),binary(),any()) -> any()), any(), #index{}, #key_range{}) ->
{limit, any(), binary()} | {done, any()}.
range_fold(Fun, Acc0, #index{file=File,root=Root}, Range) ->
case lookup_node(File,Range#key_range.from_key,Root,?FIRST_BLOCK_POS) of
{ok, {Pos,_}} ->
{ok, _} = file:position(File, Pos),
do_range_fold(Fun, Acc0, File, Range, Range#key_range.limit);
{ok, Pos} ->
{ok, _} = file:position(File, Pos),
do_range_fold(Fun, Acc0, File, Range, Range#key_range.limit);
none ->
{done, Acc0}
case Range#key_range.from_key =< first_key(Root) of
true ->
{ok, _} = file:position(File, ?FIRST_BLOCK_POS),
range_fold_from_here(Fun, Acc0, File, Range, Range#key_range.limit);
false ->
case find_leaf_node(File,Range#key_range.from_key,Root,?FIRST_BLOCK_POS) of
{ok, {Pos,_}} ->
{ok, _} = file:position(File, Pos),
range_fold_from_here(Fun, Acc0, File, Range, Range#key_range.limit);
{ok, Pos} ->
{ok, _} = file:position(File, Pos),
range_fold_from_here(Fun, Acc0, File, Range, Range#key_range.limit);
none ->
{done, Acc0}
end
end.
fold_until_stop(Fun,Acc,List) ->
fold_until_stop2(Fun, {continue, Acc}, List).
first_key(#node{members=Dict}) ->
{_,FirstKey} = fold_until_stop(fun({K,_},_) -> {stop, K} end, none, Dict),
FirstKey.
fold_until_stop(Fun,Acc,List) when is_list(List) ->
fold_until_stop2(Fun, {continue, Acc}, List);
fold_until_stop(Fun,Acc0,Bin) when is_binary(Bin) ->
vbisect:fold_until_stop(fun({Key,VBin},Acc1) ->
% io:format("-> DOING ~p,~p~n", [Key,Acc1]),
Fun({Key, decode_binary_value(VBin)}, Acc1)
end,
Acc0,
Bin).
fold_until_stop2(_Fun,{stop,Result},_) ->
{stopped, Result};
@ -170,7 +190,8 @@ get_value({Value, _TStamp}) ->
get_value(Value) ->
Value.
do_range_fold(Fun, Acc0, File, Range, undefined) ->
range_fold_from_here(Fun, Acc0, File, Range, undefined) ->
% io:format("RANGE_FOLD_FROM_HERE(~p,~p)~n", [Acc0,File]),
case next_leaf_node(File) of
eof ->
{done, Acc0};
@ -185,18 +206,19 @@ do_range_fold(Fun, Acc0, File, Range, undefined) ->
false ->
{continue, Fun(Key, get_value(Value), Acc)}
end;
(_, Acc) ->
(_Huh, Acc) ->
% io:format("SKIPPING ~p~n", [_Huh]),
{continue, Acc}
end,
Acc0,
Members) of
{stopped, Result} -> Result;
{ok, Acc1} ->
do_range_fold(Fun, Acc1, File, Range, undefined)
range_fold_from_here(Fun, Acc1, File, Range, undefined)
end
end;
do_range_fold(Fun, Acc0, File, Range, N0) ->
range_fold_from_here(Fun, Acc0, File, Range, N0) ->
case next_leaf_node(File) of
eof ->
{done, Acc0};
@ -209,7 +231,7 @@ do_range_fold(Fun, Acc0, File, Range, N0) ->
({Key,?TOMBSTONE}, {N1,Acc}) when ?KEY_IN_FROM_RANGE(Key,Range) ->
{continue, {N1, Fun(Key, ?TOMBSTONE, Acc)}};
({Key,{?TOMBSTONE,TStamp}}, {N1,Acc}) when ?KEY_IN_FROM_RANGE(Key,Range) ->
case hanoidb_utils:has_expired(TStamp) of
case hanoidb_util:has_expired(TStamp) of
true ->
{continue, {N1,Acc}};
false ->
@ -226,52 +248,74 @@ do_range_fold(Fun, Acc0, File, Range, N0) ->
{continue, Acc}
end,
{N0, Acc0},
Members) of
{stopped, Result} -> Result;
Members)
of
{stopped, Result} ->
Result;
{ok, {N2, Acc1}} ->
do_range_fold(Fun, Acc1, File, Range, N2)
range_fold_from_here(Fun, Acc1, File, Range, N2)
end
end.
lookup_node(_File,_FromKey,#node{level=0},Pos) ->
find_leaf_node(_File,_FromKey,#node{level=0},Pos) ->
{ok, Pos};
lookup_node(File,FromKey,#node{members=Members,level=N},_) ->
find_leaf_node(File,FromKey,#node{members=Members,level=N},_) when is_list(Members) ->
case find_start(FromKey, Members) of
{ok, ChildPos} when N==1 ->
{ok, ChildPos};
{ok, ChildPos} ->
case read_node(File,ChildPos) of
{ok, ChildNode} ->
lookup_node(File,FromKey,ChildNode,ChildPos);
eof ->
none
end;
recursive_find(File, FromKey, N, ChildPos);
not_found ->
none
end;
lookup_node(_,_,none,_) ->
find_leaf_node(File,FromKey,#node{members=Members,level=N},_) when is_binary(Members) ->
case vbisect:find_geq(FromKey,Members) of
{ok, _, <<?TAG_POSLEN32, Pos:64/unsigned, Len:32/unsigned>>} ->
% io:format("** FIND_LEAF_NODE(~p,~p) -> {~p,~p}~n", [FromKey, N, Pos,Len]),
recursive_find(File, FromKey, N, {Pos,Len});
none ->
% io:format("** FIND_LEAF_NODE(~p,~p) -> none~n", [FromKey, N]),
none
end;
find_leaf_node(_,_,none,_) ->
none.
recursive_find(_File,_FromKey,1,ChildPos) ->
{ok, ChildPos};
recursive_find(File,FromKey,N,ChildPos) when N>1 ->
case read_node(File,ChildPos) of
{ok, ChildNode} ->
find_leaf_node(File, FromKey,ChildNode,ChildPos);
eof ->
none
end.
%% used by the merger, needs list value
first_node(#index{file=File}) ->
case read_node(File, ?FIRST_BLOCK_POS) of
{ok, #node{level=0, members=Members}} ->
{node, Members};
{kvlist, decode_member_list(Members)};
eof->
none
end.
%% used by the merger, needs list value
next_node(#index{file=File}=_Index) ->
case next_leaf_node(File) of
{ok, #node{level=0, members=Members}} ->
{node, Members};
% {ok, #node{level=N}} when N>0 ->
% next_node(Index);
{kvlist, decode_member_list(Members)};
eof ->
end_of_data
end.
decode_member_list(List) when is_list(List) ->
List;
decode_member_list(BinDict) when is_binary(BinDict) ->
vbisect:foldr( fun(Key,Value,Acc) ->
[{Key, decode_binary_value(Value) }|Acc]
end,
[],
BinDict).
close(#index{file=undefined}) ->
ok;
close(#index{file=File}) ->
@ -279,7 +323,7 @@ close(#index{file=File}) ->
lookup(#index{file=File, root=Node, bloom=Bloom}, Key) ->
case hanoidb_bloom:member(Key, Bloom) of
case ?BLOOM_CONTAINS(Bloom, Key) of
true ->
case lookup_in_node(File, Node, Key) of
not_found ->
@ -297,11 +341,20 @@ lookup(#index{file=File, root=Node, bloom=Bloom}, Key) ->
end.
lookup_in_node(_File,#node{level=0,members=Members}, Key) ->
case lists:keyfind(Key,1,Members) of
false ->
not_found;
{_,Value} ->
{ok, Value}
find_in_leaf(Key,Members);
lookup_in_node(File,#node{members=Members},Key) when is_binary(Members) ->
case vbisect:find_geq(Key,Members) of
{ok, _Key, <<?TAG_POSLEN32, Pos:64, Size:32>>} ->
% io:format("FOUND ~p @ ~p~n", [_Key, {Pos,Size}]),
case read_node(File,{Pos,Size}) of
{ok, Node} ->
lookup_in_node(File, Node, Key);
eof ->
not_found
end;
none ->
not_found
end;
lookup_in_node(File,#node{members=Members},Key) ->
@ -323,8 +376,8 @@ lookup_in_node(File,#node{members=Members},Key) ->
end),
try plain_rpc:call(PID, read)
catch
Class:Ex ->
error_logger:error_msg("crashX: ~p:~p ~p~n", [Class,Ex,erlang:get_stacktrace()]),
Class:Ex:Stacktrace ->
error_logger:error_msg("crashX: ~p:~p ~p~n", [Class,Ex,Stacktrace]),
not_found
end;
@ -416,3 +469,29 @@ next_leaf_node(File) ->
next_leaf_node(File)
end.
find_in_leaf(Key,Bin) when is_binary(Bin) ->
case vbisect:find(Key,Bin) of
{ok, BinValue} ->
{ok, decode_binary_value(BinValue)};
error ->
not_found
end;
find_in_leaf(Key,List) when is_list(List) ->
case lists:keyfind(Key, 1, List) of
{_, Value} ->
{ok, Value};
false ->
not_found
end.
decode_binary_value(<<?TAG_KV_DATA, Value/binary>>) ->
Value;
decode_binary_value(<<?TAG_KV_DATA2, TStamp:32, Value/binary>>) ->
{Value, TStamp};
decode_binary_value(<<?TAG_DELETED>>) ->
?TOMBSTONE;
decode_binary_value(<<?TAG_DELETED2, TStamp:32>>) ->
{?TOMBSTONE, TStamp};
decode_binary_value(<<?TAG_POSLEN32, Pos:64, Len:32>>) ->
{Pos, Len}.

View file

@ -38,20 +38,25 @@
, tstamp/0
, expiry_time/1
, has_expired/1
, ensure_expiry/1 ]).
, ensure_expiry/1
, bloom_type/1
, bloom_new/2
, bloom_to_bin/1
, bin_to_bloom/1
, bin_to_bloom/2
, bloom_insert/2
, bloom_contains/2
]).
-include("src/hanoidb.hrl").
-define(ERLANG_ENCODED, 131).
-define(CRC_ENCODED, 127).
-define(BISECT_ENCODED, 126).
-define(TAG_KV_DATA, 16#80).
-define(TAG_DELETED, 16#81).
-define(TAG_POSLEN32, 16#82).
-define(TAG_TRANSACT, 16#83).
-define(TAG_KV_DATA2, 16#84).
-define(TAG_DELETED2, 16#85).
-define(TAG_END, 16#FF).
-define(FILE_ENCODING, hanoi2).
-compile({inline, [crc_encapsulate/1, crc_encapsulate_kv_entry/2 ]}).
@ -134,18 +139,47 @@ uncompress(<<?GZIP_COMPRESSION, Data/binary>>) ->
zlib:gunzip(Data).
encode_index_node(KVList, Method) ->
TermData = [ ?TAG_END |
lists:map(fun ({Key,Value}) ->
crc_encapsulate_kv_entry(Key, Value)
end,
KVList) ],
TermData =
case ?FILE_ENCODING of
bisect ->
Binary = vbisect:from_orddict(lists:map(fun binary_encode_kv/1, KVList)),
CRC = erlang:crc32(Binary),
[?BISECT_ENCODED, <<CRC:32>>, Binary];
hanoi2 ->
[ ?TAG_END |
lists:map(fun ({Key,Value}) ->
crc_encapsulate_kv_entry(Key, Value)
end,
KVList) ]
end,
{MethodName, OutData} = compress(Method, TermData),
{ok, [MethodName | OutData]}.
decode_index_node(Level, Data) ->
TermData = uncompress(Data),
{ok, KVList} = decode_kv_list(TermData),
{ok, {node, Level, KVList}}.
case decode_kv_list(TermData) of
{ok, KVList} ->
{ok, {node, Level, KVList}};
{bisect, Binary} ->
% io:format("[page level=~p~n", [Level]),
% vbisect:foldl(fun(K,V,_) -> io:format(" ~p -> ~p,~n", [K,V]) end, 0, Binary),
% io:format("]~n",[]),
{ok, {node, Level, Binary}}
end.
binary_encode_kv({Key, {Value,infinity}}) ->
binary_encode_kv({Key,Value});
binary_encode_kv({Key, {?TOMBSTONE, TStamp}}) ->
{Key, <<?TAG_DELETED2, TStamp:32>>};
binary_encode_kv({Key, ?TOMBSTONE}) ->
{Key, <<?TAG_DELETED>>};
binary_encode_kv({Key, {Value, TStamp}}) when is_binary(Value) ->
{Key, <<?TAG_KV_DATA2, TStamp:32, Value/binary>>};
binary_encode_kv({Key, Value}) when is_binary(Value)->
{Key, <<?TAG_KV_DATA, Value/binary>>};
binary_encode_kv({Key, {Pos, Len}}) when Len < 16#ffffffff ->
{Key, <<?TAG_POSLEN32, Pos:64/unsigned, Len:32/unsigned>>}.
-spec crc_encapsulate_kv_entry(binary(), expvalue()) -> iolist().
@ -184,7 +218,14 @@ decode_kv_list(<<?TAG_END, Custom/binary>>) ->
decode_kv_list(<<?ERLANG_ENCODED, _/binary>>=TermData) ->
{ok, erlang:term_to_binary(TermData)};
decode_kv_list(<<?CRC_ENCODED, Custom/binary>>) ->
decode_crc_data(Custom, [], []).
decode_crc_data(Custom, [], []);
decode_kv_list(<<?BISECT_ENCODED, CRC:32/unsigned, Binary/binary>>) ->
CRCTest = erlang:crc32( Binary ),
if CRC == CRCTest ->
{bisect, Binary};
true ->
{bisect, vbisect:from_orddict([])}
end.
-spec decode_crc_data(binary(), list(), list()) -> {ok, [kventry()]} | {partial, [kventry()], iolist()}.
decode_crc_data(<<>>, [], Acc) ->
@ -257,12 +298,50 @@ ensure_expiry(Opts) ->
undefined ->
try exit(err)
catch
exit:err ->
io:format(user, "~p~n", [erlang:get_stacktrace()])
exit:err:Stacktrace ->
io:format(user, "~p~n", [Stacktrace])
end,
exit(expiry_secs_not_set);
N when N >= 0 ->
ok
end.
bloom_type({ebloom, _}) ->
ebloom;
bloom_type({sbloom, _}) ->
sbloom.
bloom_new(Size, sbloom) ->
{ok, {sbloom, hanoidb_bloom:bloom(Size, 0.01)}};
bloom_new(Size, ebloom) ->
{ok, Bloom} = ebloom:new(Size, 0.01, Size),
{ok, {ebloom, Bloom}}.
bloom_to_bin({sbloom, Bloom}) ->
hanoidb_bloom:encode(Bloom);
bloom_to_bin({ebloom, Bloom}) ->
ebloom:serialize(Bloom).
bin_to_bloom(GZiped = <<16#1F, 16#8B, _/binary>>) ->
bin_to_bloom(GZiped, sbloom);
bin_to_bloom(TermBin = <<131, _/binary>>) ->
erlang:term_to_binary(TermBin);
bin_to_bloom(Blob) ->
bin_to_bloom(Blob, ebloom).
bin_to_bloom(Binary, sbloom) ->
{ok, {sbloom, hanoidb_bloom:decode(Binary)}};
bin_to_bloom(Binary, ebloom) ->
{ok, Bloom} = ebloom:deserialize(Binary),
{ok, {ebloom, Bloom}}.
bloom_insert({sbloom, Bloom}, Key) ->
{ok, {sbloom, hanoidb_bloom:add(Key, Bloom)}};
bloom_insert({ebloom, Bloom}, Key) ->
ok = ebloom:insert(Bloom, Key),
{ok, {ebloom, Bloom}}.
bloom_contains({sbloom, Bloom}, Key) ->
hanoidb_bloom:member(Key, Bloom);
bloom_contains({ebloom, Bloom}, Key) ->
ebloom:contains(Bloom, Key).

View file

@ -55,9 +55,9 @@
name :: string(),
bloom :: term(),
bloom :: {ebloom, term()} | {sbloom, term()},
block_size = ?NODE_SIZE :: integer(),
compress = none :: none | snappy | gzip, % | lz4,
compress = none :: none | snappy | gzip | lz4,
opts = [] :: list(any()),
value_count = 0 :: integer(),
@ -94,7 +94,7 @@ init([Name, Options]) ->
case do_open(Name, Options, [exclusive]) of
{ok, IdxFile} ->
ok = file:write(IdxFile, ?FILE_FORMAT),
Bloom = hanoidb_bloom:bloom(Size),
{ok, Bloom} = ?BLOOM_NEW(Size),
BlockSize = hanoidb:get_opt(block_size, Options, ?NODE_SIZE),
{ok, #state{ name=Name,
index_file_pos=?FIRST_BLOCK_POS, index_file=IdxFile,
@ -170,11 +170,11 @@ serialize(#state{ bloom=Bloom, index_file=File, index_file_pos=Position }=State)
exit({bad_position, Position, WrongPosition})
end,
ok = file:close(File),
erlang:term_to_binary( { State#state{ index_file=undefined }, hanoidb_bloom:encode(Bloom) } ).
erlang:term_to_binary( { State#state{ index_file=undefined, bloom=undefined }, ?BLOOM_TO_BIN(Bloom), hanoidb_util:bloom_type(Bloom) } ).
deserialize(Binary) ->
{State, Bin} = erlang:binary_to_term(Binary),
Bloom = hanoidb_bloom:decode(Bin),
{State, Bin, Type} = erlang:binary_to_term(Binary),
{ok, Bloom} = ?BIN_TO_BLOOM(Bin, Type),
{ok, IdxFile} = do_open(State#state.name, State#state.opts, []),
State#state{ bloom=Bloom, index_file=IdxFile }.
@ -188,7 +188,8 @@ do_open(Name, Options, OpenOpts) ->
%% @doc flush pending nodes and write trailer
archive_nodes(#state{ nodes=[], last_node_pos=LastNodePos, last_node_size=_LastNodeSize, bloom=Bloom, index_file=IdxFile }=State) ->
BloomBin = hanoidb_bloom:encode(Bloom),
BloomBin = ?BLOOM_TO_BIN(Bloom),
true = is_binary(BloomBin),
BloomSize = byte_size(BloomBin),
RootPos =
case LastNodePos of
@ -199,12 +200,12 @@ archive_nodes(#state{ nodes=[], last_node_pos=LastNodePos, last_node_size=_LastN
_ ->
LastNodePos
end,
Trailer = << 0:32/unsigned, BloomBin/binary, BloomSize:32/unsigned, RootPos:64/unsigned >>,
Trailer = [ << 0:32/unsigned>> , BloomBin, << BloomSize:32/unsigned, RootPos:64/unsigned >> ],
ok = file:write(IdxFile, Trailer),
ok = file:datasync(IdxFile),
ok = file:close(IdxFile),
{ok, State#state{ index_file=undefined, index_file_pos=undefined }};
{ok, State#state{ index_file=undefined, index_file_pos=undefined, bloom=undefined }};
archive_nodes(State=#state{ nodes=[#node{level=N, members=[{_,{Pos,_Len}}]}], last_node_pos=Pos })
when N > 0 ->
@ -221,7 +222,8 @@ append_node(Level, Key, Value, State=#state{ nodes=[] }) ->
append_node(Level, Key, Value, State=#state{ nodes=[ #node{level=Level2 } |_]=Stack })
when Level < Level2 ->
append_node(Level, Key, Value, State#state{ nodes=[ #node{ level=(Level2 - 1) } | Stack] });
append_node(Level, Key, Value, #state{ nodes=[ #node{level=Level, members=List, size=NodeSize}=CurrNode | RestNodes ], value_count=VC, tombstone_count=TC, bloom=Bloom }=State) ->
append_node(Level, Key, Value, #state{ nodes=[ #node{level=Level, members=List, size=NodeSize}=CurrNode | RestNodes ], value_count=VC, tombstone_count=TC, bloom=Bloom }=State)
when Bloom /= undefined ->
%% The top-of-stack node is at the level we wish to insert at.
%% Assert that keys are increasing:
@ -236,10 +238,14 @@ append_node(Level, Key, Value, #state{ nodes=[ #node{level=Level, members=List,
exit({badarg, Key})
end
end,
NewSize = NodeSize + hanoidb_util:estimate_node_size_increment(List, Key, Value),
NewBloom = hanoidb_bloom:add(Key, Bloom),
{ok,Bloom2} = case Level of
0 ->
?BLOOM_INSERT(Bloom, Key);
_ ->
{ok,Bloom}
end,
{TC1, VC1} =
case Level of
@ -258,7 +264,7 @@ append_node(Level, Key, Value, #state{ nodes=[ #node{level=Level, members=List,
NodeMembers = [{Key, Value} | List],
State2 = State#state{ nodes=[CurrNode#node{members=NodeMembers, size=NewSize} | RestNodes],
value_count=VC1, tombstone_count=TC1, bloom=NewBloom },
value_count=VC1, tombstone_count=TC1, bloom=Bloom2 },
case NewSize >= State#state.block_size of
true ->
@ -267,7 +273,8 @@ append_node(Level, Key, Value, #state{ nodes=[ #node{level=Level, members=List,
{ok, State2}
end.
flush_node_buffer(#state{nodes=[#node{ level=Level, members=NodeMembers }|RestNodes], compress=Compress, index_file_pos=NodePos} = State) ->
flush_node_buffer(#state{nodes=[#node{ level=Level, members=NodeMembers }|RestNodes], compress=Compress, index_file_pos=NodePos } = State) ->
OrderedMembers = lists:reverse(NodeMembers),
{ok, BlockData} = hanoidb_util:encode_index_node(OrderedMembers, Compress),

280
src/vbisect.erl Normal file
View file

@ -0,0 +1,280 @@
%% ----------------------------------------------------------------------------
%%
%% hanoidb: LSM-trees (Log-Structured Merge Trees) Indexed Storage
%%
%% Copyright 2014 (c) Trifork A/S. All Rights Reserved.
%% http://trifork.com/ info@trifork.com
%%
%% This file is provided to you under the Apache License, Version 2.0 (the
%% "License"); you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
%% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
%% License for the specific language governing permissions and limitations
%% under the License.
%%
%% ----------------------------------------------------------------------------
-module(vbisect).
-export([from_orddict/1,
from_gb_tree/1,
to_gb_tree/1,
first_key/1,
find/2, find_geq/2,
foldl/3, foldr/3, fold_until_stop/3,
to_orddict/1,
merge/3]).
-define(MAGIC, "vbis").
-type key() :: binary().
-type value() :: binary().
-type bindict() :: binary().
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
-spec from_gb_tree(gb_trees:tree()) -> bindict().
from_gb_tree({Count,Node}) when Count =< 16#ffffffff ->
{_BinSize,IOList} = encode_gb_node(Node),
erlang:iolist_to_binary([ <<?MAGIC, Count:32/unsigned >> | IOList ]).
encode_gb_node({Key, Value, Smaller, Bigger}) when is_binary(Key), is_binary(Value) ->
{BinSizeSmaller, IOSmaller} = encode_gb_node(Smaller),
{BinSizeBigger, IOBigger} = encode_gb_node(Bigger),
KeySize = byte_size(Key),
ValueSize = byte_size(Value),
{ 2 + KeySize
+ 4 + ValueSize
+ 4 + BinSizeSmaller
+ BinSizeBigger,
[ << KeySize:16, Key/binary,
BinSizeSmaller:32 >>, IOSmaller,
<< ValueSize:32, Value/binary >> | IOBigger ] };
encode_gb_node(nil) ->
{ 0, [] }.
to_gb_tree(<<?MAGIC, Count:32, Nodes/binary >>) ->
{ Count, to_gb_node(Nodes) }.
to_gb_node( <<>> ) ->
nil;
to_gb_node( << KeySize:16, Key:KeySize/binary,
BinSizeSmaller:32, Smaller:BinSizeSmaller/binary,
ValueSize:32, Value:ValueSize/binary,
Bigger/binary >> ) ->
{Key, Value,
to_gb_node(Smaller),
to_gb_node(Bigger)}.
-spec find(Key::key(), Dict::bindict()) ->
{ ok, value() } | error.
find(Key, <<?MAGIC, _:32, Binary/binary>>) ->
find_node(byte_size(Key), Key, Binary).
find_node(KeySize, Key, <<HereKeySize:16, HereKey:HereKeySize/binary,
BinSizeSmaller:32, _:BinSizeSmaller/binary,
ValueSize:32, Value:ValueSize/binary,
_/binary>> = Bin) ->
if
Key < HereKey ->
Skip = 6 + HereKeySize,
<< _:Skip/binary, Smaller:BinSizeSmaller/binary, _/binary>> = Bin,
find_node(KeySize, Key, Smaller);
HereKey < Key ->
Skip = 10 + HereKeySize + BinSizeSmaller + ValueSize,
<< _:Skip/binary, Bigger/binary>> = Bin,
find_node(KeySize, Key, Bigger);
true ->
{ok, Value}
end;
find_node(_, _, <<>>) ->
error.
to_orddict(BinDict) ->
foldr(fun(Key,Value,Acc) ->
[{Key,Value}|Acc]
end,
[],
BinDict).
merge(Fun, BinDict1, BinDict2) ->
OD1 = to_orddict(BinDict1),
OD2 = to_orddict(BinDict2),
OD3 = orddict:merge(Fun, OD1, OD2),
from_orddict(OD3).
-spec first_key( bindict() ) -> binary() | none.
first_key(BinDict) ->
{_, Key} = fold_until_stop(fun({K,_},_) -> {stop, K} end, none, BinDict),
Key.
%% @doc Find largest {K,V} where K is smaller than or equal to key.
%% This is good for an inner node where key is the smallest key
%% in the child node.
-spec find_geq(Key::binary(), Binary::binary()) ->
none | {ok, Key::key(), Value::value()}.
find_geq(Key, <<?MAGIC, _:32, Binary/binary>>) ->
find_geq_node(byte_size(Key), Key, Binary, none).
find_geq_node(_, _, <<>>, Else) ->
Else;
find_geq_node(KeySize, Key, <<HereKeySize:16, HereKey:HereKeySize/binary,
BinSizeSmaller:32, _:BinSizeSmaller/binary,
ValueSize:32, Value:ValueSize/binary,
_/binary>> = Bin, Else) ->
if
Key < HereKey ->
Skip = 6 + HereKeySize,
<< _:Skip/binary, Smaller:BinSizeSmaller/binary, _/binary>> = Bin,
find_geq_node(KeySize, Key, Smaller, Else);
HereKey < Key ->
Skip = 10 + HereKeySize + BinSizeSmaller + ValueSize,
<< _:Skip/binary, Bigger/binary>> = Bin,
find_geq_node(KeySize, Key, Bigger, {ok, HereKey, Value});
true ->
{ok, HereKey, Value}
end.
-spec foldl(fun((Key::key(), Value::value(), Acc::term()) -> term()), term(), bindict()) ->
term().
foldl(Fun, Acc, <<?MAGIC, _:32, Binary/binary>>) ->
foldl_node(Fun, Acc, Binary).
foldl_node(_Fun, Acc, <<>>) ->
Acc;
foldl_node(Fun, Acc, <<KeySize:16, Key:KeySize/binary,
BinSizeSmaller:32, Smaller:BinSizeSmaller/binary,
ValueSize:32, Value:ValueSize/binary,
Bigger/binary>>) ->
Acc1 = foldl_node(Fun, Acc, Smaller),
Acc2 = Fun(Key, Value, Acc1),
foldl_node(Fun, Acc2, Bigger).
-spec fold_until_stop(function(), term(), bindict()) -> {stopped, term()} | {ok, term()}.
fold_until_stop(Fun, Acc, <<?MAGIC, _:32, Bin/binary>>) ->
fold_until_stop2(Fun, {continue, Acc}, Bin).
fold_until_stop2(_Fun,{stop,Result},_) ->
{stopped, Result};
fold_until_stop2(_Fun,{continue, Acc},<<>>) ->
{ok, Acc};
fold_until_stop2(Fun,{continue, Acc}, <<KeySize:16, Key:KeySize/binary,
BinSizeSmaller:32, Smaller:BinSizeSmaller/binary,
ValueSize:32, Value:ValueSize/binary,
Bigger/binary>>) ->
case fold_until_stop2(Fun, {continue, Acc}, Smaller) of
{stopped, Result} ->
{stopped, Result};
{ok, Acc1} ->
ContinueOrStopAcc = Fun({Key,Value}, Acc1),
fold_until_stop2(Fun, ContinueOrStopAcc, Bigger)
end.
-spec foldr(fun((Key::key(), Value::value(), Acc::term()) -> term()), term(), bindict()) ->
term().
foldr(Fun, Acc, <<?MAGIC, _:32, Binary/binary>>) ->
foldr_node(Fun, Acc, Binary).
foldr_node(_Fun, Acc, <<>>) ->
Acc;
foldr_node(Fun, Acc, <<KeySize:16, Key:KeySize/binary,
BinSizeSmaller:32, Smaller:BinSizeSmaller/binary,
ValueSize:32, Value:ValueSize/binary,
Bigger/binary>>) ->
Acc1 = foldr_node(Fun, Acc, Bigger),
Acc2 = Fun(Key, Value, Acc1),
foldr_node(Fun, Acc2, Smaller).
from_orddict(OrdDict) ->
from_gb_tree(gb_trees:from_orddict(OrdDict)).
-ifdef(TEST).
speed_test_() ->
{timeout, 600,
fun() ->
Start = 100000000000000,
N = 100000,
Keys = lists:seq(Start, Start+N),
KeyValuePairs = lists:map(fun (I) -> {<<I:64/integer>>, <<255:8/integer>>} end,
Keys),
%% Will mostly be unique, if N is bigger than 10000
ReadKeys = [<<(lists:nth(random:uniform(N), Keys)):64/integer>> || _ <- lists:seq(1, 1000)],
B = from_orddict(KeyValuePairs),
time_reads(B, N, ReadKeys)
end}.
geq_test() ->
B = from_orddict([{<<2>>,<<2>>},{<<4>>,<<4>>},{<<6>>,<<6>>},{<<122>>,<<122>>}]),
none = find_geq(<<1>>, B),
{ok, <<2>>, <<2>>} = find_geq(<<2>>, B),
{ok, <<2>>, <<2>>} = find_geq(<<3>>, B),
{ok, <<4>>, <<4>>} = find_geq(<<5>>, B),
{ok, <<6>>, <<6>>} = find_geq(<<100>>, B),
{ok, <<122>>, <<122>>} = find_geq(<<150>>, B),
true.
time_reads(B, Size, ReadKeys) ->
Parent = self(),
spawn(
fun() ->
Runs = 20,
Timings =
lists:map(
fun (_) ->
StartTime = now(),
find_many(B, ReadKeys),
timer:now_diff(now(), StartTime)
end, lists:seq(1, Runs)),
Rps = 1000000 / ((lists:sum(Timings) / length(Timings)) / 1000),
error_logger:info_msg("Average over ~p runs, ~p keys in dict~n"
"Average fetch ~p keys: ~p us, max: ~p us~n"
"Average fetch 1 key: ~p us~n"
"Theoretical sequential RPS: ~w~n",
[Runs, Size, length(ReadKeys),
lists:sum(Timings) / length(Timings),
lists:max(Timings),
(lists:sum(Timings) / length(Timings)) / length(ReadKeys),
trunc(Rps)]),
Parent ! done
end),
receive done -> ok after 1000 -> ok end.
-spec find_many(bindict(), [key()]) -> non_neg_integer().
find_many(B, Keys) ->
lists:foldl(fun (K, N) ->
case find(K, B) of
{ok, _} -> N+1;
error -> N
end
end,
0, Keys).
-endif.

View file

@ -25,8 +25,6 @@
%% @doc Drive a set of LSM BTrees
-module(hanoidb_drv).
-ifdef(QC_PROPER).
-behaviour(gen_server).
%% API
@ -143,4 +141,3 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-endif. %% -ifdef(QC_PROPER).

View file

@ -24,8 +24,6 @@
-module(hanoidb_merger_tests).
-ifdef(QC_PROPER).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
@ -63,4 +61,3 @@ merge_test() ->
ok.
-endif. %% -ifdef(QC_PROPER).

View file

@ -24,8 +24,6 @@
-module(hanoidb_tests).
-ifdef(QC_PROPER).
-include("include/hanoidb.hrl").
-include("src/hanoidb.hrl").
@ -49,9 +47,15 @@
next_state/3, postcondition/3,
precondition/2]).
-record(tree, { elements = dict:new() :: dict() }).
-record(state, { open = dict:new() :: dict(),
closed = dict:new() :: dict()}).
-ifdef(pre18).
-define(OTP_DICT, dict()).
-else.
-define(OTP_DICT, dict:dict()).
-endif.
-record(tree, { elements = dict:new() :: ?OTP_DICT }).
-record(state, { open = dict:new() :: ?OTP_DICT,
closed = dict:new() :: ?OTP_DICT}).
-define(SERVER, hanoidb_drv).
full_test_() ->
@ -63,15 +67,23 @@ full_test_() ->
?_test(test_tree_simple_5())
]}.
longer_test_() ->
longer_tree_test_() ->
{setup,
spawn,
fun () -> ok end,
fun (_) -> ok end,
[
{timeout, 300, ?_test(test_tree())},
{timeout, 120, ?_test(test_qc())}
]}.
spawn,
fun () -> ok end,
fun (_) -> ok end,
[
{timeout, 300, ?_test(test_tree())}
]}.
longer_qc_test_() ->
{setup,
spawn,
fun () -> ok end,
fun (_) -> ok end,
[
{timeout, 120, ?_test(test_qc())}
]}.
-ifdef(TRIQ).
test_qc() ->
@ -428,4 +440,3 @@ dict_range_query(Dict, Range) ->
[{K, V} || {K, V} <- dict:to_list(Dict),
?KEY_IN_RANGE(K, Range)].
-endif. %% -ifdef(QC_PROPER).

View file

@ -24,8 +24,6 @@
-module(hanoidb_writer_tests).
-ifdef(QC_PROPER).
-ifdef(TEST).
-ifdef(TEST).
-ifdef(TRIQ).
@ -64,9 +62,9 @@ simple_test() ->
simple1_test() ->
file:delete("testdata"),
{ok, BT} = hanoidb_writer:open("testdata", [{block_size, 1024},{expiry_secs, 0}]),
{ok, BT} = hanoidb_writer:open("testdata", [{block_size, 102},{expiry_secs, 0}]),
Max = 1024,
Max = 102,
Seq = lists:seq(0, Max),
{Time1,_} = timer:tc(
@ -80,15 +78,16 @@ simple1_test() ->
end,
[]),
% error_logger:info_msg("time to insert: ~p/sec~n", [1000000/(Time1/Max)]),
error_logger:info_msg("time to insert: ~p/sec~n", [1000000/(Time1/Max)]),
{ok, IN} = hanoidb_reader:open("testdata", [{expiry_secs,0}]),
Middle = Max div 2,
io:format("LOOKING UP ~p~n", [<<Middle:128>>]),
{ok, <<"valuevalue/", Middle:128>>} = hanoidb_reader:lookup(IN, <<Middle:128>>),
{Time2,Count} = timer:tc(
fun() -> hanoidb_reader:fold(fun(Key, <<"valuevalue/", Key/binary>>, N) ->
fun() -> hanoidb_reader:fold(fun(_Key, <<"valuevalue/", N:128>>, N) ->
N+1
end,
0,
@ -96,12 +95,13 @@ simple1_test() ->
end,
[]),
% error_logger:info_msg("time to scan: ~p/sec~n", [1000000/(Time2/Max)]),
io:format("time to scan: ~p/sec~n", [1000000/(Time2 div Max)]),
Max = Count-1,
{Time3,{done,Count2}} = timer:tc(
fun() -> hanoidb_reader:range_fold(fun(Key, <<"valuevalue/", Key/binary>>, N) ->
fun() -> hanoidb_reader:range_fold(fun(_Key, <<"valuevalue/", N:128>>, N) ->
% io:format("[~p]~n", N),
N+1
end,
0,
@ -110,12 +110,13 @@ simple1_test() ->
end,
[]),
% error_logger:info_msg("time to range_fold: ~p/sec~n", [1000000/(Time3/Max)]),
% error_logger:info_msg("count2=~p~n", [Count2]),
%error_logger:info_msg("time to range_fold: ~p/sec~n", [1000000/(Time3 div Max)]),
io:format("count2=~p~n", [Count2]),
Max = Count2-1,
ok = hanoidb_reader:close(IN).
-endif. %% -ifdef(QC_PROPER).

83
tools/levelpresence.sh Normal file
View file

@ -0,0 +1,83 @@
#!/bin/bash
## ----------------------------------------------------------------------------
##
## hanoi: LSM-trees (Log-Structured Merge Trees) Indexed Storage
##
## Copyright 2011-2012 (c) Trifork A/S. All Rights Reserved.
## http://trifork.com/ info@trifork.com
##
## Copyright 2012 (c) Basho Technologies, Inc. All Rights Reserved.
## http://basho.com/ info@basho.com
##
## This file is provided to you under the Apache License, Version 2.0 (the
## "License"); you may not use this file except in compliance with the License.
## You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
## WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
## License for the specific language governing permissions and limitations
## under the License.
##
## ----------------------------------------------------------------------------
function periodic() {
t=0
while sleep 1 ; do
let "t=t+1"
printf "%5d [" "$t"
for ((i=0; i<35; i++)) ; do
if ! [ -f "A-$i.data" ] ; then
echo -n " "
elif ! [ -f "B-$i.data" ] ; then
echo -n "-"
elif ! [ -f "C-$i.data" ] ; then
echo -n "#"
elif ! [ -f "X-$i.data" ] ; then
echo -n "="
else
echo -n "*"
fi
done
echo
done
}
function dynamic() {
local old s t start now
t=0
start=`date +%s`
while true ; do
s=""
for ((i=0; i<35; i++)) ; do
if ! [ -f "A-$i.data" ] ; then
s="$s "
elif ! [ -f "B-$i.data" ] ; then
s="$s-"
elif ! [ -f "C-$i.data" ] ; then
s="$s="
elif ! [ -f "X-$i.data" ] ; then
s="$s%"
else
s="$s*"
fi
done
if [[ "$s" != "$old" ]] ; then
let "t=t+1"
now=`date +%s`
let "now=now-start"
printf "%5d %6d [%s\n" "$t" "$now" "$s"
old="$s"
else
# Sleep a little bit:
perl -e 'use Time::HiRes; Time::HiRes::usleep(100000)'
fi
done
}
dynamic

View file

@ -48,13 +48,13 @@ function periodic() {
}
merge_diff() {
SA=`ls -l A-${ID}.data 2> /dev/null | awk '{print $5}'`
SB=`ls -l B-${ID}.data 2> /dev/null | awk '{print $5}'`
SX=`ls -l X-${ID}.data 2> /dev/null | awk '{print $5}'`
SA=`stat -c %s A-${ID}.data 2> /dev/null`
SB=`stat -c %s B-${ID}.data 2> /dev/null`
SX=`stat -c %s X-${ID}.data 2> /dev/null`
if [ \( -n "$SA" \) -a \( -n "$SB" \) -a \( -n "$SX" \) ]; then
export RES=`expr ${SX}0 / \( $SA + $SB \)`
export RES=`expr ${SX:-0} / \( $SA + $SB \)`
else
export RES="?"
export RES=""
fi
}
@ -64,7 +64,7 @@ function dynamic() {
start=`date +%s`
while true ; do
s=""
for ((i=8; i<22; i++)) ; do
for ((i=10; i<22; i++)) ; do
if [ -f "C-$i.data" ] ; then
s="${s}C"
else

66
tools/viz.sh Executable file
View file

@ -0,0 +1,66 @@
#!/usr/bin/env bash
merge_diff() {
SA=`stat -c %s A-${ID}.data 2> /dev/null`
SB=`stat -c %s B-${ID}.data 2> /dev/null`
SX=`stat -c %s X-${ID}.data 2> /dev/null`
if [ \( -n "$SA" \) -a \( -n "$SB" \) -a \( -n "$SX" \) ]; then
export RES="$(expr ${SX:-0} / \( $SA + $SB \))"
else
export RES=0
fi
if [[ $RES -eq 0 ]]; then
export RES="⋈"
fi
}
function dynamic() {
local old s t start now
t=0
start=`date +%s`
while true ; do
s=""
for ((i=10; i<22; i++)) ; do
if [ -f "C-$i.data" ] ; then
s="${s}"
else
s="$s "
fi
if [ -f "B-$i.data" ] ; then
s="${s}"
else
s="$s "
fi
if [ -f "A-$i.data" ] ; then
s="${s}"
else
s="$s "
fi
if [ -f "X-$i.data" ] ; then
export ID="$i"
merge_diff
s="${s}$RES"
elif [ -f "M-$i.data" ] ; then
s="${s}M"
else
s="$s "
fi
s="$s|"
done
if [[ "$s" != "$old" ]] ; then
let "t=t+1"
now=`date +%s`
let "now=now-start"
free=`df -m . 2> /dev/null | tail -1 | awk '{print $4}'`
used=`du -m 2> /dev/null | awk '{print $1}' `
printf "%5d %6d [%s\n" "$t" "$now" "$s ${used}MB (${free}MB free)"
old="$s"
else
# Sleep a little bit:
sleep 1
fi
done
}
dynamic