diff --git a/enable-lsm b/enable-lsm new file mode 100755 index 0000000..7fef4d6 --- /dev/null +++ b/enable-lsm @@ -0,0 +1,53 @@ +#!/bin/sh + +# This script adds lsm_btree to a riak github repo. Run it in the riak repo +# directory. +# +# First it adds lsm_btree, then runs "make all devrel" and then enables the +# lsm_btree storage backend in the resulting dev nodes. +# +# This script is intended to be temporary. Once lsm_btree is made into a proper +# riak citizen, this script will no longer be needed. + +set -e + +wd=`pwd` +if [ `basename $wd` != riak ]; then + echo "This doesn't appear to be a riak repo directory. Exiting." + exit 1 +fi + +if [ -d dev ]; then + echo + echo 'NOTE: THIS SCRIPT WILL DELETE YOUR EXISTING DEV DIRECTORY!' + while [ 1 ]; do + printf '\n%s' ' Do you wish to proceed? [no] ' + read answer + answer=${answer:-n} + case $answer in + [Nn]*) exit 0 ;; + [Yy]*) break ;; + *) echo 'Please answer y or n.' ;; + esac + done +fi + +rebar get-deps + +file=./deps/riak_kv/rebar.config +if ! grep -q lsm_btree $file ; then + echo + echo "Modifying $file, saving the original as ${file}.orig ..." + perl -i.orig -pe '/\bsext\b/ && print qq( {lsm_btree, ".*", {git, "git\@github.com:basho/lsm_btree.git", "master"}},\n)' $file +fi + +rebar get-deps + +rm -rf dev +make all devrel + +echo +echo 'Modifying all dev/dev*/etc/app.config files, saving originals with .orig suffix...' +perl -i.orig -ne 'if (/\bstorage_backend,/) { s/(storage_backend, )[^\}]+/\1riak_kv_lsmbtree_backend/; print } elsif (/\{eleveldb,/) { $eleveldb++; print } elsif ($eleveldb && /^\s+\]\},/) { $eleveldb = 0; print; print qq(\n {lsm_btree, [\n {data_root, "./data/lb"}\n ]},\n\n) } else { print }' dev/dev*/etc/app.config + +exit 0 diff --git a/rebar.config b/rebar.config index 1b06211..5b1a0ef 100644 --- a/rebar.config +++ b/rebar.config @@ -6,7 +6,7 @@ {deps, [ {plain_fsm, "1.1.*", {git, "git://github.com/uwiger/plain_fsm", {branch, "master"}}}, - {ebloom, "1.0.*", {git, "git://github.com/basho/ebloom.git", {branch, "master"}}}, + {ebloom, "1.1.*", {git, "git://github.com/basho/ebloom.git", {branch, "master"}}}, {basho_bench, ".*", {git, "git://github.com/basho/basho_bench.git", {branch, "master"}}}, {triq, ".*", {git, "git://github.com/krestenkrab/triq.git", {branch, "master"}}}, {proper, ".*", {git, "git://github.com/manopapad/proper.git", diff --git a/src/lsmbtree_riak_kv_backend.erl b/src/lsmbtree_riak_kv_backend.erl new file mode 100644 index 0000000..5a9a4d1 --- /dev/null +++ b/src/lsmbtree_riak_kv_backend.erl @@ -0,0 +1,282 @@ +%% ------------------------------------------------------------------- +%% +%% lsmbtree_riak_kv_backend: temporary Riak backend behaviour +%% +%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +%%% NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE +%%% +%%% This is a temporary copy of riak_kv_backend, just here to keep +%%% wterl development private for now. When riak_kv_wterl_backend is +%%% moved to riak_kv, delete this file. +%%% +%%% NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE + + +-module(lsmbtree_riak_kv_backend). + +-export([behaviour_info/1]). +-export([callback_after/3]). + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). +-compile(export_all). +-export([standard_test/2]). +-endif. + +-type fold_buckets_fun() :: fun((binary(), any()) -> any() | no_return()). +-type fold_keys_fun() :: fun((binary(), binary(), any()) -> any() | + no_return()). +-type fold_objects_fun() :: fun((binary(), binary(), term(), any()) -> + any() | + no_return()). +-export_type([fold_buckets_fun/0, + fold_keys_fun/0, + fold_objects_fun/0]). + +-spec behaviour_info(atom()) -> 'undefined' | [{atom(), arity()}]. +behaviour_info(callbacks) -> + [ + {api_version,0}, + {capabilities, 1}, % (State) + {capabilities, 2}, % (Bucket, State) + {start,2}, % (Partition, Config) + {stop,1}, % (State) + {get,3}, % (Bucket, Key, State) + {put,5}, % (Bucket, Key, IndexSpecs, Val, State) + {delete,4}, % (Bucket, Key, IndexSpecs, State) + {drop,1}, % (State) + {fold_buckets,4}, % (FoldBucketsFun, Acc, Opts, State), + % FoldBucketsFun(Bucket, Acc) + {fold_keys,4}, % (FoldKeysFun, Acc, Opts, State), + % FoldKeysFun(Bucket, Key, Acc) + {fold_objects,4}, % (FoldObjectsFun, Acc, Opts, State), + % FoldObjectsFun(Bucket, Key, Object, Acc) + {is_empty,1}, % (State) + {status,1}, % (State) + {callback,3}]; % (Ref, Msg, State) -> +behaviour_info(_Other) -> + undefined. + +%% Queue a callback for the backend after Time ms. +-spec callback_after(integer(), reference(), term()) -> reference(). +callback_after(Time, Ref, Msg) when is_integer(Time), is_reference(Ref) -> + riak_core_vnode:send_command_after(Time, {backend_callback, Ref, Msg}). + +-ifdef(TEST). + +standard_test(BackendMod, Config) -> + {spawn, + [ + {setup, + fun() -> ?MODULE:setup({BackendMod, Config}) end, + fun ?MODULE:cleanup/1, + fun(X) -> + [?MODULE:basic_store_and_fetch(X), + ?MODULE:fold_buckets(X), + ?MODULE:fold_keys(X), + ?MODULE:delete_object(X), + ?MODULE:fold_objects(X), + ?MODULE:empty_check(X) + ] + end + }]}. + +basic_store_and_fetch({Backend, State}) -> + {"basic store and fetch test", + fun() -> + [ + ?_assertMatch({ok, _}, + Backend:put(<<"b1">>, <<"k1">>, [], <<"v1">>, State)), + ?_assertMatch({ok, _}, + Backend:put(<<"b2">>, <<"k2">>, [], <<"v2">>, State)), + ?_assertMatch({ok,<<"v2">>, _}, + Backend:get(<<"b2">>, <<"k2">>, State)), + ?_assertMatch({error, not_found, _}, + Backend:get(<<"b1">>, <<"k3">>, State)) + ] + end + }. + +fold_buckets({Backend, State}) -> + {"bucket folding test", + fun() -> + FoldBucketsFun = + fun(Bucket, Acc) -> + [Bucket | Acc] + end, + + ?_assertEqual([<<"b1">>, <<"b2">>], + begin + {ok, Buckets1} = + Backend:fold_buckets(FoldBucketsFun, + [], + [], + State), + lists:sort(Buckets1) + end) + end + }. + +fold_keys({Backend, State}) -> + {"key folding test", + fun() -> + FoldKeysFun = + fun(Bucket, Key, Acc) -> + [{Bucket, Key} | Acc] + end, + FoldKeysFun1 = + fun(_Bucket, Key, Acc) -> + [Key | Acc] + end, + FoldKeysFun2 = + fun(Bucket, Key, Acc) -> + case Bucket =:= <<"b1">> of + true -> + [Key | Acc]; + false -> + Acc + end + end, + FoldKeysFun3 = + fun(Bucket, Key, Acc) -> + case Bucket =:= <<"b1">> of + true -> + Acc; + false -> + [Key | Acc] + end + end, + [ + ?_assertEqual([{<<"b1">>, <<"k1">>}, {<<"b2">>, <<"k2">>}], + begin + {ok, Keys1} = + Backend:fold_keys(FoldKeysFun, + [], + [], + State), + lists:sort(Keys1) + end), + ?_assertEqual({ok, [<<"k1">>]}, + Backend:fold_keys(FoldKeysFun1, + [], + [{bucket, <<"b1">>}], + State)), + ?_assertEqual([<<"k2">>], + Backend:fold_keys(FoldKeysFun1, + [], + [{bucket, <<"b2">>}], + State)), + ?_assertEqual({ok, [<<"k1">>]}, + Backend:fold_keys(FoldKeysFun2, [], [], State)), + ?_assertEqual({ok, [<<"k1">>]}, + Backend:fold_keys(FoldKeysFun2, + [], + [{bucket, <<"b1">>}], + State)), + ?_assertEqual({ok, [<<"k2">>]}, + Backend:fold_keys(FoldKeysFun3, [], [], State)), + ?_assertEqual({ok, []}, + Backend:fold_keys(FoldKeysFun3, + [], + [{bucket, <<"b1">>}], + State)) + ] + end + }. + +delete_object({Backend, State}) -> + {"object deletion test", + fun() -> + [ + ?_assertMatch({ok, _}, Backend:delete(<<"b2">>, <<"k2">>, State)), + ?_assertMatch({error, not_found, _}, + Backend:get(<<"b2">>, <<"k2">>, State)) + ] + end + }. + +fold_objects({Backend, State}) -> + {"object folding test", + fun() -> + FoldKeysFun = + fun(Bucket, Key, Acc) -> + [{Bucket, Key} | Acc] + end, + FoldObjectsFun = + fun(Bucket, Key, Value, Acc) -> + [{{Bucket, Key}, Value} | Acc] + end, + [ + ?_assertEqual([{<<"b1">>, <<"k1">>}], + begin + {ok, Keys} = + Backend:fold_keys(FoldKeysFun, + [], + [], + State), + lists:sort(Keys) + end), + + ?_assertEqual([{{<<"b1">>,<<"k1">>}, <<"v1">>}], + begin + {ok, Objects1} = + Backend:fold_objects(FoldObjectsFun, + [], + [], + State), + lists:sort(Objects1) + end), + ?_assertMatch({ok, _}, + Backend:put(<<"b3">>, <<"k3">>, [], <<"v3">>, State)), + ?_assertEqual([{{<<"b1">>,<<"k1">>},<<"v1">>}, + {{<<"b3">>,<<"k3">>},<<"v3">>}], + begin + {ok, Objects} = + Backend:fold_objects(FoldObjectsFun, + [], + [], + State), + lists:sort(Objects) + end) + ] + end + }. + +empty_check({Backend, State}) -> + {"is_empty test", + fun() -> + [ + ?_assertEqual(false, Backend:is_empty(State)), + ?_assertMatch({ok, _}, Backend:delete(<<"b1">>,<<"k1">>, State)), + ?_assertMatch({ok, _}, Backend:delete(<<"b3">>,<<"k3">>, State)), + ?_assertEqual(true, Backend:is_empty(State)) + ] + end + }. + +setup({BackendMod, Config}) -> + %% Start the backend + {ok, S} = BackendMod:start(42, Config), + {BackendMod, S}. + +cleanup({BackendMod, S}) -> + ok = BackendMod:stop(S). + +-endif. % TEST diff --git a/src/riak_kv_lsmbtree_backend.erl b/src/riak_kv_lsmbtree_backend.erl new file mode 100644 index 0000000..627bfbd --- /dev/null +++ b/src/riak_kv_lsmbtree_backend.erl @@ -0,0 +1,352 @@ +%% ------------------------------------------------------------------- +%% +%% riak_kv_lsmbtree_backend: LSM-Btree Driver for Riak +%% +%% Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(riak_kv_lsmbtree_backend). +-behavior(lsmbtree_riak_kv_backend). +-author('Steve Vinoski '). + +%% KV Backend API +-export([api_version/0, + capabilities/1, + capabilities/2, + start/2, + stop/1, + get/3, + put/5, + delete/4, + drop/1, + fold_buckets/4, + fold_keys/4, + fold_objects/4, + is_empty/1, + status/1, + callback/3]). + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). +-endif. + +-define(API_VERSION, 1). +%% TODO: for when this backend supports 2i +%%-define(CAPABILITIES, [async_fold, indexes]). +-define(CAPABILITIES, [async_fold]). + +-record(state, {db, + partition :: integer()}). + +-type state() :: #state{}. +-type config() :: [{atom(), term()}]. + +%% =================================================================== +%% Public API +%% =================================================================== + +%% @doc Return the major version of the +%% current API. +-spec api_version() -> {ok, integer()}. +api_version() -> + {ok, ?API_VERSION}. + +%% @doc Return the capabilities of the backend. +-spec capabilities(state()) -> {ok, [atom()]}. +capabilities(_) -> + {ok, ?CAPABILITIES}. + +%% @doc Return the capabilities of the backend. +-spec capabilities(riak_object:bucket(), state()) -> {ok, [atom()]}. +capabilities(_, _) -> + {ok, ?CAPABILITIES}. + +%% @doc Start the lsm_btree backend +-spec start(integer(), config()) -> {ok, state()} | {error, term()}. +start(Partition, Config) -> + %% Get the data root directory + case app_helper:get_prop_or_env(data_root, Config, lsm_btree) of + undefined -> + lager:error("Failed to create lsm_btree dir: data_root is not set"), + {error, data_root_unset}; + DataRoot -> + AppStart = case application:start(lsm_btree) of + ok -> + ok; + {error, {already_started, _}} -> + ok; + {error, Reason} -> + lager:error("Failed to start lsm_btree: ~p", [Reason]), + {error, Reason} + end, + case AppStart of + ok -> + ok = filelib:ensure_dir(filename:join(DataRoot, "x")), + DbName = filename:join(DataRoot, "lb" ++ integer_to_list(Partition)), + case lsm_btree:open(DbName) of + {ok, Db} -> + {ok, #state{db=Db, partition=Partition}}; + {error, OpenReason}=OpenError -> + lager:error("Failed to open lsm_btree: ~p\n", [OpenReason]), + OpenError + end; + Error -> + Error + end + end. + +%% @doc Stop the lsm_btree backend +-spec stop(state()) -> ok. +stop(#state{db=Db}) -> + ok = lsm_btree:close(Db). + +%% @doc Retrieve an object from the lsm_btree backend +-spec get(riak_object:bucket(), riak_object:key(), state()) -> + {ok, any(), state()} | + {ok, not_found, state()} | + {error, term(), state()}. +get(Bucket, Key, #state{db=Db}=State) -> + LBKey = to_object_key(Bucket, Key), + case lsm_btree:lookup(Db, LBKey) of + {ok, Value} -> + {ok, Value, State}; + notfound -> + {error, not_found, State}; + {error, Reason} -> + {error, Reason, State} + end. + +%% @doc Insert an object into the lsm_btree backend. +-type index_spec() :: {add, Index, SecondaryKey} | {remove, Index, SecondaryKey}. +-spec put(riak_object:bucket(), riak_object:key(), [index_spec()], binary(), state()) -> + {ok, state()} | + {error, term(), state()}. +put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{db=Db}=State) -> + LBKey = to_object_key(Bucket, PrimaryKey), + ok = lsm_btree:put(Db, LBKey, Val), + {ok, State}. + +%% @doc Delete an object from the lsm_btree backend +-spec delete(riak_object:bucket(), riak_object:key(), [index_spec()], state()) -> + {ok, state()} | + {error, term(), state()}. +delete(Bucket, Key, _IndexSpecs, #state{db=Db}=State) -> + LBKey = to_object_key(Bucket, Key), + case lsm_btree:delete(Db, LBKey) of + ok -> + {ok, State}; + {error, Reason} -> + {error, Reason, State} + end. + +%% @doc Fold over all the buckets +-spec fold_buckets(riak_kv_backend:fold_buckets_fun(), + any(), + [], + state()) -> {ok, any()} | {async, fun()}. +fold_buckets(FoldBucketsFun, Acc, Opts, #state{db=Db}) -> + FoldFun = fold_buckets_fun(FoldBucketsFun), + BucketFolder = + fun() -> + try + lsm_btree:sync_fold_range(Db, FoldFun, {Acc, []}, undefined, undefined) + catch + {break, AccFinal} -> + AccFinal + end + end, + case lists:member(async_fold, Opts) of + true -> + {async, BucketFolder}; + false -> + {ok, BucketFolder()} + end. + +%% @doc Fold over all the keys for one or all buckets. +-spec fold_keys(riak_kv_backend:fold_keys_fun(), + any(), + [{atom(), term()}], + state()) -> {ok, term()} | {async, fun()}. +fold_keys(FoldKeysFun, Acc, Opts, #state{db=Db}) -> + %% Figure out how we should limit the fold: by bucket, by + %% secondary index, or neither (fold across everything.) + Bucket = lists:keyfind(bucket, 1, Opts), + Index = lists:keyfind(index, 1, Opts), + + %% Multiple limiters may exist. Take the most specific limiter. + Limiter = + if Index /= false -> Index; + Bucket /= false -> Bucket; + true -> undefined + end, + + %% Set up the fold... + FoldFun = fold_keys_fun(FoldKeysFun, Limiter), + KeyFolder = + fun() -> + try + lsm_btree:sync_fold_range(Db, FoldFun, Acc, undefined, undefined) + catch + {break, AccFinal} -> + AccFinal + end + end, + case lists:member(async_fold, Opts) of + true -> + {async, KeyFolder}; + false -> + {ok, KeyFolder()} + end. + +%% @doc Fold over all the objects for one or all buckets. +-spec fold_objects(riak_kv_backend:fold_objects_fun(), + any(), + [{atom(), term()}], + state()) -> {ok, any()} | {async, fun()}. +fold_objects(FoldObjectsFun, Acc, Opts, #state{db=Db}) -> + Bucket = proplists:get_value(bucket, Opts), + FoldFun = fold_objects_fun(FoldObjectsFun, Bucket), + ObjectFolder = + fun() -> + try + lsm_btree:sync_fold_range(Db, FoldFun, Acc, undefined, undefined) + catch + {break, AccFinal} -> + AccFinal + end + end, + case lists:member(async_fold, Opts) of + true -> + {async, ObjectFolder}; + false -> + {ok, ObjectFolder()} + end. + +%% @doc Delete all objects from this lsm_btree backend +-spec drop(state()) -> {ok, state()} | {error, term(), state()}. +drop(#state{}=State) -> + %% TODO: not yet implemented + {ok, State}. + +%% @doc Returns true if this lsm_btree backend contains any +%% non-tombstone values; otherwise returns false. +-spec is_empty(state()) -> boolean(). +is_empty(#state{db=Db}) -> + FoldFun = fun(_K, _V, _Acc) -> throw(ok) end, + try + [] =:= lsm_btree:sync_fold_range(Db, FoldFun, [], undefined, undefined) + catch + _:ok -> + false + end. + +%% @doc Get the status information for this lsm_btree backend +-spec status(state()) -> [{atom(), term()}]. +status(#state{}) -> + %% TODO: not yet implemented + []. + +%% @doc Register an asynchronous callback +-spec callback(reference(), any(), state()) -> {ok, state()}. +callback(_Ref, _Msg, State) -> + {ok, State}. + + +%% =================================================================== +%% Internal functions +%% =================================================================== + +%% @private +%% Return a function to fold over the buckets on this backend +fold_buckets_fun(FoldBucketsFun) -> + fun(K, _V, {Acc, LastBucket}) -> + case from_object_key(K) of + {LastBucket, _} -> + {Acc, LastBucket}; + {Bucket, _} -> + {FoldBucketsFun(Bucket, Acc), Bucket}; + _ -> + throw({break, Acc}) + end + end. + +%% @private +%% Return a function to fold over keys on this backend +fold_keys_fun(FoldKeysFun, undefined) -> + %% Fold across everything... + fun(K, _V, Acc) -> + case from_object_key(K) of + {Bucket, Key} -> + FoldKeysFun(Bucket, Key, Acc); + _ -> + throw({break, Acc}) + end + end; +fold_keys_fun(FoldKeysFun, {bucket, FilterBucket}) -> + %% Fold across a specific bucket... + fun(K, _V, Acc) -> + case from_object_key(K) of + {Bucket, Key} when Bucket == FilterBucket -> + FoldKeysFun(Bucket, Key, Acc); + _ -> + throw({break, Acc}) + end + end; +fold_keys_fun(_FoldKeysFun, Other) -> + throw({unknown_limiter, Other}). + +%% @private +%% Return a function to fold over the objects on this backend +fold_objects_fun(FoldObjectsFun, FilterBucket) -> + fun(Key, Value, Acc) -> + case from_object_key(Key) of + {Bucket, Key} when FilterBucket == undefined; + Bucket == FilterBucket -> + FoldObjectsFun(Bucket, Key, Value, Acc); + _ -> + throw({break, Acc}) + end + end. + +to_object_key(Bucket, Key) -> + sext:encode({o, Bucket, Key}). + +from_object_key(LKey) -> + case sext:decode(LKey) of + {o, Bucket, Key} -> + {Bucket, Key}; + _ -> + undefined + end. + +%% =================================================================== +%% EUnit tests +%% =================================================================== +-ifdef(TEST). + +simple_test_() -> + ?assertCmd("rm -rf test/lsmbtree-backend"), + application:set_env(lsm_btree, data_root, "test/lsmbtree-backend"), + lsmbtree_riak_kv_backend:standard_test(?MODULE, []). + +custom_config_test_() -> + ?assertCmd("rm -rf test/lsmbtree-backend"), + application:set_env(lsm_btree, data_root, ""), + lsmbtree_riak_kv_backend:standard_test(?MODULE, [{data_root, "test/lsmbtree-backend"}]). + +-endif.