Remove Riak-specific integration code. Deliver it later as a separate repo.
This commit is contained in:
parent
fb01a7dd29
commit
bfdb63094f
5 changed files with 0 additions and 1289 deletions
|
@ -1,60 +0,0 @@
|
|||
#!/bin/sh
|
||||
|
||||
# This script adds hanoidb to a riak github repo. Run it in the riak repo
|
||||
# directory.
|
||||
#
|
||||
# First it adds hanoidb, then runs "make all devrel" and then enables the
|
||||
# hanoidb storage backend in the resulting dev nodes.
|
||||
#
|
||||
# This script is intended to be temporary. Once hanoidb is made into a proper
|
||||
# riak citizen, this script will no longer be needed.
|
||||
|
||||
set -e
|
||||
|
||||
wd=`pwd`
|
||||
if [ `basename $wd` != riak ]; then
|
||||
echo "This doesn't appear to be a riak repo directory. Exiting."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if [ -d dev ]; then
|
||||
echo
|
||||
echo 'NOTE: THIS SCRIPT WILL DELETE YOUR EXISTING DEV DIRECTORY!'
|
||||
while [ 1 ]; do
|
||||
printf '\n%s' ' Do you wish to proceed? [no] '
|
||||
read answer
|
||||
answer=${answer:-n}
|
||||
case $answer in
|
||||
[Nn]*) exit 0 ;;
|
||||
[Yy]*) break ;;
|
||||
*) echo 'Please answer y or n.' ;;
|
||||
esac
|
||||
done
|
||||
fi
|
||||
|
||||
./rebar get-deps
|
||||
|
||||
file=./deps/riak_kv/src/riak_kv.app.src
|
||||
if ! grep -q hanoidb $file ; then
|
||||
echo
|
||||
echo "Modifying $file, saving the original as ${file}.orig ..."
|
||||
perl -i.orig -pe '/\bos_mon,/ && print qq( hanoidb,\n)' $file
|
||||
fi
|
||||
|
||||
file=./deps/riak_kv/rebar.config
|
||||
if ! grep -q hanoidb $file ; then
|
||||
echo
|
||||
echo "Modifying $file, saving the original as ${file}.orig ..."
|
||||
perl -i.orig -pe '/\bsext\b/ && print qq( {hanoidb, ".*", {git, "git\@github.com:basho/hanoidb.git", "master"}},\n)' $file
|
||||
fi
|
||||
|
||||
./rebar get-deps
|
||||
|
||||
rm -rf dev
|
||||
make all devrel
|
||||
|
||||
echo
|
||||
echo 'Modifying all dev/dev*/etc/app.config files, saving originals with .orig suffix...'
|
||||
perl -i.orig -ne 'if (/\bstorage_backend,/) { s/(storage_backend, )[^\}]+/\1riak_kv_hanoidb_backend/; print } elsif (/\{eleveldb,/) { $eleveldb++; print } elsif ($eleveldb && /^\s+\]\},/) { $eleveldb = 0; print; print qq(\n {hanoidb, [\n {data_root, "./data/hanoidb"}\n ]},\n\n) } else { print }' dev/dev*/etc/app.config
|
||||
|
||||
exit 0
|
|
@ -1,31 +0,0 @@
|
|||
|
||||
{mode, max}.
|
||||
|
||||
{duration, 20}.
|
||||
|
||||
{concurrent, 1}.
|
||||
|
||||
{driver, basho_bench_driver_hanoidb}.
|
||||
|
||||
{key_generator, {int_to_bin,{uniform_int, 5000000}}}.
|
||||
|
||||
{value_generator, {fixed_bin, 10000}}.
|
||||
|
||||
{operations, [{get, 1}, {put, 1}]}.
|
||||
|
||||
%% the second element in the list below (e.g., "../../public/bitcask") must point to
|
||||
%% the relevant directory of a hanoi installation
|
||||
{code_paths, ["deps/stats",
|
||||
"../hanoidb/ebin",
|
||||
"../hanoidb/deps/plain_fsm/ebin",
|
||||
"../hanoidb/deps/ebloom/ebin"
|
||||
]}.
|
||||
|
||||
% {hanoidb_dir, "/tmp/hanoidb.bench"}.
|
||||
|
||||
{hanoidb_flags, [{expiry_secs, 0},
|
||||
{sync_strategy, none}, %% |sync|{seconds, N}
|
||||
{compress, none}, %% |gzip
|
||||
{page_size, 8192}, %
|
||||
{merge_strategy, fast} %% |predictable
|
||||
]}.
|
|
@ -1,280 +0,0 @@
|
|||
%% ----------------------------------------------------------------------------
|
||||
%%
|
||||
%% riak_kv_backend: Riak backend behaviour
|
||||
%%
|
||||
%% Copyright (c) 2007-2012 Basho Technologies, Inc. All Rights Reserved.
|
||||
%%
|
||||
%% This file is provided to you under the Apache License, Version 2.0 (the
|
||||
%% "License"); you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
%% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
%% License for the specific language governing permissions and limitations
|
||||
%% under the License.
|
||||
%%
|
||||
%% ----------------------------------------------------------------------------
|
||||
|
||||
%%% NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE
|
||||
%%%
|
||||
%%% This is a temporary copy of riak_kv_backend, just here to keep hanoidb
|
||||
%%% development private for now. When riak_kv_hanoidb_backend is moved to
|
||||
%%% riak_kv, delete this file.
|
||||
%%%
|
||||
%%% NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE
|
||||
|
||||
|
||||
-module(hanoidb_temp_riak_kv_backend).
|
||||
|
||||
-export([behaviour_info/1]).
|
||||
-export([callback_after/3]).
|
||||
|
||||
-ifdef(TEST).
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-compile(export_all).
|
||||
-export([standard_test/2]).
|
||||
-endif.
|
||||
|
||||
-type fold_buckets_fun() :: fun((binary(), any()) -> any() | no_return()).
|
||||
-type fold_keys_fun() :: fun((binary(), binary(), any()) -> any() |
|
||||
no_return()).
|
||||
-type fold_objects_fun() :: fun((binary(), binary(), term(), any()) ->
|
||||
any() |
|
||||
no_return()).
|
||||
-export_type([fold_buckets_fun/0,
|
||||
fold_keys_fun/0,
|
||||
fold_objects_fun/0]).
|
||||
|
||||
-spec behaviour_info(atom()) -> 'undefined' | [{atom(), arity()}].
|
||||
behaviour_info(callbacks) ->
|
||||
[
|
||||
{api_version,0},
|
||||
{capabilities, 1}, % (State)
|
||||
{capabilities, 2}, % (Bucket, State)
|
||||
{start,2}, % (Partition, Config)
|
||||
{stop,1}, % (State)
|
||||
{get,3}, % (Bucket, Key, State)
|
||||
{put,5}, % (Bucket, Key, IndexSpecs, Val, State)
|
||||
{delete,4}, % (Bucket, Key, IndexSpecs, State)
|
||||
{drop,1}, % (State)
|
||||
{fold_buckets,4}, % (FoldBucketsFun, Acc, Opts, State),
|
||||
% FoldBucketsFun(Bucket, Acc)
|
||||
{fold_keys,4}, % (FoldKeysFun, Acc, Opts, State),
|
||||
% FoldKeysFun(Bucket, Key, Acc)
|
||||
{fold_objects,4}, % (FoldObjectsFun, Acc, Opts, State),
|
||||
% FoldObjectsFun(Bucket, Key, Object, Acc)
|
||||
{is_empty,1}, % (State)
|
||||
{status,1}, % (State)
|
||||
{callback,3}]; % (Ref, Msg, State) ->
|
||||
behaviour_info(_Other) ->
|
||||
undefined.
|
||||
|
||||
%% Queue a callback for the backend after Time ms.
|
||||
-spec callback_after(integer(), reference(), term()) -> reference().
|
||||
callback_after(Time, Ref, Msg) when is_integer(Time), is_reference(Ref) ->
|
||||
riak_core_vnode:send_command_after(Time, {backend_callback, Ref, Msg}).
|
||||
|
||||
-ifdef(TEST).
|
||||
|
||||
standard_test(BackendMod, Config) ->
|
||||
{spawn,
|
||||
[
|
||||
{setup,
|
||||
fun() -> ?MODULE:setup({BackendMod, Config}) end,
|
||||
fun ?MODULE:cleanup/1,
|
||||
fun(X) ->
|
||||
[?MODULE:basic_store_and_fetch(X),
|
||||
?MODULE:fold_buckets(X),
|
||||
?MODULE:fold_keys(X),
|
||||
?MODULE:delete_object(X),
|
||||
?MODULE:fold_objects(X),
|
||||
?MODULE:empty_check(X)
|
||||
]
|
||||
end
|
||||
}]}.
|
||||
|
||||
basic_store_and_fetch({Backend, State}) ->
|
||||
{"basic store and fetch test",
|
||||
fun() ->
|
||||
[
|
||||
?_assertMatch({ok, _},
|
||||
Backend:put(<<"b1">>, <<"k1">>, [], <<"v1">>, State)),
|
||||
?_assertMatch({ok, _},
|
||||
Backend:put(<<"b2">>, <<"k2">>, [], <<"v2">>, State)),
|
||||
?_assertMatch({ok,<<"v2">>, _},
|
||||
Backend:get(<<"b2">>, <<"k2">>, State)),
|
||||
?_assertMatch({error, not_found, _},
|
||||
Backend:get(<<"b1">>, <<"k3">>, State))
|
||||
]
|
||||
end
|
||||
}.
|
||||
|
||||
fold_buckets({Backend, State}) ->
|
||||
{"bucket folding test",
|
||||
fun() ->
|
||||
FoldBucketsFun =
|
||||
fun(Bucket, Acc) ->
|
||||
[Bucket | Acc]
|
||||
end,
|
||||
|
||||
?_assertEqual([<<"b1">>, <<"b2">>],
|
||||
begin
|
||||
{ok, Buckets1} =
|
||||
Backend:fold_buckets(FoldBucketsFun,
|
||||
[],
|
||||
[],
|
||||
State),
|
||||
lists:sort(Buckets1)
|
||||
end)
|
||||
end
|
||||
}.
|
||||
|
||||
fold_keys({Backend, State}) ->
|
||||
{"key folding test",
|
||||
fun() ->
|
||||
FoldKeysFun =
|
||||
fun(Bucket, Key, Acc) ->
|
||||
[{Bucket, Key} | Acc]
|
||||
end,
|
||||
FoldKeysFun1 =
|
||||
fun(_Bucket, Key, Acc) ->
|
||||
[Key | Acc]
|
||||
end,
|
||||
FoldKeysFun2 =
|
||||
fun(Bucket, Key, Acc) ->
|
||||
case Bucket =:= <<"b1">> of
|
||||
true ->
|
||||
[Key | Acc];
|
||||
false ->
|
||||
Acc
|
||||
end
|
||||
end,
|
||||
FoldKeysFun3 =
|
||||
fun(Bucket, Key, Acc) ->
|
||||
case Bucket =:= <<"b1">> of
|
||||
true ->
|
||||
Acc;
|
||||
false ->
|
||||
[Key | Acc]
|
||||
end
|
||||
end,
|
||||
[
|
||||
?_assertEqual([{<<"b1">>, <<"k1">>}, {<<"b2">>, <<"k2">>}],
|
||||
begin
|
||||
{ok, Keys1} =
|
||||
Backend:fold_keys(FoldKeysFun,
|
||||
[],
|
||||
[],
|
||||
State),
|
||||
lists:sort(Keys1)
|
||||
end),
|
||||
?_assertEqual({ok, [<<"k1">>]},
|
||||
Backend:fold_keys(FoldKeysFun1,
|
||||
[],
|
||||
[{bucket, <<"b1">>}],
|
||||
State)),
|
||||
?_assertEqual([<<"k2">>],
|
||||
Backend:fold_keys(FoldKeysFun1,
|
||||
[],
|
||||
[{bucket, <<"b2">>}],
|
||||
State)),
|
||||
?_assertEqual({ok, [<<"k1">>]},
|
||||
Backend:fold_keys(FoldKeysFun2, [], [], State)),
|
||||
?_assertEqual({ok, [<<"k1">>]},
|
||||
Backend:fold_keys(FoldKeysFun2,
|
||||
[],
|
||||
[{bucket, <<"b1">>}],
|
||||
State)),
|
||||
?_assertEqual({ok, [<<"k2">>]},
|
||||
Backend:fold_keys(FoldKeysFun3, [], [], State)),
|
||||
?_assertEqual({ok, []},
|
||||
Backend:fold_keys(FoldKeysFun3,
|
||||
[],
|
||||
[{bucket, <<"b1">>}],
|
||||
State))
|
||||
]
|
||||
end
|
||||
}.
|
||||
|
||||
delete_object({Backend, State}) ->
|
||||
{"object deletion test",
|
||||
fun() ->
|
||||
[
|
||||
?_assertMatch({ok, _}, Backend:delete(<<"b2">>, <<"k2">>, State)),
|
||||
?_assertMatch({error, not_found, _},
|
||||
Backend:get(<<"b2">>, <<"k2">>, State))
|
||||
]
|
||||
end
|
||||
}.
|
||||
|
||||
fold_objects({Backend, State}) ->
|
||||
{"object folding test",
|
||||
fun() ->
|
||||
FoldKeysFun =
|
||||
fun(Bucket, Key, Acc) ->
|
||||
[{Bucket, Key} | Acc]
|
||||
end,
|
||||
FoldObjectsFun =
|
||||
fun(Bucket, Key, Value, Acc) ->
|
||||
[{{Bucket, Key}, Value} | Acc]
|
||||
end,
|
||||
[
|
||||
?_assertEqual([{<<"b1">>, <<"k1">>}],
|
||||
begin
|
||||
{ok, Keys} =
|
||||
Backend:fold_keys(FoldKeysFun,
|
||||
[],
|
||||
[],
|
||||
State),
|
||||
lists:sort(Keys)
|
||||
end),
|
||||
|
||||
?_assertEqual([{{<<"b1">>,<<"k1">>}, <<"v1">>}],
|
||||
begin
|
||||
{ok, Objects1} =
|
||||
Backend:fold_objects(FoldObjectsFun,
|
||||
[],
|
||||
[],
|
||||
State),
|
||||
lists:sort(Objects1)
|
||||
end),
|
||||
?_assertMatch({ok, _},
|
||||
Backend:put(<<"b3">>, <<"k3">>, [], <<"v3">>, State)),
|
||||
?_assertEqual([{{<<"b1">>,<<"k1">>},<<"v1">>},
|
||||
{{<<"b3">>,<<"k3">>},<<"v3">>}],
|
||||
begin
|
||||
{ok, Objects} =
|
||||
Backend:fold_objects(FoldObjectsFun,
|
||||
[],
|
||||
[],
|
||||
State),
|
||||
lists:sort(Objects)
|
||||
end)
|
||||
]
|
||||
end
|
||||
}.
|
||||
|
||||
empty_check({Backend, State}) ->
|
||||
{"is_empty test",
|
||||
fun() ->
|
||||
[
|
||||
?_assertEqual(false, Backend:is_empty(State)),
|
||||
?_assertMatch({ok, _}, Backend:delete(<<"b1">>,<<"k1">>, State)),
|
||||
?_assertMatch({ok, _}, Backend:delete(<<"b3">>,<<"k3">>, State)),
|
||||
?_assertEqual(true, Backend:is_empty(State))
|
||||
]
|
||||
end
|
||||
}.
|
||||
|
||||
setup({BackendMod, Config}) ->
|
||||
%% Start the backend
|
||||
{ok, S} = BackendMod:start(42, Config),
|
||||
{BackendMod, S}.
|
||||
|
||||
cleanup({BackendMod, S}) ->
|
||||
ok = BackendMod:stop(S).
|
||||
|
||||
-endif. % TEST
|
|
@ -1,546 +0,0 @@
|
|||
%% ----------------------------------------------------------------------------
|
||||
%%
|
||||
%% hanoidb: LSM-trees (Log-Structured Merge Trees) Indexed Storage
|
||||
%%
|
||||
%% Copyright 2012 (c) Basho Technologies, Inc. All Rights Reserved.
|
||||
%% http://basho.com/ info@basho.com
|
||||
%%
|
||||
%% This file is provided to you under the Apache License, Version 2.0 (the
|
||||
%% "License"); you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
%% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
%% License for the specific language governing permissions and limitations
|
||||
%% under the License.
|
||||
%%
|
||||
%% ----------------------------------------------------------------------------
|
||||
|
||||
-module(riak_kv_hanoidb_backend).
|
||||
-behavior(hanoidb_temp_riak_kv_backend).
|
||||
-author('Steve Vinoski <steve@basho.com>').
|
||||
-author('Greg Burd <greg@basho.com>').
|
||||
|
||||
%% KV Backend API
|
||||
-export([api_version/0,
|
||||
capabilities/1,
|
||||
capabilities/2,
|
||||
start/2,
|
||||
stop/1,
|
||||
get/3,
|
||||
put/5,
|
||||
delete/4,
|
||||
drop/1,
|
||||
fold_buckets/4,
|
||||
fold_keys/4,
|
||||
fold_objects/4,
|
||||
is_empty/1,
|
||||
status/1,
|
||||
callback/3]).
|
||||
|
||||
|
||||
-define(log(Fmt,Args),ok).
|
||||
|
||||
-ifdef(TEST).
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-export([to_index_key/4,from_index_key/1,
|
||||
to_object_key/2,from_object_key/1,
|
||||
to_key_range/1]).
|
||||
-endif.
|
||||
|
||||
-include("include/hanoidb.hrl").
|
||||
|
||||
-define(API_VERSION, 1).
|
||||
%% TODO: for when this backend supports 2i
|
||||
-define(CAPABILITIES, [async_fold, indexes]).
|
||||
%-define(CAPABILITIES, [async_fold]).
|
||||
|
||||
-record(state, {tree,
|
||||
partition :: integer(),
|
||||
config :: config() }).
|
||||
|
||||
-type state() :: #state{}.
|
||||
-type config_option() :: {data_root, string()} | hanoidb:config_option().
|
||||
-type config() :: [config_option()].
|
||||
|
||||
%% ===================================================================
|
||||
%% Public API
|
||||
%% ===================================================================
|
||||
|
||||
%% @doc Return the major version of the
|
||||
%% current API.
|
||||
-spec api_version() -> {ok, integer()}.
|
||||
api_version() ->
|
||||
{ok, ?API_VERSION}.
|
||||
|
||||
%% @doc Return the capabilities of the backend.
|
||||
-spec capabilities(state()) -> {ok, [atom()]}.
|
||||
capabilities(_) ->
|
||||
{ok, ?CAPABILITIES}.
|
||||
|
||||
%% @doc Return the capabilities of the backend.
|
||||
-spec capabilities(riak_object:bucket(), state()) -> {ok, [atom()]}.
|
||||
capabilities(_, _) ->
|
||||
{ok, ?CAPABILITIES}.
|
||||
|
||||
%% @doc Start the hanoidb backend
|
||||
-spec start(integer(), config()) -> {ok, state()} | {error, term()}.
|
||||
start(Partition, Config) ->
|
||||
%% Get the data root directory
|
||||
case app_helper:get_prop_or_env(data_root, Config, hanoidb) of
|
||||
undefined ->
|
||||
lager:error("Failed to create hanoidb dir: data_root is not set"),
|
||||
{error, data_root_unset};
|
||||
DataRoot ->
|
||||
AppStart = case application:start(hanoidb) of
|
||||
ok ->
|
||||
ok;
|
||||
{error, {already_started, _}} ->
|
||||
ok;
|
||||
{error, StartReason} ->
|
||||
lager:error("Failed to init the hanoidb backend: ~p", [StartReason]),
|
||||
{error, StartReason}
|
||||
end,
|
||||
case AppStart of
|
||||
ok ->
|
||||
case get_data_dir(DataRoot, integer_to_list(Partition)) of
|
||||
{ok, DataDir} ->
|
||||
case hanoidb:open(DataDir, Config) of
|
||||
{ok, Tree} ->
|
||||
{ok, #state{tree=Tree, partition=Partition, config=Config }};
|
||||
{error, OpenReason}=OpenError ->
|
||||
lager:error("Failed to open hanoidb: ~p\n", [OpenReason]),
|
||||
OpenError
|
||||
end;
|
||||
{error, Reason} ->
|
||||
lager:error("Failed to start hanoidb backend: ~p\n", [Reason]),
|
||||
{error, Reason}
|
||||
end;
|
||||
Error ->
|
||||
Error
|
||||
end
|
||||
end.
|
||||
|
||||
%% @doc Stop the hanoidb backend
|
||||
-spec stop(state()) -> ok.
|
||||
stop(#state{tree=Tree}) ->
|
||||
ok = hanoidb:close(Tree).
|
||||
|
||||
%% @doc Retrieve an object from the hanoidb backend
|
||||
-spec get(riak_object:bucket(), riak_object:key(), state()) ->
|
||||
{ok, any(), state()} |
|
||||
{ok, not_found, state()} |
|
||||
{error, term(), state()}.
|
||||
get(Bucket, Key, #state{tree=Tree}=State) ->
|
||||
BKey = to_object_key(Bucket, Key),
|
||||
case hanoidb:get(Tree, BKey) of
|
||||
{ok, Value} ->
|
||||
{ok, Value, State};
|
||||
not_found ->
|
||||
{error, not_found, State};
|
||||
{error, Reason} ->
|
||||
{error, Reason, State}
|
||||
end.
|
||||
|
||||
%% @doc Insert an object into the hanoidb backend.
|
||||
-type index_spec() :: {add, Index, SecondaryKey} | {remove, Index, SecondaryKey}.
|
||||
-spec put(riak_object:bucket(), riak_object:key(), [index_spec()], binary(), state()) ->
|
||||
{ok, state()} |
|
||||
{error, term(), state()}.
|
||||
put(Bucket, PrimaryKey, IndexSpecs, Val, #state{tree=Tree}=State) ->
|
||||
%% Create the KV update...
|
||||
StorageKey = to_object_key(Bucket, PrimaryKey),
|
||||
Updates1 = [{put, StorageKey, Val}],
|
||||
|
||||
%% Convert IndexSpecs to index updates...
|
||||
F = fun({add, Field, Value}) ->
|
||||
{put, to_index_key(Bucket, PrimaryKey, Field, Value), <<>>};
|
||||
({remove, Field, Value}) ->
|
||||
{delete, to_index_key(Bucket, PrimaryKey, Field, Value)}
|
||||
end,
|
||||
Updates2 = [F(X) || X <- IndexSpecs],
|
||||
|
||||
ok = hanoidb:transact(Tree, Updates1 ++ Updates2),
|
||||
{ok, State}.
|
||||
|
||||
%% @doc Delete an object from the hanoidb backend
|
||||
-spec delete(riak_object:bucket(), riak_object:key(), [index_spec()], state()) ->
|
||||
{ok, state()} |
|
||||
{error, term(), state()}.
|
||||
delete(Bucket, PrimaryKey, IndexSpecs, #state{tree=Tree}=State) ->
|
||||
|
||||
%% Create the KV delete...
|
||||
StorageKey = to_object_key(Bucket, PrimaryKey),
|
||||
Updates1 = [{delete, StorageKey}],
|
||||
|
||||
%% Convert IndexSpecs to index deletes...
|
||||
F = fun({remove, Field, Value}) ->
|
||||
{delete, to_index_key(Bucket, PrimaryKey, Field, Value)}
|
||||
end,
|
||||
Updates2 = [F(X) || X <- IndexSpecs],
|
||||
|
||||
case hanoidb:transact(Tree, Updates1 ++ Updates2) of
|
||||
ok ->
|
||||
{ok, State};
|
||||
{error, Reason} ->
|
||||
{error, Reason, State}
|
||||
end.
|
||||
|
||||
%% @doc Fold over all the buckets
|
||||
-spec fold_buckets(riak_kv_backend:fold_buckets_fun(),
|
||||
any(),
|
||||
[],
|
||||
state()) -> {ok, any()} | {async, fun()}.
|
||||
fold_buckets(FoldBucketsFun, Acc, Opts, #state{tree=Tree}) ->
|
||||
BucketFolder =
|
||||
fun() ->
|
||||
fold_list_buckets(undefined, Tree, FoldBucketsFun, Acc)
|
||||
end,
|
||||
case proplists:get_bool(async_fold, Opts) of
|
||||
true ->
|
||||
{async, BucketFolder};
|
||||
false ->
|
||||
{ok, BucketFolder()}
|
||||
end.
|
||||
|
||||
|
||||
fold_list_buckets(PrevBucket, Tree, FoldBucketsFun, Acc) ->
|
||||
?log("fold_list_buckets prev=~p~n", [PrevBucket]),
|
||||
case PrevBucket of
|
||||
undefined ->
|
||||
RangeStart = to_object_key(<<>>, '_');
|
||||
_ ->
|
||||
RangeStart = to_object_key(<<PrevBucket/binary, 0>>, '_')
|
||||
end,
|
||||
|
||||
Range = #key_range{ from_key=RangeStart, from_inclusive=true,
|
||||
to_key=undefined, to_inclusive=undefined,
|
||||
limit=1 },
|
||||
|
||||
%% grab next bucket, it's a limit=1 range query :-)
|
||||
case hanoidb:fold_range(Tree,
|
||||
fun(BucketKey,_Value,none) ->
|
||||
?log( "IN_FOLDER ~p~n", [BucketKey]),
|
||||
case from_object_key(BucketKey) of
|
||||
{Bucket, _Key} ->
|
||||
[Bucket];
|
||||
_ ->
|
||||
none
|
||||
end
|
||||
end,
|
||||
none,
|
||||
Range)
|
||||
of
|
||||
none ->
|
||||
?log( "NO_MORE_BUCKETS~n", []),
|
||||
Acc;
|
||||
[Bucket] ->
|
||||
?log( "NEXT_BUCKET ~p~n", [Bucket]),
|
||||
fold_list_buckets(Bucket, Tree, FoldBucketsFun, FoldBucketsFun(Bucket, Acc))
|
||||
end.
|
||||
|
||||
|
||||
%% @doc Fold over all the keys for one or all buckets.
|
||||
-spec fold_keys(riak_kv_backend:fold_keys_fun(),
|
||||
any(),
|
||||
[{atom(), term()}],
|
||||
state()) -> {ok, term()} | {async, fun()}.
|
||||
fold_keys(FoldKeysFun, Acc, Opts, #state{tree=Tree}) ->
|
||||
%% Figure out how we should limit the fold: by bucket, by
|
||||
%% secondary index, or neither (fold across everything.)
|
||||
Bucket = lists:keyfind(bucket, 1, Opts),
|
||||
Index = lists:keyfind(index, 1, Opts),
|
||||
|
||||
%% Multiple limiters may exist. Take the most specific limiter.
|
||||
Limiter =
|
||||
if Index /= false -> Index;
|
||||
Bucket /= false -> Bucket;
|
||||
true -> undefined
|
||||
end,
|
||||
|
||||
%% Set up the fold...
|
||||
FoldFun = fold_keys_fun(FoldKeysFun, Limiter),
|
||||
Range = to_key_range(Limiter),
|
||||
case proplists:get_bool(async_fold, Opts) of
|
||||
true ->
|
||||
{async, fun() -> hanoidb:fold_range(Tree, FoldFun, Acc, Range) end};
|
||||
false ->
|
||||
{ok, hanoidb:fold_range(Tree, FoldFun, Acc, Range)}
|
||||
end.
|
||||
|
||||
%% @doc Fold over all the objects for one or all buckets.
|
||||
-spec fold_objects(riak_kv_backend:fold_objects_fun(),
|
||||
any(),
|
||||
[{atom(), term()}],
|
||||
state()) -> {ok, any()} | {async, fun()}.
|
||||
fold_objects(FoldObjectsFun, Acc, Opts, #state{tree=Tree}) ->
|
||||
Bucket = proplists:get_value(bucket, Opts),
|
||||
FoldFun = fold_objects_fun(FoldObjectsFun, Bucket),
|
||||
ObjectFolder =
|
||||
fun() ->
|
||||
% io:format(user, "starting fold_objects in ~p~n", [self()]),
|
||||
Result = hanoidb:fold_range(Tree, FoldFun, Acc, to_key_range(Bucket)),
|
||||
% io:format(user, "ended fold_objects in ~p => ~P~n", [self(),Result,20]),
|
||||
Result
|
||||
end,
|
||||
case proplists:get_bool(async_fold, Opts) of
|
||||
true ->
|
||||
{async, ObjectFolder};
|
||||
false ->
|
||||
{ok, ObjectFolder()}
|
||||
end.
|
||||
|
||||
%% @doc Delete all objects from this hanoidb backend
|
||||
-spec drop(state()) -> {ok, state()} | {error, term(), state()}.
|
||||
drop(#state{ tree=Tree, partition=Partition, config=Config }=State) ->
|
||||
case hanoidb:destroy(Tree) of
|
||||
ok ->
|
||||
start(Partition, Config);
|
||||
{error, Term} ->
|
||||
{error, Term, State}
|
||||
end.
|
||||
|
||||
%% @doc Returns true if this hanoidb backend contains any
|
||||
%% non-tombstone values; otherwise returns false.
|
||||
-spec is_empty(state()) -> boolean().
|
||||
is_empty(#state{tree=Tree}) ->
|
||||
FoldFun = fun(K, _V, Acc) -> [K|Acc] end,
|
||||
try
|
||||
Range = to_key_range(undefined),
|
||||
[] =:= hanoidb:fold_range(Tree, FoldFun, [], Range#key_range{ limit=1 })
|
||||
catch
|
||||
_:ok ->
|
||||
false
|
||||
end.
|
||||
|
||||
%% @doc Get the status information for this hanoidb backend
|
||||
-spec status(state()) -> [{atom(), term()}].
|
||||
status(#state{}) ->
|
||||
%% TODO: not yet implemented
|
||||
[].
|
||||
|
||||
%% @doc Register an asynchronous callback
|
||||
-spec callback(reference(), any(), state()) -> {ok, state()}.
|
||||
callback(_Ref, _Msg, State) ->
|
||||
{ok, State}.
|
||||
|
||||
|
||||
%% ===================================================================
|
||||
%% Internal functions
|
||||
%% ===================================================================
|
||||
|
||||
%% @private
|
||||
%% Create the directory for this partition's LSM-BTree files
|
||||
get_data_dir(DataRoot, Partition) ->
|
||||
PartitionDir = filename:join([DataRoot, Partition]),
|
||||
case filelib:ensure_dir(filename:join([filename:absname(DataRoot), Partition, "x"])) of
|
||||
ok ->
|
||||
{ok, PartitionDir};
|
||||
{error, Reason} ->
|
||||
lager:error("Failed to create hanoidb dir ~s: ~p", [PartitionDir, Reason]),
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
%% @private
|
||||
%% Return a function to fold over keys on this backend
|
||||
fold_keys_fun(FoldKeysFun, undefined) ->
|
||||
%% Fold across everything...
|
||||
fun(K, _V, Acc) ->
|
||||
case from_object_key(K) of
|
||||
{Bucket, Key} ->
|
||||
FoldKeysFun(Bucket, Key, Acc)
|
||||
end
|
||||
end;
|
||||
fold_keys_fun(FoldKeysFun, {bucket, FilterBucket}) ->
|
||||
%% Fold across a specific bucket...
|
||||
fun(K, _V, Acc) ->
|
||||
case from_object_key(K) of
|
||||
{Bucket, Key} when Bucket == FilterBucket ->
|
||||
FoldKeysFun(Bucket, Key, Acc)
|
||||
end
|
||||
end;
|
||||
fold_keys_fun(FoldKeysFun, {index, FilterBucket, {eq, <<"$bucket">>, _}}) ->
|
||||
%% 2I exact match query on special $bucket field...
|
||||
fold_keys_fun(FoldKeysFun, {bucket, FilterBucket});
|
||||
fold_keys_fun(FoldKeysFun, {index, FilterBucket, {eq, FilterField, FilterTerm}}) ->
|
||||
%% Rewrite 2I exact match query as a range...
|
||||
NewQuery = {range, FilterField, FilterTerm, FilterTerm},
|
||||
fold_keys_fun(FoldKeysFun, {index, FilterBucket, NewQuery});
|
||||
fold_keys_fun(FoldKeysFun, {index, FilterBucket, {range, <<"$key">>, StartKey, EndKey}}) ->
|
||||
%% 2I range query on special $key field...
|
||||
fun(StorageKey, Acc) ->
|
||||
case from_object_key(StorageKey) of
|
||||
{Bucket, Key} when FilterBucket == Bucket,
|
||||
StartKey =< Key,
|
||||
EndKey >= Key ->
|
||||
FoldKeysFun(Bucket, Key, Acc)
|
||||
end
|
||||
end;
|
||||
fold_keys_fun(FoldKeysFun, {index, FilterBucket, {range, FilterField, StartTerm, EndTerm}}) ->
|
||||
%% 2I range query...
|
||||
fun(StorageKey, Acc) ->
|
||||
case from_index_key(StorageKey) of
|
||||
{Bucket, Key, Field, Term} when FilterBucket == Bucket,
|
||||
FilterField == Field,
|
||||
StartTerm =< Term,
|
||||
EndTerm >= Term ->
|
||||
FoldKeysFun(Bucket, Key, Acc)
|
||||
end
|
||||
end;
|
||||
fold_keys_fun(_FoldKeysFun, Other) ->
|
||||
throw({unknown_limiter, Other}).
|
||||
|
||||
%% @private
|
||||
%% Return a function to fold over the objects on this backend
|
||||
fold_objects_fun(FoldObjectsFun, FilterBucket) ->
|
||||
fun(StorageKey, Value, Acc) ->
|
||||
?log( "OFOLD: ~p, filter=~p~n", [sext:decode(StorageKey), FilterBucket]),
|
||||
case from_object_key(StorageKey) of
|
||||
{Bucket, Key} when FilterBucket == undefined;
|
||||
Bucket == FilterBucket ->
|
||||
FoldObjectsFun(Bucket, Key, Value, Acc)
|
||||
end
|
||||
end.
|
||||
|
||||
|
||||
%% This is guaranteed larger than any object key
|
||||
-define(MAX_OBJECT_KEY, <<16,0,0,0,4>>).
|
||||
|
||||
%% This is guaranteed larger than any index key
|
||||
-define(MAX_INDEX_KEY, <<16,0,0,0,6>>).
|
||||
|
||||
to_key_range(undefined) ->
|
||||
#key_range{ from_key = to_object_key(<<>>, <<>>),
|
||||
from_inclusive = true,
|
||||
to_key = ?MAX_OBJECT_KEY,
|
||||
to_inclusive = false
|
||||
};
|
||||
to_key_range({bucket, Bucket}) ->
|
||||
#key_range{ from_key = to_object_key(Bucket, <<>>),
|
||||
from_inclusive = true,
|
||||
to_key = to_object_key(<<Bucket/binary, 0>>, <<>>),
|
||||
to_inclusive = false };
|
||||
to_key_range({index, Bucket, {eq, <<"$bucket">>, _Term}}) ->
|
||||
to_key_range(Bucket);
|
||||
to_key_range({index, Bucket, {eq, Field, Term}}) ->
|
||||
to_key_range({index, Bucket, {range, Field, Term, Term}});
|
||||
to_key_range({index, Bucket, {range, <<"$key">>, StartTerm, EndTerm}}) ->
|
||||
#key_range{ from_key = to_object_key(Bucket, StartTerm),
|
||||
from_inclusive = true,
|
||||
to_key = to_object_key(Bucket, EndTerm),
|
||||
to_inclusive = true };
|
||||
to_key_range({index, Bucket, {range, Field, StartTerm, EndTerm}}) ->
|
||||
#key_range{ from_key = to_index_key(Bucket, <<>>, Field, StartTerm),
|
||||
from_inclusive = true,
|
||||
to_key = to_index_key(Bucket, <<16#ff,16#ff,16#ff,16#ff,
|
||||
16#ff,16#ff,16#ff,16#ff,
|
||||
16#ff,16#ff,16#ff,16#ff,
|
||||
16#ff,16#ff,16#ff,16#ff,
|
||||
16#ff,16#ff,16#ff,16#ff,
|
||||
16#ff,16#ff,16#ff,16#ff,
|
||||
16#ff,16#ff,16#ff,16#ff,
|
||||
16#ff,16#ff,16#ff,16#ff >>, Field, EndTerm),
|
||||
to_inclusive = false };
|
||||
to_key_range(Other) ->
|
||||
erlang:throw({unknown_limiter, Other}).
|
||||
|
||||
|
||||
|
||||
|
||||
to_object_key(Bucket, Key) ->
|
||||
sext:encode({o, Bucket, Key}).
|
||||
|
||||
from_object_key(LKey) ->
|
||||
case sext:decode(LKey) of
|
||||
{o, Bucket, Key} ->
|
||||
{Bucket, Key};
|
||||
_ ->
|
||||
undefined
|
||||
end.
|
||||
|
||||
to_index_key(Bucket, Key, Field, Term) ->
|
||||
sext:encode({i, Bucket, Field, Term, Key}).
|
||||
|
||||
from_index_key(LKey) ->
|
||||
case sext:decode(LKey) of
|
||||
{i, Bucket, Field, Term, Key} ->
|
||||
{Bucket, Key, Field, Term};
|
||||
_ ->
|
||||
undefined
|
||||
end.
|
||||
|
||||
%% ===================================================================
|
||||
%% EUnit tests
|
||||
%% ===================================================================
|
||||
-ifdef(TEST).
|
||||
|
||||
-include("src/hanoidb.hrl").
|
||||
|
||||
key_range_test() ->
|
||||
Range = to_key_range({bucket, <<"a">>}),
|
||||
|
||||
?assertEqual(true, ?KEY_IN_RANGE( to_object_key(<<"a">>, <<>>) , Range)),
|
||||
?assertEqual(true, ?KEY_IN_RANGE( to_object_key(<<"a">>, <<16#ff,16#ff,16#ff,16#ff>>), Range )),
|
||||
?assertEqual(false, ?KEY_IN_RANGE( to_object_key(<<>>, <<>>), Range )),
|
||||
?assertEqual(false, ?KEY_IN_RANGE( to_object_key(<<"a",0>>, <<>>), Range )).
|
||||
|
||||
index_range_test() ->
|
||||
Range = to_key_range({index, <<"idx">>, {range, <<"f">>, <<6>>, <<7,3>>}}),
|
||||
|
||||
?assertEqual(false, ?KEY_IN_RANGE( to_index_key(<<"idx">>, <<"key1">>, <<"f">>, <<5>>) , Range)),
|
||||
?assertEqual(true, ?KEY_IN_RANGE( to_index_key(<<"idx">>, <<"key1">>, <<"f">>, <<6>>) , Range)),
|
||||
?assertEqual(true, ?KEY_IN_RANGE( to_index_key(<<"idx">>, <<"key1">>, <<"f">>, <<7>>) , Range)),
|
||||
?assertEqual(false, ?KEY_IN_RANGE( to_index_key(<<"idx">>, <<"key1">>, <<"f">>, <<7,4>>) , Range)),
|
||||
?assertEqual(false, ?KEY_IN_RANGE( to_index_key(<<"idx">>, <<"key1">>, <<"f">>, <<9>>) , Range)).
|
||||
|
||||
|
||||
simple_test_() ->
|
||||
?assertCmd("rm -rf test/hanoidb-backend"),
|
||||
application:set_env(hanoidb, data_root, "test/hanoidbd-backend"),
|
||||
hanoidb_temp_riak_kv_backend:standard_test(?MODULE, []).
|
||||
|
||||
custom_config_test_() ->
|
||||
?assertCmd("rm -rf test/hanoidb-backend"),
|
||||
application:set_env(hanoidb, data_root, ""),
|
||||
hanoidb_temp_riak_kv_backend:standard_test(?MODULE, [{data_root, "test/hanoidb-backend"}]).
|
||||
|
||||
-ifdef(PROPER).
|
||||
|
||||
eqc_test_() ->
|
||||
{spawn,
|
||||
[{inorder,
|
||||
[{setup,
|
||||
fun setup/0,
|
||||
fun cleanup/1,
|
||||
[
|
||||
{timeout, 60,
|
||||
[?_assertEqual(true,
|
||||
backend_eqc:test(?MODULE, false,
|
||||
[{data_root,
|
||||
"test/hanoidbdb-backend"},
|
||||
{async_fold, false}]))]},
|
||||
{timeout, 60,
|
||||
[?_assertEqual(true,
|
||||
backend_eqc:test(?MODULE, false,
|
||||
[{data_root,
|
||||
"test/hanoidbdb-backend"}]))]}
|
||||
]}]}]}.
|
||||
|
||||
setup() ->
|
||||
application:load(sasl),
|
||||
application:set_env(sasl, sasl_error_logger, {file, "riak_kv_hanoidbdb_backend_eqc_sasl.log"}),
|
||||
error_logger:tty(false),
|
||||
error_logger:logfile({open, "riak_kv_hanoidbdb_backend_eqc.log"}),
|
||||
|
||||
ok.
|
||||
|
||||
cleanup(_) ->
|
||||
?_assertCmd("rm -rf test/hanoidbdb-backend").
|
||||
|
||||
-endif. % EQC
|
||||
|
||||
|
||||
-endif.
|
|
@ -1,372 +0,0 @@
|
|||
%% -------------------------------------------------------------------
|
||||
%%
|
||||
%% backend_eqc: Quickcheck testing for the backend api.
|
||||
%%
|
||||
%% Copyright (c) 2007-2011 Basho Technologies, Inc. All Rights Reserved.
|
||||
%%
|
||||
%% This file is provided to you under the Apache License,
|
||||
%% Version 2.0 (the "License"); you may not use this file
|
||||
%% except in compliance with the License. You may obtain
|
||||
%% a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing,
|
||||
%% software distributed under the License is distributed on an
|
||||
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
%% KIND, either express or implied. See the License for the
|
||||
%% specific language governing permissions and limitations
|
||||
%% under the License.
|
||||
%%
|
||||
%% -------------------------------------------------------------------
|
||||
|
||||
-module(backend_eqc).
|
||||
|
||||
-define(log(Fmt,Args),ok).
|
||||
|
||||
-ifdef(TRIQ).
|
||||
|
||||
-include_lib("triq/include/triq.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
%% Public API
|
||||
-compile(export_all).
|
||||
-export([test/1,
|
||||
test/2,
|
||||
test/3,
|
||||
test/4,
|
||||
test/5]).
|
||||
|
||||
%% eqc_fsm callbacks
|
||||
-export([initial_state/0,
|
||||
initial_state_data/0,
|
||||
next_state_data/5,
|
||||
precondition/4,
|
||||
postcondition/5]).
|
||||
|
||||
%% eqc property
|
||||
-export([prop_backend/4]).
|
||||
|
||||
%% States
|
||||
-export([stopped/1,
|
||||
running/1]).
|
||||
|
||||
%% Helpers
|
||||
-export([drop/2,
|
||||
init_backend/3]).
|
||||
|
||||
-define(TEST_ITERATIONS, 50).
|
||||
|
||||
-record(qcst, {backend, % Backend module under test
|
||||
volatile, % Indicates if backend is volatile
|
||||
c, % Backend config
|
||||
s, % Module state returned by Backend:start
|
||||
olds=sets:new(), % Old states after a stop
|
||||
d=[]}).% Orddict of values stored
|
||||
|
||||
%% ====================================================================
|
||||
%% Public API
|
||||
%% ====================================================================
|
||||
|
||||
test(Backend) ->
|
||||
test(Backend, false).
|
||||
|
||||
test(Backend, Volatile) ->
|
||||
test(Backend, Volatile, []).
|
||||
|
||||
test(Backend, Volatile, Config) ->
|
||||
test(Backend, Volatile, Config, fun(_BeState,_Olds) -> ok end).
|
||||
|
||||
test(Backend, Volatile, Config, Cleanup) ->
|
||||
test(Backend, Volatile, Config, Cleanup, ?TEST_ITERATIONS).
|
||||
|
||||
test(Backend, Volatile, Config, Cleanup, NumTests) ->
|
||||
triq:check(%triq:numtests(NumTests,
|
||||
prop_backend(Backend, Volatile, Config, Cleanup)
|
||||
% )
|
||||
).
|
||||
|
||||
%% ====================================================================
|
||||
%% eqc property
|
||||
%% ====================================================================
|
||||
|
||||
prop_backend(Backend, Volatile, Config, Cleanup) ->
|
||||
?FORALL(Cmds,
|
||||
triq_fsm:commands(?MODULE,
|
||||
{stopped,
|
||||
initial_state_data(Backend, Volatile, Config)}),
|
||||
begin
|
||||
{H,{_F,S},Res} = triq_fsm:run_commands(?MODULE, Cmds),
|
||||
Cleanup(S#qcst.s, sets:to_list(S#qcst.olds)),
|
||||
% aggregate(zip(triq_fsm:state_names(H), command_names(Cmds)),
|
||||
?WHENFAIL(
|
||||
begin
|
||||
?debugFmt("Cmds: ~p~n",
|
||||
[triq_statem:zip(triq_fsm:state_names(H),
|
||||
triq_statem:command_names(Cmds))]),
|
||||
?debugFmt("Result: ~p~n", [Res]),
|
||||
?debugFmt("History: ~p~n", [H]),
|
||||
?debugFmt("BE Config: ~p~nBE State: ~p~nD: ~p~n",
|
||||
[S#qcst.c,
|
||||
S#qcst.s,
|
||||
orddict:to_list(S#qcst.d)])
|
||||
end,
|
||||
begin
|
||||
?log( "!!!!!!!!!!!!!!!!!!! Res: ~p~n", [Res]),
|
||||
case Res of
|
||||
ok -> true;
|
||||
true -> true;
|
||||
_ -> false
|
||||
end
|
||||
end
|
||||
)
|
||||
% )
|
||||
end
|
||||
).
|
||||
|
||||
%%====================================================================
|
||||
%% Generators
|
||||
%%====================================================================
|
||||
|
||||
bucket() ->
|
||||
elements([<<"b1">>,<<"b2">>,<<"b3">>,<<"b4">>]).
|
||||
|
||||
key() ->
|
||||
elements([<<"k1">>,<<"k2">>,<<"k3">>,<<"k4">>]).
|
||||
|
||||
val() ->
|
||||
%% The creation of the riak object and the call
|
||||
%% to term_to_binary are to facilitate testing
|
||||
%% of riak_kv_index_backend. It does not matter
|
||||
%% that the bucket and key in the object may
|
||||
%% differ since at this point in the processing
|
||||
%% pipeline the information has already been
|
||||
%% extracted.
|
||||
term_to_binary(riak_object:new(<<"b1">>, <<"k1">>, binary())).
|
||||
|
||||
g_opts() ->
|
||||
frequency([{5, [async_fold]}, {2, []}]).
|
||||
|
||||
%%====================================================================
|
||||
%% Helpers
|
||||
%%====================================================================
|
||||
|
||||
fold_buckets_fun() ->
|
||||
fun(Bucket, Acc) ->
|
||||
riak_kv_fold_buffer:add(Bucket, Acc)
|
||||
end.
|
||||
|
||||
fold_keys_fun() ->
|
||||
fun(Bucket, Key, Acc) ->
|
||||
riak_kv_fold_buffer:add({Bucket, Key}, Acc)
|
||||
end.
|
||||
|
||||
fold_objects_fun() ->
|
||||
fun(Bucket, Key, Value, Acc) ->
|
||||
riak_kv_fold_buffer:add({{Bucket, Key}, Value}, Acc)
|
||||
end.
|
||||
|
||||
get_partition() ->
|
||||
{MegaSecs, Secs, MicroSecs} = erlang:now(),
|
||||
Partition = integer_to_list(MegaSecs) ++
|
||||
integer_to_list(Secs) ++
|
||||
integer_to_list(MicroSecs),
|
||||
case erlang:get(Partition) of
|
||||
undefined ->
|
||||
erlang:put(Partition, ok),
|
||||
list_to_integer(Partition);
|
||||
_ ->
|
||||
get_partition()
|
||||
end.
|
||||
|
||||
%% @TODO Volatile is unused now so remove it. Will require
|
||||
%% updating each backend module as well.
|
||||
init_backend(Backend, _Volatile, Config) ->
|
||||
Partition = get_partition(),
|
||||
%% Start an async worker pool
|
||||
{ok, PoolPid} =
|
||||
riak_core_vnode_worker_pool:start_link(riak_kv_worker,
|
||||
2,
|
||||
Partition,
|
||||
[],
|
||||
worker_props),
|
||||
%% Shutdown any previous running worker pool
|
||||
case erlang:get(worker_pool) of
|
||||
undefined ->
|
||||
ok;
|
||||
OldPoolPid ->
|
||||
riak_core_vnode_worker_pool:stop(OldPoolPid, normal)
|
||||
end,
|
||||
%% Store the info about the worker pool
|
||||
erlang:put(worker_pool, PoolPid),
|
||||
%% Start the backend
|
||||
{ok, S} = Backend:start(Partition, Config),
|
||||
S.
|
||||
|
||||
drop(Backend, State) ->
|
||||
case Backend:drop(State) of
|
||||
{ok, NewState} ->
|
||||
NewState;
|
||||
{error, _, NewState} ->
|
||||
NewState
|
||||
end.
|
||||
|
||||
get_fold_buffer() ->
|
||||
riak_kv_fold_buffer:new(100,
|
||||
get_fold_buffer_fun({raw, foldid, self()})).
|
||||
|
||||
get_fold_buffer_fun(From) ->
|
||||
fun(Results) ->
|
||||
riak_core_vnode:reply(From,
|
||||
Results)
|
||||
end.
|
||||
|
||||
%% @private
|
||||
finish_fun(Sender) ->
|
||||
fun(Buffer) ->
|
||||
finish_fold(Buffer, Sender)
|
||||
end.
|
||||
|
||||
%% @private
|
||||
finish_fold(Buffer, Sender) ->
|
||||
?log( "finish_fold: ~p,~p~n", [Buffer, Sender]),
|
||||
riak_kv_fold_buffer:flush(Buffer),
|
||||
riak_core_vnode:reply(Sender, done).
|
||||
|
||||
receive_fold_results(Acc) ->
|
||||
receive
|
||||
{_, done} ->
|
||||
Acc;
|
||||
{error, Error} ->
|
||||
?debugFmt("Error occurred: ~p~n", [Error]),
|
||||
[];
|
||||
{_, Results} ->
|
||||
receive_fold_results(Acc++Results)
|
||||
after 1000 ->
|
||||
receive MSG -> exit({bad, MSG}) end
|
||||
end.
|
||||
|
||||
%%====================================================================
|
||||
%% eqc_fsm callbacks
|
||||
%%====================================================================
|
||||
|
||||
initial_state() ->
|
||||
{stopped, true}.
|
||||
|
||||
initial_state_data() ->
|
||||
#qcst{d = orddict:new()}.
|
||||
|
||||
initial_state_data(Backend, Volatile, Config) ->
|
||||
#qcst{backend=Backend,
|
||||
c=Config,
|
||||
d=orddict:new(),
|
||||
volatile=Volatile}.
|
||||
|
||||
next_state_data(running, stopped, S, _R,
|
||||
{call, _M, stop, _}) ->
|
||||
S#qcst{d=orddict:new(),
|
||||
olds = sets:add_element(S#qcst.s, S#qcst.olds)};
|
||||
next_state_data(stopped, running, S, R, {call, _M, init_backend, _}) ->
|
||||
S#qcst{s=R};
|
||||
next_state_data(_From, _To, S, _R, {call, _M, put, [Bucket, Key, [], Val, _]}) ->
|
||||
S#qcst{d = orddict:store({Bucket, Key}, Val, S#qcst.d)};
|
||||
next_state_data(_From, _To, S, _R, {call, _M, delete, [Bucket, Key, [], _]}) ->
|
||||
S#qcst{d = orddict:erase({Bucket, Key}, S#qcst.d)};
|
||||
next_state_data(_From, _To, S, R, {call, ?MODULE, drop, _}) ->
|
||||
S#qcst{d=orddict:new(), s=R};
|
||||
next_state_data(_From, _To, S, _R, _C) ->
|
||||
S.
|
||||
|
||||
stopped(#qcst{backend=Backend,
|
||||
c=Config,
|
||||
volatile=Volatile}) ->
|
||||
[{running,
|
||||
{call, ?MODULE, init_backend, [Backend, Volatile, Config]}}].
|
||||
|
||||
running(#qcst{backend=Backend,
|
||||
s=State}) ->
|
||||
[
|
||||
{history, {call, Backend, put, [bucket(), key(), [], val(), State]}},
|
||||
{history, {call, Backend, get, [bucket(), key(), State]}},
|
||||
{history, {call, Backend, delete, [bucket(), key(), [], State]}},
|
||||
{history, {call, Backend, fold_buckets, [fold_buckets_fun(), get_fold_buffer(), g_opts(), State]}},
|
||||
{history, {call, Backend, fold_keys, [fold_keys_fun(), get_fold_buffer(), g_opts(), State]}},
|
||||
{history, {call, Backend, fold_objects, [fold_objects_fun(), get_fold_buffer(), g_opts(), State]}},
|
||||
{history, {call, Backend, is_empty, [State]}},
|
||||
{history, {call, ?MODULE, drop, [Backend, State]}},
|
||||
{stopped, {call, Backend, stop, [State]}}
|
||||
].
|
||||
|
||||
precondition(_From,_To,_S,_C) ->
|
||||
true.
|
||||
|
||||
postcondition(_From, _To, S, _C={call, _M, get, [Bucket, Key, _BeState]}, R) ->
|
||||
case R of
|
||||
{error, not_found, _} ->
|
||||
not orddict:is_key({Bucket, Key}, S#qcst.d);
|
||||
{ok, Val, _} ->
|
||||
Res = orddict:find({Bucket, Key}, S#qcst.d),
|
||||
{ok, Val} =:= Res
|
||||
end;
|
||||
postcondition(_From, _To, _S,
|
||||
{call, _M, put, [_Bucket, _Key, _IndexEntries, _Val, _BeState]}, {R, _RState}) ->
|
||||
R =:= ok orelse R =:= already_exists;
|
||||
postcondition(_From, _To, _S,
|
||||
{call, _M, delete,[_Bucket, _Key, _IndexEntries, _BeState]}, {R, _RState}) ->
|
||||
R =:= ok;
|
||||
postcondition(_From, _To, S,
|
||||
{call, _M, fold_buckets, [_FoldFun, _Acc, _Opts, _BeState]}, FoldRes) ->
|
||||
ExpectedEntries = orddict:to_list(S#qcst.d),
|
||||
Buckets = [Bucket || {{Bucket, _}, _} <- ExpectedEntries],
|
||||
From = {raw, foldid, self()},
|
||||
case FoldRes of
|
||||
{async, Work} ->
|
||||
Pool = erlang:get(worker_pool),
|
||||
?log( "pool: ~p~n", [Pool]),
|
||||
FinishFun = finish_fun(From),
|
||||
riak_core_vnode_worker_pool:handle_work(Pool, {fold, Work, FinishFun}, From);
|
||||
{ok, Buffer} ->
|
||||
?log( "got: ~p~n", [Buffer]),
|
||||
finish_fold(Buffer, From)
|
||||
end,
|
||||
R = receive_fold_results([]),
|
||||
lists:usort(Buckets) =:= lists:sort(R);
|
||||
postcondition(_From, _To, S,
|
||||
{call, _M, fold_keys, [_FoldFun, _Acc, _Opts, _BeState]}, FoldRes) ->
|
||||
ExpectedEntries = orddict:to_list(S#qcst.d),
|
||||
Keys = [{Bucket, Key} || {{Bucket, Key}, _} <- ExpectedEntries],
|
||||
From = {raw, foldid, self()},
|
||||
case FoldRes of
|
||||
{async, Work} ->
|
||||
Pool = erlang:get(worker_pool),
|
||||
?log( "pool: ~p~n", [Pool]),
|
||||
FinishFun = finish_fun(From),
|
||||
riak_core_vnode_worker_pool:handle_work(Pool, {fold, Work, FinishFun}, From);
|
||||
{ok, Buffer} ->
|
||||
?log( "got: ~p~n", [Buffer]),
|
||||
finish_fold(Buffer, From)
|
||||
end,
|
||||
R = receive_fold_results([]),
|
||||
lists:sort(Keys) =:= lists:sort(R);
|
||||
postcondition(_From, _To, S,
|
||||
{call, _M, fold_objects, [_FoldFun, _Acc, _Opts, _BeState]}, FoldRes) ->
|
||||
ExpectedEntries = orddict:to_list(S#qcst.d),
|
||||
Objects = [Object || Object <- ExpectedEntries],
|
||||
From = {raw, foldid, self()},
|
||||
case FoldRes of
|
||||
{async, Work} ->
|
||||
Pool = erlang:get(worker_pool),
|
||||
FinishFun = finish_fun(From),
|
||||
riak_core_vnode_worker_pool:handle_work(Pool, {fold, Work, FinishFun}, From);
|
||||
{ok, Buffer} ->
|
||||
finish_fold(Buffer, From)
|
||||
end,
|
||||
R = receive_fold_results([]),
|
||||
?log( "POST: fold_objects ~p =:= ~p~n", [Objects, R]),
|
||||
lists:sort(Objects) =:= lists:sort(R);
|
||||
postcondition(_From, _To, S,{call, _M, is_empty, [_BeState]}, R) ->
|
||||
R =:= (orddict:size(S#qcst.d) =:= 0);
|
||||
postcondition(_From, _To, _S, _C, _R) ->
|
||||
true.
|
||||
|
||||
-endif.
|
Loading…
Reference in a new issue