add riak kv backend

This commit is contained in:
Steve Vinoski 2012-04-14 20:49:56 -04:00
parent f0def8231b
commit 4bc1eb6e19
4 changed files with 688 additions and 1 deletions

53
enable-lsm Executable file
View file

@ -0,0 +1,53 @@
#!/bin/sh
# This script adds lsm_btree to a riak github repo. Run it in the riak repo
# directory.
#
# First it adds lsm_btree, then runs "make all devrel" and then enables the
# lsm_btree storage backend in the resulting dev nodes.
#
# This script is intended to be temporary. Once lsm_btree is made into a proper
# riak citizen, this script will no longer be needed.
set -e
wd=`pwd`
if [ `basename $wd` != riak ]; then
echo "This doesn't appear to be a riak repo directory. Exiting."
exit 1
fi
if [ -d dev ]; then
echo
echo 'NOTE: THIS SCRIPT WILL DELETE YOUR EXISTING DEV DIRECTORY!'
while [ 1 ]; do
printf '\n%s' ' Do you wish to proceed? [no] '
read answer
answer=${answer:-n}
case $answer in
[Nn]*) exit 0 ;;
[Yy]*) break ;;
*) echo 'Please answer y or n.' ;;
esac
done
fi
rebar get-deps
file=./deps/riak_kv/rebar.config
if ! grep -q lsm_btree $file ; then
echo
echo "Modifying $file, saving the original as ${file}.orig ..."
perl -i.orig -pe '/\bsext\b/ && print qq( {lsm_btree, ".*", {git, "git\@github.com:basho/lsm_btree.git", "master"}},\n)' $file
fi
rebar get-deps
rm -rf dev
make all devrel
echo
echo 'Modifying all dev/dev*/etc/app.config files, saving originals with .orig suffix...'
perl -i.orig -ne 'if (/\bstorage_backend,/) { s/(storage_backend, )[^\}]+/\1riak_kv_lsmbtree_backend/; print } elsif (/\{eleveldb,/) { $eleveldb++; print } elsif ($eleveldb && /^\s+\]\},/) { $eleveldb = 0; print; print qq(\n {lsm_btree, [\n {data_root, "./data/lb"}\n ]},\n\n) } else { print }' dev/dev*/etc/app.config
exit 0

View file

@ -6,7 +6,7 @@
{deps, [ {deps, [
{plain_fsm, "1.1.*", {git, "git://github.com/uwiger/plain_fsm", {branch, "master"}}}, {plain_fsm, "1.1.*", {git, "git://github.com/uwiger/plain_fsm", {branch, "master"}}},
{ebloom, "1.0.*", {git, "git://github.com/basho/ebloom.git", {branch, "master"}}}, {ebloom, "1.1.*", {git, "git://github.com/basho/ebloom.git", {branch, "master"}}},
{basho_bench, ".*", {git, "git://github.com/basho/basho_bench.git", {branch, "master"}}}, {basho_bench, ".*", {git, "git://github.com/basho/basho_bench.git", {branch, "master"}}},
{triq, ".*", {git, "git://github.com/krestenkrab/triq.git", {branch, "master"}}}, {triq, ".*", {git, "git://github.com/krestenkrab/triq.git", {branch, "master"}}},
{proper, ".*", {git, "git://github.com/manopapad/proper.git", {proper, ".*", {git, "git://github.com/manopapad/proper.git",

View file

@ -0,0 +1,282 @@
%% -------------------------------------------------------------------
%%
%% lsmbtree_riak_kv_backend: temporary Riak backend behaviour
%%
%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%%% NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE
%%%
%%% This is a temporary copy of riak_kv_backend, just here to keep
%%% wterl development private for now. When riak_kv_wterl_backend is
%%% moved to riak_kv, delete this file.
%%%
%%% NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE
-module(lsmbtree_riak_kv_backend).
-export([behaviour_info/1]).
-export([callback_after/3]).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-compile(export_all).
-export([standard_test/2]).
-endif.
-type fold_buckets_fun() :: fun((binary(), any()) -> any() | no_return()).
-type fold_keys_fun() :: fun((binary(), binary(), any()) -> any() |
no_return()).
-type fold_objects_fun() :: fun((binary(), binary(), term(), any()) ->
any() |
no_return()).
-export_type([fold_buckets_fun/0,
fold_keys_fun/0,
fold_objects_fun/0]).
-spec behaviour_info(atom()) -> 'undefined' | [{atom(), arity()}].
behaviour_info(callbacks) ->
[
{api_version,0},
{capabilities, 1}, % (State)
{capabilities, 2}, % (Bucket, State)
{start,2}, % (Partition, Config)
{stop,1}, % (State)
{get,3}, % (Bucket, Key, State)
{put,5}, % (Bucket, Key, IndexSpecs, Val, State)
{delete,4}, % (Bucket, Key, IndexSpecs, State)
{drop,1}, % (State)
{fold_buckets,4}, % (FoldBucketsFun, Acc, Opts, State),
% FoldBucketsFun(Bucket, Acc)
{fold_keys,4}, % (FoldKeysFun, Acc, Opts, State),
% FoldKeysFun(Bucket, Key, Acc)
{fold_objects,4}, % (FoldObjectsFun, Acc, Opts, State),
% FoldObjectsFun(Bucket, Key, Object, Acc)
{is_empty,1}, % (State)
{status,1}, % (State)
{callback,3}]; % (Ref, Msg, State) ->
behaviour_info(_Other) ->
undefined.
%% Queue a callback for the backend after Time ms.
-spec callback_after(integer(), reference(), term()) -> reference().
callback_after(Time, Ref, Msg) when is_integer(Time), is_reference(Ref) ->
riak_core_vnode:send_command_after(Time, {backend_callback, Ref, Msg}).
-ifdef(TEST).
standard_test(BackendMod, Config) ->
{spawn,
[
{setup,
fun() -> ?MODULE:setup({BackendMod, Config}) end,
fun ?MODULE:cleanup/1,
fun(X) ->
[?MODULE:basic_store_and_fetch(X),
?MODULE:fold_buckets(X),
?MODULE:fold_keys(X),
?MODULE:delete_object(X),
?MODULE:fold_objects(X),
?MODULE:empty_check(X)
]
end
}]}.
basic_store_and_fetch({Backend, State}) ->
{"basic store and fetch test",
fun() ->
[
?_assertMatch({ok, _},
Backend:put(<<"b1">>, <<"k1">>, [], <<"v1">>, State)),
?_assertMatch({ok, _},
Backend:put(<<"b2">>, <<"k2">>, [], <<"v2">>, State)),
?_assertMatch({ok,<<"v2">>, _},
Backend:get(<<"b2">>, <<"k2">>, State)),
?_assertMatch({error, not_found, _},
Backend:get(<<"b1">>, <<"k3">>, State))
]
end
}.
fold_buckets({Backend, State}) ->
{"bucket folding test",
fun() ->
FoldBucketsFun =
fun(Bucket, Acc) ->
[Bucket | Acc]
end,
?_assertEqual([<<"b1">>, <<"b2">>],
begin
{ok, Buckets1} =
Backend:fold_buckets(FoldBucketsFun,
[],
[],
State),
lists:sort(Buckets1)
end)
end
}.
fold_keys({Backend, State}) ->
{"key folding test",
fun() ->
FoldKeysFun =
fun(Bucket, Key, Acc) ->
[{Bucket, Key} | Acc]
end,
FoldKeysFun1 =
fun(_Bucket, Key, Acc) ->
[Key | Acc]
end,
FoldKeysFun2 =
fun(Bucket, Key, Acc) ->
case Bucket =:= <<"b1">> of
true ->
[Key | Acc];
false ->
Acc
end
end,
FoldKeysFun3 =
fun(Bucket, Key, Acc) ->
case Bucket =:= <<"b1">> of
true ->
Acc;
false ->
[Key | Acc]
end
end,
[
?_assertEqual([{<<"b1">>, <<"k1">>}, {<<"b2">>, <<"k2">>}],
begin
{ok, Keys1} =
Backend:fold_keys(FoldKeysFun,
[],
[],
State),
lists:sort(Keys1)
end),
?_assertEqual({ok, [<<"k1">>]},
Backend:fold_keys(FoldKeysFun1,
[],
[{bucket, <<"b1">>}],
State)),
?_assertEqual([<<"k2">>],
Backend:fold_keys(FoldKeysFun1,
[],
[{bucket, <<"b2">>}],
State)),
?_assertEqual({ok, [<<"k1">>]},
Backend:fold_keys(FoldKeysFun2, [], [], State)),
?_assertEqual({ok, [<<"k1">>]},
Backend:fold_keys(FoldKeysFun2,
[],
[{bucket, <<"b1">>}],
State)),
?_assertEqual({ok, [<<"k2">>]},
Backend:fold_keys(FoldKeysFun3, [], [], State)),
?_assertEqual({ok, []},
Backend:fold_keys(FoldKeysFun3,
[],
[{bucket, <<"b1">>}],
State))
]
end
}.
delete_object({Backend, State}) ->
{"object deletion test",
fun() ->
[
?_assertMatch({ok, _}, Backend:delete(<<"b2">>, <<"k2">>, State)),
?_assertMatch({error, not_found, _},
Backend:get(<<"b2">>, <<"k2">>, State))
]
end
}.
fold_objects({Backend, State}) ->
{"object folding test",
fun() ->
FoldKeysFun =
fun(Bucket, Key, Acc) ->
[{Bucket, Key} | Acc]
end,
FoldObjectsFun =
fun(Bucket, Key, Value, Acc) ->
[{{Bucket, Key}, Value} | Acc]
end,
[
?_assertEqual([{<<"b1">>, <<"k1">>}],
begin
{ok, Keys} =
Backend:fold_keys(FoldKeysFun,
[],
[],
State),
lists:sort(Keys)
end),
?_assertEqual([{{<<"b1">>,<<"k1">>}, <<"v1">>}],
begin
{ok, Objects1} =
Backend:fold_objects(FoldObjectsFun,
[],
[],
State),
lists:sort(Objects1)
end),
?_assertMatch({ok, _},
Backend:put(<<"b3">>, <<"k3">>, [], <<"v3">>, State)),
?_assertEqual([{{<<"b1">>,<<"k1">>},<<"v1">>},
{{<<"b3">>,<<"k3">>},<<"v3">>}],
begin
{ok, Objects} =
Backend:fold_objects(FoldObjectsFun,
[],
[],
State),
lists:sort(Objects)
end)
]
end
}.
empty_check({Backend, State}) ->
{"is_empty test",
fun() ->
[
?_assertEqual(false, Backend:is_empty(State)),
?_assertMatch({ok, _}, Backend:delete(<<"b1">>,<<"k1">>, State)),
?_assertMatch({ok, _}, Backend:delete(<<"b3">>,<<"k3">>, State)),
?_assertEqual(true, Backend:is_empty(State))
]
end
}.
setup({BackendMod, Config}) ->
%% Start the backend
{ok, S} = BackendMod:start(42, Config),
{BackendMod, S}.
cleanup({BackendMod, S}) ->
ok = BackendMod:stop(S).
-endif. % TEST

View file

@ -0,0 +1,352 @@
%% -------------------------------------------------------------------
%%
%% riak_kv_lsmbtree_backend: LSM-Btree Driver for Riak
%%
%% Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(riak_kv_lsmbtree_backend).
-behavior(lsmbtree_riak_kv_backend).
-author('Steve Vinoski <steve@basho.com>').
%% KV Backend API
-export([api_version/0,
capabilities/1,
capabilities/2,
start/2,
stop/1,
get/3,
put/5,
delete/4,
drop/1,
fold_buckets/4,
fold_keys/4,
fold_objects/4,
is_empty/1,
status/1,
callback/3]).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
-define(API_VERSION, 1).
%% TODO: for when this backend supports 2i
%%-define(CAPABILITIES, [async_fold, indexes]).
-define(CAPABILITIES, [async_fold]).
-record(state, {db,
partition :: integer()}).
-type state() :: #state{}.
-type config() :: [{atom(), term()}].
%% ===================================================================
%% Public API
%% ===================================================================
%% @doc Return the major version of the
%% current API.
-spec api_version() -> {ok, integer()}.
api_version() ->
{ok, ?API_VERSION}.
%% @doc Return the capabilities of the backend.
-spec capabilities(state()) -> {ok, [atom()]}.
capabilities(_) ->
{ok, ?CAPABILITIES}.
%% @doc Return the capabilities of the backend.
-spec capabilities(riak_object:bucket(), state()) -> {ok, [atom()]}.
capabilities(_, _) ->
{ok, ?CAPABILITIES}.
%% @doc Start the lsm_btree backend
-spec start(integer(), config()) -> {ok, state()} | {error, term()}.
start(Partition, Config) ->
%% Get the data root directory
case app_helper:get_prop_or_env(data_root, Config, lsm_btree) of
undefined ->
lager:error("Failed to create lsm_btree dir: data_root is not set"),
{error, data_root_unset};
DataRoot ->
AppStart = case application:start(lsm_btree) of
ok ->
ok;
{error, {already_started, _}} ->
ok;
{error, Reason} ->
lager:error("Failed to start lsm_btree: ~p", [Reason]),
{error, Reason}
end,
case AppStart of
ok ->
ok = filelib:ensure_dir(filename:join(DataRoot, "x")),
DbName = filename:join(DataRoot, "lb" ++ integer_to_list(Partition)),
case lsm_btree:open(DbName) of
{ok, Db} ->
{ok, #state{db=Db, partition=Partition}};
{error, OpenReason}=OpenError ->
lager:error("Failed to open lsm_btree: ~p\n", [OpenReason]),
OpenError
end;
Error ->
Error
end
end.
%% @doc Stop the lsm_btree backend
-spec stop(state()) -> ok.
stop(#state{db=Db}) ->
ok = lsm_btree:close(Db).
%% @doc Retrieve an object from the lsm_btree backend
-spec get(riak_object:bucket(), riak_object:key(), state()) ->
{ok, any(), state()} |
{ok, not_found, state()} |
{error, term(), state()}.
get(Bucket, Key, #state{db=Db}=State) ->
LBKey = to_object_key(Bucket, Key),
case lsm_btree:lookup(Db, LBKey) of
{ok, Value} ->
{ok, Value, State};
notfound ->
{error, not_found, State};
{error, Reason} ->
{error, Reason, State}
end.
%% @doc Insert an object into the lsm_btree backend.
-type index_spec() :: {add, Index, SecondaryKey} | {remove, Index, SecondaryKey}.
-spec put(riak_object:bucket(), riak_object:key(), [index_spec()], binary(), state()) ->
{ok, state()} |
{error, term(), state()}.
put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{db=Db}=State) ->
LBKey = to_object_key(Bucket, PrimaryKey),
ok = lsm_btree:put(Db, LBKey, Val),
{ok, State}.
%% @doc Delete an object from the lsm_btree backend
-spec delete(riak_object:bucket(), riak_object:key(), [index_spec()], state()) ->
{ok, state()} |
{error, term(), state()}.
delete(Bucket, Key, _IndexSpecs, #state{db=Db}=State) ->
LBKey = to_object_key(Bucket, Key),
case lsm_btree:delete(Db, LBKey) of
ok ->
{ok, State};
{error, Reason} ->
{error, Reason, State}
end.
%% @doc Fold over all the buckets
-spec fold_buckets(riak_kv_backend:fold_buckets_fun(),
any(),
[],
state()) -> {ok, any()} | {async, fun()}.
fold_buckets(FoldBucketsFun, Acc, Opts, #state{db=Db}) ->
FoldFun = fold_buckets_fun(FoldBucketsFun),
BucketFolder =
fun() ->
try
lsm_btree:sync_fold_range(Db, FoldFun, {Acc, []}, undefined, undefined)
catch
{break, AccFinal} ->
AccFinal
end
end,
case lists:member(async_fold, Opts) of
true ->
{async, BucketFolder};
false ->
{ok, BucketFolder()}
end.
%% @doc Fold over all the keys for one or all buckets.
-spec fold_keys(riak_kv_backend:fold_keys_fun(),
any(),
[{atom(), term()}],
state()) -> {ok, term()} | {async, fun()}.
fold_keys(FoldKeysFun, Acc, Opts, #state{db=Db}) ->
%% Figure out how we should limit the fold: by bucket, by
%% secondary index, or neither (fold across everything.)
Bucket = lists:keyfind(bucket, 1, Opts),
Index = lists:keyfind(index, 1, Opts),
%% Multiple limiters may exist. Take the most specific limiter.
Limiter =
if Index /= false -> Index;
Bucket /= false -> Bucket;
true -> undefined
end,
%% Set up the fold...
FoldFun = fold_keys_fun(FoldKeysFun, Limiter),
KeyFolder =
fun() ->
try
lsm_btree:sync_fold_range(Db, FoldFun, Acc, undefined, undefined)
catch
{break, AccFinal} ->
AccFinal
end
end,
case lists:member(async_fold, Opts) of
true ->
{async, KeyFolder};
false ->
{ok, KeyFolder()}
end.
%% @doc Fold over all the objects for one or all buckets.
-spec fold_objects(riak_kv_backend:fold_objects_fun(),
any(),
[{atom(), term()}],
state()) -> {ok, any()} | {async, fun()}.
fold_objects(FoldObjectsFun, Acc, Opts, #state{db=Db}) ->
Bucket = proplists:get_value(bucket, Opts),
FoldFun = fold_objects_fun(FoldObjectsFun, Bucket),
ObjectFolder =
fun() ->
try
lsm_btree:sync_fold_range(Db, FoldFun, Acc, undefined, undefined)
catch
{break, AccFinal} ->
AccFinal
end
end,
case lists:member(async_fold, Opts) of
true ->
{async, ObjectFolder};
false ->
{ok, ObjectFolder()}
end.
%% @doc Delete all objects from this lsm_btree backend
-spec drop(state()) -> {ok, state()} | {error, term(), state()}.
drop(#state{}=State) ->
%% TODO: not yet implemented
{ok, State}.
%% @doc Returns true if this lsm_btree backend contains any
%% non-tombstone values; otherwise returns false.
-spec is_empty(state()) -> boolean().
is_empty(#state{db=Db}) ->
FoldFun = fun(_K, _V, _Acc) -> throw(ok) end,
try
[] =:= lsm_btree:sync_fold_range(Db, FoldFun, [], undefined, undefined)
catch
_:ok ->
false
end.
%% @doc Get the status information for this lsm_btree backend
-spec status(state()) -> [{atom(), term()}].
status(#state{}) ->
%% TODO: not yet implemented
[].
%% @doc Register an asynchronous callback
-spec callback(reference(), any(), state()) -> {ok, state()}.
callback(_Ref, _Msg, State) ->
{ok, State}.
%% ===================================================================
%% Internal functions
%% ===================================================================
%% @private
%% Return a function to fold over the buckets on this backend
fold_buckets_fun(FoldBucketsFun) ->
fun(K, _V, {Acc, LastBucket}) ->
case from_object_key(K) of
{LastBucket, _} ->
{Acc, LastBucket};
{Bucket, _} ->
{FoldBucketsFun(Bucket, Acc), Bucket};
_ ->
throw({break, Acc})
end
end.
%% @private
%% Return a function to fold over keys on this backend
fold_keys_fun(FoldKeysFun, undefined) ->
%% Fold across everything...
fun(K, _V, Acc) ->
case from_object_key(K) of
{Bucket, Key} ->
FoldKeysFun(Bucket, Key, Acc);
_ ->
throw({break, Acc})
end
end;
fold_keys_fun(FoldKeysFun, {bucket, FilterBucket}) ->
%% Fold across a specific bucket...
fun(K, _V, Acc) ->
case from_object_key(K) of
{Bucket, Key} when Bucket == FilterBucket ->
FoldKeysFun(Bucket, Key, Acc);
_ ->
throw({break, Acc})
end
end;
fold_keys_fun(_FoldKeysFun, Other) ->
throw({unknown_limiter, Other}).
%% @private
%% Return a function to fold over the objects on this backend
fold_objects_fun(FoldObjectsFun, FilterBucket) ->
fun(Key, Value, Acc) ->
case from_object_key(Key) of
{Bucket, Key} when FilterBucket == undefined;
Bucket == FilterBucket ->
FoldObjectsFun(Bucket, Key, Value, Acc);
_ ->
throw({break, Acc})
end
end.
to_object_key(Bucket, Key) ->
sext:encode({o, Bucket, Key}).
from_object_key(LKey) ->
case sext:decode(LKey) of
{o, Bucket, Key} ->
{Bucket, Key};
_ ->
undefined
end.
%% ===================================================================
%% EUnit tests
%% ===================================================================
-ifdef(TEST).
simple_test_() ->
?assertCmd("rm -rf test/lsmbtree-backend"),
application:set_env(lsm_btree, data_root, "test/lsmbtree-backend"),
lsmbtree_riak_kv_backend:standard_test(?MODULE, []).
custom_config_test_() ->
?assertCmd("rm -rf test/lsmbtree-backend"),
application:set_env(lsm_btree, data_root, ""),
lsmbtree_riak_kv_backend:standard_test(?MODULE, [{data_root, "test/lsmbtree-backend"}]).
-endif.