From 9468870e1f820e307251467d29a9ebbe2c06a6ff Mon Sep 17 00:00:00 2001 From: Gregory Burd Date: Thu, 30 May 2013 17:10:51 -0400 Subject: [PATCH] WIP: use a log2 histogram to track how long items live in the cache, evict items which have been in cache greater than the mean time. --- c_src/wterl.c | 166 ++++++--- src/riak_kv_wterl_backend.erl | 613 ---------------------------------- src/temp_riak_kv_backend.erl | 287 ---------------- 3 files changed, 120 insertions(+), 946 deletions(-) delete mode 100644 src/riak_kv_wterl_backend.erl delete mode 100644 src/temp_riak_kv_backend.erl diff --git a/c_src/wterl.c b/c_src/wterl.c index 53ee9b6..eba372d 100644 --- a/c_src/wterl.c +++ b/c_src/wterl.c @@ -40,14 +40,14 @@ static ErlNifResourceType *wterl_cursor_RESOURCE; /* WiredTiger object names*/ typedef char Uri[128]; -typedef struct wterl_ctx { +struct wterl_ctx { WT_SESSION *session; // open session WT_CURSOR **cursors; // open cursors, all reset ready to reuse uint64_t sig; -} WterlCtxHandle; +}; struct cache_entry { - WterlCtxHandle *ctx; + struct wterl_ctx *ctx; uint64_t sig; uint64_t tstamp; }; @@ -63,6 +63,8 @@ typedef struct wterl_conn { kbtree_t(cache_entries) *cache; fifo_t(cache_entries) recycled_cache_entries; SLIST_ENTRY(wterl_conn) conns; + uint64_t histogram[64]; + uint64_t hits, misses; } WterlConnHandle; typedef struct { @@ -105,6 +107,51 @@ static ERL_NIF_TERM ATOM_MSG_PID; /* Global init for async_nif. */ ASYNC_NIF_INIT(wterl); +/** + * A string hash function. + * + * A basic hash function for strings of characters used during the + * affinity association. + * + * s a NULL terminated set of bytes to be hashed + * -> an integer hash encoding of the bytes + */ +static inline uint32_t +__str_hash(const char *s) +{ + unsigned int h = (unsigned int)*s; + if (h) for (++s ; *s; ++s) h = (h << 5) - h + (unsigned int)*s; + return h; +} + +/** + * Calculate the log2 of 64bit unsigned integers. + */ +#ifdef __GCC__ +#define __log2(X) ((unsigned) ((8 * (sizeof(uint64_t) - 1)) - __builtin_clzll((X)))) +#else +static inline uint32_t __log2(uint64_t x) { + static const int tab64[64] = { + 63, 0, 58, 1, 59, 47, 53, 2, + 60, 39, 48, 27, 54, 33, 42, 3, + 61, 51, 37, 40, 49, 18, 28, 20, + 55, 30, 34, 11, 43, 14, 22, 4, + 62, 57, 46, 52, 38, 26, 32, 41, + 50, 36, 17, 19, 29, 10, 13, 21, + 56, 45, 25, 31, 35, 16, 9, 12, + 44, 24, 15, 8, 23, 7, 6, 5}; + if (x == 0) return 0; + uint64_t v = x; + v |= v >> 1; + v |= v >> 2; + v |= v >> 4; + v |= v >> 8; + v |= v >> 16; + v |= v >> 32; + return tab64[((uint64_t)((v - (v >> 1)) * 0x07EDD5E59A4E28C2)) >> 58]; +} +#endif + /** * Is the context cache full? * @@ -125,11 +172,40 @@ __ctx_cache_full(WterlConnHandle *conn) * -> number of items evicted */ static int -__ctx_cache_evict(WterlConnHandle *conn) +__ctx_cache_evict(WterlConnHandle *conn_handle) { - // TODO: - UNUSED(conn); - return 0; + uint32_t i; + uint64_t mean, now = cpu_clock_ticks(); + kbtree_t(cache_entries) *t = conn_handle->cache; + + // Find the mean of the recorded times that items stayed in cache. + for (i = 0; i < 64; i++) + mean += (conn_handle->histogram[i] * i); + if (mean > 0) + mean /= conn_handle->hits; + + // Clear out the histogram and hit/misses + memset(conn_handle->histogram, 0, sizeof(uint64_t) * 64); + conn_handle->hits = 0; + conn_handle->misses = 0; + + // Evict anything older than the mean time in queue. + i = 0; +#define traverse_f(p) \ + { \ + struct cache_entry *e = *p; \ + uint64_t elapsed = e->tstamp - now; \ + if (__log2(elapsed) > mean) { \ + kb_del(cache_entries, t, e); \ + e->ctx->session->close(e->ctx->session, NULL); \ + enif_free(e->ctx); \ + fifo_q_put(cache_entries, conn_handle->recycled_cache_entries, e); \ + i++; \ + } \ + } + __kb_traverse(struct cache_entry *, t, traverse_f); +#undef traverse_f + return i; } /** @@ -141,21 +217,26 @@ __ctx_cache_evict(WterlConnHandle *conn) * sig a 64-bit signature (hash) representing the session/cursor* needed * for the operation */ -static WterlCtxHandle * -__ctx_cache_find(WterlConnHandle *conn, const uint64_t sig) +static struct wterl_ctx * +__ctx_cache_find(WterlConnHandle *conn_handle, const uint64_t sig) { - WterlCtxHandle *p = NULL; + struct wterl_ctx *p = NULL; struct cache_entry key, *e; key.sig = sig; - e = *kb_get(cache_entries, conn->cache, &key); + e = *kb_get(cache_entries, conn_handle->cache, &key); if (e) { // cache hit, remove it from the tree - kb_del(cache_entries, conn->cache, &key); + uint64_t elapsed = cpu_clock_ticks() - e->tstamp; + kb_del(cache_entries, conn_handle->cache, &key); p = e->ctx; memset(e, 0, sizeof(struct cache_entry)); - fifo_q_put(cache_entries, conn->recycled_cache_entries, e); - } // else { cache miss, so p == NULL when we return } + fifo_q_put(cache_entries, conn_handle->recycled_cache_entries, e); + conn_handle->hits++; + conn_handle->histogram[__log2(elapsed)]++; + } else { + conn_handle->misses++; + } return p; } @@ -166,7 +247,7 @@ __ctx_cache_find(WterlConnHandle *conn, const uint64_t sig) * the front of the LRU. */ static int -__ctx_cache_add(WterlConnHandle *conn, WterlCtxHandle *c) +__ctx_cache_add(WterlConnHandle *conn, struct wterl_ctx *c) { struct cache_entry *e; @@ -229,23 +310,6 @@ __zi(uint32_t p, uint32_t q) return z; } -/** - * A string hash function. - * - * A basic hash function for strings of characters used during the - * affinity association. - * - * s a NULL terminated set of bytes to be hashed - * -> an integer hash encoding of the bytes - */ -static inline uint32_t -__str_hash(const char *s) -{ - unsigned int h = (unsigned int)*s; - if (h) for (++s ; *s; ++s) h = (h << 5) - h + (unsigned int)*s; - return h; -} - /** * Create a signature for the operation we're about to perform. * @@ -300,7 +364,7 @@ __ctx_cache_sig(const char *c, ...) * session. */ static int -__retain_ctx(WterlConnHandle *conn_handle, WterlCtxHandle **ctx, +__retain_ctx(WterlConnHandle *conn_handle, struct wterl_ctx **ctx, const char *session_config, ...) { int i, count; @@ -314,17 +378,18 @@ __retain_ctx(WterlConnHandle *conn_handle, WterlCtxHandle **ctx, va_end (ap); enif_mutex_lock(conn_handle->cache_mutex); - *ctx = __ctx_cache_find(conn_handle, sig); - if (NULL == *ctx) { + (*ctx) = __ctx_cache_find(conn_handle, sig); + if ((*ctx) == NULL) { // cache miss WT_CONNECTION *conn = conn_handle->conn; WT_SESSION *session = NULL; int rc = conn->open_session(conn, NULL, session_config, &session); if (rc != 0) return rc; - size_t s = sizeof(WterlCtxHandle) + ((count / 2) * sizeof(WT_CURSOR*)); - *ctx = enif_alloc_resource(wterl_ctx_RESOURCE, s); - if (NULL == *ctx) { + size_t s = sizeof(struct wterl_ctx) + ((count / 2) * sizeof(WT_CURSOR*)); + // TODO: *ctx = enif_alloc_resource(wterl_ctx_RESOURCE, s); + *ctx = enif_alloc(s); + if (*ctx == NULL) { session->close(session, NULL); return ENOMEM; } @@ -337,20 +402,24 @@ __retain_ctx(WterlConnHandle *conn_handle, WterlCtxHandle **ctx, for (i = 0; i < (count / 2); i++) { const char *uri = va_arg(ap, const char *); const char *config = va_arg(ap, const char *); + // TODO: error when uri or config is NULL rc = session->open_cursor(session, uri, NULL, config, &cursors[i]); if (rc != 0) { - session->close(session, NULL); // this will free cursors too + session->close(session, NULL); // this will free the cursors too return rc; } } va_end (ap); - } // else { cache hit so 'ctx' is a reusable session/cursor } + } // else { cache hit } enif_mutex_unlock(conn_handle->cache_mutex); return 0; } +/** + * Return a context to the cache for reuse. + */ static void -__release_ctx(WterlConnHandle *conn_handle, WterlCtxHandle *ctx) +__release_ctx(WterlConnHandle *conn_handle, struct wterl_ctx *ctx) { int i, c; WT_CURSOR *cursor; @@ -375,7 +444,10 @@ __close_all_sessions(WterlConnHandle *conn_handle) { kbtree_t(cache_entries) *t = conn_handle->cache; -#define traverse_f(p) { kb_del(cache_entries, t, *p); } +#define traverse_f(p) { \ + kb_del(cache_entries, t, *p); \ + enif_free(p); \ + } __kb_traverse(struct cache_entry *, t, traverse_f); #undef traverse_f } @@ -1300,7 +1372,7 @@ ASYNC_NIF_DECL( return; } - WterlCtxHandle *ctx = NULL; + struct wterl_ctx *ctx = NULL; WT_CURSOR *cursor = NULL; int rc = __retain_ctx(args->conn_handle, &ctx, args->conn_handle->session_config, @@ -1359,7 +1431,7 @@ ASYNC_NIF_DECL( return; } - WterlCtxHandle *ctx = NULL; + struct wterl_ctx *ctx = NULL; WT_CURSOR *cursor = NULL; int rc = __retain_ctx(args->conn_handle, &ctx, args->conn_handle->session_config, @@ -1441,7 +1513,7 @@ ASYNC_NIF_DECL( return; } - WterlCtxHandle *ctx = NULL; + struct wterl_ctx *ctx = NULL; WT_CURSOR *cursor = NULL; int rc = __retain_ctx(args->conn_handle, &ctx, args->conn_handle->session_config, @@ -2180,6 +2252,8 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info) NULL, flags, NULL); wterl_cursor_RESOURCE = enif_open_resource_type(env, NULL, "wterl_cursor_resource", NULL, flags, NULL); + wterl_ctx_RESOURCE = enif_open_resource_type(env, NULL, "wterl_ctx_resource", + NULL, flags, NULL); ATOM_ERROR = enif_make_atom(env, "error"); ATOM_OK = enif_make_atom(env, "ok"); @@ -2265,7 +2339,7 @@ on_unload(ErlNifEnv *env, void *priv_data) SLIST_FOREACH(conn_handle, &priv->conns, conns) { fifo_q_foreach(cache_entries, conn_handle->recycled_cache_entries, e, { - WterlCtxHandle *ctx = e->ctx; + struct wterl_ctx *ctx = e->ctx; ctx->session->close(ctx->session, NULL); }); fifo_q_free(cache_entries, conn_handle->recycled_cache_entries); diff --git a/src/riak_kv_wterl_backend.erl b/src/riak_kv_wterl_backend.erl deleted file mode 100644 index 313da29..0000000 --- a/src/riak_kv_wterl_backend.erl +++ /dev/null @@ -1,613 +0,0 @@ -%% ------------------------------------------------------------------- -%% -%% riak_kv_wterl_backend: WiredTiger Driver for Riak -%% -%% Copyright (c) 2012-2013 Basho Technologies, Inc. All Rights Reserved. -%% -%% This file is provided to you under the Apache License, -%% Version 2.0 (the "License"); you may not use this file -%% except in compliance with the License. You may obtain -%% a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, -%% software distributed under the License is distributed on an -%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -%% KIND, either express or implied. See the License for the -%% specific language governing permissions and limitations -%% under the License. -%% -%% ------------------------------------------------------------------- - --module(riak_kv_wterl_backend). --behavior(temp_riak_kv_backend). - -%% KV Backend API --export([api_version/0, - capabilities/1, - capabilities/2, - start/2, - stop/1, - get/3, - put/5, - delete/4, - drop/1, - fold_buckets/4, - fold_keys/4, - fold_objects/4, - is_empty/1, - status/1, - callback/3]). - --ifdef(TEST). --include_lib("eunit/include/eunit.hrl"). --compiel(export_all). --endif. - --define(API_VERSION, 1). -%% TODO: for when this backend supports 2i -%%-define(CAPABILITIES, [async_fold, indexes]). --define(CAPABILITIES, [async_fold]). - --record(state, {table :: string(), - type :: string(), - connection :: wterl:connection(), - is_empty_cursor :: wterl:cursor(), - status_cursor :: wterl:cursor()}). - --type state() :: #state{}. --type config() :: [{atom(), term()}]. - -%% =================================================================== -%% Public API -%% =================================================================== - -%% @doc Return the major version of the -%% current API. --spec api_version() -> {ok, integer()}. -api_version() -> - {ok, ?API_VERSION}. - -%% @doc Return the capabilities of the backend. --spec capabilities(state()) -> {ok, [atom()]}. -capabilities(_) -> - {ok, ?CAPABILITIES}. - -%% @doc Return the capabilities of the backend. --spec capabilities(riak_object:bucket(), state()) -> {ok, [atom()]}. -capabilities(_, _) -> - {ok, ?CAPABILITIES}. - -%% @doc Start the wterl backend --spec start(integer(), config()) -> {ok, state()} | {error, term()}. -start(Partition, Config) -> - AppStart = - case application:start(wterl) of - ok -> - ok; - {error, {already_started, _}} -> - ok; - {error, Reason1} -> - lager:error("Failed to start wterl: ~p", [Reason1]), - {error, Reason1} - end, - case AppStart of - ok -> - Type = - case wterl:config_value(type, Config, "lsm") of - {type, "lsm"} -> "lsm"; - {type, "table"} -> "table"; - {type, "btree"} -> "table"; - {type, BadType} -> - lager:info("wterl:start ignoring unknown type ~p, using lsm instead", [BadType]), - "lsm"; - _ -> - lager:info("wterl:start ignoring mistaken setting defaulting to lsm"), - "lsm" - end, - {ok, Connection} = establish_connection(Config, Type), - Table = Type ++ ":" ++ integer_to_list(Partition), - Compressor = - case wterl:config_value(block_compressor, Config, "snappy") of - {block_compressor, "snappy"}=C -> [C]; - {block_compressor, "none"} -> []; - {block_compressor, none} -> []; - {block_compressor, _} -> [{block_compressor, "snappy"}]; - _ -> [{block_compressor, "snappy"}] - end, - TableOpts = - case Type of - "lsm" -> - [{internal_page_max, "128K"}, - {leaf_page_max, "128K"}, - {lsm_chunk_size, "100MB"}, - {lsm_merge_threads, 2}, - {prefix_compression, false}, - {lsm_bloom_newest, true}, - {lsm_bloom_oldest, true} , - {lsm_bloom_bit_count, 128}, - {lsm_bloom_hash_count, 64}, - {lsm_bloom_config, [{leaf_page_max, "8MB"}]} - ] ++ Compressor; - "table" -> - Compressor - end, - case wterl:create(Connection, Table, TableOpts) of - ok -> - case establish_utility_cursors(Connection, Table) of - {ok, IsEmptyCursor, StatusCursor} -> - {ok, #state{table=Table, type=Type, - connection=Connection, - is_empty_cursor=IsEmptyCursor, - status_cursor=StatusCursor}}; - {error, Reason2} -> - {error, Reason2} - end; - {error, Reason3} -> - {error, Reason3} - end - end. - -%% @doc Stop the wterl backend --spec stop(state()) -> ok. -stop(_State) -> - ok. %% The connection is closed by wterl_conn:stop() - -%% @doc Retrieve an object from the wterl backend --spec get(riak_object:bucket(), riak_object:key(), state()) -> - {ok, any(), state()} | - {ok, not_found, state()} | - {error, term(), state()}. -get(Bucket, Key, #state{connection=Connection, table=Table}=State) -> - WTKey = to_object_key(Bucket, Key), - case wterl:get(Connection, Table, WTKey) of - {ok, Value} -> - {ok, Value, State}; - not_found -> - {error, not_found, State}; - {error, Reason} -> - {error, Reason, State} - end. - -%% @doc Insert an object into the wterl backend. -%% NOTE: The wterl backend does not currently support -%% secondary indexing and the_IndexSpecs parameter -%% is ignored. --type index_spec() :: {add, Index, SecondaryKey} | {remove, Index, SecondaryKey}. --spec put(riak_object:bucket(), riak_object:key(), [index_spec()], binary(), state()) -> - {ok, state()} | - {error, term(), state()}. -put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{connection=Connection, table=Table}=State) -> - case wterl:put(Connection, Table, to_object_key(Bucket, PrimaryKey), Val) of - ok -> - {ok, State}; - {error, Reason} -> - {error, Reason, State} - end. - -%% @doc Delete an object from the wterl backend -%% NOTE: The wterl backend does not currently support -%% secondary indexing and the_IndexSpecs parameter -%% is ignored. --spec delete(riak_object:bucket(), riak_object:key(), [index_spec()], state()) -> - {ok, state()} | - {error, term(), state()}. -delete(Bucket, Key, _IndexSpecs, #state{connection=Connection, table=Table}=State) -> - case wterl:delete(Connection, Table, to_object_key(Bucket, Key)) of - ok -> - {ok, State}; - {error, Reason} -> - {error, Reason, State} - end. - -%% @doc Fold over all the buckets --spec fold_buckets(riak_kv_backend:fold_buckets_fun(), - any(), - [], - state()) -> {ok, any()} | {async, fun()}. -fold_buckets(FoldBucketsFun, Acc, Opts, #state{connection=Connection, table=Table}) -> - FoldFun = fold_buckets_fun(FoldBucketsFun), - BucketFolder = - fun() -> - case wterl:cursor_open(Connection, Table) of - {error, {enoent, _Message}} -> - Acc; - {ok, Cursor} -> - try - {FoldResult, _} = - wterl:fold_keys(Cursor, FoldFun, {Acc, []}), - FoldResult - catch - {break, AccFinal} -> - AccFinal - after - ok = wterl:cursor_close(Cursor) - end - end - end, - case lists:member(async_fold, Opts) of - true -> - {async, BucketFolder}; - false -> - {ok, BucketFolder()} - end. - -%% @doc Fold over all the keys for one or all buckets. --spec fold_keys(riak_kv_backend:fold_keys_fun(), - any(), - [{atom(), term()}], - state()) -> {ok, term()} | {async, fun()}. -fold_keys(FoldKeysFun, Acc, Opts, #state{connection=Connection, table=Table}) -> - %% Figure out how we should limit the fold: by bucket, by - %% secondary index, or neither (fold across everything.) - Bucket = lists:keyfind(bucket, 1, Opts), - Index = lists:keyfind(index, 1, Opts), - - %% Multiple limiters may exist. Take the most specific limiter. - Limiter = - if Index /= false -> Index; - Bucket /= false -> Bucket; - true -> undefined - end, - - %% Set up the fold... - FoldFun = fold_keys_fun(FoldKeysFun, Limiter), - KeyFolder = - fun() -> - case wterl:cursor_open(Connection, Table) of - {error, {enoent, _Message}} -> - Acc; - {ok, Cursor} -> - try - wterl:fold_keys(Cursor, FoldFun, Acc) - catch - {break, AccFinal} -> - AccFinal - after - ok = wterl:cursor_close(Cursor) - end - end - end, - case lists:member(async_fold, Opts) of - true -> - {async, KeyFolder}; - false -> - {ok, KeyFolder()} - end. - -%% @doc Fold over all the objects for one or all buckets. --spec fold_objects(riak_kv_backend:fold_objects_fun(), - any(), - [{atom(), term()}], - state()) -> {ok, any()} | {async, fun()}. -fold_objects(FoldObjectsFun, Acc, Opts, #state{connection=Connection, table=Table}) -> - Bucket = proplists:get_value(bucket, Opts), - FoldFun = fold_objects_fun(FoldObjectsFun, Bucket), - ObjectFolder = - fun() -> - case wterl:cursor_open(Connection, Table) of - {error, {enoent, _Message}} -> - Acc; - {ok, Cursor} -> - try - wterl:fold(Cursor, FoldFun, Acc) - catch - {break, AccFinal} -> - AccFinal - after - case wterl:cursor_close(Cursor) of - ok -> - ok; - {error, {eperm, _}} -> %% TODO: review/fix - ok; - {error, _}=E -> - E - end - end - end - end, - case lists:member(async_fold, Opts) of - true -> - {async, ObjectFolder}; - false -> - {ok, ObjectFolder()} - end. - -%% @doc Delete all objects from this wterl backend --spec drop(state()) -> {ok, state()} | {error, term(), state()}. -drop(#state{connection=Connection, table=Table}=State) -> - case wterl:drop(Connection, Table) of - ok -> - {ok, State}; - {error, {ebusy, _}} -> %% TODO: review/fix - {ok, State}; - Error -> - {error, Error, State} - end. - -%% @doc Returns true if this wterl backend contains any -%% non-tombstone values; otherwise returns false. --spec is_empty(state()) -> boolean(). -is_empty(#state{is_empty_cursor=Cursor}) -> - wterl:cursor_reset(Cursor), - case wterl:cursor_next(Cursor) of - not_found -> true; - {error, {eperm, _}} -> false; % TODO: review/fix this logic - _ -> false - end. - -%% @doc Get the status information for this wterl backend --spec status(state()) -> [{atom(), term()}]. -status(#state{status_cursor=Cursor}) -> - wterl:cursor_reset(Cursor), - case fetch_status(Cursor) of - {ok, Stats} -> - Stats; - {error, {eperm, _}} -> % TODO: review/fix this logic - {ok, []}; - _ -> - {ok, []} - end. - -%% @doc Register an asynchronous callback --spec callback(reference(), any(), state()) -> {ok, state()}. -callback(_Ref, _Msg, State) -> - {ok, State}. - - -%% =================================================================== -%% Internal functions -%% =================================================================== - -%% @private -max_sessions(Config) -> - RingSize = - case app_helper:get_prop_or_env(ring_creation_size, Config, riak_core) of - undefined -> 1024; - Size -> Size - end, - Est = 100 * (RingSize * erlang:system_info(schedulers)), % TODO: review/fix this logic - case Est > 1000000000 of % Note: WiredTiger uses a signed int for this - true -> 1000000000; - false -> Est - end. - -%% @private -establish_utility_cursors(Connection, Table) -> - case wterl:cursor_open(Connection, Table) of - {ok, IsEmptyCursor} -> - case wterl:cursor_open(Connection, "statistics:" ++ Table, [{statistics_fast, true}]) of - {ok, StatusCursor} -> - {ok, IsEmptyCursor, StatusCursor}; - {error, Reason1} -> - {error, Reason1} - end; - {error, Reason2} -> - {error, Reason2} - end. - -%% @private -establish_connection(Config, Type) -> - %% Get the data root directory - case app_helper:get_prop_or_env(data_root, Config, wterl) of - undefined -> - lager:error("Failed to create wterl dir: data_root is not set"), - {error, data_root_unset}; - DataRoot -> - ok = filelib:ensure_dir(filename:join(DataRoot, "x")), - - %% WT Connection Options: - %% NOTE: LSM auto-checkpoints, so we don't have too. - CheckpointSetting = - case Type =:= "lsm" of - true -> - []; - false -> - app_helper:get_prop_or_env(checkpoint, Config, wterl, [{wait, 10}]) - end, - RequestedCacheSize = app_helper:get_prop_or_env(cache_size, Config, wterl), - ConnectionOpts = - orddict:from_list( - [ wterl:config_value(create, Config, true), - wterl:config_value(sync, Config, false), - wterl:config_value(logging, Config, true), - wterl:config_value(transactional, Config, true), - wterl:config_value(session_max, Config, max_sessions(Config)), - wterl:config_value(cache_size, Config, size_cache(RequestedCacheSize)), - wterl:config_value(statistics_log, Config, [{wait, 300}]), % sec - wterl:config_value(verbose, Config, [ "salvage", "verify" - % Note: for some unknown reason, if you add these additional - % verbose flags Erlang SEGV's "size_object: bad tag for 0x80" - % no idea why... yet... you've been warned. - - %"block", "shared_cache", "reconcile", "evict", "lsm", - %"fileops", "read", "write", "readserver", "evictserver", - %"hazard", "mutex", "ckpt" - ]) ] ++ CheckpointSetting ++ proplists:get_value(wterl, Config, [])), % sec - - %% WT Session Options: - SessionOpts = [{isolation, "snapshot"}], - - case wterl_conn:open(DataRoot, ConnectionOpts, SessionOpts) of - {ok, Connection} -> - {ok, Connection}; - {error, Reason2} -> - lager:error("Failed to establish a WiredTiger connection, wterl backend unable to start: ~p\n", [Reason2]), - {error, Reason2} - end - end. - -%% @private -%% Return a function to fold over the buckets on this backend -fold_buckets_fun(FoldBucketsFun) -> - fun(BK, {Acc, LastBucket}) -> - case from_object_key(BK) of - {LastBucket, _} -> - {Acc, LastBucket}; - {Bucket, _} -> - {FoldBucketsFun(Bucket, Acc), Bucket}; - _ -> - throw({break, Acc}) - end - end. - -%% @private -%% Return a function to fold over keys on this backend -fold_keys_fun(FoldKeysFun, undefined) -> - %% Fold across everything... - fun(StorageKey, Acc) -> - case from_object_key(StorageKey) of - {Bucket, Key} -> - FoldKeysFun(Bucket, Key, Acc); - _ -> - throw({break, Acc}) - end - end; -fold_keys_fun(FoldKeysFun, {bucket, FilterBucket}) -> - %% Fold across a specific bucket... - fun(StorageKey, Acc) -> - case from_object_key(StorageKey) of - {Bucket, Key} when Bucket == FilterBucket -> - FoldKeysFun(Bucket, Key, Acc); - _ -> - throw({break, Acc}) - end - end; -fold_keys_fun(FoldKeysFun, {index, FilterBucket, {eq, <<"$bucket">>, _}}) -> - %% 2I exact match query on special $bucket field... - fold_keys_fun(FoldKeysFun, {bucket, FilterBucket}); -fold_keys_fun(FoldKeysFun, {index, FilterBucket, {eq, FilterField, FilterTerm}}) -> - %% Rewrite 2I exact match query as a range... - NewQuery = {range, FilterField, FilterTerm, FilterTerm}, - fold_keys_fun(FoldKeysFun, {index, FilterBucket, NewQuery}); -fold_keys_fun(FoldKeysFun, {index, FilterBucket, {range, <<"$key">>, StartKey, EndKey}}) -> - %% 2I range query on special $key field... - fun(StorageKey, Acc) -> - case from_object_key(StorageKey) of - {Bucket, Key} when FilterBucket == Bucket, - StartKey =< Key, - EndKey >= Key -> - FoldKeysFun(Bucket, Key, Acc); - _ -> - throw({break, Acc}) - end - end; -fold_keys_fun(FoldKeysFun, {index, FilterBucket, {range, FilterField, StartTerm, EndTerm}}) -> - %% 2I range query... - fun(StorageKey, Acc) -> - case from_index_key(StorageKey) of - {Bucket, Key, Field, Term} when FilterBucket == Bucket, - FilterField == Field, - StartTerm =< Term, - EndTerm >= Term -> - FoldKeysFun(Bucket, Key, Acc); - _ -> - throw({break, Acc}) - end - end; -fold_keys_fun(_FoldKeysFun, Other) -> - throw({unknown_limiter, Other}). - -%% @private -%% Return a function to fold over the objects on this backend -fold_objects_fun(FoldObjectsFun, FilterBucket) -> - %% 2I does not support fold objects at this time, so this is much - %% simpler than fold_keys_fun. - fun({StorageKey, Value}, Acc) -> - case from_object_key(StorageKey) of - {Bucket, Key} when FilterBucket == undefined; - Bucket == FilterBucket -> - FoldObjectsFun(Bucket, Key, Value, Acc); - _ -> - throw({break, Acc}) - end - end. - -to_object_key(Bucket, Key) -> - sext:encode({o, Bucket, Key}). - -from_object_key(LKey) -> - case sext:decode(LKey) of - {o, Bucket, Key} -> - {Bucket, Key}; - _ -> - undefined - end. - -from_index_key(LKey) -> - case sext:decode(LKey) of - {i, Bucket, Field, Term, Key} -> - {Bucket, Key, Field, Term}; - _ -> - undefined - end. - -%% @private -%% Return all status from wterl statistics cursor -fetch_status(Cursor) -> - {ok, fetch_status(Cursor, wterl:cursor_next_value(Cursor), [])}. -fetch_status(_Cursor, {error, _}, Acc) -> - lists:reverse(Acc); -fetch_status(_Cursor, not_found, Acc) -> - lists:reverse(Acc); -fetch_status(Cursor, {ok, Stat}, Acc) -> - [What,Val|_] = [binary_to_list(B) || B <- binary:split(Stat, [<<0>>], [global])], - fetch_status(Cursor, wterl:cursor_next_value(Cursor), [{What,Val}|Acc]). - -size_cache(RequestedSize) -> - Size = - case RequestedSize of - undefined -> - RunningApps = application:which_applications(), - FinalGuess = - case proplists:is_defined(sasl, RunningApps) andalso - proplists:is_defined(os_mon, RunningApps) of - true -> - Memory = memsup:get_system_memory_data(), - TotalRAM = proplists:get_value(system_total_memory, Memory), - FreeRAM = proplists:get_value(free_memory, Memory), - UsedByBeam = proplists:get_value(total, erlang:memory()), - Target = ((TotalRAM - UsedByBeam) div 4), - FirstGuess = (Target - (Target rem (1024 * 1024))), - SecondGuess = - case FirstGuess > FreeRAM of - true -> FreeRAM - (FreeRAM rem (1024 * 1024)); - _ -> FirstGuess - end, - case SecondGuess < 1073741824 of %% < 1GB? - true -> "1GB"; - false -> - ThirdGuess = SecondGuess div (1024 * 1024), - integer_to_list(ThirdGuess) ++ "MB" - end; - false -> - "1GB" - end, - application:set_env(wterl, cache_size, FinalGuess), - FinalGuess; - Value when is_list(Value) -> - Value; - Value when is_number(Value) -> - integer_to_list(Value) - end, - Size. - -%% =================================================================== -%% EUnit tests -%% =================================================================== --ifdef(TEST). - -simple_test_() -> - {ok, CWD} = file:get_cwd(), - rmdir:path(filename:join([CWD, "test/wterl-backend"])), %?assertCmd("rm -rf test/wterl-backend"), - application:set_env(wterl, data_root, "test/wterl-backend"), - temp_riak_kv_backend:standard_test(?MODULE, []). - -custom_config_test_() -> - {ok, CWD} = file:get_cwd(), - rmdir:path(filename:join([CWD, "test/wterl-backend"])), %?assertCmd("rm -rf test/wterl-backend"), - application:set_env(wterl, data_root, ""), - temp_riak_kv_backend:standard_test(?MODULE, [{data_root, "test/wterl-backend"}]). - --endif. diff --git a/src/temp_riak_kv_backend.erl b/src/temp_riak_kv_backend.erl deleted file mode 100644 index c41a38d..0000000 --- a/src/temp_riak_kv_backend.erl +++ /dev/null @@ -1,287 +0,0 @@ -%% ------------------------------------------------------------------- -%% -%% riak_kv_backend: Riak backend behaviour -%% -%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved. -%% -%% This file is provided to you under the Apache License, -%% Version 2.0 (the "License"); you may not use this file -%% except in compliance with the License. You may obtain -%% a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, -%% software distributed under the License is distributed on an -%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -%% KIND, either express or implied. See the License for the -%% specific language governing permissions and limitations -%% under the License. -%% -%% ------------------------------------------------------------------- - -%%% NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE -%%% -%%% This is a temporary copy of riak_kv_backend, just here to keep -%%% wterl development private for now. When riak_kv_wterl_backend is -%%% moved to riak_kv, delete this file. -%%% -%%% NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE - - --module(temp_riak_kv_backend). - --export([behaviour_info/1]). --export([callback_after/3]). - --ifdef(TEST). --include_lib("eunit/include/eunit.hrl"). --compile(export_all). --export([standard_test/2]). --endif. - --type fold_buckets_fun() :: fun((binary(), any()) -> any() | no_return()). --type fold_keys_fun() :: fun((binary(), binary(), any()) -> any() | - no_return()). --type fold_objects_fun() :: fun((binary(), binary(), term(), any()) -> - any() | - no_return()). --export_type([fold_buckets_fun/0, - fold_keys_fun/0, - fold_objects_fun/0]). - --spec behaviour_info(atom()) -> 'undefined' | [{atom(), arity()}]. -behaviour_info(callbacks) -> - [ - {api_version,0}, - {capabilities, 1}, % (State) - {capabilities, 2}, % (Bucket, State) - {start,2}, % (Partition, Config) - {stop,1}, % (State) - {get,3}, % (Bucket, Key, State) - {put,5}, % (Bucket, Key, IndexSpecs, Val, State) - {delete,4}, % (Bucket, Key, IndexSpecs, State) - {drop,1}, % (State) - {fold_buckets,4}, % (FoldBucketsFun, Acc, Opts, State), - % FoldBucketsFun(Bucket, Acc) - {fold_keys,4}, % (FoldKeysFun, Acc, Opts, State), - % FoldKeysFun(Bucket, Key, Acc) - {fold_objects,4}, % (FoldObjectsFun, Acc, Opts, State), - % FoldObjectsFun(Bucket, Key, Object, Acc) - {is_empty,1}, % (State) - {status,1}, % (State) - {callback,3}]; % (Ref, Msg, State) -> -behaviour_info(_Other) -> - undefined. - -%% Queue a callback for the backend after Time ms. --spec callback_after(integer(), reference(), term()) -> reference(). -callback_after(Time, Ref, Msg) when is_integer(Time), is_reference(Ref) -> - riak_core_vnode:send_command_after(Time, {backend_callback, Ref, Msg}). - --ifdef(TEST). - -standard_test(BackendMod, Config) -> - {spawn, - [ - {setup, - fun() -> ?MODULE:setup({BackendMod, Config}) end, - fun ?MODULE:cleanup/1, - fun(X) -> - [?MODULE:basic_store_and_fetch(X), - ?MODULE:fold_buckets(X), - ?MODULE:fold_keys(X), - ?MODULE:delete_object(X), - ?MODULE:fold_objects(X), - ?MODULE:empty_check(X) - ] - end - }]}. - -basic_store_and_fetch({Backend, State}) -> - {"basic store and fetch test", - fun() -> - [ - ?_assertMatch({ok, _}, - Backend:put(<<"b1">>, <<"k1">>, [], <<"v1">>, State)), - ?_assertMatch({ok, _}, - Backend:put(<<"b2">>, <<"k2">>, [], <<"v2">>, State)), - ?_assertMatch({ok,<<"v2">>, _}, - Backend:get(<<"b2">>, <<"k2">>, State)), - ?_assertMatch({error, not_found, _}, - Backend:get(<<"b1">>, <<"k3">>, State)) - ] - end - }. - -fold_buckets({Backend, State}) -> - {"bucket folding test", - fun() -> - FoldBucketsFun = - fun(Bucket, Acc) -> - [Bucket | Acc] - end, - - ?_assertEqual([<<"b1">>, <<"b2">>], - begin - {ok, Buckets1} = - Backend:fold_buckets(FoldBucketsFun, - [], - [], - State), - lists:sort(Buckets1) - end) - end - }. - -fold_keys({Backend, State}) -> - {"key folding test", - fun() -> - FoldKeysFun = - fun(Bucket, Key, Acc) -> - [{Bucket, Key} | Acc] - end, - FoldKeysFun1 = - fun(_Bucket, Key, Acc) -> - [Key | Acc] - end, - FoldKeysFun2 = - fun(Bucket, Key, Acc) -> - case Bucket =:= <<"b1">> of - true -> - [Key | Acc]; - false -> - Acc - end - end, - FoldKeysFun3 = - fun(Bucket, Key, Acc) -> - case Bucket =:= <<"b1">> of - true -> - Acc; - false -> - [Key | Acc] - end - end, - [ - ?_assertEqual([{<<"b1">>, <<"k1">>}, {<<"b2">>, <<"k2">>}], - begin - {ok, Keys1} = - Backend:fold_keys(FoldKeysFun, - [], - [], - State), - lists:sort(Keys1) - end), - ?_assertEqual({ok, [<<"k1">>]}, - Backend:fold_keys(FoldKeysFun1, - [], - [{bucket, <<"b1">>}], - State)), - ?_assertEqual([<<"k2">>], - Backend:fold_keys(FoldKeysFun1, - [], - [{bucket, <<"b2">>}], - State)), - ?_assertEqual({ok, [<<"k1">>]}, - Backend:fold_keys(FoldKeysFun2, [], [], State)), - ?_assertEqual({ok, [<<"k1">>]}, - Backend:fold_keys(FoldKeysFun2, - [], - [{bucket, <<"b1">>}], - State)), - ?_assertEqual({ok, [<<"k2">>]}, - Backend:fold_keys(FoldKeysFun3, [], [], State)), - ?_assertEqual({ok, []}, - Backend:fold_keys(FoldKeysFun3, - [], - [{bucket, <<"b1">>}], - State)) - ] - end - }. - -delete_object({Backend, State}) -> - {"object deletion test", - fun() -> - [ - ?_assertMatch({ok, _}, Backend:delete(<<"b2">>, <<"k2">>, State)), - ?_assertMatch({error, not_found, _}, - Backend:get(<<"b2">>, <<"k2">>, State)) - ] - end - }. - -fold_objects({Backend, State}) -> - {"object folding test", - fun() -> - FoldKeysFun = - fun(Bucket, Key, Acc) -> - [{Bucket, Key} | Acc] - end, - FoldObjectsFun = - fun(Bucket, Key, Value, Acc) -> - [{{Bucket, Key}, Value} | Acc] - end, - [ - ?_assertEqual([{<<"b1">>, <<"k1">>}], - begin - {ok, Keys} = - Backend:fold_keys(FoldKeysFun, - [], - [], - State), - lists:sort(Keys) - end), - - ?_assertEqual([{{<<"b1">>,<<"k1">>}, <<"v1">>}], - begin - {ok, Objects1} = - Backend:fold_objects(FoldObjectsFun, - [], - [], - State), - lists:sort(Objects1) - end), - ?_assertMatch({ok, _}, - Backend:put(<<"b3">>, <<"k3">>, [], <<"v3">>, State)), - ?_assertEqual([{{<<"b1">>,<<"k1">>},<<"v1">>}, - {{<<"b3">>,<<"k3">>},<<"v3">>}], - begin - {ok, Objects} = - Backend:fold_objects(FoldObjectsFun, - [], - [], - State), - lists:sort(Objects) - end) - ] - end - }. - -empty_check({Backend, State}) -> - {"is_empty test", - fun() -> - [ - ?_assertEqual(false, Backend:is_empty(State)), - ?_assertMatch({ok, _}, Backend:delete(<<"b1">>,<<"k1">>, State)), - ?_assertMatch({ok, _}, Backend:delete(<<"b3">>,<<"k3">>, State)), - ?_assertEqual(true, Backend:is_empty(State)) - ] - end - }. - -setup({BackendMod, Config}) -> - application:start(lager), - application:start(sasl), - application:start(os_mon), - {ok, S} = BackendMod:start(42, Config), - {BackendMod, S}. - -cleanup({BackendMod, S}) -> - ok = BackendMod:stop(S), - application:stop(lager), - application:stop(sasl), - application:stop(os_mon). - --endif. % TEST