diff --git a/c_src/async_nif.h b/c_src/async_nif.h index 724b8d5..9483433 100644 --- a/c_src/async_nif.h +++ b/c_src/async_nif.h @@ -92,7 +92,7 @@ struct async_nif_state { struct decl ## _args *args = &on_stack_args; \ struct decl ## _args *copy_of_args; \ struct async_nif_req_entry *req = NULL; \ - const unsigned int affinity = 0; \ + unsigned int affinity = 0; \ ErlNifEnv *new_env = NULL; \ /* argv[0] is a ref used for selective recv */ \ const ERL_NIF_TERM *argv = argv_in + 1; \ diff --git a/c_src/duration.h b/c_src/duration.h index fc31101..083ad6b 100644 --- a/c_src/duration.h +++ b/c_src/duration.h @@ -33,7 +33,7 @@ static uint64_t ts(time_scale unit) ((uint64_t)ts.tv_nsec / scale[unit].div)); } -if defined(__i386__) || defined(__x86_64__) +#if defined(__i386__) || defined(__x86_64__) /** * cpu_clock_ticks() diff --git a/c_src/fifo_q.h b/c_src/fifo_q.h index f37bf67..bbc4ff0 100644 --- a/c_src/fifo_q.h +++ b/c_src/fifo_q.h @@ -26,6 +26,8 @@ extern "C" { #endif +#define fifo_t(name) \ + struct fifo_q__ ## name * #define FIFO_QUEUE_TYPE(name) \ struct fifo_q__ ## name * #define DECL_FIFO_QUEUE(name, type) \ diff --git a/c_src/kbtree.h b/c_src/kbtree.h new file mode 100644 index 0000000..f628d66 --- /dev/null +++ b/c_src/kbtree.h @@ -0,0 +1,381 @@ +/*- + * Copyright 1997-1999, 2001, John-Mark Gurney. + * 2008, Attractive Chaos + * + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +/* Reference: http://attractivechaos.awardspace.com/kbtree.h + http://attractivechaos.awardspace.com/kbtree.h.html */ + +#ifndef __AC_KBTREE_H +#define __AC_KBTREE_H + +#include +#include +#include + +typedef struct { + int32_t is_internal:1, n:31; +} kbnode_t; + +#define __KB_KEY(type, x) ((type*)((char*)x + 4)) +#define __KB_PTR(btr, x) ((kbnode_t**)((char*)x + btr->off_ptr)) + +#define __KB_TREE_T(name) \ + typedef struct { \ + kbnode_t *root; \ + int off_key, off_ptr, ilen, elen; \ + int n, t; \ + int n_keys, n_nodes; \ + } kbtree_##name##_t; + +#define __KB_INIT(name, key_t) \ + kbtree_##name##_t *kb_init_##name(int size) \ + { \ + kbtree_##name##_t *b; \ + b = (kbtree_##name##_t*)calloc(1, sizeof(kbtree_##name##_t)); \ + b->t = ((size - 4 - sizeof(void*)) / (sizeof(void*) + sizeof(key_t)) + 1) / 2; \ + if (b->t < 2) { \ + free(b); return 0; \ + } \ + b->n = 2 * b->t - 1; \ + b->off_ptr = 4 + b->n * sizeof(key_t); \ + b->ilen = (4 + sizeof(void*) + b->n * (sizeof(void*) + sizeof(key_t)) + 3) / 4 * 4; \ + b->elen = (b->off_ptr + 3) / 4 * 4; \ + b->root = (kbnode_t*)calloc(1, b->ilen); \ + ++b->n_nodes; \ + return b; \ + } + +#define __kb_destroy(b) do { \ + int i, max = 8; \ + kbnode_t *x, **top, **stack; \ + if (b) { \ + top = stack = (kbnode_t**)calloc(max, sizeof(kbnode_t*)); \ + *top++ = (b)->root; \ + while (top != stack) { \ + x = *--top; \ + if (x->is_internal == 0) { free(x); continue; } \ + for (i = 0; i <= x->n; ++i) \ + if (__KB_PTR(b, x)[i]) { \ + if (top - stack == max) { \ + max <<= 1; \ + stack = (kbnode_t**)realloc(stack, max * sizeof(kbnode_t*)); \ + top = stack + (max>>1); \ + } \ + *top++ = __KB_PTR(b, x)[i]; \ + } \ + free(x); \ + } \ + } \ + free(b); free(stack); \ + } while (0) + +#define __KB_GET_AUX0(name, key_t, __cmp) \ + static inline int __kb_get_aux_##name(const kbnode_t * __restrict x, const key_t * __restrict k, int *r) \ + { \ + int tr, *rr, begin, end, n = x->n / 2; \ + if (x->n == 0) return -1; \ + if (__cmp(*k, __KB_KEY(key_t, x)[n]) < 0) { \ + begin = 0; end = n; \ + } else { begin = n; end = x->n - 1; } \ + rr = r? r : &tr; \ + n = end; \ + while (n >= begin && (*rr = __cmp(*k, __KB_KEY(key_t, x)[n])) < 0) --n; \ + return n; \ + } + +#define __KB_GET_AUX1(name, key_t, __cmp) \ + static inline int __kb_getp_aux_##name(const kbnode_t * __restrict x, const key_t * __restrict k, int *r) \ + { \ + int tr, *rr, begin = 0, end = x->n; \ + if (x->n == 0) return -1; \ + rr = r? r : &tr; \ + while (begin < end) { \ + int mid = (begin + end) / 2; \ + if (__cmp(__KB_KEY(key_t, x)[mid], *k) < 0) begin = mid + 1; \ + else end = mid; \ + } \ + if (begin == x->n) { *rr = 1; return x->n - 1; } \ + if ((*rr = __cmp(*k, __KB_KEY(key_t, x)[begin])) < 0) --begin; \ + return begin; \ + } + +#define __KB_GET(name, key_t) \ + static key_t *kb_getp_##name(kbtree_##name##_t *b, const key_t * __restrict k) \ + { \ + int i, r = 0; \ + kbnode_t *x = b->root; \ + while (x) { \ + i = __kb_getp_aux_##name(x, k, &r); \ + if (i >= 0 && r == 0) return &__KB_KEY(key_t, x)[i]; \ + if (x->is_internal == 0) return 0; \ + x = __KB_PTR(b, x)[i + 1]; \ + } \ + return 0; \ + } \ + static inline key_t *kb_get_##name(kbtree_##name##_t *b, const key_t k) \ + { \ + return kb_getp_##name(b, &k); \ + } + +#define __KB_INTERVAL(name, key_t) \ + static void kb_intervalp_##name(kbtree_##name##_t *b, const key_t * __restrict k, key_t **lower, key_t **upper) \ + { \ + int i, r = 0; \ + kbnode_t *x = b->root; \ + *lower = *upper = 0; \ + while (x) { \ + i = __kb_getp_aux_##name(x, k, &r); \ + if (i >= 0 && r == 0) { \ + *lower = *upper = &__KB_KEY(key_t, x)[i]; \ + return; \ + } \ + if (i >= 0) *lower = &__KB_KEY(key_t, x)[i]; \ + if (i < x->n - 1) *upper = &__KB_KEY(key_t, x)[i + 1]; \ + if (x->is_internal == 0) return; \ + x = __KB_PTR(b, x)[i + 1]; \ + } \ + } \ + static inline void kb_interval_##name(kbtree_##name##_t *b, const key_t k, key_t **lower, key_t **upper) \ + { \ + kb_intervalp_##name(b, &k, lower, upper); \ + } + +#define __KB_PUT(name, key_t, __cmp) \ + /* x must be an internal node */ \ + static void __kb_split_##name(kbtree_##name##_t *b, kbnode_t *x, int i, kbnode_t *y) \ + { \ + kbnode_t *z; \ + z = (kbnode_t*)calloc(1, y->is_internal? b->ilen : b->elen); \ + ++b->n_nodes; \ + z->is_internal = y->is_internal; \ + z->n = b->t - 1; \ + memcpy(__KB_KEY(key_t, z), __KB_KEY(key_t, y) + b->t, sizeof(key_t) * (b->t - 1)); \ + if (y->is_internal) memcpy(__KB_PTR(b, z), __KB_PTR(b, y) + b->t, sizeof(void*) * b->t); \ + y->n = b->t - 1; \ + memmove(__KB_PTR(b, x) + i + 2, __KB_PTR(b, x) + i + 1, sizeof(void*) * (x->n - i)); \ + __KB_PTR(b, x)[i + 1] = z; \ + memmove(__KB_KEY(key_t, x) + i + 1, __KB_KEY(key_t, x) + i, sizeof(key_t) * (x->n - i)); \ + __KB_KEY(key_t, x)[i] = __KB_KEY(key_t, y)[b->t - 1]; \ + ++x->n; \ + } \ + static void __kb_putp_aux_##name(kbtree_##name##_t *b, kbnode_t *x, const key_t * __restrict k) \ + { \ + int i = x->n - 1; \ + if (x->is_internal == 0) { \ + i = __kb_getp_aux_##name(x, k, 0); \ + if (i != x->n - 1) \ + memmove(__KB_KEY(key_t, x) + i + 2, __KB_KEY(key_t, x) + i + 1, (x->n - i - 1) * sizeof(key_t)); \ + __KB_KEY(key_t, x)[i + 1] = (key_t)*k; \ + ++x->n; \ + } else { \ + i = __kb_getp_aux_##name(x, k, 0) + 1; \ + if (__KB_PTR(b, x)[i]->n == 2 * b->t - 1) { \ + __kb_split_##name(b, x, i, __KB_PTR(b, x)[i]); \ + if (__cmp(*k, __KB_KEY(key_t, x)[i]) > 0) ++i; \ + } \ + __kb_putp_aux_##name(b, __KB_PTR(b, x)[i], k); \ + } \ + } \ + static void kb_putp_##name(kbtree_##name##_t *b, const key_t * __restrict k) \ + { \ + kbnode_t *r, *s; \ + ++b->n_keys; \ + r = b->root; \ + if (r->n == 2 * b->t - 1) { \ + ++b->n_nodes; \ + s = (kbnode_t*)calloc(1, b->ilen); \ + b->root = s; s->is_internal = 1; s->n = 0; \ + __KB_PTR(b, s)[0] = r; \ + __kb_split_##name(b, s, 0, r); \ + r = s; \ + } \ + __kb_putp_aux_##name(b, r, k); \ + } \ + static inline void kb_put_##name(kbtree_##name##_t *b, const key_t k) \ + { \ + kb_putp_##name(b, &k); \ + } + + +#define __KB_DEL(name, key_t) \ + static key_t __kb_delp_aux_##name(kbtree_##name##_t *b, kbnode_t *x, const key_t * __restrict k, int s) \ + { \ + int yn, zn, i, r = 0; \ + kbnode_t *xp, *y, *z; \ + key_t kp; \ + if (x == 0) return (key_t)*k; \ + if (s) { /* s can only be 0, 1 or 2 */ \ + r = x->is_internal == 0? 0 : s == 1? 1 : -1; \ + i = s == 1? x->n - 1 : -1; \ + } else i = __kb_getp_aux_##name(x, k, &r); \ + if (x->is_internal == 0) { \ + if (s == 2) ++i; \ + kp = __KB_KEY(key_t, x)[i]; \ + memmove(__KB_KEY(key_t, x) + i, __KB_KEY(key_t, x) + i + 1, (x->n - i - 1) * sizeof(key_t)); \ + --x->n; \ + return kp; \ + } \ + if (r == 0) { \ + if ((yn = __KB_PTR(b, x)[i]->n) >= b->t) { \ + xp = __KB_PTR(b, x)[i]; \ + kp = __KB_KEY(key_t, x)[i]; \ + __KB_KEY(key_t, x)[i] = __kb_delp_aux_##name(b, xp, 0, 1); \ + return kp; \ + } else if ((zn = __KB_PTR(b, x)[i + 1]->n) >= b->t) { \ + xp = __KB_PTR(b, x)[i + 1]; \ + kp = __KB_KEY(key_t, x)[i]; \ + __KB_KEY(key_t, x)[i] = __kb_delp_aux_##name(b, xp, 0, 2); \ + return kp; \ + } else if (yn == b->t - 1 && zn == b->t - 1) { \ + y = __KB_PTR(b, x)[i]; z = __KB_PTR(b, x)[i + 1]; \ + __KB_KEY(key_t, y)[y->n++] = (key_t)*k; \ + memmove(__KB_KEY(key_t, y) + y->n, __KB_KEY(key_t, z), z->n * sizeof(key_t)); \ + if (y->is_internal) memmove(__KB_PTR(b, y) + y->n, __KB_PTR(b, z), (z->n + 1) * sizeof(void*)); \ + y->n += z->n; \ + memmove(__KB_KEY(key_t, x) + i, __KB_KEY(key_t, x) + i + 1, (x->n - i - 1) * sizeof(key_t)); \ + memmove(__KB_PTR(b, x) + i + 1, __KB_PTR(b, x) + i + 2, (x->n - i - 1) * sizeof(void*)); \ + --x->n; \ + free(z); \ + return __kb_delp_aux_##name(b, y, k, s); \ + } \ + } \ + ++i; \ + if ((xp = __KB_PTR(b, x)[i])->n == b->t - 1) { \ + if (i > 0 && (y = __KB_PTR(b, x)[i - 1])->n >= b->t) { \ + memmove(__KB_KEY(key_t, xp) + 1, __KB_KEY(key_t, xp), xp->n * sizeof(key_t)); \ + if (xp->is_internal) memmove(__KB_PTR(b, xp) + 1, __KB_PTR(b, xp), (xp->n + 1) * sizeof(void*)); \ + __KB_KEY(key_t, xp)[0] = __KB_KEY(key_t, x)[i - 1]; \ + __KB_KEY(key_t, x)[i - 1] = __KB_KEY(key_t, y)[y->n - 1]; \ + if (xp->is_internal) __KB_PTR(b, xp)[0] = __KB_PTR(b, y)[y->n]; \ + --y->n; ++xp->n; \ + } else if (i < x->n && (y = __KB_PTR(b, x)[i + 1])->n >= b->t) { \ + __KB_KEY(key_t, xp)[xp->n++] = __KB_KEY(key_t, x)[i]; \ + __KB_KEY(key_t, x)[i] = __KB_KEY(key_t, y)[0]; \ + if (xp->is_internal) __KB_PTR(b, xp)[xp->n] = __KB_PTR(b, y)[0]; \ + --y->n; \ + memmove(__KB_KEY(key_t, y), __KB_KEY(key_t, y) + 1, y->n * sizeof(key_t)); \ + if (y->is_internal) memmove(__KB_PTR(b, y), __KB_PTR(b, y) + 1, (y->n + 1) * sizeof(void*)); \ + } else if (i > 0 && (y = __KB_PTR(b, x)[i - 1])->n == b->t - 1) { \ + __KB_KEY(key_t, y)[y->n++] = __KB_KEY(key_t, x)[i - 1]; \ + memmove(__KB_KEY(key_t, y) + y->n, __KB_KEY(key_t, xp), xp->n * sizeof(key_t)); \ + if (y->is_internal) memmove(__KB_PTR(b, y) + y->n, __KB_PTR(b, xp), (xp->n + 1) * sizeof(void*)); \ + y->n += xp->n; \ + memmove(__KB_KEY(key_t, x) + i - 1, __KB_KEY(key_t, x) + i, (x->n - i) * sizeof(key_t)); \ + memmove(__KB_PTR(b, x) + i, __KB_PTR(b, x) + i + 1, (x->n - i) * sizeof(void*)); \ + --x->n; \ + free(xp); \ + xp = y; \ + } else if (i < x->n && (y = __KB_PTR(b, x)[i + 1])->n == b->t - 1) { \ + __KB_KEY(key_t, xp)[xp->n++] = __KB_KEY(key_t, x)[i]; \ + memmove(__KB_KEY(key_t, xp) + xp->n, __KB_KEY(key_t, y), y->n * sizeof(key_t)); \ + if (xp->is_internal) memmove(__KB_PTR(b, xp) + xp->n, __KB_PTR(b, y), (y->n + 1) * sizeof(void*)); \ + xp->n += y->n; \ + memmove(__KB_KEY(key_t, x) + i, __KB_KEY(key_t, x) + i + 1, (x->n - i - 1) * sizeof(key_t)); \ + memmove(__KB_PTR(b, x) + i + 1, __KB_PTR(b, x) + i + 2, (x->n - i - 1) * sizeof(void*)); \ + --x->n; \ + free(y); \ + } \ + } \ + return __kb_delp_aux_##name(b, xp, k, s); \ + } \ + static key_t kb_delp_##name(kbtree_##name##_t *b, const key_t * __restrict k) \ + { \ + kbnode_t *x; \ + key_t ret; \ + ret = __kb_delp_aux_##name(b, b->root, k, 0); \ + --b->n_keys; \ + if (b->root->n == 0 && b->root->is_internal) { \ + --b->n_nodes; \ + x = b->root; \ + b->root = __KB_PTR(b, x)[0]; \ + free(x); \ + } \ + return ret; \ + } \ + static inline key_t kb_del_##name(kbtree_##name##_t *b, const key_t k) \ + { \ + return kb_delp_##name(b, &k); \ + } + +typedef struct { + kbnode_t *x; + int i; +} __kbstack_t; + +#define __kb_traverse(key_t, b, __func) do { \ + int __kmax = 8; \ + __kbstack_t *__kstack, *__kp; \ + __kp = __kstack = (__kbstack_t*)calloc(__kmax, sizeof(__kbstack_t)); \ + __kp->x = (b)->root; __kp->i = 0; \ + for (;;) { \ + while (__kp->x && __kp->i <= __kp->x->n) { \ + if (__kp - __kstack == __kmax - 1) { \ + __kmax <<= 1; \ + __kstack = (__kbstack_t*)realloc(__kstack, __kmax * sizeof(__kbstack_t)); \ + __kp = __kstack + (__kmax>>1) - 1; \ + } \ + (__kp+1)->i = 0; (__kp+1)->x = __kp->x->is_internal? __KB_PTR(b, __kp->x)[__kp->i] : 0; \ + ++__kp; \ + } \ + --__kp; \ + if (__kp >= __kstack) { \ + if (__kp->x && __kp->i < __kp->x->n) __func(&__KB_KEY(key_t, __kp->x)[__kp->i]); \ + ++__kp->i; \ + } else break; \ + } \ + free(__kstack); \ + } while (0) + +#define KBTREE_INIT(name, key_t, __cmp) \ + __KB_TREE_T(name) \ + __KB_INIT(name, key_t) \ + __KB_GET_AUX1(name, key_t, __cmp) \ + __KB_GET(name, key_t) \ + __KB_INTERVAL(name, key_t) \ + __KB_PUT(name, key_t, __cmp) \ + __KB_DEL(name, key_t) + +#define KB_DEFAULT_SIZE 512 + +#define kbtree_t(name) kbtree_##name##_t +#define kb_init(name, s) kb_init_##name(s) +#define kb_destroy(name, b) __kb_destroy(b) +#define kb_get(name, b, k) kb_get_##name(b, k) +#define kb_put(name, b, k) kb_put_##name(b, k) +#define kb_del(name, b, k) kb_del_##name(b, k) +#define kb_interval(name, b, k, l, u) kb_interval_##name(b, k, l, u) +#define kb_getp(name, b, k) kb_getp_##name(b, k) +#define kb_putp(name, b, k) kb_putp_##name(b, k) +#define kb_delp(name, b, k) kb_delp_##name(b, k) +#define kb_intervalp(name, b, k, l, u) kb_intervalp_##name(b, k, l, u) + +#define kb_size(b) ((b)->n_keys) + +#define kb_generic_cmp(a, b) (((a) > (b)) - ((a) < (b))) +#define kb_str_cmp(a, b) strcmp(a, b) + +#endif diff --git a/c_src/khash.h b/c_src/khash.h index 69549dc..ab157b1 100644 --- a/c_src/khash.h +++ b/c_src/khash.h @@ -586,7 +586,7 @@ static kh_inline khint_t __ac_Wang_hash(khint_t key) @param name Name of the hash table [symbol] @param khval_t Type of values [type] */ -#ifdef __x86_64__ +#ifdef __x86_64__ #define KHASH_MAP_INIT_PTR(name, khval_t) \ KHASH_INIT(name, void*, khval_t, 1, kh_ptr64_hash_func, kh_ptr64_hash_equal) #else diff --git a/c_src/wterl.c b/c_src/wterl.c index 3455143..53ee9b6 100644 --- a/c_src/wterl.c +++ b/c_src/wterl.c @@ -27,57 +27,68 @@ #include "common.h" #include "wiredtiger.h" -#include "async_nif.h" -#include "khash.h" #include "stats.h" - -#define CTX_CACHE_SIZE ASYNC_NIF_MAX_WORKERS +#include "async_nif.h" +#include "kbtree.h" +#include "queue.h" +#include "fifo_q.h" static ErlNifResourceType *wterl_conn_RESOURCE; static ErlNifResourceType *wterl_ctx_RESOURCE; static ErlNifResourceType *wterl_cursor_RESOURCE; -typedef struct struct WterlCtxHandle { +/* WiredTiger object names*/ +typedef char Uri[128]; + +typedef struct wterl_ctx { WT_SESSION *session; // open session - WT_CURSOR *cursors[]; // open cursors, all reset ready to reuse + WT_CURSOR **cursors; // open cursors, all reset ready to reuse + uint64_t sig; } WterlCtxHandle; -struct ctx_lru_entry { +struct cache_entry { WterlCtxHandle *ctx; - u_int64_t sig; - SLIST_HEAD(ctx, struct WterlCtxHandle*) set; - STAILQ_ENTRY(struct ctx_lru_entry) entries; + uint64_t sig; + uint64_t tstamp; }; -KHASH_SET_INIT_INT64(ctx_idx, struct ctx_group*); +#define __ctx_sig_cmp(a, b) ((((a)->sig) > ((b)->sig)) - (((a)->sig) < ((b)->sig))) +KBTREE_INIT(cache_entries, struct cache_entry*, __ctx_sig_cmp); +DECL_FIFO_QUEUE(cache_entries, struct cache_entry); -struct ctx_cache { - size_t size; - struct lru { - STAILQ_HEAD(lru, struct ctx_lru_entry*) lru; - } lru; - struct idx { - int h; - ctx_group cgs[CTX_CACHE_SIZE]; - khash_t(ctx_idx) *ctx; - } idx; -}; - -typedef struct { +typedef struct wterl_conn { WT_CONNECTION *conn; const char *session_config; - ErlNifMutex *contexts_mutex; - unsigned int num_contexts; - WterlCtx **contexts; // TODO: free this + ErlNifMutex *cache_mutex; + kbtree_t(cache_entries) *cache; + fifo_t(cache_entries) recycled_cache_entries; + SLIST_ENTRY(wterl_conn) conns; } WterlConnHandle; typedef struct { - WT_CURSOR *cursor; WT_SESSION *session; + WT_CURSOR *cursor; } WterlCursorHandle; -/* WiredTiger object names*/ -typedef char Uri[128]; +struct wterl_event_handlers { + WT_EVENT_HANDLER handlers; + ErlNifEnv *msg_env_error; + ErlNifMutex *error_mutex; + ErlNifEnv *msg_env_message; + ErlNifMutex *message_mutex; + ErlNifEnv *msg_env_progress; + ErlNifMutex *progress_mutex; + ErlNifPid to_pid; +}; + +struct wterl_priv_data { + void *async_nif_priv; // Note: must be first element in struct + ErlNifMutex *conns_mutex; + SLIST_HEAD(conns, wterl_conn) conns; + struct wterl_event_handlers eh; + char wterl_vsn[512]; + char wiredtiger_vsn[512]; +}; /* Atoms (initialized in on_load) */ static ERL_NIF_TERM ATOM_ERROR; @@ -91,57 +102,34 @@ static ERL_NIF_TERM ATOM_WTERL_VSN; static ERL_NIF_TERM ATOM_WIREDTIGER_VSN; static ERL_NIF_TERM ATOM_MSG_PID; -struct wterl_event_handlers { - WT_EVENT_HANDLER handlers; - ErlNifEnv *msg_env_error; - ErlNifMutex *error_mutex; - ErlNifEnv *msg_env_message; - ErlNifMutex *message_mutex; - ErlNifEnv *msg_env_progress; - ErlNifMutex *progress_mutex; - ErlNifPid to_pid; -}; - -/* Generators for 'conns' a named, type-specific hash table functions. */ -KHASH_MAP_INIT_PTR(conns, WterlConnHandle*); - -struct wterl_priv_data { - void *async_nif_priv; // Note: must be first element in struct - ErlNifMutex *conns_mutex; - khash_t(conns) *conns; - struct wterl_event_handlers eh; - char wterl_vsn[512]; - char wiredtiger_vsn[512]; -}; - /* Global init for async_nif. */ ASYNC_NIF_INIT(wterl); - /** - * Is the cache full? + * Is the context cache full? * - * Test to see if the cache is full or not. - * - * -> 0=false/not full yet, 1=true/cache is full + * -> 0 = no/false, anything else is true */ static int -__ctx_cache_full(struct wterl_ctx_cache *cache) +__ctx_cache_full(WterlConnHandle *conn) { - return cache->size == CTX_CACHE_SIZE ? 1 : 0; + return fifo_q_full(cache_entries, conn->recycled_cache_entries); } /** * Evict items from the cache. * - * Evict some number of items from the cache to make space for other items. + * Evict old contexts from the cache to make space for new, more frequently + * used contexts. * * -> number of items evicted */ static int -__ctx_cache_evict(struct ctx_cache *cache) +__ctx_cache_evict(WterlConnHandle *conn) { // TODO: + UNUSED(conn); + return 0; } /** @@ -150,51 +138,25 @@ __ctx_cache_evict(struct ctx_cache *cache) * See if there exists an item in the cache with a matching signature, if * so remove it from the cache and return it for use by the callee. * - * sig a 32-bit signature (hash) representing the session/cursor* needed + * sig a 64-bit signature (hash) representing the session/cursor* needed * for the operation */ static WterlCtxHandle * -__ctx_cache_find(wterl_ctx_cache *cache, u_int64_t sig) +__ctx_cache_find(WterlConnHandle *conn, const uint64_t sig) { - khiter_t k; + WterlCtxHandle *p = NULL; + struct cache_entry key, *e; - kh_get(ctx_idx, cache->idx.ctx, sig); - if (k != kh_end(h)) { - /* - * This signature exists in the hashtable, that's good news. Maybe - * there is a context open and ready for us to reuse, let's check. - */ - struct ctx_group *cg = kh_value(ctx_idx, k); - if (SLIST_EMPTY(cg->set)) { // cache miss - /* - * Nope, there are no contexts available for reuse with this - * signature. - */ - return NULL; - } else { // cache hit - /* - * Yes, we've found a context available for reuse with the - * desired signature. Remove it from the cache and return it - * to the caller. - */ - WterlCtxHandle *p = SLIST_REMOVE(cg->set); // remove from index - WterlCtxHandle *q; - STAILQ_FOREACH(q, &cache->lru, entries) { - if (p == q) { - STAILQ_REMOVE(&cache-lru, q, ctx_lru_entries, entries); - } - } - // remove from lru - cache->size--; // update cache size - return p; - } - } else { - /* - * The signature didn't match any that we're caching contexts for right - * now, so clearly there won't be any cached contexts for this either. - */ - return NULL; - } + key.sig = sig; + e = *kb_get(cache_entries, conn->cache, &key); + if (e) { + // cache hit, remove it from the tree + kb_del(cache_entries, conn->cache, &key); + p = e->ctx; + memset(e, 0, sizeof(struct cache_entry)); + fifo_q_put(cache_entries, conn->recycled_cache_entries, e); + } // else { cache miss, so p == NULL when we return } + return p; } /** @@ -204,43 +166,34 @@ __ctx_cache_find(wterl_ctx_cache *cache, u_int64_t sig) * the front of the LRU. */ static int -__ctx_cache_add(wterl_ctx_cache *cache, WterlCtxHandle *e) +__ctx_cache_add(WterlConnHandle *conn, WterlCtxHandle *c) { - khiter_t k; + struct cache_entry *e; - kh_get(ctx_idx, cache->idx.ctx, sig); - if (k != kh_end(h)) { - /* - * This signature exists in the bitmap, that's good news. We can just - * put this into the list of cached items for that signature. - */ - struct ctx_group *cg = kh_value(ctx_idx, k); - SLIST_INSERT_HEAD(cg->set, e, entries); // add to index - STAILQ_INSERT_HEAD(&cache->lru, e, entries); // add to lru - cache->size++; // update cache size - } else { - /* - * The signature didn't match any that we're caching contexts for right - * now, so we need to add a context group for it. - */ - if (cache->idx - return NULL; - } + if (__ctx_cache_full(conn)) + __ctx_cache_evict(conn); + + e = fifo_q_get(cache_entries, conn->recycled_cache_entries); + e->ctx = c; + e->sig = c->sig; + e->tstamp = cpu_clock_ticks(); + kb_put(cache_entries, conn->cache, e); + return 0; } /** - * Produce the Morton Number from two 32-bit unsigned integers. + * Produce the "Z-Index" or "Morton Number" from 2 32-bit unsigned integers. * e.g. p = 0101 1011 0100 0011 * q = 1011 1100 0001 0011 * z = 0110 0111 1101 1010 0010 0001 0000 1111 */ -static inline u_int64_t -__interleave(u_int32_t p, u_int32_t q) +static inline uint64_t +__zi(uint32_t p, uint32_t q) { - static const u_int32_t B[] = {0x55555555, 0x33333333, 0x0F0F0F0F, 0x00FF00FF}; - static const u_int32_t S[] = {1, 2, 4, 8}; - u_int32_t x, y; - u_int64_t z; + static const uint32_t B[] = {0x55555555, 0x33333333, 0x0F0F0F0F, 0x00FF00FF}; + static const uint32_t S[] = {1, 2, 4, 8}; + uint32_t x, y; + uint64_t z; x = p & 0x0000FFFF; // Interleave lower 16 bits of p as x and q as y, so the y = q & 0x0000FFFF; // bits of x are in the even positions and bits from y @@ -285,8 +238,8 @@ __interleave(u_int32_t p, u_int32_t q) * s a NULL terminated set of bytes to be hashed * -> an integer hash encoding of the bytes */ -static inline unsigned int -__str_hash_func(const char *s) +static inline uint32_t +__str_hash(const char *s) { unsigned int h = (unsigned int)*s; if (h) for (++s ; *s; ++s) h = (h << 5) - h + (unsigned int)*s; @@ -303,26 +256,141 @@ __str_hash_func(const char *s) * session_config the string used to configure the WT_SESSION * ... each pair of items in the varargs array is a table name, * cursor config pair + * -> number of variable arguments processed */ -static u_int32_t -__ctx_cache_sig(const char *session_config, ...) +static int +__ctx_cache_sig_(const char *c, va_list ap, uint64_t *h) { - va_list ap; - int i; - u_int64_t h; + int i = 0; - if (NULL == session_config) + if (NULL == c) return 0; - h = __str_hash_fn(session_config); + *h = __str_hash(c); - va_start (ap, count); - for (i = 0; i < count; i++) { - h = __morton(h, __str_hash_fn(va_arg(ap, const char *))); - h <<= 1; + while (*c) { + *h = __zi((uint32_t)(*h & 0xFFFFFFFF), __str_hash(va_arg(ap, const char *))); + *h <<= 1; + i++; } + return i; +} + +#if 0 +static uint64_t +__ctx_cache_sig(const char *c, ...) +{ + int i; + va_list ap; + uint64_t h; + + if (NULL == c) + return 0; + + va_start(ap, c); + i = __ctx_cache_sig_(c, ap, &h); va_end (ap); - return (u_int32_t)(h & 0xFFFFFFFF); + + return i; +} +#endif + +/** + * Get a reusable cursor that was opened for a particular worker within its + * session. + */ +static int +__retain_ctx(WterlConnHandle *conn_handle, WterlCtxHandle **ctx, + const char *session_config, ...) +{ + int i, count; + va_list ap; + uint64_t sig; + const char *c; + + c = session_config; + va_start(ap, session_config); + count = __ctx_cache_sig_(session_config, ap, &sig); + va_end (ap); + + enif_mutex_lock(conn_handle->cache_mutex); + *ctx = __ctx_cache_find(conn_handle, sig); + if (NULL == *ctx) { + // cache miss + WT_CONNECTION *conn = conn_handle->conn; + WT_SESSION *session = NULL; + int rc = conn->open_session(conn, NULL, session_config, &session); + if (rc != 0) + return rc; + size_t s = sizeof(WterlCtxHandle) + ((count / 2) * sizeof(WT_CURSOR*)); + *ctx = enif_alloc_resource(wterl_ctx_RESOURCE, s); + if (NULL == *ctx) { + session->close(session, NULL); + return ENOMEM; + } + memset(*ctx, 0, s); + (*ctx)->sig = sig; + (*ctx)->session = session; + WT_CURSOR **cursors = (*ctx)->cursors; + session_config = c; + va_start(ap, session_config); + for (i = 0; i < (count / 2); i++) { + const char *uri = va_arg(ap, const char *); + const char *config = va_arg(ap, const char *); + rc = session->open_cursor(session, uri, NULL, config, &cursors[i]); + if (rc != 0) { + session->close(session, NULL); // this will free cursors too + return rc; + } + } + va_end (ap); + } // else { cache hit so 'ctx' is a reusable session/cursor } + enif_mutex_unlock(conn_handle->cache_mutex); + return 0; +} + +static void +__release_ctx(WterlConnHandle *conn_handle, WterlCtxHandle *ctx) +{ + int i, c; + WT_CURSOR *cursor; + + c = sizeof(ctx->cursors) / sizeof(ctx->cursors[0]); + for (i = 0; i < c; i++) { + cursor = ctx->cursors[i]; + cursor->reset(cursor); + } + enif_mutex_lock(conn_handle->cache_mutex); + __ctx_cache_add(conn_handle, ctx); + enif_mutex_unlock(conn_handle->cache_mutex); +} + +/** + * Close all sessions and all cursors open on any objects. + * + * Note: always call within enif_mutex_lock/unlock(conn_handle->cache_mutex) + */ +void +__close_all_sessions(WterlConnHandle *conn_handle) +{ + kbtree_t(cache_entries) *t = conn_handle->cache; + +#define traverse_f(p) { kb_del(cache_entries, t, *p); } + __kb_traverse(struct cache_entry *, t, traverse_f); +#undef traverse_f +} + +/** + * Close cursors open on 'uri' object. + * + * Note: always call within enif_mutex_lock/unlock(conn_handle->cache_mutex) + */ +void +__close_cursors_on(WterlConnHandle *conn_handle, const char *uri) +{ + UNUSED(uri); + // TODO: find a way to only close those session/cursor* open on uri + __close_all_sessions(conn_handle); } /** @@ -440,172 +508,6 @@ __wterl_progress_handler(WT_EVENT_HANDLER *handler, const char *operation, uint6 return rc; } -/** - * Open a WT_SESSION for the thread context 'ctx' to use, also init the - * shared cursor hash table. - * - * Note: always call within enif_mutex_lock/unlock(conn_handle->contexts_mutex) - */ -static int -__init_session_and_cursor_cache(WterlConnHandle *conn_handle, WterlCtx *ctx) -{ - /* Create a context for this worker thread to reuse. */ - WT_CONNECTION *conn = conn_handle->conn; - int rc = conn->open_session(conn, NULL, conn_handle->session_config, &ctx->session); - if (rc != 0) { - ctx->session = NULL; - return rc; - } - - ctx->cursors = kh_init(cursors); - if (!ctx->cursors) { - ctx->session->close(ctx->session, NULL); - ctx->session = NULL; - return ENOMEM; - } - - return 0; -} - -/** - * Get the per-worker reusable WT_SESSION for a worker_id. - */ -static int -__session_for(WterlConnHandle *conn_handle, unsigned int worker_id, WT_SESSION **session) -{ - WterlCtx *ctx = &conn_handle->contexts[worker_id]; - int rc = 0; - - if (ctx->session == NULL) { - enif_mutex_lock(conn_handle->contexts_mutex); - rc = __init_session_and_cursor_cache(conn_handle, ctx); - enif_mutex_unlock(conn_handle->contexts_mutex); - } - *session = ctx->session; - return rc; -} - -/** - * Close all sessions and all cursors open on any objects. - * - * Note: always call within enif_mutex_lock/unlock(conn_handle->contexts_mutex) - */ -void -__close_all_sessions(WterlConnHandle *conn_handle) -{ - int i; - - for (i = 0; i < ASYNC_NIF_MAX_WORKERS; i++) { - WterlCtx *ctx = &conn_handle->contexts[i]; - if (ctx->session != NULL) { - WT_SESSION *session = ctx->session; - khash_t(cursors) *h = ctx->cursors; - khiter_t itr; - for (itr = kh_begin(h); itr != kh_end(h); ++itr) { - if (kh_exist(h, itr)) { - WT_CURSOR *cursor = kh_val(h, itr); - char *key = (char *)kh_key(h, itr); - cursor->close(cursor); - kh_del(cursors, h, itr); - enif_free(key); - kh_value(h, itr) = NULL; - } - } - kh_destroy(cursors, h); - session->close(session, NULL); - ctx->session = NULL; - } - } -} - -/** - * Close cursors open on 'uri' object. - * - * Note: always call within enif_mutex_lock/unlock(conn_handle->contexts_mutex) - */ -void -__close_cursors_on(WterlConnHandle *conn_handle, const char *uri) -{ - int i; - - for (i = 0; i < ASYNC_NIF_MAX_WORKERS; i++) { - WterlCtx *ctx = &conn_handle->contexts[i]; - if (ctx->session != NULL) { - khash_t(cursors) *h = ctx->cursors; - khiter_t itr = kh_get(cursors, h, (char *)uri); - if (itr != kh_end(h)) { - WT_CURSOR *cursor = kh_value(h, itr); - char *key = (char *)kh_key(h, itr); - cursor->close(cursor); - kh_del(cursors, h, itr); - enif_free(key); - kh_value(h, itr) = NULL; - } - } - } -} - -/** - * Get a reusable cursor that was opened for a particular worker within its - * session. - */ -static int -__retain_ctx(WterlConnHandle *conn_handle, const char *uri, WterlCtx **ctx) -{ - /* Check to see if we have a cursor open for this uri and if so reuse it. */ - WterlCtx *ctx = &conn_handle->contexts[worker_id]; - khash_t(cursors) *h = NULL; - khiter_t itr; - int rc; - unsigned int h = __str_hash_func(uri); // TODO: add config at some point - - if (ctx->session == NULL) { - enif_mutex_lock(conn_handle->contexts_mutex); - rc = __init_session_and_cursor_cache(conn_handle, ctx); - enif_mutex_unlock(conn_handle->contexts_mutex); - if (rc != 0) - return rc; - } - - h = ctx->cursors; - itr = kh_get(cursors, h, (char *)uri); - if (itr != kh_end(h)) { - // key exists in hash table, retrieve it - *cursor = (WT_CURSOR*)kh_value(h, itr); - } else { - // key does not exist in hash table, create and insert one - enif_mutex_lock(conn_handle->contexts_mutex); - WT_SESSION *session = conn_handle->contexts[worker_id].session; - rc = session->open_cursor(session, uri, NULL, "overwrite,raw", cursor); - if (rc != 0) { - enif_mutex_unlock(conn_handle->contexts_mutex); - return rc; - } - - char *key = enif_alloc(sizeof(Uri)); - if (!key) { - session->close(session, NULL); - enif_mutex_unlock(conn_handle->contexts_mutex); - return ENOMEM; - } - memcpy(key, uri, 128); - int itr_status; - itr = kh_put(cursors, h, key, &itr_status); - kh_value(h, itr) = *cursor; - enif_mutex_unlock(conn_handle->contexts_mutex); - } - return 0; -} - -static void -__release_ctx(WterlConnHandle *conn_handle, const char *uri, WterlCtx *ctx) -{ - UNUSED(conn_handle); - UNUSED(worker_id); - UNUSED(uri); - cursor->reset(cursor); -} - /** * Convenience function to generate {error, {errno, Reason}} or 'not_found' * Erlang terms to return to callers. @@ -625,9 +527,9 @@ __strerror_term(ErlNifEnv* env, int rc) atom rather than the message when matching in Erlang. You've been warned. */ return enif_make_tuple2(env, ATOM_ERROR, - enif_make_tuple2(env, - enif_make_atom(env, erl_errno_id(rc)), - enif_make_string(env, wiredtiger_strerror(rc), ERL_NIF_LATIN1))); + enif_make_tuple2(env, + enif_make_atom(env, erl_errno_id(rc)), + enif_make_string(env, wiredtiger_strerror(rc), ERL_NIF_LATIN1))); } } @@ -693,30 +595,27 @@ ASYNC_NIF_DECL( return; } memcpy(sc, session_config.data, session_config.size); - conn_handle->session_config = (const char *)sc; } else { conn_handle->session_config = NULL; } - conn_handle->contexts_mutex = enif_mutex_create(NULL); - enif_mutex_lock(conn_handle->contexts_mutex); + conn_handle->cache_mutex = enif_mutex_create(NULL); + enif_mutex_lock(conn_handle->cache_mutex); conn_handle->conn = conn; - memset(conn_handle->contexts, 0, sizeof(WterlCtx) * ASYNC_NIF_MAX_WORKERS); ERL_NIF_TERM result = enif_make_resource(env, conn_handle); + /* Init tree which manages the cache of session/cursor(s) */ + conn_handle->cache = kb_init(cache_entries, ASYNC_NIF_MAX_WORKERS); // TODO: size + conn_handle->recycled_cache_entries = fifo_q_new(cache_entries, ASYNC_NIF_MAX_WORKERS); + /* Keep track of open connections so as to free when unload/reload/etc. are called. */ - khash_t(conns) *h; enif_mutex_lock(args->priv->conns_mutex); - h = args->priv->conns; - int itr_status = 0; - khiter_t itr = kh_put(conns, h, conn, &itr_status); - if (itr_status != 0) // 0 indicates the key exists already - kh_value(h, itr) = conn_handle; + SLIST_INSERT_HEAD(&args->priv->conns, conn_handle, conns); enif_mutex_unlock(args->priv->conns_mutex); enif_release_resource(conn_handle); - enif_mutex_unlock(conn_handle->contexts_mutex); + enif_mutex_unlock(conn_handle->cache_mutex); ASYNC_NIF_REPLY(enif_make_tuple2(env, ATOM_OK, result)); } else @@ -753,7 +652,7 @@ ASYNC_NIF_DECL( { // work /* Free up the shared sessions and cursors. */ - enif_mutex_lock(args->conn_handle->contexts_mutex); + enif_mutex_lock(args->conn_handle->cache_mutex); __close_all_sessions(args->conn_handle); if (args->conn_handle->session_config) { enif_free((char *)args->conn_handle->session_config); @@ -763,19 +662,11 @@ ASYNC_NIF_DECL( int rc = conn->close(conn, NULL); /* Connection is closed, remove it so we don't free on unload/reload/etc. */ - khash_t(conns) *h; enif_mutex_lock(args->priv->conns_mutex); - h = args->priv->conns; - khiter_t itr; - itr = kh_get(conns, h, conn); - if (itr != kh_end(h)) { - /* key exists in table (as expected) delete it */ - kh_del(conns, h, itr); - kh_value(h, itr) = NULL; - } + SLIST_REMOVE(&args->priv->conns, args->conn_handle, wterl_conn, conns); enif_mutex_unlock(args->priv->conns_mutex); - enif_mutex_unlock(args->conn_handle->contexts_mutex); - enif_mutex_destroy(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); + enif_mutex_destroy(args->conn_handle->cache_mutex); memset(args->conn_handle, 0, sizeof(WterlConnHandle)); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); @@ -871,12 +762,12 @@ ASYNC_NIF_DECL( { // work /* This call requires that there be no open cursors referencing the object. */ - enif_mutex_lock(args->conn_handle->contexts_mutex); + enif_mutex_lock(args->conn_handle->cache_mutex); __close_cursors_on(args->conn_handle, args->uri); ErlNifBinary config; if (!enif_inspect_binary(env, args->config, &config)) { - enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(enif_make_badarg(env)); return; } @@ -888,7 +779,7 @@ ASYNC_NIF_DECL( WT_SESSION *session = NULL; int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); if (rc != 0) { - enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } @@ -898,7 +789,7 @@ ASYNC_NIF_DECL( this will result in EBUSY(16) "Device or resource busy". */ rc = session->drop(session, args->uri, (const char*)config.data); (void)session->close(session, NULL); - enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); }, { // post @@ -938,12 +829,12 @@ ASYNC_NIF_DECL( { // work /* This call requires that there be no open cursors referencing the object. */ - enif_mutex_lock(args->conn_handle->contexts_mutex); + enif_mutex_lock(args->conn_handle->cache_mutex); __close_cursors_on(args->conn_handle, args->oldname); ErlNifBinary config; if (!enif_inspect_binary(env, args->config, &config)) { - enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(enif_make_badarg(env)); return; } @@ -955,7 +846,7 @@ ASYNC_NIF_DECL( WT_SESSION *session = NULL; int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); if (rc != 0) { - enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } @@ -966,7 +857,7 @@ ASYNC_NIF_DECL( this will result in EBUSY(16) "Device or resource busy". */ rc = session->rename(session, args->oldname, args->newname, (const char*)config.data); (void)session->close(session, NULL); - enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); }, { // post @@ -1006,12 +897,12 @@ ASYNC_NIF_DECL( { // work /* This call requires that there be no open cursors referencing the object. */ - enif_mutex_lock(args->conn_handle->contexts_mutex); + enif_mutex_lock(args->conn_handle->cache_mutex); __close_cursors_on(args->conn_handle, args->uri); ErlNifBinary config; if (!enif_inspect_binary(env, args->config, &config)) { - enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(enif_make_badarg(env)); return; } @@ -1023,14 +914,14 @@ ASYNC_NIF_DECL( WT_SESSION *session = NULL; int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); if (rc != 0) { - enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } rc = session->salvage(session, args->uri, (const char*)config.data); (void)session->close(session, NULL); - enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); }, { // post @@ -1134,17 +1025,17 @@ ASYNC_NIF_DECL( } args->config = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[4]); enif_keep_resource((void*)args->conn_handle); - affinity = args->uri; + affinity = __str_hash(args->uri); }, { // work /* This call requires that there be no open cursors referencing the object. */ - enif_mutex_lock(args->conn_handle->contexts_mutex); + enif_mutex_lock(args->conn_handle->cache_mutex); __close_cursors_on(args->conn_handle, args->uri); ErlNifBinary config; if (!enif_inspect_binary(env, args->config, &config)) { - enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(enif_make_badarg(env)); return; } @@ -1157,7 +1048,7 @@ ASYNC_NIF_DECL( WT_SESSION *session = NULL; int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); if (rc != 0) { - enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } @@ -1172,7 +1063,7 @@ ASYNC_NIF_DECL( mess. */ if (!args->from_first) { if (!enif_inspect_binary(env, args->start, &start_key)) { - enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(enif_make_badarg(env)); return; } @@ -1180,7 +1071,7 @@ ASYNC_NIF_DECL( rc = session->open_cursor(session, args->uri, NULL, "raw", &start); if (rc != 0) { session->close(session, NULL); - enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } @@ -1190,7 +1081,7 @@ ASYNC_NIF_DECL( if (rc != 0) { start->close(start); session->close(session, NULL); - enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } @@ -1205,7 +1096,7 @@ ASYNC_NIF_DECL( if (!enif_inspect_binary(env, args->stop, &stop_key)) { start->close(start); session->close(session, NULL); - enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(enif_make_badarg(env)); return; } @@ -1214,7 +1105,7 @@ ASYNC_NIF_DECL( if (rc != 0) { start->close(start); session->close(session, NULL); - enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } @@ -1225,7 +1116,7 @@ ASYNC_NIF_DECL( start->close(start); stop->close(stop); session->close(session, NULL); - enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } @@ -1243,7 +1134,7 @@ ASYNC_NIF_DECL( start->close(start); stop->close(stop); session->close(session, NULL); - enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); }, { // post @@ -1280,12 +1171,12 @@ ASYNC_NIF_DECL( { // work /* This call requires that there be no open cursors referencing the object. */ - enif_mutex_lock(args->conn_handle->contexts_mutex); + enif_mutex_lock(args->conn_handle->cache_mutex); __close_cursors_on(args->conn_handle, args->uri); ErlNifBinary config; if (!enif_inspect_binary(env, args->config, &config)) { - enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(enif_make_badarg(env)); return; } @@ -1297,14 +1188,14 @@ ASYNC_NIF_DECL( WT_SESSION *session = NULL; int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); if (rc != 0) { - enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } rc = session->upgrade(session, args->uri, (const char*)config.data); (void)session->close(session, NULL); - enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); }, { // post @@ -1342,12 +1233,12 @@ ASYNC_NIF_DECL( { // work /* This call requires that there be no open cursors referencing the object. */ - enif_mutex_lock(args->conn_handle->contexts_mutex); + enif_mutex_lock(args->conn_handle->cache_mutex); __close_all_sessions(args->conn_handle); ErlNifBinary config; if (!enif_inspect_binary(env, args->config, &config)) { - enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(enif_make_badarg(env)); return; } @@ -1359,14 +1250,14 @@ ASYNC_NIF_DECL( WT_SESSION *session = NULL; int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); if (rc != 0) { - enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } rc = session->verify(session, args->uri, (const char*)config.data); (void)session->close(session, NULL); - enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); }, { // post @@ -1399,7 +1290,7 @@ ASYNC_NIF_DECL( } args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); enif_keep_resource((void*)args->conn_handle); - affinity = args->uri; + affinity = __str_hash(args->uri); }, { // work @@ -1409,14 +1300,16 @@ ASYNC_NIF_DECL( return; } - WterlCtx *ctx = NULL; + WterlCtxHandle *ctx = NULL; WT_CURSOR *cursor = NULL; - int rc = __retain_ctx(args->conn_handle, args->uri, &ctx); + int rc = __retain_ctx(args->conn_handle, &ctx, + args->conn_handle->session_config, + args->uri, "overwrite,raw"); if (rc != 0) { ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } - cursor = ctx->cursor; + cursor = ctx->cursors[0]; WT_ITEM item_key; item_key.data = key.data; @@ -1424,7 +1317,7 @@ ASYNC_NIF_DECL( cursor->set_key(cursor, &item_key); rc = cursor->remove(cursor); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); - __release_ctx(args->conn_handle, args->uri, cursor); + __release_ctx(args->conn_handle, ctx); }, { // post @@ -1456,7 +1349,7 @@ ASYNC_NIF_DECL( } args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); enif_keep_resource((void*)args->conn_handle); - affinity = args->uri; + affinity = __str_hash(args->uri); }, { // work @@ -1466,14 +1359,16 @@ ASYNC_NIF_DECL( return; } - WterlCtx *ctx = NULL + WterlCtxHandle *ctx = NULL; WT_CURSOR *cursor = NULL; - int rc = __retain_ctx(args->conn_handle, args->uri, &ctx); + int rc = __retain_ctx(args->conn_handle, &ctx, + args->conn_handle->session_config, + args->uri, "overwrite,raw"); if (rc != 0) { ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } - cursor = ctx->cursor; + cursor = ctx->cursors[0]; WT_ITEM item_key; WT_ITEM item_value; @@ -1495,7 +1390,7 @@ ASYNC_NIF_DECL( unsigned char *bin = enif_make_new_binary(env, item_value.size, &value); memcpy(bin, item_value.data, item_value.size); ASYNC_NIF_REPLY(enif_make_tuple2(env, ATOM_OK, value)); - __release_ctx(args->conn_handle, args->uri, ctx); + __release_ctx(args->conn_handle, ctx); }, { // post @@ -1531,7 +1426,7 @@ ASYNC_NIF_DECL( args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); args->value = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[3]); enif_keep_resource((void*)args->conn_handle); - affinity = args->uri; + affinity = __str_hash(args->uri); }, { // work @@ -1546,14 +1441,16 @@ ASYNC_NIF_DECL( return; } - WterlCtx *ctx = NULL; + WterlCtxHandle *ctx = NULL; WT_CURSOR *cursor = NULL; - int rc = __retain_ctx(args->conn_handle, args->uri, &ctx); + int rc = __retain_ctx(args->conn_handle, &ctx, + args->conn_handle->session_config, + args->uri, "overwrite,raw"); if (rc != 0) { ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } - cursor = ctx->cursors; + cursor = ctx->cursors[0]; WT_ITEM item_key; WT_ITEM item_value; @@ -1564,7 +1461,7 @@ ASYNC_NIF_DECL( item_value.size = value.size; cursor->set_value(cursor, &item_value); rc = cursor->insert(cursor); - __release_ctx(args->conn_handle, args->uri, ctx); + __release_ctx(args->conn_handle, ctx); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); }, { // post @@ -1597,7 +1494,7 @@ ASYNC_NIF_DECL( } args->config = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); enif_keep_resource((void*)args->conn_handle); - affinity = args->uri; + affinity = __str_hash(args->uri); }, { // work @@ -2301,12 +2198,7 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info) memset(priv, 0, sizeof(struct wterl_priv_data)); priv->conns_mutex = enif_mutex_create(NULL); - priv->conns = kh_init(conns); - if (!priv->conns) { - enif_mutex_destroy(priv->conns_mutex); - enif_free(priv); - return ENOMEM; - } + SLIST_INIT(&priv->conns); struct wterl_event_handlers *eh = &priv->eh; eh->error_mutex = enif_mutex_create(NULL); @@ -2333,7 +2225,6 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info) pointer to the async_nif's private data which we set here. */ ASYNC_NIF_LOAD(wterl, priv->async_nif_priv); if (!priv->async_nif_priv) { - kh_destroy(conns, priv->conns); enif_mutex_destroy(priv->conns_mutex); enif_free(priv); return ENOMEM; @@ -2358,61 +2249,29 @@ on_reload(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info) static void on_unload(ErlNifEnv *env, void *priv_data) { - unsigned int i; struct wterl_priv_data *priv = (struct wterl_priv_data *)priv_data; - khash_t(conns) *h; - khiter_t itr_conns; WterlConnHandle *conn_handle; + struct cache_entry *e; enif_mutex_lock(priv->conns_mutex); - h = priv->conns; - for (itr_conns = kh_begin(h); itr_conns != kh_end(h); ++itr_conns) { - if (kh_exist(h, itr_conns)) { - conn_handle = kh_val(h, itr_conns); - if (conn_handle) { - enif_mutex_lock(conn_handle->contexts_mutex); - enif_free((void*)conn_handle->session_config); - for (i = 0; i < ASYNC_NIF_MAX_WORKERS; i++) { - WterlCtx *ctx = &conn_handle->contexts[i]; - if (ctx->session != NULL) { - WT_SESSION *session = ctx->session; - khash_t(cursors) *h = ctx->cursors; - khiter_t itr_cursors; - for (itr_cursors = kh_begin(h); itr_cursors != kh_end(h); ++itr_cursors) { - if (kh_exist(h, itr_cursors)) { - WT_CURSOR *cursor = kh_val(h, itr_cursors); - char *key = (char *)kh_key(h, itr_cursors); - cursor->close(cursor); - kh_del(cursors, h, itr_cursors); - enif_free(key); - kh_value(h, itr_cursors) = NULL; - } - } - kh_destroy(cursors, h); - session->close(session, NULL); - } - } - } - - /* This would have closed all cursors and sessions for us - but we do that explicitly above. */ - conn_handle->conn->close(conn_handle->conn, NULL); - } + /* Lock the cache mutex before unloading the async_nif to prevent new + work from coming in while shutting down. */ + SLIST_FOREACH(conn_handle, &priv->conns, conns) { + enif_mutex_lock(conn_handle->cache_mutex); } - /* Continue to hold the context mutex while unloading the async_nif - to prevent new work from coming in while shutting down. */ ASYNC_NIF_UNLOAD(wterl, env, priv->async_nif_priv); - for (itr_conns = kh_begin(h); itr_conns != kh_end(h); ++itr_conns) { - if (kh_exist(h, itr_conns)) { - conn_handle = kh_val(h, itr_conns); - if (conn_handle) { - enif_mutex_unlock(conn_handle->contexts_mutex); - enif_mutex_destroy(conn_handle->contexts_mutex); - } - } + SLIST_FOREACH(conn_handle, &priv->conns, conns) { + fifo_q_foreach(cache_entries, conn_handle->recycled_cache_entries, e, { + WterlCtxHandle *ctx = e->ctx; + ctx->session->close(ctx->session, NULL); + }); + fifo_q_free(cache_entries, conn_handle->recycled_cache_entries); + kb_destroy(cache_entries, conn_handle->cache); + enif_mutex_unlock(conn_handle->cache_mutex); + enif_mutex_destroy(conn_handle->cache_mutex); } /* At this point all WiredTiger state and threads are free'd/stopped so there @@ -2429,7 +2288,6 @@ on_unload(ErlNifEnv *env, void *priv_data) if (eh->msg_env_progress) enif_free_env(eh->msg_env_progress); - kh_destroy(conns, h); enif_mutex_unlock(priv->conns_mutex); enif_mutex_destroy(priv->conns_mutex); enif_free(priv);