Move the FIFO Queue implementation into its own file (fifo_q.h). Work

on the nif_unload path.  Free up resources owned by wterl.c when
unloading.  Continue to evolve the build script.  Add to khash the ability
to create a hash that maps from a pointer to a value. There is still a segv
due to a race wterl.c:do_unload() which needs to be addressed.
This commit is contained in:
Gregory Burd 2013-04-18 10:32:29 -04:00
parent db953f5b39
commit 60dd048b7e
7 changed files with 254 additions and 93 deletions

View file

@ -1 +1,4 @@
handle SIGPIPE nostop noprint pass
#b erl_nif.c:1203
#b sys/unix/erl_unix_sys_ddll.c:234

View file

@ -27,6 +27,7 @@ extern "C" {
#endif
#include <assert.h>
#include "fifo_q.h"
#ifdef ASYNC_NIF_STATS
#include "stats.h" // TODO: measure, measure... measure again
#endif
@ -34,73 +35,6 @@ extern "C" {
#define ASYNC_NIF_MAX_WORKERS 128
#define ASYNC_NIF_WORKER_QUEUE_SIZE 500
#define FIFO_QUEUE_TYPE(name) \
struct fifo_q__ ## name *
#define DECL_FIFO_QUEUE(name, type) \
struct fifo_q__ ## name { \
unsigned int h, t, s; \
type *items[]; \
}; \
static struct fifo_q__ ## name *fifo_q_ ## name ## _new(unsigned int n) { \
int sz = sizeof(struct fifo_q__ ## name) + ((n+1) * sizeof(type *));\
struct fifo_q__ ## name *q = enif_alloc(sz); \
if (!q) \
return 0; \
memset(q, 0, sz); \
q->s = n + 1; \
return q; \
} \
static inline void fifo_q_ ## name ## _free(struct fifo_q__ ## name *q) { \
memset(q, 0, sizeof(struct fifo_q__ ## name) + (q->s * sizeof(type *))); \
enif_free(q); \
} \
static inline type *fifo_q_ ## name ## _put(struct fifo_q__ ## name *q, type *n) { \
q->items[q->h] = n; \
q->h = (q->h + 1) % q->s; \
return n; \
} \
static inline type *fifo_q_ ## name ## _get(struct fifo_q__ ## name *q) { \
type *n = q->items[q->t]; \
q->items[q->t] = 0; \
q->t = (q->t + 1) % q->s; \
return n; \
} \
static inline unsigned int fifo_q_ ## name ## _size(struct fifo_q__ ## name *q) { \
return (q->h - q->t + q->s) % q->s; \
} \
static inline unsigned int fifo_q_ ## name ## _capacity(struct fifo_q__ ## name *q) { \
return q->s - 1; \
} \
static inline int fifo_q_ ## name ## _empty(struct fifo_q__ ## name *q) { \
return (q->t == q->h); \
} \
static inline int fifo_q_ ## name ## _full(struct fifo_q__ ## name *q) { \
return ((q->h + 1) % q->s) == q->t; \
}
#define fifo_q_new(name, size) fifo_q_ ## name ## _new(size)
#define fifo_q_free(name, queue) fifo_q_ ## name ## _free(queue)
#define fifo_q_get(name, queue) fifo_q_ ## name ## _get(queue)
#define fifo_q_put(name, queue, item) fifo_q_ ## name ## _put(queue, item)
#define fifo_q_size(name, queue) fifo_q_ ## name ## _size(queue)
#define fifo_q_capacity(name, queue) fifo_q_ ## name ## _capacity(queue)
#define fifo_q_empty(name, queue) fifo_q_ ## name ## _empty(queue)
#define fifo_q_full(name, queue) fifo_q_ ## name ## _full(queue)
#define fifo_q_foreach(name, queue, item, task) do { \
while((item = fifo_q_ ## name ## _get(queue)) != NULL) { \
do task while(0); \
} \
} while(0);
struct async_nif_req_entry {
ERL_NIF_TERM ref;
ErlNifEnv *env;
ErlNifPid pid;
void *args;
void (*fn_work)(ErlNifEnv*, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *);
void (*fn_post)(void *);
};
DECL_FIFO_QUEUE(reqs, struct async_nif_req_entry);
struct async_nif_work_queue {
@ -141,7 +75,8 @@ struct async_nif_state {
/* argv[0] is a ref used for selective recv */ \
const ERL_NIF_TERM *argv = argv_in + 1; \
argc -= 1; \
struct async_nif_state *async_nif = (struct async_nif_state*)enif_priv_data(env); \
/* Note: !!! this assumes that the first element of priv_data is ours */ \
struct async_nif_state *async_nif = *(struct async_nif_state**)enif_priv_data(env); \
if (async_nif->shutdown) \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "shutdown")); \
@ -198,11 +133,11 @@ struct async_nif_state {
priv = async_nif_load(); \
enif_mutex_unlock(name##_async_nif_coord); \
} while(0);
#define ASYNC_NIF_UNLOAD(name, env) do { \
#define ASYNC_NIF_UNLOAD(name, env, priv) do { \
if (!name##_async_nif_coord) \
name##_async_nif_coord = enif_mutex_create(NULL); \
enif_mutex_lock(name##_async_nif_coord); \
async_nif_unload(env); \
async_nif_unload(env, priv); \
enif_mutex_unlock(name##_async_nif_coord); \
enif_mutex_destroy(name##_async_nif_coord); \
name##_async_nif_coord = NULL; \
@ -326,10 +261,9 @@ async_nif_worker_fn(void *arg)
}
static void
async_nif_unload(ErlNifEnv *env)
async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
{
unsigned int i;
struct async_nif_state *async_nif = (struct async_nif_state*)enif_priv_data(env);
unsigned int num_queues = async_nif->num_queues;
struct async_nif_work_queue *q = NULL;
@ -359,7 +293,6 @@ async_nif_unload(ErlNifEnv *env)
/* Cleanup requests, mutexes and conditions in each work queue. */
for (i = 0; i < num_queues; i++) {
q = &async_nif->queues[i];
enif_mutex_lock(q->reqs_mutex); // TODO: unnecessary?
/* Worker threads are stopped, now toss anything left in the queue. */
struct async_nif_req_entry *req = NULL;
@ -372,8 +305,6 @@ async_nif_unload(ErlNifEnv *env)
enif_free(req);
});
fifo_q_free(reqs, q->reqs);
enif_mutex_unlock(q->reqs_mutex); // TODO: unnecessary?
enif_mutex_destroy(q->reqs_mutex);
enif_cond_destroy(q->reqs_cnd);
}
@ -382,7 +313,7 @@ async_nif_unload(ErlNifEnv *env)
}
static void *
async_nif_load(void)
async_nif_load()
{
static int has_init = 0;
unsigned int i, j, num_queues;

View file

@ -158,17 +158,17 @@ case "$1" in
# Build Snappy
[ -d $BASEDIR/$SNAPPY_DIR ] || (echo "Missing Snappy source directory" && exit 1)
test -f system/lib/libsnappy.so.[0-9].[0-9].[0-9] || build_snappy;
test -f $BASEDIR/system/lib/libsnappy.so.[0-9].[0-9].[0-9] || build_snappy;
# Build BZIP2
[ -d $BASEDIR/$BZIP2_DIR ] || (echo "Missing BZip2 source directory" && exit 1)
test -f system/lib/libbz2.so.[0-9].[0-9].[0-9] || build_bzip2;
test -f $BASEDIR/system/lib/libbz2.so.[0-9].[0-9].[0-9] || build_bzip2;
# Build WiredTiger
[ -d $BASEDIR/$WT_DIR ] || (echo "Missing WiredTiger source directory" && exit 1)
test -f system/lib/libwiredtiger-[0-9].[0-9].[0-9].so \
-a -f system/lib/libwiredtiger_snappy.so \
-a -f system/lib/libwiredtiger_bzip2.so.[0-9].[0-9].[0-9] || build_wt;
test -f $BASEDIR/system/lib/libwiredtiger-[0-9].[0-9].[0-9].so \
-a -f $BASEDIR/system/lib/libwiredtiger_snappy.so \
-a -f $BASEDIR/system/lib/libwiredtiger_bzip2.so.[0-9].[0-9].[0-9] || build_wt;
[ -d $BASEDIR/../priv ] || mkdir ${BASEDIR}/../priv
cp $BASEDIR/system/bin/wt ${BASEDIR}/../priv

102
c_src/fifo_q.h Normal file
View file

@ -0,0 +1,102 @@
/*
* fifo_q: a macro-based implementation of a FIFO Queue
*
* Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
* Author: Gregory Burd <greg@basho.com> <greg@burd.me>
*
* This file is provided to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef __FIFO_Q_H__
#define __FIFO_Q_H__
#if defined(__cplusplus)
extern "C" {
#endif
#define FIFO_QUEUE_TYPE(name) \
struct fifo_q__ ## name *
#define DECL_FIFO_QUEUE(name, type) \
struct fifo_q__ ## name { \
unsigned int h, t, s; \
type *items[]; \
}; \
static struct fifo_q__ ## name *fifo_q_ ## name ## _new(unsigned int n) { \
int sz = sizeof(struct fifo_q__ ## name) + ((n+1) * sizeof(type *));\
struct fifo_q__ ## name *q = enif_alloc(sz); \
if (!q) \
return 0; \
memset(q, 0, sz); \
q->s = n + 1; \
return q; \
} \
static inline void fifo_q_ ## name ## _free(struct fifo_q__ ## name *q) { \
memset(q, 0, sizeof(struct fifo_q__ ## name) + (q->s * sizeof(type *))); \
enif_free(q); \
} \
static inline type *fifo_q_ ## name ## _put(struct fifo_q__ ## name *q, type *n) { \
q->items[q->h] = n; \
q->h = (q->h + 1) % q->s; \
return n; \
} \
static inline type *fifo_q_ ## name ## _get(struct fifo_q__ ## name *q) { \
type *n = q->items[q->t]; \
q->items[q->t] = 0; \
q->t = (q->t + 1) % q->s; \
return n; \
} \
static inline unsigned int fifo_q_ ## name ## _size(struct fifo_q__ ## name *q) { \
return (q->h - q->t + q->s) % q->s; \
} \
static inline unsigned int fifo_q_ ## name ## _capacity(struct fifo_q__ ## name *q) { \
return q->s - 1; \
} \
static inline int fifo_q_ ## name ## _empty(struct fifo_q__ ## name *q) { \
return (q->t == q->h); \
} \
static inline int fifo_q_ ## name ## _full(struct fifo_q__ ## name *q) { \
return ((q->h + 1) % q->s) == q->t; \
}
#define fifo_q_new(name, size) fifo_q_ ## name ## _new(size)
#define fifo_q_free(name, queue) fifo_q_ ## name ## _free(queue)
#define fifo_q_get(name, queue) fifo_q_ ## name ## _get(queue)
#define fifo_q_put(name, queue, item) fifo_q_ ## name ## _put(queue, item)
#define fifo_q_size(name, queue) fifo_q_ ## name ## _size(queue)
#define fifo_q_capacity(name, queue) fifo_q_ ## name ## _capacity(queue)
#define fifo_q_empty(name, queue) fifo_q_ ## name ## _empty(queue)
#define fifo_q_full(name, queue) fifo_q_ ## name ## _full(queue)
#define fifo_q_foreach(name, queue, item, task) do { \
while(!fifo_q_ ## name ## _empty(queue)) { \
item = fifo_q_ ## name ## _get(queue); \
do task while(0); \
} \
} while(0);
struct async_nif_req_entry {
ERL_NIF_TERM ref;
ErlNifEnv *env;
ErlNifPid pid;
void *args;
void (*fn_work)(ErlNifEnv*, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *);
void (*fn_post)(void *);
};
#if defined(__cplusplus)
}
#endif
#endif // __FIFO_Q_H__

View file

@ -371,6 +371,26 @@ static const double __ac_HASH_UPPER = 0.77;
@abstract 64-bit integer comparison function
*/
#define kh_int64_hash_equal(a, b) ((a) == (b))
/*! @function
@abstract Pointer hash function
@param key The integer void *
@return The hash value [khint_t]
*/
#define kh_ptr_hash_func(key) (khint32_t)(key)
/*! @function
@abstract Pointer comparison function
*/
#define kh_ptr_hash_equal(a, b) ((a) == (b))
/*! @function
@abstract 64-bit pointer hash function
@param key The integer void *
@return The hash value [khint_t]
*/
#define kh_ptr64_hash_func(key) (khint32_t)(((khint64_t)key)>>33^((khint64_t)key)^((khint64_t)key)<<11)
/*! @function
@abstract 64-bit pointer comparison function
*/
#define kh_ptr64_hash_equal(a, b) ((a) == (b))
/*! @function
@abstract const char* hash function
@param s Pointer to a null terminated string
@ -561,6 +581,19 @@ static kh_inline khint_t __ac_Wang_hash(khint_t key)
/* More conenient interfaces */
/*! @function
@abstract Instantiate a hash map containing (void *) keys
@param name Name of the hash table [symbol]
@param khval_t Type of values [type]
*/
#ifdef __x86_64__
#define KHASH_MAP_INIT_PTR(name, khval_t) \
KHASH_INIT(name, void*, khval_t, 1, kh_ptr64_hash_func, kh_ptr64_hash_equal)
#else
#define KHASH_MAP_INIT_PTR(name, khval_t) \
KHASH_INIT(name, void*, khval_t, 1, kh_ptr_hash_func, kh_ptr_hash_equal)
#endif
/*! @function
@abstract Instantiate a hash set containing integer keys
@param name Name of the hash table [symbol]

View file

@ -46,6 +46,7 @@
static ErlNifResourceType *wterl_conn_RESOURCE;
static ErlNifResourceType *wterl_cursor_RESOURCE;
/* Generators for 'cursors' a named, type-specific hash table functions. */
KHASH_MAP_INIT_STR(cursors, WT_CURSOR*);
/**
@ -90,8 +91,19 @@ static ERL_NIF_TERM ATOM_NOT_FOUND;
static ERL_NIF_TERM ATOM_FIRST;
static ERL_NIF_TERM ATOM_LAST;
/* Generators for 'conns' a named, type-specific hash table functions. */
KHASH_MAP_INIT_PTR(conns, WterlConnHandle*);
struct wterl_priv_data {
void *async_nif_priv; // Note: must be first element in struct
ErlNifMutex *conns_mutex;
khash_t(conns) *conns;
};
/* Global init for async_nif. */
ASYNC_NIF_INIT(wterl);
/**
* Get the per-worker reusable WT_SESSION for a worker_id.
*/
@ -135,10 +147,9 @@ __close_all_sessions(WterlConnHandle *conn_handle)
for (itr = kh_begin(h); itr != kh_end(h); ++itr) {
if (kh_exist(h, itr)) {
WT_CURSOR *cursor = kh_val(h, itr);
char *key = (char *)kh_key(h, itr);
cursor->close(cursor);
kh_del(cursors, h, itr);
enif_free(key);
kh_value(h, itr) = NULL;
}
}
kh_destroy(cursors, h);
@ -165,10 +176,9 @@ __close_cursors_on(WterlConnHandle *conn_handle, const char *uri)
khiter_t itr = kh_get(cursors, h, (char *)uri);
if (itr != kh_end(h)) {
WT_CURSOR *cursor = kh_value(h, itr);
char *key = (char *)kh_key(h, itr);
cursor->close(cursor);
kh_del(cursors, h, itr);
enif_free(key);
kh_value(h, itr) = NULL;
}
}
}
@ -251,6 +261,7 @@ ASYNC_NIF_DECL(
ERL_NIF_TERM config;
ERL_NIF_TERM session_config;
char homedir[4096];
struct wterl_priv_data *priv;
},
{ // pre
@ -262,6 +273,8 @@ ASYNC_NIF_DECL(
}
args->config = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[1]);
args->session_config = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]);
args->priv = (struct wterl_priv_data *)enif_priv_data(env);
},
{ // work
@ -297,13 +310,25 @@ ASYNC_NIF_DECL(
} else {
conn_handle->session_config = NULL;
}
conn_handle->contexts_mutex = enif_mutex_create(NULL);
enif_mutex_lock(conn_handle->contexts_mutex);
conn_handle->conn = conn;
conn_handle->num_contexts = 0;
memset(conn_handle->contexts, 0, sizeof(WterlCtx) * ASYNC_NIF_MAX_WORKERS);
conn_handle->contexts_mutex = enif_mutex_create(NULL);
ERL_NIF_TERM result = enif_make_resource(env, conn_handle);
khash_t(conns) *h;
enif_mutex_lock(args->priv->conns_mutex);
h = args->priv->conns;
int itr_status = 0;
khiter_t itr = kh_put(conns, h, conn, &itr_status);
if (itr_status != 0) // 0 indicates the key exists already
kh_value(h, itr) = conn_handle;
enif_mutex_unlock(args->priv->conns_mutex);
enif_release_resource(conn_handle);
ASYNC_NIF_REPLY(enif_make_tuple2(env, ATOM_OK, result));
enif_mutex_unlock(conn_handle->contexts_mutex);
}
else
{
@ -324,6 +349,7 @@ ASYNC_NIF_DECL(
{ // struct
WterlConnHandle* conn_handle;
struct wterl_priv_data *priv;
},
{ // pre
@ -332,6 +358,8 @@ ASYNC_NIF_DECL(
ASYNC_NIF_RETURN_BADARG();
}
enif_keep_resource((void*)args->conn_handle);
args->priv = (struct wterl_priv_data *)enif_priv_data(env);
},
{ // work
@ -344,6 +372,19 @@ ASYNC_NIF_DECL(
}
WT_CONNECTION* conn = args->conn_handle->conn;
int rc = conn->close(conn, NULL);
khash_t(conns) *h;
enif_mutex_lock(args->priv->conns_mutex);
h = args->priv->conns;
khiter_t itr;
itr = kh_get(conns, h, conn);
if (itr == 0) {
/* key exists in table (as expected) delete it */
kh_del(conns, h, itr);
kh_value(h, itr) = NULL;
}
enif_mutex_unlock(args->priv->conns_mutex);
enif_mutex_unlock(args->conn_handle->contexts_mutex);
enif_mutex_destroy(args->conn_handle->contexts_mutex);
ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc));
@ -1836,27 +1877,78 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info)
ATOM_FIRST = enif_make_atom(env, "first");
ATOM_LAST = enif_make_atom(env, "last");
ASYNC_NIF_LOAD(wterl, *priv_data);
struct wterl_priv_data *priv = enif_alloc(sizeof(struct wterl_priv_data));
if (!priv)
return ENOMEM;
memset(priv, 0, sizeof(struct wterl_priv_data));
return *priv_data ? 0 : -1;
/* Note: !!! the first element of our priv_data struct *must* be the
pointer to the async_nif's private data which we set here. */
ASYNC_NIF_LOAD(wterl, priv->async_nif_priv);
priv->conns_mutex = enif_mutex_create(NULL);
priv->conns = kh_init(conns);
*priv_data = priv;
return *priv_data ? 0 : ENOMEM;
}
static int
on_reload(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info)
{
return 0; // TODO: Determine what should be done here.
return 0; // TODO: implement
}
static void
on_unload(ErlNifEnv *env, void *priv_data)
{
ASYNC_NIF_UNLOAD(wterl, env); // TODO: Review/test this.
unsigned int i;
struct wterl_priv_data *priv = (struct wterl_priv_data *)priv_data;
khash_t(conns) *h;
khiter_t itr;
enif_mutex_lock(priv->conns_mutex);
h = priv->conns;
for (itr = kh_begin(h); itr != kh_end(h); ++itr) {
if (kh_exist(h, itr)) {
WterlConnHandle *c = kh_val(h, itr);
if (c) {
enif_mutex_lock(c->contexts_mutex);
enif_free((void*)c->session_config);
for (i = 0; i < ASYNC_NIF_MAX_WORKERS; i++) {
kh_destroy(cursors, c->contexts[i].cursors);
}
}
/* This should close all cursors and sessions. */
c->conn->close(c->conn, NULL);
}
}
/* Continue to hold the context mutex while unloading the async_nif
to prevent new work from coming in while shutting down. */
ASYNC_NIF_UNLOAD(wterl, env, priv->async_nif_priv);
for (itr = kh_begin(h); itr != kh_end(h); ++itr) {
if (kh_exist(h, itr)) {
WterlConnHandle *c = kh_val(h, itr);
if (c) {
enif_mutex_unlock(c->contexts_mutex);
enif_mutex_destroy(c->contexts_mutex);
}
}
}
kh_destroy(conns, h);
enif_mutex_unlock(priv->conns_mutex);
enif_mutex_destroy(priv->conns_mutex);
enif_free(priv);
}
static int
on_upgrade(ErlNifEnv *env, void **priv_data, void **old_priv_data, ERL_NIF_TERM load_info)
{
ASYNC_NIF_UPGRADE(wterl, env); // TODO: Review/test this.
ASYNC_NIF_UPGRADE(wterl, env); // TODO: implement
return 0;
}

View file

@ -151,8 +151,8 @@ start(Partition, Config) ->
%% @doc Stop the wterl backend
-spec stop(state()) -> ok.
stop(_State) ->
ok.
stop(#state{connection=Connection}) ->
wterl_conn:close(Connection).
%% @doc Retrieve an object from the wterl backend
-spec get(riak_object:bucket(), riak_object:key(), state()) ->