diff --git a/Makefile b/Makefile index e485507..0e2e357 100644 --- a/Makefile +++ b/Makefile @@ -84,9 +84,9 @@ repl: @$(ERL) -pa ebin -pz deps/lager/ebin eunit-repl: - @$(ERL) -pa .eunit deps/lager/ebin + @$(ERL) erl -pa .eunit -pz deps/lager/ebin -ERL_TOP= /home/gburd/eng/otp_R15B01 +ERL_TOP= /home/gburd/repos/otp_R15B01 CERL= ${ERL_TOP}/bin/cerl VALGRIND_MISC_FLAGS= "--verbose --leak-check=full --show-reachable=yes --trace-children=yes --track-origins=yes --suppressions=${ERL_TOP}/erts/emulator/valgrind/suppress.standard --show-possibly-lost=no --malloc-fill=AB --free-fill=CD" diff --git a/c_src/async_nif.h b/c_src/async_nif.h index 26d556f..d282bfb 100644 --- a/c_src/async_nif.h +++ b/c_src/async_nif.h @@ -47,6 +47,7 @@ struct async_nif_req_entry { void *args; void (*fn_work)(ErlNifEnv*, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *); void (*fn_post)(void *); + const char *func; }; DECL_FIFO_QUEUE(reqs, struct async_nif_req_entry); @@ -92,7 +93,7 @@ struct async_nif_state { struct decl ## _args *args = &on_stack_args; \ struct decl ## _args *copy_of_args; \ struct async_nif_req_entry *req = NULL; \ - unsigned int affinity = 0; \ + unsigned int affinity = 0; \ ErlNifEnv *new_env = NULL; \ /* argv[0] is a ref used for selective recv */ \ const ERL_NIF_TERM *argv = argv_in + 1; \ @@ -104,13 +105,16 @@ struct async_nif_state { enif_make_atom(env, "shutdown")); \ req = async_nif_reuse_req(async_nif); \ new_env = req->env; \ - if (!req) \ - return enif_make_tuple2(env, enif_make_atom(env, "error"), \ - enif_make_atom(env, "eagain")); \ + if (!req) { \ + async_nif_recycle_req(req, async_nif); \ + return enif_make_tuple2(env, enif_make_atom(env, "error"), \ + enif_make_atom(env, "eagain")); \ + } \ do pre_block while(0); \ copy_of_args = (struct decl ## _args *)enif_alloc(sizeof(struct decl ## _args)); \ if (!copy_of_args) { \ fn_post_ ## decl (args); \ + async_nif_recycle_req(req, async_nif); \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \ enif_make_atom(env, "enomem")); \ } \ @@ -120,12 +124,14 @@ struct async_nif_state { req->args = (void*)copy_of_args; \ req->fn_work = (void (*)(ErlNifEnv *, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *))fn_work_ ## decl ; \ req->fn_post = (void (*)(void *))fn_post_ ## decl; \ + req->func = __func__; \ int h = -1; \ if (affinity) \ h = affinity % async_nif->num_queues; \ ERL_NIF_TERM reply = async_nif_enqueue_req(async_nif, req, h); \ if (!reply) { \ fn_post_ ## decl (args); \ + async_nif_recycle_req(req, async_nif); \ enif_free(copy_of_args); \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \ enif_make_atom(env, "shutdown")); \ @@ -212,8 +218,13 @@ async_nif_reuse_req(struct async_nif_state *async_nif) void async_nif_recycle_req(struct async_nif_req_entry *req, struct async_nif_state *async_nif) { + ErlNifEnv *env = NULL; STAT_TOCK(async_nif, qwait); enif_mutex_lock(async_nif->recycled_req_mutex); + env = req->env; + enif_clear_env(env); + memset(req, 0, sizeof(struct async_nif_req_entry)); + req->env = env; fifo_q_put(reqs, async_nif->recycled_reqs, req); enif_mutex_unlock(async_nif->recycled_req_mutex); } @@ -257,13 +268,13 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en return 0; } if (fifo_q_size(reqs, q->reqs) > async_nif->num_queues) { - double await = STAT_MEAN_LOG2_SAMPLE(async_nif, qwait); - double await_inthisq = STAT_MEAN_LOG2_SAMPLE(q, qwait); - if (fifo_q_full(reqs, q->reqs) || await_inthisq > await) { - enif_mutex_unlock(q->reqs_mutex); - qid = (qid + 1) % async_nif->num_queues; - q = &async_nif->queues[qid]; - } + double await = STAT_MEAN_LOG2_SAMPLE(async_nif, qwait); + double await_inthisq = STAT_MEAN_LOG2_SAMPLE(q, qwait); + if (fifo_q_full(reqs, q->reqs) || await_inthisq > await) { + enif_mutex_unlock(q->reqs_mutex); + qid = (qid + 1) % async_nif->num_queues; + q = &async_nif->queues[qid]; + } } else { break; } @@ -335,7 +346,6 @@ async_nif_worker_fn(void *arg) req->fn_post = 0; enif_free(req->args); req->args = NULL; - enif_clear_env(req->env); async_nif_recycle_req(req, async_nif); req = NULL; } diff --git a/c_src/cas.h b/c_src/cas.h new file mode 100644 index 0000000..ea81dbf --- /dev/null +++ b/c_src/cas.h @@ -0,0 +1,153 @@ +/* + * wterl: an Erlang NIF for WiredTiger + * + * Copyright (c) 2012-2013 Basho Technologies, Inc. All Rights Reserved. + * + * This file is provided to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + */ + +/* + * Most of the following source code is copied directly from: "The Lock-Free + * Library" (http://www.cl.cam.ac.uk/research/srg/netos/lock-free/) reused and + * redistrubuted in accordance with their license: + * + * Copyright (c) 2002-2003 K A Fraser, All Rights Reserved. + * + * * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * * The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef __CAS_H_ +#define __CAS_H_ + +#define CACHE_LINE_SIZE 64 + +#define ATOMIC_ADD_TO(_v,_x) \ +do { \ + int __val = (_v), __newval; \ + while ( (__newval = CASIO(&(_v),__val,__val+(_x))) != __val ) \ + __val = __newval; \ +} while ( 0 ) + +#define ATOMIC_SET_TO(_v,_x) \ +do { \ + int __val = (_v), __newval; \ + while ( (__newval = CASIO(&(_v),__val,__val=(_x))) != __val ) \ + __val = __newval; \ +} while ( 0 ) + +#define ALIGNED_ENIF_ALLOC(_s) \ + ((void *)(((unsigned long)enif_alloc((_s)+CACHE_LINE_SIZE*2) + \ + CACHE_LINE_SIZE - 1) & ~(CACHE_LINE_SIZE-1))) \ + +/* + * I. Compare-and-swap. + */ + +/* + * This is a strong barrier! Reads cannot be delayed beyond a later store. + * Reads cannot be hoisted beyond a LOCK prefix. Stores always in-order. + */ +#define CAS(_a, _o, _n) \ +({ __typeof__(_o) __o = _o; \ + __asm__ __volatile__( \ + "lock cmpxchg %3,%1" \ + : "=a" (__o), "=m" (*(volatile unsigned int *)(_a)) \ + : "0" (__o), "r" (_n) ); \ + __o; \ +}) + +#define FAS(_a, _n) \ +({ __typeof__(_n) __o; \ + __asm__ __volatile__( \ + "lock xchg %0,%1" \ + : "=r" (__o), "=m" (*(volatile unsigned int *)(_a)) \ + : "0" (_n) ); \ + __o; \ +}) + +#define CAS64(_a, _o, _n) \ +({ __typeof__(_o) __o = _o; \ + __asm__ __volatile__( \ + "movl %3, %%ecx;" \ + "movl %4, %%ebx;" \ + "lock cmpxchg8b %1" \ + : "=A" (__o), "=m" (*(volatile unsigned long long *)(_a)) \ + : "0" (__o), "m" (_n >> 32), "m" (_n) \ + : "ebx", "ecx" ); \ + __o; \ +}) + +/* Update Integer location, return Old value. */ +#define CASIO CAS +#define FASIO FAS +/* Update Pointer location, return Old value. */ +#define CASPO CAS +#define FASPO FAS +/* Update 32/64-bit location, return Old value. */ +#define CAS32O CAS +#define CAS64O CAS64 + +/* + * II. Memory barriers. + * WMB(): All preceding write operations must commit before any later writes. + * RMB(): All preceding read operations must commit before any later reads. + * MB(): All preceding memory accesses must commit before any later accesses. + * + * If the compiler does not observe these barriers (but any sane compiler + * will!), then VOLATILE should be defined as 'volatile'. + */ + +#define MB() __asm__ __volatile__ ("lock; addl $0,0(%%esp)" : : : "memory") +#define WMB() __asm__ __volatile__ ("" : : : "memory") +#define RMB() MB() +#define VOLATILE /*volatile*/ + +/* On Intel, CAS is a strong barrier, but not a compile barrier. */ +#define RMB_NEAR_CAS() WMB() +#define WMB_NEAR_CAS() WMB() +#define MB_NEAR_CAS() WMB() + + +/* + * III. Cycle counter access. + */ + +typedef unsigned long long tick_t; +#define RDTICK() \ + ({ tick_t __t; __asm__ __volatile__ ("rdtsc" : "=A" (__t)); __t; }) + +#endif /* __CAS_H_ */ diff --git a/c_src/common.h b/c_src/common.h index 42ac5e0..3364573 100644 --- a/c_src/common.h +++ b/c_src/common.h @@ -53,6 +53,12 @@ extern "C" { } while (0) #endif +#ifdef __APPLE__ +#define PRIuint64(x) (x) +#else +#define PRIuint64(x) (unsigned long long)(x) +#endif + #if defined(__cplusplus) } #endif diff --git a/c_src/duration.h b/c_src/duration.h index fbc97cb..6c05df0 100644 --- a/c_src/duration.h +++ b/c_src/duration.h @@ -52,7 +52,7 @@ static uint64_t ts(time_scale unit) struct timespec ts; current_utc_time(&ts); return (((uint64_t)ts.tv_sec * scale[unit].mul) + - ((uint64_t)ts.tv_nsec / scale[unit].div)); + ((uint64_t)ts.tv_nsec / scale[unit].div)); } #if defined(__i386__) || defined(__x86_64__) @@ -67,12 +67,12 @@ static inline uint64_t cpu_clock_ticks() { uint32_t lo, hi; __asm__ __volatile__ ( - "XORL %%eax, %%eax\n" /* Flush the pipeline */ - "CPUID\n" - "RDTSC\n" /* Get RDTSC counter in edx:eax */ - : "=a" (lo), "=d" (hi) - : - : "%ebx", "%ecx" ); + "XORL %%eax, %%eax\n" /* Flush the pipeline */ + "CPUID\n" + "RDTSC\n" /* Get RDTSC counter in edx:eax */ + : "=a" (lo), "=d" (hi) + : + : "%ebx", "%ecx" ); return (uint64_t)hi << 32 | lo; } @@ -110,14 +110,14 @@ static inline uint64_t elapsed(duration_t *d) #define ELAPSED_DURING(result, resolution, block) \ do { \ - DURATION(__x, resolution); \ - do block while(0); \ - *result = elapsed(&__x); \ + DURATION(__x, resolution); \ + do block while(0); \ + *result = elapsed(&__x); \ } while(0); #define CYCLES_DURING(result, block) \ do { \ - uint64_t __begin = cpu_clock_ticks(); \ - do block while(0); \ - *result = cpu_clock_ticks() - __begin; \ + uint64_t __begin = cpu_clock_ticks(); \ + do block while(0); \ + *result = cpu_clock_ticks() - __begin; \ } while(0); diff --git a/c_src/stats.h b/c_src/stats.h index f44319b..12f5d21 100644 --- a/c_src/stats.h +++ b/c_src/stats.h @@ -152,7 +152,7 @@ static unsigned int __log2_64(uint64_t x) { fprintf(stderr, " ns μs ms s ks\n"); \ fprintf(stderr, "min: "); \ if (s->min < 1000) \ - fprintf(stderr, "%llu (ns)", s->min); \ + fprintf(stderr, "%llu (ns)", PRIuint64(s->min)); \ else if (s->min < 1000000) \ fprintf(stderr, "%.2f (μs)", s->min / 1000.0); \ else if (s->min < 1000000000) \ @@ -161,7 +161,7 @@ static unsigned int __log2_64(uint64_t x) { fprintf(stderr, "%.2f (s)", s->min / 1000000000.0); \ fprintf(stderr, " max: "); \ if (s->max < 1000) \ - fprintf(stderr, "%llu (ns)", s->max); \ + fprintf(stderr, "%llu (ns)", PRIuint64(s->max)); \ else if (s->max < 1000000) \ fprintf(stderr, "%.2f (μs)", s->max / 1000.0); \ else if (s->max < 1000000000) \ diff --git a/c_src/wterl.c b/c_src/wterl.c index 8756879..4e26ed9 100644 --- a/c_src/wterl.c +++ b/c_src/wterl.c @@ -16,6 +16,7 @@ * under the License. * */ + #include "erl_nif.h" #include "erl_driver.h" @@ -29,8 +30,8 @@ #include "wiredtiger.h" #include "stats.h" #include "async_nif.h" -#include "khash.h" #include "queue.h" +#include "cas.h" #define MAX_CACHE_SIZE ASYNC_NIF_MAX_WORKERS @@ -40,27 +41,20 @@ static ErlNifResourceType *wterl_cursor_RESOURCE; typedef char Uri[128]; struct wterl_ctx { - SLIST_ENTRY(wterl_ctx) entries; + STAILQ_ENTRY(wterl_ctx) entries; uint64_t sig; uint64_t tstamp; WT_SESSION *session; WT_CURSOR *cursors[]; // Note: must be last in struct }; -struct cache_entry { - uint64_t sig; - SLIST_HEAD(ctxs, wterl_ctx) contexts; -}; - -KHASH_MAP_INIT_INT64(cache_entries, struct cache_entry*); - typedef struct wterl_conn { WT_CONNECTION *conn; const char *session_config; + STAILQ_HEAD(ctxs, wterl_ctx) cache; ErlNifMutex *cache_mutex; - khash_t(cache_entries) *cache; - uint32_t num_ctx_in_cache; - struct wterl_ctx *last_ctx_used[ASYNC_NIF_MAX_WORKERS]; + uint32_t cache_size; + struct wterl_ctx *mru_ctx[ASYNC_NIF_MAX_WORKERS]; SLIST_ENTRY(wterl_conn) conns; uint64_t histogram[64]; uint64_t histogram_count; @@ -164,22 +158,19 @@ __ctx_cache_evict(WterlConnHandle *conn_handle) { uint32_t mean, log, num_evicted, i; uint64_t now, elapsed; - khash_t(cache_entries) *h = conn_handle->cache; - khiter_t itr; - struct cache_entry *e; struct wterl_ctx *c, *n; - if (conn_handle->num_ctx_in_cache != MAX_CACHE_SIZE) - return 0; + if (conn_handle->cache_size != MAX_CACHE_SIZE) + return 0; now = cpu_clock_ticks(); // Find the mean of the recorded times that items stayed in cache. mean = 0; for (i = 0; i < 64; i++) - mean += (conn_handle->histogram[i] * i); + mean += (conn_handle->histogram[i] * i); if (mean > 0) - mean /= conn_handle->histogram_count; + mean /= conn_handle->histogram_count; // Clear out the histogram and hit/misses memset(conn_handle->histogram, 0, sizeof(uint64_t) * 64); @@ -190,30 +181,20 @@ __ctx_cache_evict(WterlConnHandle *conn_handle) * items from the lists stored in the tree. */ num_evicted = 0; - for (itr = kh_begin(h); itr != kh_end(h); ++itr) { - if (kh_exist(h, itr)) { - e = kh_val(h, itr); - c = SLIST_FIRST(&e->contexts); - while (c != NULL) { - n = SLIST_NEXT(c, entries); - elapsed = c->tstamp - now; - log = __log2(elapsed); - if (log > mean) { - SLIST_REMOVE(&e->contexts, c, wterl_ctx, entries); - c->session->close(c->session, NULL); - enif_free(c); - num_evicted++; - } - c = n; - } - if (SLIST_EMPTY(&e->contexts)) { - kh_del(cache_entries, h, itr); - enif_free(e); - kh_value(h, itr) = NULL; - } - } + c = STAILQ_FIRST(&conn_handle->cache); + while (c != NULL) { + n = STAILQ_NEXT(c, entries); + elapsed = c->tstamp - now; + log = __log2(elapsed); + if (log > mean) { + STAILQ_REMOVE(&conn_handle->cache, c, wterl_ctx, entries); + c->session->close(c->session, NULL); + enif_free(c); + num_evicted++; + } + c = n; } - conn_handle->num_ctx_in_cache -= num_evicted; + conn_handle->cache_size -= num_evicted; return num_evicted; } @@ -229,29 +210,20 @@ __ctx_cache_evict(WterlConnHandle *conn_handle) static struct wterl_ctx * __ctx_cache_find(WterlConnHandle *conn_handle, const uint64_t sig) { - struct wterl_ctx *c = NULL; - struct cache_entry *e; - khash_t(cache_entries) *h; - khiter_t itr; + struct wterl_ctx *c, *n; - h = conn_handle->cache; enif_mutex_lock(conn_handle->cache_mutex); - if (conn_handle->num_ctx_in_cache > 0) { - itr = kh_get(cache_entries, h, sig); - if (itr != kh_end(h)) { - e = kh_value(h, itr); - if (!SLIST_EMPTY(&e->contexts)) { - /* - * cache hit: - * remove a context from the list in the tree node - */ - c = SLIST_FIRST(&e->contexts); - SLIST_REMOVE_HEAD(&e->contexts, entries); - conn_handle->histogram[__log2(cpu_clock_ticks() - c->tstamp)]++; - conn_handle->histogram_count++; - conn_handle->num_ctx_in_cache -= 1; - } - } + c = STAILQ_FIRST(&conn_handle->cache); + while (c != NULL) { + n = STAILQ_NEXT(c, entries); + if (c->sig == sig) { + // cache hit: + STAILQ_REMOVE_HEAD(&conn_handle->cache, entries); + conn_handle->histogram[__log2(cpu_clock_ticks() - c->tstamp)]++; + conn_handle->histogram_count++; + conn_handle->cache_size -= 1; + } + c = n; } enif_mutex_unlock(conn_handle->cache_mutex); return c; @@ -266,26 +238,11 @@ __ctx_cache_find(WterlConnHandle *conn_handle, const uint64_t sig) static void __ctx_cache_add(WterlConnHandle *conn_handle, struct wterl_ctx *c) { - struct cache_entry *e; - khash_t(cache_entries) *h; - khiter_t itr; - int itr_status; - enif_mutex_lock(conn_handle->cache_mutex); __ctx_cache_evict(conn_handle); c->tstamp = cpu_clock_ticks(); - h = conn_handle->cache; - itr = kh_get(cache_entries, h, c->sig); - if (itr == kh_end(h)) { - e = enif_alloc(sizeof(struct cache_entry)); // TODO: enomem - memset(e, 0, sizeof(struct cache_entry)); - SLIST_INIT(&e->contexts); - itr = kh_put(cache_entries, h, c->sig, &itr_status); - kh_value(h, itr) = e; - } - e = kh_value(h, itr); - SLIST_INSERT_HEAD(&e->contexts, c, entries); - conn_handle->num_ctx_in_cache += 1; + STAILQ_INSERT_TAIL(&conn_handle->cache, c, entries); + conn_handle->cache_size += 1; enif_mutex_unlock(conn_handle->cache_mutex); } @@ -357,14 +314,14 @@ __ctx_cache_sig(const char *c, va_list ap, int count) const char *arg; if (c) - h = __str_hash(c); + h = __str_hash(c); else - h = 0; + h = 0; for (i = 0; i < (2 * count); i++) { - arg = va_arg(ap, const char *); - if (arg) h = __zi((uint32_t)(h & 0xFFFFFFFF), __str_hash(arg)); - else h = __zi((uint32_t)(h & 0xFFFFFFFF), 0); + arg = va_arg(ap, const char *); + if (arg) h = __zi((uint32_t)(h & 0xFFFFFFFF), __str_hash(arg)); + else h = __zi((uint32_t)(h & 0xFFFFFFFF), 0); } return h; } @@ -375,62 +332,76 @@ __ctx_cache_sig(const char *c, va_list ap, int count) */ static int __retain_ctx(WterlConnHandle *conn_handle, uint32_t worker_id, - struct wterl_ctx **ctx, - int count, const char *session_config, ...) + struct wterl_ctx **ctx, + int count, const char *session_config, ...) { int i = 0; va_list ap; uint64_t sig; const char *arg; + struct wterl_ctx *c; arg = session_config; va_start(ap, session_config); sig = __ctx_cache_sig(session_config, ap, count); va_end(ap); - DPRINTF("worker: %u cache size: %u", worker_id, conn_handle->num_ctx_in_cache); - if (conn_handle->last_ctx_used[worker_id] != NULL && - conn_handle->last_ctx_used[worker_id]->sig == sig) { - (*ctx) = conn_handle->last_ctx_used[worker_id]; - DPRINTF("worker: %u reuse hit: %lu %p", worker_id, sig, *ctx); - } else { - if (conn_handle->last_ctx_used[worker_id] != NULL) - __ctx_cache_add(conn_handle, conn_handle->last_ctx_used[worker_id]); - conn_handle->last_ctx_used[worker_id] = NULL; - (*ctx) = __ctx_cache_find(conn_handle, sig); - if ((*ctx) == NULL) { - // cache miss - DPRINTF("worker: %u cache miss: %lu", worker_id, sig); - WT_CONNECTION *conn = conn_handle->conn; - WT_SESSION *session = NULL; - int rc = conn->open_session(conn, NULL, session_config, &session); - if (rc != 0) - return rc; - size_t s = sizeof(struct wterl_ctx) + (count * sizeof(WT_CURSOR*)); - *ctx = enif_alloc(s); // TODO: enif_alloc_resource() - if (*ctx == NULL) { - session->close(session, NULL); - return ENOMEM; - } - memset(*ctx, 0, s); - (*ctx)->sig = sig; - (*ctx)->session = session; - session_config = arg; - va_start(ap, session_config); - for (i = 0; i < count; i++) { - const char *uri = va_arg(ap, const char *); - const char *config = va_arg(ap, const char *); - // TODO: error when uri or config is NULL - rc = session->open_cursor(session, uri, NULL, config, &(*ctx)->cursors[i]); - if (rc != 0) { - session->close(session, NULL); // this will free the cursors too - return rc; - } - } - va_end(ap); - } else { // else { cache hit } - DPRINTF("worker: %u cache hit: %lu %p", worker_id, sig, *ctx); - } + do { + c = conn_handle->mru_ctx[worker_id]; + if (CASPO(&conn_handle->mru_ctx[worker_id], c, NULL) != c) { + if (c == NULL) { + // mru miss: + *ctx = NULL; + } else { + if (c->sig == sig) { + // mru hit: + *ctx = c; + } else { + // mru missmatch: + __ctx_cache_add(conn_handle, c); + *ctx = NULL; + } + } + } else { + // CAS failed, retry... + continue; + } + } while(0); + + if (*ctx == NULL) { + // check the cache + (*ctx) = __ctx_cache_find(conn_handle, sig); + if ((*ctx) == NULL) { + // cache miss: + WT_CONNECTION *conn = conn_handle->conn; + WT_SESSION *session = NULL; + int rc = conn->open_session(conn, NULL, session_config, &session); + if (rc != 0) { + return rc; + } + size_t s = sizeof(struct wterl_ctx) + (count * sizeof(WT_CURSOR*)); + *ctx = enif_alloc(s); // TODO: enif_alloc_resource() + if (*ctx == NULL) { + session->close(session, NULL); + return ENOMEM; + } + memset(*ctx, 0, s); + (*ctx)->sig = sig; + (*ctx)->session = session; + session_config = arg; + va_start(ap, session_config); + for (i = 0; i < count; i++) { + const char *uri = va_arg(ap, const char *); + const char *config = va_arg(ap, const char *); + // TODO: error when uri or config is NULL + rc = session->open_cursor(session, uri, NULL, config, &(*ctx)->cursors[i]); + if (rc != 0) { + session->close(session, NULL); // this will free the cursors too + return rc; + } + } + va_end(ap); + } // else { cache hit } } return 0; } @@ -443,16 +414,17 @@ __release_ctx(WterlConnHandle *conn_handle, uint32_t worker_id, struct wterl_ctx { int i, n; WT_CURSOR *cursor; + struct wterl_ctx *c; - DPRINTF("worker: %u cache size: %u", worker_id, conn_handle->num_ctx_in_cache); n = sizeof((WT_CURSOR**)ctx->cursors) / sizeof(ctx->cursors[0]); for (i = 0; i < n; i++) { - cursor = ctx->cursors[i]; - cursor->reset(cursor); + cursor = ctx->cursors[i]; + cursor->reset(cursor); } - assert(conn_handle->last_ctx_used[worker_id] == 0 || - conn_handle->last_ctx_used[worker_id] == ctx); - conn_handle->last_ctx_used[worker_id] = ctx; + + do { + c = conn_handle->mru_ctx[worker_id]; + } while(CASPO(&conn_handle->mru_ctx[worker_id], c, ctx) != c); } /** @@ -463,34 +435,31 @@ __release_ctx(WterlConnHandle *conn_handle, uint32_t worker_id, struct wterl_ctx void __close_all_sessions(WterlConnHandle *conn_handle) { - khash_t(cache_entries) *h = conn_handle->cache; - struct cache_entry *e; - struct wterl_ctx *c; - int i; + struct wterl_ctx *c, *n; + int worker_id; - for (i = 0; i < ASYNC_NIF_MAX_WORKERS; i++) { - c = conn_handle->last_ctx_used[i]; - if (c) { - c->session->close(c->session, NULL); - enif_free(c); - conn_handle->last_ctx_used[i] = NULL; - } + // clear out the mru + for (worker_id = 0; worker_id < ASYNC_NIF_MAX_WORKERS; worker_id++) { + do { + c = conn_handle->mru_ctx[worker_id]; + } while(CASPO(&conn_handle->mru_ctx[worker_id], c, NULL) != c); + + if (c != NULL) { + c->session->close(c->session, NULL); + enif_free(c); + } } - khiter_t itr; - for (itr = kh_begin(h); itr != kh_end(h); ++itr) { - if (kh_exist(h, itr)) { - e = kh_val(h, itr); - while ((c = SLIST_FIRST(&e->contexts)) != NULL) { - SLIST_REMOVE(&e->contexts, c, wterl_ctx, entries); - c->session->close(c->session, NULL); - enif_free(c); - } - kh_del(cache_entries, h, itr); - enif_free(e); - kh_value(h, itr) = NULL; - } + + // clear out the cache + c = STAILQ_FIRST(&conn_handle->cache); + while (c != NULL) { + n = STAILQ_NEXT(c, entries); + STAILQ_REMOVE(&conn_handle->cache, c, wterl_ctx, entries); + conn_handle->cache_size -= 1; + c->session->close(c->session, NULL); + enif_free(c); + c = n; } - conn_handle->num_ctx_in_cache = 0; } /** @@ -502,8 +471,8 @@ void __close_cursors_on(WterlConnHandle *conn_handle, const char *uri) { UNUSED(uri); - // TODO: find a way to only close those session/cursor* open on uri __close_all_sessions(conn_handle); + return; } /** @@ -613,9 +582,9 @@ __wterl_progress_handler(WT_EVENT_HANDLER *handler, const char *operation, uint6 enif_make_int64(msg_env, counter))); enif_clear_env(msg_env); if (!enif_send(NULL, to_pid, msg_env, msg)) - fprintf(stderr, "[%llu] %s\n", counter, operation); + fprintf(stderr, "[%llu] %s\n", PRIuint64(counter), operation); } else { - rc = (printf("[%llu] %s\n", counter, operation) >= 0 ? 0 : EIO); + rc = (printf("[%llu] %s\n", PRIuint64(counter), operation) >= 0 ? 0 : EIO); } enif_mutex_unlock(eh->progress_mutex); return rc; @@ -693,7 +662,7 @@ ASYNC_NIF_DECL( int rc = wiredtiger_open(args->homedir, (WT_EVENT_HANDLER*)&args->priv->eh.handlers, (config.size > 1) ? (const char *)config.data : NULL, - &conn); + &conn); if (rc == 0) { WterlConnHandle *conn_handle = enif_alloc_resource(wterl_conn_RESOURCE, sizeof(WterlConnHandle)); memset(conn_handle, 0, sizeof(WterlConnHandle)); @@ -719,8 +688,8 @@ ASYNC_NIF_DECL( ERL_NIF_TERM result = enif_make_resource(env, conn_handle); /* Init hash table which manages the cache of session/cursor(s) */ - conn_handle->cache = kh_init(cache_entries); - conn_handle->num_ctx_in_cache = 0; + STAILQ_INIT(&conn_handle->cache); + conn_handle->cache_size = 0; /* Keep track of open connections so as to free when unload/reload/etc. are called. */ @@ -1416,8 +1385,8 @@ ASYNC_NIF_DECL( struct wterl_ctx *ctx = NULL; WT_CURSOR *cursor = NULL; int rc = __retain_ctx(args->conn_handle, worker_id, &ctx, 1, - args->conn_handle->session_config, - args->uri, "overwrite,raw"); + args->conn_handle->session_config, + args->uri, "overwrite,raw"); if (rc != 0) { ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; @@ -1475,8 +1444,8 @@ ASYNC_NIF_DECL( struct wterl_ctx *ctx = NULL; WT_CURSOR *cursor = NULL; int rc = __retain_ctx(args->conn_handle, worker_id, &ctx, 1, - args->conn_handle->session_config, - args->uri, "overwrite,raw"); + args->conn_handle->session_config, + args->uri, "overwrite,raw"); if (rc != 0) { ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; @@ -1557,8 +1526,8 @@ ASYNC_NIF_DECL( struct wterl_ctx *ctx = NULL; WT_CURSOR *cursor = NULL; int rc = __retain_ctx(args->conn_handle, worker_id, &ctx, 1, - args->conn_handle->session_config, - args->uri, "overwrite,raw"); + args->conn_handle->session_config, + args->uri, "overwrite,raw"); if (rc != 0) { ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; @@ -2370,19 +2339,18 @@ on_unload(ErlNifEnv *env, void *priv_data) /* Lock the cache mutex before unloading the async_nif to prevent new work from coming in while shutting down. */ SLIST_FOREACH(conn_handle, &priv->conns, conns) { - enif_mutex_lock(conn_handle->cache_mutex); + enif_mutex_lock(conn_handle->cache_mutex); } ASYNC_NIF_UNLOAD(wterl, env, priv->async_nif_priv); SLIST_FOREACH(conn_handle, &priv->conns, conns) { - __close_all_sessions(conn_handle); - conn_handle->conn->close(conn_handle->conn, NULL); - kh_destroy(cache_entries, conn_handle->cache); - if (conn_handle->session_config) - enif_free((void*)conn_handle->session_config); - enif_mutex_unlock(conn_handle->cache_mutex); - enif_mutex_destroy(conn_handle->cache_mutex); + __close_all_sessions(conn_handle); + conn_handle->conn->close(conn_handle->conn, NULL); + if (conn_handle->session_config) + enif_free((void*)conn_handle->session_config); + enif_mutex_unlock(conn_handle->cache_mutex); + enif_mutex_destroy(conn_handle->cache_mutex); } /* At this point all WiredTiger state and threads are free'd/stopped so there diff --git a/rebar.config b/rebar.config index a1cf184..46f0af2 100644 --- a/rebar.config +++ b/rebar.config @@ -12,7 +12,7 @@ debug_info, %{d,'DEBUG',true}, %strict_validation, %fail_on_warning, - warn_missing_spec, + %warn_missing_spec, warn_bif_clash, warn_deprecated_function, warn_export_all, @@ -22,7 +22,7 @@ warn_shadow_vars, warn_untyped_record, warn_unused_function, - warn_unused_import, + %warn_unused_import, warn_unused_record, warn_unused_vars ]}. diff --git a/src/wterl.erl b/src/wterl.erl index 8fc79f2..4dc5b79 100644 --- a/src/wterl.erl +++ b/src/wterl.erl @@ -96,8 +96,8 @@ nif_stub_error(Line) -> -spec init() -> ok | {error, any()}. init() -> erlang:load_nif(filename:join([priv_dir(), atom_to_list(?MODULE)]), - [{wterl_vsn, "f1b7d8322da904a3385b97456819afd63ff41afe"}, - {wiredtiger_vsn, "1.6.1-a06b59e47db7b120575049bd7d6314df53e78e54"}]). + [{wterl_vsn, "b2c0b65"}, + {wiredtiger_vsn, "1.6.1-87-gbe6742a"}]). -spec connection_open(string(), config_list()) -> {ok, connection()} | {error, term()}. -spec connection_open(string(), config_list(), config_list()) -> {ok, connection()} | {error, term()}. @@ -618,7 +618,7 @@ various_online_test_() -> end}, {"truncate entire table", fun() -> - ?assertMatch(ok, truncate(ConnRef, "table:test")), + ?assertMatch(ok, truncate(ConnRef, "table:test")), ?assertMatch(not_found, get(ConnRef, "table:test", <<"a">>)) end}, %% {"truncate range [<>..last], ensure value outside range is found after", @@ -863,7 +863,7 @@ prop_put_delete() -> DataDir = "test/wterl.putdelete.qc", Table = "table:eqc", {ok, CWD} = file:get_cwd(), - rmdir(filename:join([CWD, DataDir])), % ?cmd("rm -rf " ++ filename:join([CWD, DataDir])), + rmdir:path(filename:join([CWD, DataDir])), % ?cmd("rm -rf " ++ filename:join([CWD, DataDir])), ok = filelib:ensure_dir(filename:join([DataDir, "x"])), {ok, ConnRef} = wterl:connection_open(DataDir, [{create,true}]), try