diff --git a/.gdbinit b/.gdbinit index f4ddf19..1f3ca93 100644 --- a/.gdbinit +++ b/.gdbinit @@ -1 +1,4 @@ handle SIGPIPE nostop noprint pass +#b erl_nif.c:1203 +#b sys/unix/erl_unix_sys_ddll.c:234 + diff --git a/c_src/async_nif.h b/c_src/async_nif.h index 36526a9..da13cd4 100644 --- a/c_src/async_nif.h +++ b/c_src/async_nif.h @@ -27,6 +27,7 @@ extern "C" { #endif #include +#include "fifo_q.h" #ifdef ASYNC_NIF_STATS #include "stats.h" // TODO: measure, measure... measure again #endif @@ -34,73 +35,6 @@ extern "C" { #define ASYNC_NIF_MAX_WORKERS 128 #define ASYNC_NIF_WORKER_QUEUE_SIZE 500 -#define FIFO_QUEUE_TYPE(name) \ - struct fifo_q__ ## name * -#define DECL_FIFO_QUEUE(name, type) \ - struct fifo_q__ ## name { \ - unsigned int h, t, s; \ - type *items[]; \ - }; \ - static struct fifo_q__ ## name *fifo_q_ ## name ## _new(unsigned int n) { \ - int sz = sizeof(struct fifo_q__ ## name) + ((n+1) * sizeof(type *));\ - struct fifo_q__ ## name *q = enif_alloc(sz); \ - if (!q) \ - return 0; \ - memset(q, 0, sz); \ - q->s = n + 1; \ - return q; \ - } \ - static inline void fifo_q_ ## name ## _free(struct fifo_q__ ## name *q) { \ - memset(q, 0, sizeof(struct fifo_q__ ## name) + (q->s * sizeof(type *))); \ - enif_free(q); \ - } \ - static inline type *fifo_q_ ## name ## _put(struct fifo_q__ ## name *q, type *n) { \ - q->items[q->h] = n; \ - q->h = (q->h + 1) % q->s; \ - return n; \ - } \ - static inline type *fifo_q_ ## name ## _get(struct fifo_q__ ## name *q) { \ - type *n = q->items[q->t]; \ - q->items[q->t] = 0; \ - q->t = (q->t + 1) % q->s; \ - return n; \ - } \ - static inline unsigned int fifo_q_ ## name ## _size(struct fifo_q__ ## name *q) { \ - return (q->h - q->t + q->s) % q->s; \ - } \ - static inline unsigned int fifo_q_ ## name ## _capacity(struct fifo_q__ ## name *q) { \ - return q->s - 1; \ - } \ - static inline int fifo_q_ ## name ## _empty(struct fifo_q__ ## name *q) { \ - return (q->t == q->h); \ - } \ - static inline int fifo_q_ ## name ## _full(struct fifo_q__ ## name *q) { \ - return ((q->h + 1) % q->s) == q->t; \ - } - -#define fifo_q_new(name, size) fifo_q_ ## name ## _new(size) -#define fifo_q_free(name, queue) fifo_q_ ## name ## _free(queue) -#define fifo_q_get(name, queue) fifo_q_ ## name ## _get(queue) -#define fifo_q_put(name, queue, item) fifo_q_ ## name ## _put(queue, item) -#define fifo_q_size(name, queue) fifo_q_ ## name ## _size(queue) -#define fifo_q_capacity(name, queue) fifo_q_ ## name ## _capacity(queue) -#define fifo_q_empty(name, queue) fifo_q_ ## name ## _empty(queue) -#define fifo_q_full(name, queue) fifo_q_ ## name ## _full(queue) -#define fifo_q_foreach(name, queue, item, task) do { \ - while((item = fifo_q_ ## name ## _get(queue)) != NULL) { \ - do task while(0); \ - } \ - } while(0); - -struct async_nif_req_entry { - ERL_NIF_TERM ref; - ErlNifEnv *env; - ErlNifPid pid; - void *args; - void (*fn_work)(ErlNifEnv*, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *); - void (*fn_post)(void *); -}; - DECL_FIFO_QUEUE(reqs, struct async_nif_req_entry); struct async_nif_work_queue { @@ -141,7 +75,8 @@ struct async_nif_state { /* argv[0] is a ref used for selective recv */ \ const ERL_NIF_TERM *argv = argv_in + 1; \ argc -= 1; \ - struct async_nif_state *async_nif = (struct async_nif_state*)enif_priv_data(env); \ + /* Note: !!! this assumes that the first element of priv_data is ours */ \ + struct async_nif_state *async_nif = *(struct async_nif_state**)enif_priv_data(env); \ if (async_nif->shutdown) \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \ enif_make_atom(env, "shutdown")); \ @@ -198,11 +133,11 @@ struct async_nif_state { priv = async_nif_load(); \ enif_mutex_unlock(name##_async_nif_coord); \ } while(0); -#define ASYNC_NIF_UNLOAD(name, env) do { \ +#define ASYNC_NIF_UNLOAD(name, env, priv) do { \ if (!name##_async_nif_coord) \ name##_async_nif_coord = enif_mutex_create(NULL); \ enif_mutex_lock(name##_async_nif_coord); \ - async_nif_unload(env); \ + async_nif_unload(env, priv); \ enif_mutex_unlock(name##_async_nif_coord); \ enif_mutex_destroy(name##_async_nif_coord); \ name##_async_nif_coord = NULL; \ @@ -326,10 +261,9 @@ async_nif_worker_fn(void *arg) } static void -async_nif_unload(ErlNifEnv *env) +async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif) { unsigned int i; - struct async_nif_state *async_nif = (struct async_nif_state*)enif_priv_data(env); unsigned int num_queues = async_nif->num_queues; struct async_nif_work_queue *q = NULL; @@ -359,7 +293,6 @@ async_nif_unload(ErlNifEnv *env) /* Cleanup requests, mutexes and conditions in each work queue. */ for (i = 0; i < num_queues; i++) { q = &async_nif->queues[i]; - enif_mutex_lock(q->reqs_mutex); // TODO: unnecessary? /* Worker threads are stopped, now toss anything left in the queue. */ struct async_nif_req_entry *req = NULL; @@ -372,8 +305,6 @@ async_nif_unload(ErlNifEnv *env) enif_free(req); }); fifo_q_free(reqs, q->reqs); - - enif_mutex_unlock(q->reqs_mutex); // TODO: unnecessary? enif_mutex_destroy(q->reqs_mutex); enif_cond_destroy(q->reqs_cnd); } @@ -382,7 +313,7 @@ async_nif_unload(ErlNifEnv *env) } static void * -async_nif_load(void) +async_nif_load() { static int has_init = 0; unsigned int i, j, num_queues; diff --git a/c_src/build_deps.sh b/c_src/build_deps.sh index 19658eb..73bc487 100755 --- a/c_src/build_deps.sh +++ b/c_src/build_deps.sh @@ -158,17 +158,17 @@ case "$1" in # Build Snappy [ -d $BASEDIR/$SNAPPY_DIR ] || (echo "Missing Snappy source directory" && exit 1) - test -f system/lib/libsnappy.so.[0-9].[0-9].[0-9] || build_snappy; + test -f $BASEDIR/system/lib/libsnappy.so.[0-9].[0-9].[0-9] || build_snappy; # Build BZIP2 [ -d $BASEDIR/$BZIP2_DIR ] || (echo "Missing BZip2 source directory" && exit 1) - test -f system/lib/libbz2.so.[0-9].[0-9].[0-9] || build_bzip2; + test -f $BASEDIR/system/lib/libbz2.so.[0-9].[0-9].[0-9] || build_bzip2; # Build WiredTiger [ -d $BASEDIR/$WT_DIR ] || (echo "Missing WiredTiger source directory" && exit 1) - test -f system/lib/libwiredtiger-[0-9].[0-9].[0-9].so \ - -a -f system/lib/libwiredtiger_snappy.so \ - -a -f system/lib/libwiredtiger_bzip2.so.[0-9].[0-9].[0-9] || build_wt; + test -f $BASEDIR/system/lib/libwiredtiger-[0-9].[0-9].[0-9].so \ + -a -f $BASEDIR/system/lib/libwiredtiger_snappy.so \ + -a -f $BASEDIR/system/lib/libwiredtiger_bzip2.so.[0-9].[0-9].[0-9] || build_wt; [ -d $BASEDIR/../priv ] || mkdir ${BASEDIR}/../priv cp $BASEDIR/system/bin/wt ${BASEDIR}/../priv diff --git a/c_src/fifo_q.h b/c_src/fifo_q.h new file mode 100644 index 0000000..b9291f2 --- /dev/null +++ b/c_src/fifo_q.h @@ -0,0 +1,102 @@ +/* + * fifo_q: a macro-based implementation of a FIFO Queue + * + * Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved. + * Author: Gregory Burd + * + * This file is provided to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef __FIFO_Q_H__ +#define __FIFO_Q_H__ + +#if defined(__cplusplus) +extern "C" { +#endif + +#define FIFO_QUEUE_TYPE(name) \ + struct fifo_q__ ## name * +#define DECL_FIFO_QUEUE(name, type) \ + struct fifo_q__ ## name { \ + unsigned int h, t, s; \ + type *items[]; \ + }; \ + static struct fifo_q__ ## name *fifo_q_ ## name ## _new(unsigned int n) { \ + int sz = sizeof(struct fifo_q__ ## name) + ((n+1) * sizeof(type *));\ + struct fifo_q__ ## name *q = enif_alloc(sz); \ + if (!q) \ + return 0; \ + memset(q, 0, sz); \ + q->s = n + 1; \ + return q; \ + } \ + static inline void fifo_q_ ## name ## _free(struct fifo_q__ ## name *q) { \ + memset(q, 0, sizeof(struct fifo_q__ ## name) + (q->s * sizeof(type *))); \ + enif_free(q); \ + } \ + static inline type *fifo_q_ ## name ## _put(struct fifo_q__ ## name *q, type *n) { \ + q->items[q->h] = n; \ + q->h = (q->h + 1) % q->s; \ + return n; \ + } \ + static inline type *fifo_q_ ## name ## _get(struct fifo_q__ ## name *q) { \ + type *n = q->items[q->t]; \ + q->items[q->t] = 0; \ + q->t = (q->t + 1) % q->s; \ + return n; \ + } \ + static inline unsigned int fifo_q_ ## name ## _size(struct fifo_q__ ## name *q) { \ + return (q->h - q->t + q->s) % q->s; \ + } \ + static inline unsigned int fifo_q_ ## name ## _capacity(struct fifo_q__ ## name *q) { \ + return q->s - 1; \ + } \ + static inline int fifo_q_ ## name ## _empty(struct fifo_q__ ## name *q) { \ + return (q->t == q->h); \ + } \ + static inline int fifo_q_ ## name ## _full(struct fifo_q__ ## name *q) { \ + return ((q->h + 1) % q->s) == q->t; \ + } + +#define fifo_q_new(name, size) fifo_q_ ## name ## _new(size) +#define fifo_q_free(name, queue) fifo_q_ ## name ## _free(queue) +#define fifo_q_get(name, queue) fifo_q_ ## name ## _get(queue) +#define fifo_q_put(name, queue, item) fifo_q_ ## name ## _put(queue, item) +#define fifo_q_size(name, queue) fifo_q_ ## name ## _size(queue) +#define fifo_q_capacity(name, queue) fifo_q_ ## name ## _capacity(queue) +#define fifo_q_empty(name, queue) fifo_q_ ## name ## _empty(queue) +#define fifo_q_full(name, queue) fifo_q_ ## name ## _full(queue) +#define fifo_q_foreach(name, queue, item, task) do { \ + while(!fifo_q_ ## name ## _empty(queue)) { \ + item = fifo_q_ ## name ## _get(queue); \ + do task while(0); \ + } \ + } while(0); + +struct async_nif_req_entry { + ERL_NIF_TERM ref; + ErlNifEnv *env; + ErlNifPid pid; + void *args; + void (*fn_work)(ErlNifEnv*, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *); + void (*fn_post)(void *); +}; + + +#if defined(__cplusplus) +} +#endif + +#endif // __FIFO_Q_H__ diff --git a/c_src/khash.h b/c_src/khash.h index 16bba5c..ab157b1 100644 --- a/c_src/khash.h +++ b/c_src/khash.h @@ -371,6 +371,26 @@ static const double __ac_HASH_UPPER = 0.77; @abstract 64-bit integer comparison function */ #define kh_int64_hash_equal(a, b) ((a) == (b)) +/*! @function + @abstract Pointer hash function + @param key The integer void * + @return The hash value [khint_t] +*/ +#define kh_ptr_hash_func(key) (khint32_t)(key) +/*! @function + @abstract Pointer comparison function +*/ +#define kh_ptr_hash_equal(a, b) ((a) == (b)) +/*! @function + @abstract 64-bit pointer hash function + @param key The integer void * + @return The hash value [khint_t] +*/ +#define kh_ptr64_hash_func(key) (khint32_t)(((khint64_t)key)>>33^((khint64_t)key)^((khint64_t)key)<<11) +/*! @function + @abstract 64-bit pointer comparison function +*/ +#define kh_ptr64_hash_equal(a, b) ((a) == (b)) /*! @function @abstract const char* hash function @param s Pointer to a null terminated string @@ -561,6 +581,19 @@ static kh_inline khint_t __ac_Wang_hash(khint_t key) /* More conenient interfaces */ +/*! @function + @abstract Instantiate a hash map containing (void *) keys + @param name Name of the hash table [symbol] + @param khval_t Type of values [type] +*/ +#ifdef __x86_64__ +#define KHASH_MAP_INIT_PTR(name, khval_t) \ + KHASH_INIT(name, void*, khval_t, 1, kh_ptr64_hash_func, kh_ptr64_hash_equal) +#else +#define KHASH_MAP_INIT_PTR(name, khval_t) \ + KHASH_INIT(name, void*, khval_t, 1, kh_ptr_hash_func, kh_ptr_hash_equal) +#endif + /*! @function @abstract Instantiate a hash set containing integer keys @param name Name of the hash table [symbol] diff --git a/c_src/wterl.c b/c_src/wterl.c index 992a9e0..53954e1 100644 --- a/c_src/wterl.c +++ b/c_src/wterl.c @@ -46,6 +46,7 @@ static ErlNifResourceType *wterl_conn_RESOURCE; static ErlNifResourceType *wterl_cursor_RESOURCE; +/* Generators for 'cursors' a named, type-specific hash table functions. */ KHASH_MAP_INIT_STR(cursors, WT_CURSOR*); /** @@ -90,8 +91,19 @@ static ERL_NIF_TERM ATOM_NOT_FOUND; static ERL_NIF_TERM ATOM_FIRST; static ERL_NIF_TERM ATOM_LAST; +/* Generators for 'conns' a named, type-specific hash table functions. */ +KHASH_MAP_INIT_PTR(conns, WterlConnHandle*); + +struct wterl_priv_data { + void *async_nif_priv; // Note: must be first element in struct + ErlNifMutex *conns_mutex; + khash_t(conns) *conns; +}; + +/* Global init for async_nif. */ ASYNC_NIF_INIT(wterl); + /** * Get the per-worker reusable WT_SESSION for a worker_id. */ @@ -135,10 +147,9 @@ __close_all_sessions(WterlConnHandle *conn_handle) for (itr = kh_begin(h); itr != kh_end(h); ++itr) { if (kh_exist(h, itr)) { WT_CURSOR *cursor = kh_val(h, itr); - char *key = (char *)kh_key(h, itr); cursor->close(cursor); kh_del(cursors, h, itr); - enif_free(key); + kh_value(h, itr) = NULL; } } kh_destroy(cursors, h); @@ -165,10 +176,9 @@ __close_cursors_on(WterlConnHandle *conn_handle, const char *uri) khiter_t itr = kh_get(cursors, h, (char *)uri); if (itr != kh_end(h)) { WT_CURSOR *cursor = kh_value(h, itr); - char *key = (char *)kh_key(h, itr); cursor->close(cursor); kh_del(cursors, h, itr); - enif_free(key); + kh_value(h, itr) = NULL; } } } @@ -251,6 +261,7 @@ ASYNC_NIF_DECL( ERL_NIF_TERM config; ERL_NIF_TERM session_config; char homedir[4096]; + struct wterl_priv_data *priv; }, { // pre @@ -262,6 +273,8 @@ ASYNC_NIF_DECL( } args->config = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[1]); args->session_config = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); + + args->priv = (struct wterl_priv_data *)enif_priv_data(env); }, { // work @@ -297,13 +310,25 @@ ASYNC_NIF_DECL( } else { conn_handle->session_config = NULL; } + conn_handle->contexts_mutex = enif_mutex_create(NULL); + enif_mutex_lock(conn_handle->contexts_mutex); conn_handle->conn = conn; conn_handle->num_contexts = 0; memset(conn_handle->contexts, 0, sizeof(WterlCtx) * ASYNC_NIF_MAX_WORKERS); - conn_handle->contexts_mutex = enif_mutex_create(NULL); ERL_NIF_TERM result = enif_make_resource(env, conn_handle); + + khash_t(conns) *h; + enif_mutex_lock(args->priv->conns_mutex); + h = args->priv->conns; + int itr_status = 0; + khiter_t itr = kh_put(conns, h, conn, &itr_status); + if (itr_status != 0) // 0 indicates the key exists already + kh_value(h, itr) = conn_handle; + enif_mutex_unlock(args->priv->conns_mutex); + enif_release_resource(conn_handle); ASYNC_NIF_REPLY(enif_make_tuple2(env, ATOM_OK, result)); + enif_mutex_unlock(conn_handle->contexts_mutex); } else { @@ -324,6 +349,7 @@ ASYNC_NIF_DECL( { // struct WterlConnHandle* conn_handle; + struct wterl_priv_data *priv; }, { // pre @@ -332,6 +358,8 @@ ASYNC_NIF_DECL( ASYNC_NIF_RETURN_BADARG(); } enif_keep_resource((void*)args->conn_handle); + + args->priv = (struct wterl_priv_data *)enif_priv_data(env); }, { // work @@ -344,6 +372,19 @@ ASYNC_NIF_DECL( } WT_CONNECTION* conn = args->conn_handle->conn; int rc = conn->close(conn, NULL); + + khash_t(conns) *h; + enif_mutex_lock(args->priv->conns_mutex); + h = args->priv->conns; + khiter_t itr; + itr = kh_get(conns, h, conn); + if (itr == 0) { + /* key exists in table (as expected) delete it */ + kh_del(conns, h, itr); + kh_value(h, itr) = NULL; + } + enif_mutex_unlock(args->priv->conns_mutex); + enif_mutex_unlock(args->conn_handle->contexts_mutex); enif_mutex_destroy(args->conn_handle->contexts_mutex); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); @@ -1836,27 +1877,78 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info) ATOM_FIRST = enif_make_atom(env, "first"); ATOM_LAST = enif_make_atom(env, "last"); - ASYNC_NIF_LOAD(wterl, *priv_data); + struct wterl_priv_data *priv = enif_alloc(sizeof(struct wterl_priv_data)); + if (!priv) + return ENOMEM; + memset(priv, 0, sizeof(struct wterl_priv_data)); - return *priv_data ? 0 : -1; + /* Note: !!! the first element of our priv_data struct *must* be the + pointer to the async_nif's private data which we set here. */ + ASYNC_NIF_LOAD(wterl, priv->async_nif_priv); + + priv->conns_mutex = enif_mutex_create(NULL); + priv->conns = kh_init(conns); + *priv_data = priv; + return *priv_data ? 0 : ENOMEM; } static int on_reload(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info) { - return 0; // TODO: Determine what should be done here. + return 0; // TODO: implement } static void on_unload(ErlNifEnv *env, void *priv_data) { - ASYNC_NIF_UNLOAD(wterl, env); // TODO: Review/test this. + unsigned int i; + struct wterl_priv_data *priv = (struct wterl_priv_data *)priv_data; + khash_t(conns) *h; + khiter_t itr; + + enif_mutex_lock(priv->conns_mutex); + h = priv->conns; + + for (itr = kh_begin(h); itr != kh_end(h); ++itr) { + if (kh_exist(h, itr)) { + WterlConnHandle *c = kh_val(h, itr); + if (c) { + enif_mutex_lock(c->contexts_mutex); + enif_free((void*)c->session_config); + for (i = 0; i < ASYNC_NIF_MAX_WORKERS; i++) { + kh_destroy(cursors, c->contexts[i].cursors); + } + } + + /* This should close all cursors and sessions. */ + c->conn->close(c->conn, NULL); + } + } + + /* Continue to hold the context mutex while unloading the async_nif + to prevent new work from coming in while shutting down. */ + ASYNC_NIF_UNLOAD(wterl, env, priv->async_nif_priv); + + for (itr = kh_begin(h); itr != kh_end(h); ++itr) { + if (kh_exist(h, itr)) { + WterlConnHandle *c = kh_val(h, itr); + if (c) { + enif_mutex_unlock(c->contexts_mutex); + enif_mutex_destroy(c->contexts_mutex); + } + } + } + + kh_destroy(conns, h); + enif_mutex_unlock(priv->conns_mutex); + enif_mutex_destroy(priv->conns_mutex); + enif_free(priv); } static int on_upgrade(ErlNifEnv *env, void **priv_data, void **old_priv_data, ERL_NIF_TERM load_info) { - ASYNC_NIF_UPGRADE(wterl, env); // TODO: Review/test this. + ASYNC_NIF_UPGRADE(wterl, env); // TODO: implement return 0; } diff --git a/src/riak_kv_wterl_backend.erl b/src/riak_kv_wterl_backend.erl index d6282b2..42f4dd0 100644 --- a/src/riak_kv_wterl_backend.erl +++ b/src/riak_kv_wterl_backend.erl @@ -151,8 +151,8 @@ start(Partition, Config) -> %% @doc Stop the wterl backend -spec stop(state()) -> ok. -stop(_State) -> - ok. +stop(#state{connection=Connection}) -> + wterl_conn:close(Connection). %% @doc Retrieve an object from the wterl backend -spec get(riak_object:bucket(), riak_object:key(), state()) ->