Cache session/[{cursor, config}] for reuse and spawn threads when needed. #9

Merged
gburd merged 30 commits from gsb-ctx-cache into master 2013-07-03 12:31:15 +00:00
3 changed files with 120 additions and 946 deletions
Showing only changes of commit 9468870e1f - Show all commits

View file

@ -40,14 +40,14 @@ static ErlNifResourceType *wterl_cursor_RESOURCE;
/* WiredTiger object names*/
typedef char Uri[128];
typedef struct wterl_ctx {
struct wterl_ctx {
WT_SESSION *session; // open session
WT_CURSOR **cursors; // open cursors, all reset ready to reuse
uint64_t sig;
} WterlCtxHandle;
};
struct cache_entry {
WterlCtxHandle *ctx;
struct wterl_ctx *ctx;
uint64_t sig;
uint64_t tstamp;
};
@ -63,6 +63,8 @@ typedef struct wterl_conn {
kbtree_t(cache_entries) *cache;
fifo_t(cache_entries) recycled_cache_entries;
SLIST_ENTRY(wterl_conn) conns;
uint64_t histogram[64];
uint64_t hits, misses;
} WterlConnHandle;
typedef struct {
@ -105,6 +107,51 @@ static ERL_NIF_TERM ATOM_MSG_PID;
/* Global init for async_nif. */
ASYNC_NIF_INIT(wterl);
/**
* A string hash function.
*
* A basic hash function for strings of characters used during the
* affinity association.
*
* s a NULL terminated set of bytes to be hashed
* -> an integer hash encoding of the bytes
*/
static inline uint32_t
__str_hash(const char *s)
{
unsigned int h = (unsigned int)*s;
if (h) for (++s ; *s; ++s) h = (h << 5) - h + (unsigned int)*s;
return h;
}
/**
* Calculate the log2 of 64bit unsigned integers.
*/
#ifdef __GCC__
#define __log2(X) ((unsigned) ((8 * (sizeof(uint64_t) - 1)) - __builtin_clzll((X))))
#else
static inline uint32_t __log2(uint64_t x) {
static const int tab64[64] = {
63, 0, 58, 1, 59, 47, 53, 2,
60, 39, 48, 27, 54, 33, 42, 3,
61, 51, 37, 40, 49, 18, 28, 20,
55, 30, 34, 11, 43, 14, 22, 4,
62, 57, 46, 52, 38, 26, 32, 41,
50, 36, 17, 19, 29, 10, 13, 21,
56, 45, 25, 31, 35, 16, 9, 12,
44, 24, 15, 8, 23, 7, 6, 5};
if (x == 0) return 0;
uint64_t v = x;
v |= v >> 1;
v |= v >> 2;
v |= v >> 4;
v |= v >> 8;
v |= v >> 16;
v |= v >> 32;
return tab64[((uint64_t)((v - (v >> 1)) * 0x07EDD5E59A4E28C2)) >> 58];
}
#endif
/**
* Is the context cache full?
*
@ -125,11 +172,40 @@ __ctx_cache_full(WterlConnHandle *conn)
* -> number of items evicted
*/
static int
__ctx_cache_evict(WterlConnHandle *conn)
__ctx_cache_evict(WterlConnHandle *conn_handle)
{
// TODO:
UNUSED(conn);
return 0;
uint32_t i;
uint64_t mean, now = cpu_clock_ticks();
kbtree_t(cache_entries) *t = conn_handle->cache;
// Find the mean of the recorded times that items stayed in cache.
for (i = 0; i < 64; i++)
mean += (conn_handle->histogram[i] * i);
if (mean > 0)
mean /= conn_handle->hits;
// Clear out the histogram and hit/misses
memset(conn_handle->histogram, 0, sizeof(uint64_t) * 64);
conn_handle->hits = 0;
conn_handle->misses = 0;
// Evict anything older than the mean time in queue.
i = 0;
#define traverse_f(p) \
{ \
struct cache_entry *e = *p; \
uint64_t elapsed = e->tstamp - now; \
if (__log2(elapsed) > mean) { \
kb_del(cache_entries, t, e); \
e->ctx->session->close(e->ctx->session, NULL); \
enif_free(e->ctx); \
fifo_q_put(cache_entries, conn_handle->recycled_cache_entries, e); \
i++; \
} \
}
__kb_traverse(struct cache_entry *, t, traverse_f);
#undef traverse_f
return i;
}
/**
@ -141,21 +217,26 @@ __ctx_cache_evict(WterlConnHandle *conn)
* sig a 64-bit signature (hash) representing the session/cursor* needed
* for the operation
*/
static WterlCtxHandle *
__ctx_cache_find(WterlConnHandle *conn, const uint64_t sig)
static struct wterl_ctx *
__ctx_cache_find(WterlConnHandle *conn_handle, const uint64_t sig)
{
WterlCtxHandle *p = NULL;
struct wterl_ctx *p = NULL;
struct cache_entry key, *e;
key.sig = sig;
e = *kb_get(cache_entries, conn->cache, &key);
e = *kb_get(cache_entries, conn_handle->cache, &key);
if (e) {
// cache hit, remove it from the tree
kb_del(cache_entries, conn->cache, &key);
uint64_t elapsed = cpu_clock_ticks() - e->tstamp;
kb_del(cache_entries, conn_handle->cache, &key);
p = e->ctx;
memset(e, 0, sizeof(struct cache_entry));
fifo_q_put(cache_entries, conn->recycled_cache_entries, e);
} // else { cache miss, so p == NULL when we return }
fifo_q_put(cache_entries, conn_handle->recycled_cache_entries, e);
conn_handle->hits++;
conn_handle->histogram[__log2(elapsed)]++;
} else {
conn_handle->misses++;
}
return p;
}
@ -166,7 +247,7 @@ __ctx_cache_find(WterlConnHandle *conn, const uint64_t sig)
* the front of the LRU.
*/
static int
__ctx_cache_add(WterlConnHandle *conn, WterlCtxHandle *c)
__ctx_cache_add(WterlConnHandle *conn, struct wterl_ctx *c)
{
struct cache_entry *e;
@ -229,23 +310,6 @@ __zi(uint32_t p, uint32_t q)
return z;
}
/**
* A string hash function.
*
* A basic hash function for strings of characters used during the
* affinity association.
*
* s a NULL terminated set of bytes to be hashed
* -> an integer hash encoding of the bytes
*/
static inline uint32_t
__str_hash(const char *s)
{
unsigned int h = (unsigned int)*s;
if (h) for (++s ; *s; ++s) h = (h << 5) - h + (unsigned int)*s;
return h;
}
/**
* Create a signature for the operation we're about to perform.
*
@ -300,7 +364,7 @@ __ctx_cache_sig(const char *c, ...)
* session.
*/
static int
__retain_ctx(WterlConnHandle *conn_handle, WterlCtxHandle **ctx,
__retain_ctx(WterlConnHandle *conn_handle, struct wterl_ctx **ctx,
const char *session_config, ...)
{
int i, count;
@ -314,17 +378,18 @@ __retain_ctx(WterlConnHandle *conn_handle, WterlCtxHandle **ctx,
va_end (ap);
enif_mutex_lock(conn_handle->cache_mutex);
*ctx = __ctx_cache_find(conn_handle, sig);
if (NULL == *ctx) {
(*ctx) = __ctx_cache_find(conn_handle, sig);
if ((*ctx) == NULL) {
// cache miss
WT_CONNECTION *conn = conn_handle->conn;
WT_SESSION *session = NULL;
int rc = conn->open_session(conn, NULL, session_config, &session);
if (rc != 0)
return rc;
size_t s = sizeof(WterlCtxHandle) + ((count / 2) * sizeof(WT_CURSOR*));
*ctx = enif_alloc_resource(wterl_ctx_RESOURCE, s);
if (NULL == *ctx) {
size_t s = sizeof(struct wterl_ctx) + ((count / 2) * sizeof(WT_CURSOR*));
// TODO: *ctx = enif_alloc_resource(wterl_ctx_RESOURCE, s);
*ctx = enif_alloc(s);
if (*ctx == NULL) {
session->close(session, NULL);
return ENOMEM;
}
@ -337,20 +402,24 @@ __retain_ctx(WterlConnHandle *conn_handle, WterlCtxHandle **ctx,
for (i = 0; i < (count / 2); i++) {
const char *uri = va_arg(ap, const char *);
const char *config = va_arg(ap, const char *);
// TODO: error when uri or config is NULL
rc = session->open_cursor(session, uri, NULL, config, &cursors[i]);
if (rc != 0) {
session->close(session, NULL); // this will free cursors too
session->close(session, NULL); // this will free the cursors too
return rc;
}
}
va_end (ap);
} // else { cache hit so 'ctx' is a reusable session/cursor }
} // else { cache hit }
enif_mutex_unlock(conn_handle->cache_mutex);
return 0;
}
/**
* Return a context to the cache for reuse.
*/
static void
__release_ctx(WterlConnHandle *conn_handle, WterlCtxHandle *ctx)
__release_ctx(WterlConnHandle *conn_handle, struct wterl_ctx *ctx)
{
int i, c;
WT_CURSOR *cursor;
@ -375,7 +444,10 @@ __close_all_sessions(WterlConnHandle *conn_handle)
{
kbtree_t(cache_entries) *t = conn_handle->cache;
#define traverse_f(p) { kb_del(cache_entries, t, *p); }
#define traverse_f(p) { \
kb_del(cache_entries, t, *p); \
enif_free(p); \
}
__kb_traverse(struct cache_entry *, t, traverse_f);
#undef traverse_f
}
@ -1300,7 +1372,7 @@ ASYNC_NIF_DECL(
return;
}
WterlCtxHandle *ctx = NULL;
struct wterl_ctx *ctx = NULL;
WT_CURSOR *cursor = NULL;
int rc = __retain_ctx(args->conn_handle, &ctx,
args->conn_handle->session_config,
@ -1359,7 +1431,7 @@ ASYNC_NIF_DECL(
return;
}
WterlCtxHandle *ctx = NULL;
struct wterl_ctx *ctx = NULL;
WT_CURSOR *cursor = NULL;
int rc = __retain_ctx(args->conn_handle, &ctx,
args->conn_handle->session_config,
@ -1441,7 +1513,7 @@ ASYNC_NIF_DECL(
return;
}
WterlCtxHandle *ctx = NULL;
struct wterl_ctx *ctx = NULL;
WT_CURSOR *cursor = NULL;
int rc = __retain_ctx(args->conn_handle, &ctx,
args->conn_handle->session_config,
@ -2180,6 +2252,8 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info)
NULL, flags, NULL);
wterl_cursor_RESOURCE = enif_open_resource_type(env, NULL, "wterl_cursor_resource",
NULL, flags, NULL);
wterl_ctx_RESOURCE = enif_open_resource_type(env, NULL, "wterl_ctx_resource",
NULL, flags, NULL);
ATOM_ERROR = enif_make_atom(env, "error");
ATOM_OK = enif_make_atom(env, "ok");
@ -2265,7 +2339,7 @@ on_unload(ErlNifEnv *env, void *priv_data)
SLIST_FOREACH(conn_handle, &priv->conns, conns) {
fifo_q_foreach(cache_entries, conn_handle->recycled_cache_entries, e, {
WterlCtxHandle *ctx = e->ctx;
struct wterl_ctx *ctx = e->ctx;
ctx->session->close(ctx->session, NULL);
});
fifo_q_free(cache_entries, conn_handle->recycled_cache_entries);

View file

@ -1,613 +0,0 @@
%% -------------------------------------------------------------------
%%
%% riak_kv_wterl_backend: WiredTiger Driver for Riak
%%
%% Copyright (c) 2012-2013 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(riak_kv_wterl_backend).
-behavior(temp_riak_kv_backend).
%% KV Backend API
-export([api_version/0,
capabilities/1,
capabilities/2,
start/2,
stop/1,
get/3,
put/5,
delete/4,
drop/1,
fold_buckets/4,
fold_keys/4,
fold_objects/4,
is_empty/1,
status/1,
callback/3]).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-compiel(export_all).
-endif.
-define(API_VERSION, 1).
%% TODO: for when this backend supports 2i
%%-define(CAPABILITIES, [async_fold, indexes]).
-define(CAPABILITIES, [async_fold]).
-record(state, {table :: string(),
type :: string(),
connection :: wterl:connection(),
is_empty_cursor :: wterl:cursor(),
status_cursor :: wterl:cursor()}).
-type state() :: #state{}.
-type config() :: [{atom(), term()}].
%% ===================================================================
%% Public API
%% ===================================================================
%% @doc Return the major version of the
%% current API.
-spec api_version() -> {ok, integer()}.
api_version() ->
{ok, ?API_VERSION}.
%% @doc Return the capabilities of the backend.
-spec capabilities(state()) -> {ok, [atom()]}.
capabilities(_) ->
{ok, ?CAPABILITIES}.
%% @doc Return the capabilities of the backend.
-spec capabilities(riak_object:bucket(), state()) -> {ok, [atom()]}.
capabilities(_, _) ->
{ok, ?CAPABILITIES}.
%% @doc Start the wterl backend
-spec start(integer(), config()) -> {ok, state()} | {error, term()}.
start(Partition, Config) ->
AppStart =
case application:start(wterl) of
ok ->
ok;
{error, {already_started, _}} ->
ok;
{error, Reason1} ->
lager:error("Failed to start wterl: ~p", [Reason1]),
{error, Reason1}
end,
case AppStart of
ok ->
Type =
case wterl:config_value(type, Config, "lsm") of
{type, "lsm"} -> "lsm";
{type, "table"} -> "table";
{type, "btree"} -> "table";
{type, BadType} ->
lager:info("wterl:start ignoring unknown type ~p, using lsm instead", [BadType]),
"lsm";
_ ->
lager:info("wterl:start ignoring mistaken setting defaulting to lsm"),
"lsm"
end,
{ok, Connection} = establish_connection(Config, Type),
Table = Type ++ ":" ++ integer_to_list(Partition),
Compressor =
case wterl:config_value(block_compressor, Config, "snappy") of
{block_compressor, "snappy"}=C -> [C];
{block_compressor, "none"} -> [];
{block_compressor, none} -> [];
{block_compressor, _} -> [{block_compressor, "snappy"}];
_ -> [{block_compressor, "snappy"}]
end,
TableOpts =
case Type of
"lsm" ->
[{internal_page_max, "128K"},
{leaf_page_max, "128K"},
{lsm_chunk_size, "100MB"},
{lsm_merge_threads, 2},
{prefix_compression, false},
{lsm_bloom_newest, true},
{lsm_bloom_oldest, true} ,
{lsm_bloom_bit_count, 128},
{lsm_bloom_hash_count, 64},
{lsm_bloom_config, [{leaf_page_max, "8MB"}]}
] ++ Compressor;
"table" ->
Compressor
end,
case wterl:create(Connection, Table, TableOpts) of
ok ->
case establish_utility_cursors(Connection, Table) of
{ok, IsEmptyCursor, StatusCursor} ->
{ok, #state{table=Table, type=Type,
connection=Connection,
is_empty_cursor=IsEmptyCursor,
status_cursor=StatusCursor}};
{error, Reason2} ->
{error, Reason2}
end;
{error, Reason3} ->
{error, Reason3}
end
end.
%% @doc Stop the wterl backend
-spec stop(state()) -> ok.
stop(_State) ->
ok. %% The connection is closed by wterl_conn:stop()
%% @doc Retrieve an object from the wterl backend
-spec get(riak_object:bucket(), riak_object:key(), state()) ->
{ok, any(), state()} |
{ok, not_found, state()} |
{error, term(), state()}.
get(Bucket, Key, #state{connection=Connection, table=Table}=State) ->
WTKey = to_object_key(Bucket, Key),
case wterl:get(Connection, Table, WTKey) of
{ok, Value} ->
{ok, Value, State};
not_found ->
{error, not_found, State};
{error, Reason} ->
{error, Reason, State}
end.
%% @doc Insert an object into the wterl backend.
%% NOTE: The wterl backend does not currently support
%% secondary indexing and the_IndexSpecs parameter
%% is ignored.
-type index_spec() :: {add, Index, SecondaryKey} | {remove, Index, SecondaryKey}.
-spec put(riak_object:bucket(), riak_object:key(), [index_spec()], binary(), state()) ->
{ok, state()} |
{error, term(), state()}.
put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{connection=Connection, table=Table}=State) ->
case wterl:put(Connection, Table, to_object_key(Bucket, PrimaryKey), Val) of
ok ->
{ok, State};
{error, Reason} ->
{error, Reason, State}
end.
%% @doc Delete an object from the wterl backend
%% NOTE: The wterl backend does not currently support
%% secondary indexing and the_IndexSpecs parameter
%% is ignored.
-spec delete(riak_object:bucket(), riak_object:key(), [index_spec()], state()) ->
{ok, state()} |
{error, term(), state()}.
delete(Bucket, Key, _IndexSpecs, #state{connection=Connection, table=Table}=State) ->
case wterl:delete(Connection, Table, to_object_key(Bucket, Key)) of
ok ->
{ok, State};
{error, Reason} ->
{error, Reason, State}
end.
%% @doc Fold over all the buckets
-spec fold_buckets(riak_kv_backend:fold_buckets_fun(),
any(),
[],
state()) -> {ok, any()} | {async, fun()}.
fold_buckets(FoldBucketsFun, Acc, Opts, #state{connection=Connection, table=Table}) ->
FoldFun = fold_buckets_fun(FoldBucketsFun),
BucketFolder =
fun() ->
case wterl:cursor_open(Connection, Table) of
{error, {enoent, _Message}} ->
Acc;
{ok, Cursor} ->
try
{FoldResult, _} =
wterl:fold_keys(Cursor, FoldFun, {Acc, []}),
FoldResult
catch
{break, AccFinal} ->
AccFinal
after
ok = wterl:cursor_close(Cursor)
end
end
end,
case lists:member(async_fold, Opts) of
true ->
{async, BucketFolder};
false ->
{ok, BucketFolder()}
end.
%% @doc Fold over all the keys for one or all buckets.
-spec fold_keys(riak_kv_backend:fold_keys_fun(),
any(),
[{atom(), term()}],
state()) -> {ok, term()} | {async, fun()}.
fold_keys(FoldKeysFun, Acc, Opts, #state{connection=Connection, table=Table}) ->
%% Figure out how we should limit the fold: by bucket, by
%% secondary index, or neither (fold across everything.)
Bucket = lists:keyfind(bucket, 1, Opts),
Index = lists:keyfind(index, 1, Opts),
%% Multiple limiters may exist. Take the most specific limiter.
Limiter =
if Index /= false -> Index;
Bucket /= false -> Bucket;
true -> undefined
end,
%% Set up the fold...
FoldFun = fold_keys_fun(FoldKeysFun, Limiter),
KeyFolder =
fun() ->
case wterl:cursor_open(Connection, Table) of
{error, {enoent, _Message}} ->
Acc;
{ok, Cursor} ->
try
wterl:fold_keys(Cursor, FoldFun, Acc)
catch
{break, AccFinal} ->
AccFinal
after
ok = wterl:cursor_close(Cursor)
end
end
end,
case lists:member(async_fold, Opts) of
true ->
{async, KeyFolder};
false ->
{ok, KeyFolder()}
end.
%% @doc Fold over all the objects for one or all buckets.
-spec fold_objects(riak_kv_backend:fold_objects_fun(),
any(),
[{atom(), term()}],
state()) -> {ok, any()} | {async, fun()}.
fold_objects(FoldObjectsFun, Acc, Opts, #state{connection=Connection, table=Table}) ->
Bucket = proplists:get_value(bucket, Opts),
FoldFun = fold_objects_fun(FoldObjectsFun, Bucket),
ObjectFolder =
fun() ->
case wterl:cursor_open(Connection, Table) of
{error, {enoent, _Message}} ->
Acc;
{ok, Cursor} ->
try
wterl:fold(Cursor, FoldFun, Acc)
catch
{break, AccFinal} ->
AccFinal
after
case wterl:cursor_close(Cursor) of
ok ->
ok;
{error, {eperm, _}} -> %% TODO: review/fix
ok;
{error, _}=E ->
E
end
end
end
end,
case lists:member(async_fold, Opts) of
true ->
{async, ObjectFolder};
false ->
{ok, ObjectFolder()}
end.
%% @doc Delete all objects from this wterl backend
-spec drop(state()) -> {ok, state()} | {error, term(), state()}.
drop(#state{connection=Connection, table=Table}=State) ->
case wterl:drop(Connection, Table) of
ok ->
{ok, State};
{error, {ebusy, _}} -> %% TODO: review/fix
{ok, State};
Error ->
{error, Error, State}
end.
%% @doc Returns true if this wterl backend contains any
%% non-tombstone values; otherwise returns false.
-spec is_empty(state()) -> boolean().
is_empty(#state{is_empty_cursor=Cursor}) ->
wterl:cursor_reset(Cursor),
case wterl:cursor_next(Cursor) of
not_found -> true;
{error, {eperm, _}} -> false; % TODO: review/fix this logic
_ -> false
end.
%% @doc Get the status information for this wterl backend
-spec status(state()) -> [{atom(), term()}].
status(#state{status_cursor=Cursor}) ->
wterl:cursor_reset(Cursor),
case fetch_status(Cursor) of
{ok, Stats} ->
Stats;
{error, {eperm, _}} -> % TODO: review/fix this logic
{ok, []};
_ ->
{ok, []}
end.
%% @doc Register an asynchronous callback
-spec callback(reference(), any(), state()) -> {ok, state()}.
callback(_Ref, _Msg, State) ->
{ok, State}.
%% ===================================================================
%% Internal functions
%% ===================================================================
%% @private
max_sessions(Config) ->
RingSize =
case app_helper:get_prop_or_env(ring_creation_size, Config, riak_core) of
undefined -> 1024;
Size -> Size
end,
Est = 100 * (RingSize * erlang:system_info(schedulers)), % TODO: review/fix this logic
case Est > 1000000000 of % Note: WiredTiger uses a signed int for this
true -> 1000000000;
false -> Est
end.
%% @private
establish_utility_cursors(Connection, Table) ->
case wterl:cursor_open(Connection, Table) of
{ok, IsEmptyCursor} ->
case wterl:cursor_open(Connection, "statistics:" ++ Table, [{statistics_fast, true}]) of
{ok, StatusCursor} ->
{ok, IsEmptyCursor, StatusCursor};
{error, Reason1} ->
{error, Reason1}
end;
{error, Reason2} ->
{error, Reason2}
end.
%% @private
establish_connection(Config, Type) ->
%% Get the data root directory
case app_helper:get_prop_or_env(data_root, Config, wterl) of
undefined ->
lager:error("Failed to create wterl dir: data_root is not set"),
{error, data_root_unset};
DataRoot ->
ok = filelib:ensure_dir(filename:join(DataRoot, "x")),
%% WT Connection Options:
%% NOTE: LSM auto-checkpoints, so we don't have too.
CheckpointSetting =
case Type =:= "lsm" of
true ->
[];
false ->
app_helper:get_prop_or_env(checkpoint, Config, wterl, [{wait, 10}])
end,
RequestedCacheSize = app_helper:get_prop_or_env(cache_size, Config, wterl),
ConnectionOpts =
orddict:from_list(
[ wterl:config_value(create, Config, true),
wterl:config_value(sync, Config, false),
wterl:config_value(logging, Config, true),
wterl:config_value(transactional, Config, true),
wterl:config_value(session_max, Config, max_sessions(Config)),
wterl:config_value(cache_size, Config, size_cache(RequestedCacheSize)),
wterl:config_value(statistics_log, Config, [{wait, 300}]), % sec
wterl:config_value(verbose, Config, [ "salvage", "verify"
% Note: for some unknown reason, if you add these additional
% verbose flags Erlang SEGV's "size_object: bad tag for 0x80"
% no idea why... yet... you've been warned.
%"block", "shared_cache", "reconcile", "evict", "lsm",
%"fileops", "read", "write", "readserver", "evictserver",
%"hazard", "mutex", "ckpt"
]) ] ++ CheckpointSetting ++ proplists:get_value(wterl, Config, [])), % sec
%% WT Session Options:
SessionOpts = [{isolation, "snapshot"}],
case wterl_conn:open(DataRoot, ConnectionOpts, SessionOpts) of
{ok, Connection} ->
{ok, Connection};
{error, Reason2} ->
lager:error("Failed to establish a WiredTiger connection, wterl backend unable to start: ~p\n", [Reason2]),
{error, Reason2}
end
end.
%% @private
%% Return a function to fold over the buckets on this backend
fold_buckets_fun(FoldBucketsFun) ->
fun(BK, {Acc, LastBucket}) ->
case from_object_key(BK) of
{LastBucket, _} ->
{Acc, LastBucket};
{Bucket, _} ->
{FoldBucketsFun(Bucket, Acc), Bucket};
_ ->
throw({break, Acc})
end
end.
%% @private
%% Return a function to fold over keys on this backend
fold_keys_fun(FoldKeysFun, undefined) ->
%% Fold across everything...
fun(StorageKey, Acc) ->
case from_object_key(StorageKey) of
{Bucket, Key} ->
FoldKeysFun(Bucket, Key, Acc);
_ ->
throw({break, Acc})
end
end;
fold_keys_fun(FoldKeysFun, {bucket, FilterBucket}) ->
%% Fold across a specific bucket...
fun(StorageKey, Acc) ->
case from_object_key(StorageKey) of
{Bucket, Key} when Bucket == FilterBucket ->
FoldKeysFun(Bucket, Key, Acc);
_ ->
throw({break, Acc})
end
end;
fold_keys_fun(FoldKeysFun, {index, FilterBucket, {eq, <<"$bucket">>, _}}) ->
%% 2I exact match query on special $bucket field...
fold_keys_fun(FoldKeysFun, {bucket, FilterBucket});
fold_keys_fun(FoldKeysFun, {index, FilterBucket, {eq, FilterField, FilterTerm}}) ->
%% Rewrite 2I exact match query as a range...
NewQuery = {range, FilterField, FilterTerm, FilterTerm},
fold_keys_fun(FoldKeysFun, {index, FilterBucket, NewQuery});
fold_keys_fun(FoldKeysFun, {index, FilterBucket, {range, <<"$key">>, StartKey, EndKey}}) ->
%% 2I range query on special $key field...
fun(StorageKey, Acc) ->
case from_object_key(StorageKey) of
{Bucket, Key} when FilterBucket == Bucket,
StartKey =< Key,
EndKey >= Key ->
FoldKeysFun(Bucket, Key, Acc);
_ ->
throw({break, Acc})
end
end;
fold_keys_fun(FoldKeysFun, {index, FilterBucket, {range, FilterField, StartTerm, EndTerm}}) ->
%% 2I range query...
fun(StorageKey, Acc) ->
case from_index_key(StorageKey) of
{Bucket, Key, Field, Term} when FilterBucket == Bucket,
FilterField == Field,
StartTerm =< Term,
EndTerm >= Term ->
FoldKeysFun(Bucket, Key, Acc);
_ ->
throw({break, Acc})
end
end;
fold_keys_fun(_FoldKeysFun, Other) ->
throw({unknown_limiter, Other}).
%% @private
%% Return a function to fold over the objects on this backend
fold_objects_fun(FoldObjectsFun, FilterBucket) ->
%% 2I does not support fold objects at this time, so this is much
%% simpler than fold_keys_fun.
fun({StorageKey, Value}, Acc) ->
case from_object_key(StorageKey) of
{Bucket, Key} when FilterBucket == undefined;
Bucket == FilterBucket ->
FoldObjectsFun(Bucket, Key, Value, Acc);
_ ->
throw({break, Acc})
end
end.
to_object_key(Bucket, Key) ->
sext:encode({o, Bucket, Key}).
from_object_key(LKey) ->
case sext:decode(LKey) of
{o, Bucket, Key} ->
{Bucket, Key};
_ ->
undefined
end.
from_index_key(LKey) ->
case sext:decode(LKey) of
{i, Bucket, Field, Term, Key} ->
{Bucket, Key, Field, Term};
_ ->
undefined
end.
%% @private
%% Return all status from wterl statistics cursor
fetch_status(Cursor) ->
{ok, fetch_status(Cursor, wterl:cursor_next_value(Cursor), [])}.
fetch_status(_Cursor, {error, _}, Acc) ->
lists:reverse(Acc);
fetch_status(_Cursor, not_found, Acc) ->
lists:reverse(Acc);
fetch_status(Cursor, {ok, Stat}, Acc) ->
[What,Val|_] = [binary_to_list(B) || B <- binary:split(Stat, [<<0>>], [global])],
fetch_status(Cursor, wterl:cursor_next_value(Cursor), [{What,Val}|Acc]).
size_cache(RequestedSize) ->
Size =
case RequestedSize of
undefined ->
RunningApps = application:which_applications(),
FinalGuess =
case proplists:is_defined(sasl, RunningApps) andalso
proplists:is_defined(os_mon, RunningApps) of
true ->
Memory = memsup:get_system_memory_data(),
TotalRAM = proplists:get_value(system_total_memory, Memory),
FreeRAM = proplists:get_value(free_memory, Memory),
UsedByBeam = proplists:get_value(total, erlang:memory()),
Target = ((TotalRAM - UsedByBeam) div 4),
FirstGuess = (Target - (Target rem (1024 * 1024))),
SecondGuess =
case FirstGuess > FreeRAM of
true -> FreeRAM - (FreeRAM rem (1024 * 1024));
_ -> FirstGuess
end,
case SecondGuess < 1073741824 of %% < 1GB?
true -> "1GB";
false ->
ThirdGuess = SecondGuess div (1024 * 1024),
integer_to_list(ThirdGuess) ++ "MB"
end;
false ->
"1GB"
end,
application:set_env(wterl, cache_size, FinalGuess),
FinalGuess;
Value when is_list(Value) ->
Value;
Value when is_number(Value) ->
integer_to_list(Value)
end,
Size.
%% ===================================================================
%% EUnit tests
%% ===================================================================
-ifdef(TEST).
simple_test_() ->
{ok, CWD} = file:get_cwd(),
rmdir:path(filename:join([CWD, "test/wterl-backend"])), %?assertCmd("rm -rf test/wterl-backend"),
application:set_env(wterl, data_root, "test/wterl-backend"),
temp_riak_kv_backend:standard_test(?MODULE, []).
custom_config_test_() ->
{ok, CWD} = file:get_cwd(),
rmdir:path(filename:join([CWD, "test/wterl-backend"])), %?assertCmd("rm -rf test/wterl-backend"),
application:set_env(wterl, data_root, ""),
temp_riak_kv_backend:standard_test(?MODULE, [{data_root, "test/wterl-backend"}]).
-endif.

View file

@ -1,287 +0,0 @@
%% -------------------------------------------------------------------
%%
%% riak_kv_backend: Riak backend behaviour
%%
%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%%% NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE
%%%
%%% This is a temporary copy of riak_kv_backend, just here to keep
%%% wterl development private for now. When riak_kv_wterl_backend is
%%% moved to riak_kv, delete this file.
%%%
%%% NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE
-module(temp_riak_kv_backend).
-export([behaviour_info/1]).
-export([callback_after/3]).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-compile(export_all).
-export([standard_test/2]).
-endif.
-type fold_buckets_fun() :: fun((binary(), any()) -> any() | no_return()).
-type fold_keys_fun() :: fun((binary(), binary(), any()) -> any() |
no_return()).
-type fold_objects_fun() :: fun((binary(), binary(), term(), any()) ->
any() |
no_return()).
-export_type([fold_buckets_fun/0,
fold_keys_fun/0,
fold_objects_fun/0]).
-spec behaviour_info(atom()) -> 'undefined' | [{atom(), arity()}].
behaviour_info(callbacks) ->
[
{api_version,0},
{capabilities, 1}, % (State)
{capabilities, 2}, % (Bucket, State)
{start,2}, % (Partition, Config)
{stop,1}, % (State)
{get,3}, % (Bucket, Key, State)
{put,5}, % (Bucket, Key, IndexSpecs, Val, State)
{delete,4}, % (Bucket, Key, IndexSpecs, State)
{drop,1}, % (State)
{fold_buckets,4}, % (FoldBucketsFun, Acc, Opts, State),
% FoldBucketsFun(Bucket, Acc)
{fold_keys,4}, % (FoldKeysFun, Acc, Opts, State),
% FoldKeysFun(Bucket, Key, Acc)
{fold_objects,4}, % (FoldObjectsFun, Acc, Opts, State),
% FoldObjectsFun(Bucket, Key, Object, Acc)
{is_empty,1}, % (State)
{status,1}, % (State)
{callback,3}]; % (Ref, Msg, State) ->
behaviour_info(_Other) ->
undefined.
%% Queue a callback for the backend after Time ms.
-spec callback_after(integer(), reference(), term()) -> reference().
callback_after(Time, Ref, Msg) when is_integer(Time), is_reference(Ref) ->
riak_core_vnode:send_command_after(Time, {backend_callback, Ref, Msg}).
-ifdef(TEST).
standard_test(BackendMod, Config) ->
{spawn,
[
{setup,
fun() -> ?MODULE:setup({BackendMod, Config}) end,
fun ?MODULE:cleanup/1,
fun(X) ->
[?MODULE:basic_store_and_fetch(X),
?MODULE:fold_buckets(X),
?MODULE:fold_keys(X),
?MODULE:delete_object(X),
?MODULE:fold_objects(X),
?MODULE:empty_check(X)
]
end
}]}.
basic_store_and_fetch({Backend, State}) ->
{"basic store and fetch test",
fun() ->
[
?_assertMatch({ok, _},
Backend:put(<<"b1">>, <<"k1">>, [], <<"v1">>, State)),
?_assertMatch({ok, _},
Backend:put(<<"b2">>, <<"k2">>, [], <<"v2">>, State)),
?_assertMatch({ok,<<"v2">>, _},
Backend:get(<<"b2">>, <<"k2">>, State)),
?_assertMatch({error, not_found, _},
Backend:get(<<"b1">>, <<"k3">>, State))
]
end
}.
fold_buckets({Backend, State}) ->
{"bucket folding test",
fun() ->
FoldBucketsFun =
fun(Bucket, Acc) ->
[Bucket | Acc]
end,
?_assertEqual([<<"b1">>, <<"b2">>],
begin
{ok, Buckets1} =
Backend:fold_buckets(FoldBucketsFun,
[],
[],
State),
lists:sort(Buckets1)
end)
end
}.
fold_keys({Backend, State}) ->
{"key folding test",
fun() ->
FoldKeysFun =
fun(Bucket, Key, Acc) ->
[{Bucket, Key} | Acc]
end,
FoldKeysFun1 =
fun(_Bucket, Key, Acc) ->
[Key | Acc]
end,
FoldKeysFun2 =
fun(Bucket, Key, Acc) ->
case Bucket =:= <<"b1">> of
true ->
[Key | Acc];
false ->
Acc
end
end,
FoldKeysFun3 =
fun(Bucket, Key, Acc) ->
case Bucket =:= <<"b1">> of
true ->
Acc;
false ->
[Key | Acc]
end
end,
[
?_assertEqual([{<<"b1">>, <<"k1">>}, {<<"b2">>, <<"k2">>}],
begin
{ok, Keys1} =
Backend:fold_keys(FoldKeysFun,
[],
[],
State),
lists:sort(Keys1)
end),
?_assertEqual({ok, [<<"k1">>]},
Backend:fold_keys(FoldKeysFun1,
[],
[{bucket, <<"b1">>}],
State)),
?_assertEqual([<<"k2">>],
Backend:fold_keys(FoldKeysFun1,
[],
[{bucket, <<"b2">>}],
State)),
?_assertEqual({ok, [<<"k1">>]},
Backend:fold_keys(FoldKeysFun2, [], [], State)),
?_assertEqual({ok, [<<"k1">>]},
Backend:fold_keys(FoldKeysFun2,
[],
[{bucket, <<"b1">>}],
State)),
?_assertEqual({ok, [<<"k2">>]},
Backend:fold_keys(FoldKeysFun3, [], [], State)),
?_assertEqual({ok, []},
Backend:fold_keys(FoldKeysFun3,
[],
[{bucket, <<"b1">>}],
State))
]
end
}.
delete_object({Backend, State}) ->
{"object deletion test",
fun() ->
[
?_assertMatch({ok, _}, Backend:delete(<<"b2">>, <<"k2">>, State)),
?_assertMatch({error, not_found, _},
Backend:get(<<"b2">>, <<"k2">>, State))
]
end
}.
fold_objects({Backend, State}) ->
{"object folding test",
fun() ->
FoldKeysFun =
fun(Bucket, Key, Acc) ->
[{Bucket, Key} | Acc]
end,
FoldObjectsFun =
fun(Bucket, Key, Value, Acc) ->
[{{Bucket, Key}, Value} | Acc]
end,
[
?_assertEqual([{<<"b1">>, <<"k1">>}],
begin
{ok, Keys} =
Backend:fold_keys(FoldKeysFun,
[],
[],
State),
lists:sort(Keys)
end),
?_assertEqual([{{<<"b1">>,<<"k1">>}, <<"v1">>}],
begin
{ok, Objects1} =
Backend:fold_objects(FoldObjectsFun,
[],
[],
State),
lists:sort(Objects1)
end),
?_assertMatch({ok, _},
Backend:put(<<"b3">>, <<"k3">>, [], <<"v3">>, State)),
?_assertEqual([{{<<"b1">>,<<"k1">>},<<"v1">>},
{{<<"b3">>,<<"k3">>},<<"v3">>}],
begin
{ok, Objects} =
Backend:fold_objects(FoldObjectsFun,
[],
[],
State),
lists:sort(Objects)
end)
]
end
}.
empty_check({Backend, State}) ->
{"is_empty test",
fun() ->
[
?_assertEqual(false, Backend:is_empty(State)),
?_assertMatch({ok, _}, Backend:delete(<<"b1">>,<<"k1">>, State)),
?_assertMatch({ok, _}, Backend:delete(<<"b3">>,<<"k3">>, State)),
?_assertEqual(true, Backend:is_empty(State))
]
end
}.
setup({BackendMod, Config}) ->
application:start(lager),
application:start(sasl),
application:start(os_mon),
{ok, S} = BackendMod:start(42, Config),
{BackendMod, S}.
cleanup({BackendMod, S}) ->
ok = BackendMod:stop(S),
application:stop(lager),
application:stop(sasl),
application:stop(os_mon).
-endif. % TEST