Merge remote-tracking branch 'origin/master' into gsb-async-nifs2

Conflicts:
	c_src/wterl.c
	src/riak_kv_wterl_backend.erl
	src/wterl.erl
This commit is contained in:
Gregory Burd 2013-04-02 09:33:41 -04:00
commit 6aa51437cc
8 changed files with 524 additions and 899 deletions

View file

@ -3,6 +3,7 @@
set -e set -e
WT_BRANCH=basho WT_BRANCH=basho
WT_REMOTE_REPO=http://github.com/wiredtiger/wiredtiger.git
[ `basename $PWD` != "c_src" ] && cd c_src [ `basename $PWD` != "c_src" ] && cd c_src
@ -21,12 +22,19 @@ case "$1" in
git fetch && \ git fetch && \
git merge origin/$WT_BRANCH) git merge origin/$WT_BRANCH)
else else
git clone http://github.com/wiredtiger/wiredtiger.git -b $WT_BRANCH && \ git clone -b $WT_BRANCH --single-branch $WT_REMOTE_REPO && \
(cd wiredtiger && ./autogen.sh) (cd wiredtiger && \
patch -p1 < ../wiredtiger-extension-link.patch && \
./autogen.sh)
fi fi
(cd wiredtiger/build_posix && \ (cd wiredtiger/build_posix && \
../configure --with-pic \ ../configure --with-pic \
--enable-snappy \
--enable-bzip2 \
--prefix=$BASEDIR/system && \ --prefix=$BASEDIR/system && \
make -j 8 && make install) make -j && make install)
[ -d $BASEDIR/../priv ] || mkdir $BASEDIR/../priv
cp $BASEDIR/system/bin/wt $BASEDIR/../priv
cp $BASEDIR/system/lib/*.so $BASEDIR/../priv
;; ;;
esac esac

View file

@ -0,0 +1,22 @@
diff --git a/ext/compressors/bzip2/Makefile.am b/ext/compressors/bzip2/Makefile.am
index 0aedc2e..1cc4cf6 100644
--- a/ext/compressors/bzip2/Makefile.am
+++ b/ext/compressors/bzip2/Makefile.am
@@ -2,5 +2,5 @@ AM_CPPFLAGS = -I$(top_builddir) -I$(top_srcdir)/src/include
lib_LTLIBRARIES = libwiredtiger_bzip2.la
libwiredtiger_bzip2_la_SOURCES = bzip2_compress.c
-libwiredtiger_bzip2_la_LDFLAGS = -avoid-version -module
+libwiredtiger_bzip2_la_LDFLAGS = -avoid-version -module -Wl,-rpath,lib/wterl/priv:priv:/usr/local/lib
libwiredtiger_bzip2_la_LIBADD = -lbz2
diff --git a/ext/compressors/snappy/Makefile.am b/ext/compressors/snappy/Makefile.am
index 6d78823..7d35777 100644
--- a/ext/compressors/snappy/Makefile.am
+++ b/ext/compressors/snappy/Makefile.am
@@ -2,5 +2,5 @@ AM_CPPFLAGS = -I$(top_builddir) -I$(top_srcdir)/src/include
lib_LTLIBRARIES = libwiredtiger_snappy.la
libwiredtiger_snappy_la_SOURCES = snappy_compress.c
-libwiredtiger_snappy_la_LDFLAGS = -avoid-version -module
+libwiredtiger_snappy_la_LDFLAGS = -avoid-version -module -Wl,-rpath,lib/wterl/priv:priv:/usr/local/lib
libwiredtiger_snappy_la_LIBADD = -lsnappy

File diff suppressed because it is too large Load diff

View file

@ -35,7 +35,7 @@ fi
rebar get-deps rebar get-deps
file=./deps/riak_kv/src/riak_kv.app.src file=./deps/riak_kv/src/riak_kv.app.src
if ! grep -q hanoidb $file && ! grep -q wterl $file ; then if ! grep -q wterl $file ; then
echo echo
echo "Modifying $file, saving the original as ${file}.orig ..." echo "Modifying $file, saving the original as ${file}.orig ..."
perl -i.orig -pe '/\bos_mon,/ && print qq( wterl,\n)' $file perl -i.orig -pe '/\bos_mon,/ && print qq( wterl,\n)' $file

View file

@ -37,7 +37,7 @@
{port_env, [ {port_env, [
{"DRV_CFLAGS", "$DRV_CFLAGS -Werror -I c_src/system/include"}, {"DRV_CFLAGS", "$DRV_CFLAGS -Werror -I c_src/system/include"},
{"DRV_LDFLAGS", "$DRV_LDFLAGS c_src/system/lib/libwiredtiger.a"} {"DRV_LDFLAGS", "$DRV_LDFLAGS -Wl,-rpath,lib/wterl/priv:priv -Lpriv -lwiredtiger"}
]}. ]}.
{pre_hooks, [{compile, "c_src/build_deps.sh"}]}. {pre_hooks, [{compile, "c_src/build_deps.sh"}]}.

View file

@ -52,12 +52,12 @@
-define(CAPABILITIES, [async_fold]). -define(CAPABILITIES, [async_fold]).
-record(pass, {session :: wterl:session(), -record(pass, {session :: wterl:session(),
cursor :: wterl:cursor()}). cursor :: wterl:cursor()}).
-type pass() :: #pass{}. -type pass() :: #pass{}.
-record(state, {table :: string(), -record(state, {table :: string(),
connection :: wterl:connection(), connection :: wterl:connection(),
passes :: [pass()]}). passes :: [pass()]}).
-type state() :: #state{}. -type state() :: #state{}.
-type config() :: [{atom(), term()}]. -type config() :: [{atom(), term()}].
@ -85,7 +85,6 @@ capabilities(_, _) ->
%% @doc Start the wterl backend %% @doc Start the wterl backend
-spec start(integer(), config()) -> {ok, state()} | {error, term()}. -spec start(integer(), config()) -> {ok, state()} | {error, term()}.
start(Partition, Config) -> start(Partition, Config) ->
lager:start(),
AppStart = AppStart =
case application:start(wterl) of case application:start(wterl) of
ok -> ok ->
@ -100,8 +99,8 @@ start(Partition, Config) ->
ok -> ok ->
Table = "lsm:wt" ++ integer_to_list(Partition), Table = "lsm:wt" ++ integer_to_list(Partition),
{ok, Connection} = establish_connection(Config), {ok, Connection} = establish_connection(Config),
Passes = establish_passes(erlang:system_info(schedulers), Connection, Table), Passes = establish_passes(erlang:system_info(schedulers), Connection, Table),
{ok, #state{table=Table, connection=Connection, passes=Passes}}; {ok, #state{table=Table, connection=Connection, passes=Passes}};
{error, Reason2} -> {error, Reason2} ->
{error, Reason2} {error, Reason2}
end. end.
@ -109,10 +108,11 @@ start(Partition, Config) ->
%% @doc Stop the wterl backend %% @doc Stop the wterl backend
-spec stop(state()) -> ok. -spec stop(state()) -> ok.
stop(#state{passes=Passes}) -> stop(#state{passes=Passes}) ->
lists:foreach(fun({Session, Cursor}) -> lists:foreach(fun(Elem) ->
ok = wterl:cursor_close(Cursor), {Session, Cursor} = Elem,
ok = wterl:session_close(Session) ok = wterl:cursor_close(Cursor),
end, Passes), ok = wterl:session_close(Session)
end, Passes),
ok. ok.
%% @doc Retrieve an object from the wterl backend %% @doc Retrieve an object from the wterl backend
@ -191,7 +191,7 @@ fold_buckets(FoldBucketsFun, Acc, Opts, #state{table=Table}) ->
AccFinal AccFinal
after after
ok = wterl:cursor_close(Cursor), ok = wterl:cursor_close(Cursor),
ok = wterl:session_close(Session) ok = wterl:session_close(Session)
end end
end end
end, end,
@ -238,7 +238,7 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{table=Table}) ->
AccFinal AccFinal
after after
ok = wterl:cursor_close(Cursor), ok = wterl:cursor_close(Cursor),
ok = wterl:session_close(Session) ok = wterl:session_close(Session)
end end
end end
end, end,
@ -272,7 +272,7 @@ fold_objects(FoldObjectsFun, Acc, Opts, #state{table=Table}) ->
AccFinal AccFinal
after after
ok = wterl:cursor_close(Cursor), ok = wterl:cursor_close(Cursor),
ok = wterl:session_close(Session) ok = wterl:session_close(Session)
end end
end end
end, end,
@ -327,6 +327,14 @@ callback(_Ref, _Msg, State) ->
%% Internal functions %% Internal functions
%% =================================================================== %% ===================================================================
max_sessions(Config) ->
RingSize =
case app_helper:get_prop_or_env(ring_creation_size, Config, riak_core) of
undefined -> 1024;
Size -> Size
end,
2 * (RingSize * erlang:system_info(schedulers)).
%% @private %% @private
establish_connection(Config) -> establish_connection(Config) ->
%% Get the data root directory %% Get the data root directory
@ -336,28 +344,23 @@ establish_connection(Config) ->
{error, data_root_unset}; {error, data_root_unset};
DataRoot -> DataRoot ->
ok = filelib:ensure_dir(filename:join(DataRoot, "x")), ok = filelib:ensure_dir(filename:join(DataRoot, "x")),
SessionMax =
case app_helper:get_prop_or_env(ring_creation_size, Config, riak_core) of
undefined -> 1024;
RingSize when RingSize < 512 -> 1024;
RingSize -> RingSize * 2
end,
RequestedCacheSize = app_helper:get_prop_or_env(cache_size, Config, wterl), RequestedCacheSize = app_helper:get_prop_or_env(cache_size, Config, wterl),
Opts = orddict:from_list( Opts =
orddict:from_list(
[ wterl:config_value(create, Config, true), [ wterl:config_value(create, Config, true),
wterl:config_value(sync, Config, false), wterl:config_value(sync, Config, false),
wterl:config_value(logging, Config, true), wterl:config_value(logging, Config, true),
wterl:config_value(transactional, Config, true), wterl:config_value(transactional, Config, true),
wterl:config_value(session_max, Config, SessionMax), wterl:config_value(session_max, Config, max_sessions(Config)),
wterl:config_value(cache_size, Config, size_cache(RequestedCacheSize)), wterl:config_value(cache_size, Config, size_cache(RequestedCacheSize)),
wterl:config_value(statistics_log, Config, [{wait, 30}]), % sec wterl:config_value(statistics_log, Config, [{wait, 30}]), % sec
%% NOTE: LSM auto-checkpoints, so we don't have too.
%% wterl:config_value(checkpoint, Config, [{wait, 10}]), % sec
wterl:config_value(verbose, Config, [ wterl:config_value(verbose, Config, [
%"ckpt" "block", "shared_cache", "evictserver", "fileops", %"ckpt" "block", "shared_cache", "evictserver", "fileops",
%"hazard", "mutex", "read", "readserver", "reconcile", %"hazard", "mutex", "read", "readserver", "reconcile",
%"salvage", "verify", "write", "evict", "lsm" %"salvage", "verify", "write", "evict", "lsm"
]) ]) ] ++ proplists:get_value(wterl, Config, [])), % sec
] ++ proplists:get_value(wterl, Config, [])), % sec
lager:info("WiredTiger connection:open(~s, ~s)", [DataRoot, wterl:config_to_bin(Opts)]),
case wterl_conn:open(DataRoot, Opts) of case wterl_conn:open(DataRoot, Opts) of
{ok, Connection} -> {ok, Connection} ->
{ok, Connection}; {ok, Connection};
@ -369,41 +372,36 @@ establish_connection(Config) ->
establish_passes(Count, Connection, Table) establish_passes(Count, Connection, Table)
when is_number(Count), Count > 0 -> when is_number(Count), Count > 0 ->
establish_passes(Count, Connection, Table, []). lists:map(fun(_Elem) ->
{ok, Session} = establish_session(Connection, Table),
establish_passes(Count, Connection, Table, Acc) {ok, Cursor} = wterl:cursor_open(Session, Table),
when Count > 0 -> {Session, Cursor}
case Count > 1 of end, lists:seq(1, Count)).
true ->
{ok, Session} = establish_session(Connection, Table),
{ok, Cursor} = wterl:cursor_open(Session, Table),
[{Session, Cursor} | establish_passes(Count - 1, Connection, Table, Acc)];
false ->
Acc
end.
%% @private %% @private
establish_session(Connection, Table) -> establish_session(Connection, Table) ->
case wterl:session_open(Connection, wterl:config_to_bin([{isolation, "snapshot"}])) of case wterl:session_open(Connection, wterl:config_to_bin([{isolation, "snapshot"}])) of
{ok, Session} -> {ok, Session} ->
SessionOpts = SessionOpts =
[%TODO {block_compressor, "snappy"}, [{block_compressor, "snappy"},
{internal_page_max, "128K"}, {internal_page_max, "128K"},
{leaf_page_max, "128K"}, {leaf_page_max, "128K"},
{lsm_chunk_size, "200MB"}, {lsm_chunk_size, "25MB"},
{lsm_bloom_newest, true}, {lsm_bloom_newest, true},
{lsm_bloom_oldest, true} , {lsm_bloom_oldest, true} ,
{lsm_bloom_config, [{leaf_page_max, "10MB"}]} ], {lsm_bloom_bit_count, 128},
{lsm_bloom_hash_count, 64},
{lsm_bloom_config, [{leaf_page_max, "8MB"}]} ],
case wterl:session_create(Session, Table, wterl:config_to_bin(SessionOpts)) of case wterl:session_create(Session, Table, wterl:config_to_bin(SessionOpts)) of
ok -> ok ->
{ok, Session}; {ok, Session};
{error, Reason} -> {error, Reason} ->
lager:error("Failed to start wterl backend: ~p\n", [Reason]), lager:error("Failed to start wterl backend: ~p\n", [Reason]),
{error, Reason} {error, Reason}
end; end;
{error, Reason} -> {error, Reason} ->
lager:error("Failed to open a WiredTiger session: ~p\n", [Reason]), lager:error("Failed to open a WiredTiger session: ~p\n", [Reason]),
{error, Reason} {error, Reason}
end. end.
%% @private %% @private
@ -551,7 +549,6 @@ size_cache(RequestedSize) ->
"1GB" "1GB"
end, end,
application:set_env(wterl, cache_size, FinalGuess), application:set_env(wterl, cache_size, FinalGuess),
lager:info("Using cache size of ~p for WiredTiger storage backend.", [FinalGuess]),
FinalGuess; FinalGuess;
Value when is_list(Value) -> Value when is_list(Value) ->
Value; Value;

View file

@ -20,8 +20,8 @@
%% %%
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
-module(wterl). -module(wterl).
-export([conn_open/2, -export([connection_open/2,
conn_close/1, connection_close/1,
cursor_close/1, cursor_close/1,
cursor_insert/3, cursor_insert/3,
cursor_next/1, cursor_next/1,
@ -60,6 +60,7 @@
session_verify/3, session_verify/3,
config_value/3, config_value/3,
config_to_bin/1, config_to_bin/1,
priv_dir/0,
fold_keys/3, fold_keys/3,
fold/3]). fold/3]).
@ -94,15 +95,22 @@ nif_stub_error(Line) ->
-spec init() -> ok | {error, any()}. -spec init() -> ok | {error, any()}.
init() -> init() ->
PrivDir = case code:priv_dir(?MODULE) of erlang:load_nif(filename:join(priv_dir(), atom_to_list(?MODULE)), 0).
{error, bad_name} ->
EbinDir = filename:dirname(code:which(?MODULE)), -spec connection_open(string(), config()) -> {ok, connection()} | {error, term()}.
AppPath = filename:dirname(EbinDir), connection_open(HomeDir, Config) ->
filename:join(AppPath, "priv"); PrivDir = wterl:priv_dir(),
Path -> {ok, PrivFiles} = file:list_dir(PrivDir),
Path SoFiles =
end, lists:filter(fun(Elem) ->
erlang:load_nif(filename:join(PrivDir, atom_to_list(?MODULE)), 0). case re:run(Elem, "^libwiredtiger_.*\.so$") of
{match, _} -> true;
nomatch -> false
end
end, PrivFiles),
SoPaths = lists:map(fun(Elem) -> filename:join(PrivDir, Elem) end, SoFiles),
Bin = config_to_bin([{extensions, SoPaths}], [<<",">>, Config]),
conn_open(HomeDir, Bin).
-spec conn_open(string(), config()) -> {ok, connection()} | {error, term()}. -spec conn_open(string(), config()) -> {ok, connection()} | {error, term()}.
conn_open(HomeDir, Config) -> conn_open(HomeDir, Config) ->
@ -383,11 +391,21 @@ fold(_Cursor, _Fun, Acc, not_found) ->
fold(Cursor, Fun, Acc, {ok, Key, Value}) -> fold(Cursor, Fun, Acc, {ok, Key, Value}) ->
fold(Cursor, Fun, Fun({Key, Value}, Acc), cursor_next(Cursor)). fold(Cursor, Fun, Fun({Key, Value}, Acc), cursor_next(Cursor)).
priv_dir() ->
case code:priv_dir(?MODULE) of
{error, bad_name} ->
EbinDir = filename:dirname(code:which(?MODULE)),
AppPath = filename:dirname(EbinDir),
filename:join(AppPath, "priv");
Path ->
Path
end.
%% %%
%% Configuration type information. %% Configuration type information.
%% %%
config_types() -> config_types() ->
[{block_compressor, string}, [{block_compressor, {string, quoted}},
{cache_size, string}, {cache_size, string},
{checkpoint, config}, {checkpoint, config},
{create, bool}, {create, bool},
@ -396,7 +414,7 @@ config_types() ->
{error_prefix, string}, {error_prefix, string},
{eviction_target, integer}, {eviction_target, integer},
{eviction_trigger, integer}, {eviction_trigger, integer},
{extensions, string}, {extensions, {list, quoted}},
{force, bool}, {force, bool},
{hazard_max, integer}, {hazard_max, integer},
{home_environment, bool}, {home_environment, bool},
@ -437,8 +455,13 @@ config_encode(config, Value) ->
list_to_binary(["(", config_to_bin(Value, []), ")"]); list_to_binary(["(", config_to_bin(Value, []), ")"]);
config_encode(list, Value) -> config_encode(list, Value) ->
list_to_binary(["(", string:join(Value, ","), ")"]); list_to_binary(["(", string:join(Value, ","), ")"]);
config_encode({list, quoted}, Value) ->
Values = lists:map(fun(S) -> "\"" ++ S ++ "\"" end, Value),
list_to_binary(["(", string:join(Values, ","), ")"]);
config_encode(string, Value) when is_list(Value) -> config_encode(string, Value) when is_list(Value) ->
list_to_binary(Value); list_to_binary(Value);
config_encode({string, quoted}, Value) when is_list(Value) ->
list_to_binary("\"" ++ Value ++ "\"");
config_encode(string, Value) when is_number(Value) -> config_encode(string, Value) when is_number(Value) ->
list_to_binary(integer_to_list(Value)); list_to_binary(integer_to_list(Value));
config_encode(bool, true) -> config_encode(bool, true) ->
@ -493,18 +516,18 @@ open_test_conn(DataDir) ->
?cmd("rm -rf "++DataDir), ?cmd("rm -rf "++DataDir),
?assertMatch(ok, filelib:ensure_dir(filename:join(DataDir, "x"))), ?assertMatch(ok, filelib:ensure_dir(filename:join(DataDir, "x"))),
OpenConfig = config_to_bin([{create,true},{cache_size,"100MB"}]), OpenConfig = config_to_bin([{create,true},{cache_size,"100MB"}]),
{ok, ConnRef} = conn_open(DataDir, OpenConfig), {ok, ConnRef} = connection_open(DataDir, OpenConfig),
ConnRef. ConnRef.
open_test_session(ConnRef) -> open_test_session(ConnRef) ->
{ok, SRef} = session_open(ConnRef), {ok, SRef} = session_open(ConnRef),
?assertMatch(ok, session_drop(SRef, "table:test", config_to_bin([{force,true}]))), ?assertMatch(ok, session_drop(SRef, "table:test", config_to_bin([{force,true}]))),
?assertMatch(ok, session_create(SRef, "table:test")), ?assertMatch(ok, session_create(SRef, "table:test", config_to_bin([{block_compressor, "snappy"}]))),
SRef. SRef.
conn_test() -> conn_test() ->
ConnRef = open_test_conn(?TEST_DATA_DIR), ConnRef = open_test_conn(?TEST_DATA_DIR),
?assertMatch(ok, conn_close(ConnRef)). ?assertMatch(ok, connection_close(ConnRef)).
session_test_() -> session_test_() ->
{setup, {setup,
@ -512,7 +535,7 @@ session_test_() ->
open_test_conn(?TEST_DATA_DIR) open_test_conn(?TEST_DATA_DIR)
end, end,
fun(ConnRef) -> fun(ConnRef) ->
ok = conn_close(ConnRef) ok = connection_close(ConnRef)
end, end,
fun(ConnRef) -> fun(ConnRef) ->
{inorder, {inorder,
@ -537,7 +560,7 @@ insert_delete_test() ->
?assertMatch(ok, session_delete(SRef, "table:test", <<"a">>)), ?assertMatch(ok, session_delete(SRef, "table:test", <<"a">>)),
?assertMatch(not_found, session_get(SRef, "table:test", <<"a">>)), ?assertMatch(not_found, session_get(SRef, "table:test", <<"a">>)),
ok = session_close(SRef), ok = session_close(SRef),
ok = conn_close(ConnRef). ok = connection_close(ConnRef).
init_test_table() -> init_test_table() ->
ConnRef = open_test_conn(?TEST_DATA_DIR), ConnRef = open_test_conn(?TEST_DATA_DIR),
@ -551,7 +574,7 @@ init_test_table() ->
stop_test_table({ConnRef, SRef}) -> stop_test_table({ConnRef, SRef}) ->
?assertMatch(ok, session_close(SRef)), ?assertMatch(ok, session_close(SRef)),
?assertMatch(ok, conn_close(ConnRef)). ?assertMatch(ok, connection_close(ConnRef)).
various_session_test_() -> various_session_test_() ->
{setup, {setup,
@ -760,10 +783,10 @@ prop_put_delete() ->
Table = "table:eqc", Table = "table:eqc",
{ok, CWD} = file:get_cwd(), {ok, CWD} = file:get_cwd(),
?assertMatch(true, lists:suffix("wterl/.eunit", CWD)), ?assertMatch(true, lists:suffix("wterl/.eunit", CWD)),
?cmd("rm -rf "++DataDir), ?cmd("rm -rf "++DataDir),
ok = filelib:ensure_dir(filename:join(DataDir, "x")), ok = filelib:ensure_dir(filename:join(DataDir, "x")),
Cfg = wterl:config_to_bin([{create,true}]), Cfg = wterl:config_to_bin([{create,true}]),
{ok, Conn} = wterl:conn_open(DataDir, Cfg), {ok, Conn} = wterl:connection_open(DataDir, Cfg),
{ok, SRef} = wterl:session_open(Conn), {ok, SRef} = wterl:session_open(Conn),
try try
wterl:session_create(SRef, Table), wterl:session_create(SRef, Table),
@ -779,7 +802,7 @@ prop_put_delete() ->
true true
after after
wterl:session_close(SRef), wterl:session_close(SRef),
wterl:conn_close(Conn) wterl:connection_close(Conn)
end end
end)). end)).

View file

@ -82,7 +82,7 @@ init([]) ->
handle_call({open, Dir, Config, Caller}, _From, #state{conn=undefined}=State) -> handle_call({open, Dir, Config, Caller}, _From, #state{conn=undefined}=State) ->
{Reply, NState} = {Reply, NState} =
case wterl:conn_open(Dir, wterl:config_to_bin(Config)) of case wterl:connection_open(Dir, wterl:config_to_bin(Config)) of
{ok, ConnRef}=OK -> {ok, ConnRef}=OK ->
Monitor = erlang:monitor(process, Caller), Monitor = erlang:monitor(process, Caller),
true = ets:insert(wterl_ets, {Monitor, Caller}), true = ets:insert(wterl_ets, {Monitor, Caller}),
@ -164,7 +164,7 @@ code_change(_OldVsn, State, _Extra) ->
do_close(undefined) -> do_close(undefined) ->
ok; ok;
do_close(ConnRef) -> do_close(ConnRef) ->
wterl:conn_close(ConnRef). wterl:connection_close(ConnRef).
-ifdef(TEST). -ifdef(TEST).