WIP: simplify the cache from hash-of-lists to list; use a CAS() operation to protect the most-recently-used (mru) list.

This commit is contained in:
Gregory Burd 2013-06-10 14:31:59 -04:00
parent b2c0b65114
commit 2a4b8ee7d2
9 changed files with 353 additions and 216 deletions

View file

@ -84,9 +84,9 @@ repl:
@$(ERL) -pa ebin -pz deps/lager/ebin @$(ERL) -pa ebin -pz deps/lager/ebin
eunit-repl: eunit-repl:
@$(ERL) -pa .eunit deps/lager/ebin @$(ERL) erl -pa .eunit -pz deps/lager/ebin
ERL_TOP= /home/gburd/eng/otp_R15B01 ERL_TOP= /home/gburd/repos/otp_R15B01
CERL= ${ERL_TOP}/bin/cerl CERL= ${ERL_TOP}/bin/cerl
VALGRIND_MISC_FLAGS= "--verbose --leak-check=full --show-reachable=yes --trace-children=yes --track-origins=yes --suppressions=${ERL_TOP}/erts/emulator/valgrind/suppress.standard --show-possibly-lost=no --malloc-fill=AB --free-fill=CD" VALGRIND_MISC_FLAGS= "--verbose --leak-check=full --show-reachable=yes --trace-children=yes --track-origins=yes --suppressions=${ERL_TOP}/erts/emulator/valgrind/suppress.standard --show-possibly-lost=no --malloc-fill=AB --free-fill=CD"

View file

@ -47,6 +47,7 @@ struct async_nif_req_entry {
void *args; void *args;
void (*fn_work)(ErlNifEnv*, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *); void (*fn_work)(ErlNifEnv*, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *);
void (*fn_post)(void *); void (*fn_post)(void *);
const char *func;
}; };
DECL_FIFO_QUEUE(reqs, struct async_nif_req_entry); DECL_FIFO_QUEUE(reqs, struct async_nif_req_entry);
@ -104,13 +105,16 @@ struct async_nif_state {
enif_make_atom(env, "shutdown")); \ enif_make_atom(env, "shutdown")); \
req = async_nif_reuse_req(async_nif); \ req = async_nif_reuse_req(async_nif); \
new_env = req->env; \ new_env = req->env; \
if (!req) \ if (!req) { \
async_nif_recycle_req(req, async_nif); \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "eagain")); \ enif_make_atom(env, "eagain")); \
} \
do pre_block while(0); \ do pre_block while(0); \
copy_of_args = (struct decl ## _args *)enif_alloc(sizeof(struct decl ## _args)); \ copy_of_args = (struct decl ## _args *)enif_alloc(sizeof(struct decl ## _args)); \
if (!copy_of_args) { \ if (!copy_of_args) { \
fn_post_ ## decl (args); \ fn_post_ ## decl (args); \
async_nif_recycle_req(req, async_nif); \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "enomem")); \ enif_make_atom(env, "enomem")); \
} \ } \
@ -120,12 +124,14 @@ struct async_nif_state {
req->args = (void*)copy_of_args; \ req->args = (void*)copy_of_args; \
req->fn_work = (void (*)(ErlNifEnv *, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *))fn_work_ ## decl ; \ req->fn_work = (void (*)(ErlNifEnv *, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *))fn_work_ ## decl ; \
req->fn_post = (void (*)(void *))fn_post_ ## decl; \ req->fn_post = (void (*)(void *))fn_post_ ## decl; \
req->func = __func__; \
int h = -1; \ int h = -1; \
if (affinity) \ if (affinity) \
h = affinity % async_nif->num_queues; \ h = affinity % async_nif->num_queues; \
ERL_NIF_TERM reply = async_nif_enqueue_req(async_nif, req, h); \ ERL_NIF_TERM reply = async_nif_enqueue_req(async_nif, req, h); \
if (!reply) { \ if (!reply) { \
fn_post_ ## decl (args); \ fn_post_ ## decl (args); \
async_nif_recycle_req(req, async_nif); \
enif_free(copy_of_args); \ enif_free(copy_of_args); \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "shutdown")); \ enif_make_atom(env, "shutdown")); \
@ -212,8 +218,13 @@ async_nif_reuse_req(struct async_nif_state *async_nif)
void void
async_nif_recycle_req(struct async_nif_req_entry *req, struct async_nif_state *async_nif) async_nif_recycle_req(struct async_nif_req_entry *req, struct async_nif_state *async_nif)
{ {
ErlNifEnv *env = NULL;
STAT_TOCK(async_nif, qwait); STAT_TOCK(async_nif, qwait);
enif_mutex_lock(async_nif->recycled_req_mutex); enif_mutex_lock(async_nif->recycled_req_mutex);
env = req->env;
enif_clear_env(env);
memset(req, 0, sizeof(struct async_nif_req_entry));
req->env = env;
fifo_q_put(reqs, async_nif->recycled_reqs, req); fifo_q_put(reqs, async_nif->recycled_reqs, req);
enif_mutex_unlock(async_nif->recycled_req_mutex); enif_mutex_unlock(async_nif->recycled_req_mutex);
} }
@ -335,7 +346,6 @@ async_nif_worker_fn(void *arg)
req->fn_post = 0; req->fn_post = 0;
enif_free(req->args); enif_free(req->args);
req->args = NULL; req->args = NULL;
enif_clear_env(req->env);
async_nif_recycle_req(req, async_nif); async_nif_recycle_req(req, async_nif);
req = NULL; req = NULL;
} }

153
c_src/cas.h Normal file
View file

@ -0,0 +1,153 @@
/*
* wterl: an Erlang NIF for WiredTiger
*
* Copyright (c) 2012-2013 Basho Technologies, Inc. All Rights Reserved.
*
* This file is provided to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
*/
/*
* Most of the following source code is copied directly from: "The Lock-Free
* Library" (http://www.cl.cam.ac.uk/research/srg/netos/lock-free/) reused and
* redistrubuted in accordance with their license:
*
* Copyright (c) 2002-2003 K A Fraser, All Rights Reserved.
*
* * Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* * The name of the author may not be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
* EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
* OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
* WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
* OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef __CAS_H_
#define __CAS_H_
#define CACHE_LINE_SIZE 64
#define ATOMIC_ADD_TO(_v,_x) \
do { \
int __val = (_v), __newval; \
while ( (__newval = CASIO(&(_v),__val,__val+(_x))) != __val ) \
__val = __newval; \
} while ( 0 )
#define ATOMIC_SET_TO(_v,_x) \
do { \
int __val = (_v), __newval; \
while ( (__newval = CASIO(&(_v),__val,__val=(_x))) != __val ) \
__val = __newval; \
} while ( 0 )
#define ALIGNED_ENIF_ALLOC(_s) \
((void *)(((unsigned long)enif_alloc((_s)+CACHE_LINE_SIZE*2) + \
CACHE_LINE_SIZE - 1) & ~(CACHE_LINE_SIZE-1))) \
/*
* I. Compare-and-swap.
*/
/*
* This is a strong barrier! Reads cannot be delayed beyond a later store.
* Reads cannot be hoisted beyond a LOCK prefix. Stores always in-order.
*/
#define CAS(_a, _o, _n) \
({ __typeof__(_o) __o = _o; \
__asm__ __volatile__( \
"lock cmpxchg %3,%1" \
: "=a" (__o), "=m" (*(volatile unsigned int *)(_a)) \
: "0" (__o), "r" (_n) ); \
__o; \
})
#define FAS(_a, _n) \
({ __typeof__(_n) __o; \
__asm__ __volatile__( \
"lock xchg %0,%1" \
: "=r" (__o), "=m" (*(volatile unsigned int *)(_a)) \
: "0" (_n) ); \
__o; \
})
#define CAS64(_a, _o, _n) \
({ __typeof__(_o) __o = _o; \
__asm__ __volatile__( \
"movl %3, %%ecx;" \
"movl %4, %%ebx;" \
"lock cmpxchg8b %1" \
: "=A" (__o), "=m" (*(volatile unsigned long long *)(_a)) \
: "0" (__o), "m" (_n >> 32), "m" (_n) \
: "ebx", "ecx" ); \
__o; \
})
/* Update Integer location, return Old value. */
#define CASIO CAS
#define FASIO FAS
/* Update Pointer location, return Old value. */
#define CASPO CAS
#define FASPO FAS
/* Update 32/64-bit location, return Old value. */
#define CAS32O CAS
#define CAS64O CAS64
/*
* II. Memory barriers.
* WMB(): All preceding write operations must commit before any later writes.
* RMB(): All preceding read operations must commit before any later reads.
* MB(): All preceding memory accesses must commit before any later accesses.
*
* If the compiler does not observe these barriers (but any sane compiler
* will!), then VOLATILE should be defined as 'volatile'.
*/
#define MB() __asm__ __volatile__ ("lock; addl $0,0(%%esp)" : : : "memory")
#define WMB() __asm__ __volatile__ ("" : : : "memory")
#define RMB() MB()
#define VOLATILE /*volatile*/
/* On Intel, CAS is a strong barrier, but not a compile barrier. */
#define RMB_NEAR_CAS() WMB()
#define WMB_NEAR_CAS() WMB()
#define MB_NEAR_CAS() WMB()
/*
* III. Cycle counter access.
*/
typedef unsigned long long tick_t;
#define RDTICK() \
({ tick_t __t; __asm__ __volatile__ ("rdtsc" : "=A" (__t)); __t; })
#endif /* __CAS_H_ */

View file

@ -53,6 +53,12 @@ extern "C" {
} while (0) } while (0)
#endif #endif
#ifdef __APPLE__
#define PRIuint64(x) (x)
#else
#define PRIuint64(x) (unsigned long long)(x)
#endif
#if defined(__cplusplus) #if defined(__cplusplus)
} }
#endif #endif

View file

@ -152,7 +152,7 @@ static unsigned int __log2_64(uint64_t x) {
fprintf(stderr, " ns μs ms s ks\n"); \ fprintf(stderr, " ns μs ms s ks\n"); \
fprintf(stderr, "min: "); \ fprintf(stderr, "min: "); \
if (s->min < 1000) \ if (s->min < 1000) \
fprintf(stderr, "%llu (ns)", s->min); \ fprintf(stderr, "%llu (ns)", PRIuint64(s->min)); \
else if (s->min < 1000000) \ else if (s->min < 1000000) \
fprintf(stderr, "%.2f (μs)", s->min / 1000.0); \ fprintf(stderr, "%.2f (μs)", s->min / 1000.0); \
else if (s->min < 1000000000) \ else if (s->min < 1000000000) \
@ -161,7 +161,7 @@ static unsigned int __log2_64(uint64_t x) {
fprintf(stderr, "%.2f (s)", s->min / 1000000000.0); \ fprintf(stderr, "%.2f (s)", s->min / 1000000000.0); \
fprintf(stderr, " max: "); \ fprintf(stderr, " max: "); \
if (s->max < 1000) \ if (s->max < 1000) \
fprintf(stderr, "%llu (ns)", s->max); \ fprintf(stderr, "%llu (ns)", PRIuint64(s->max)); \
else if (s->max < 1000000) \ else if (s->max < 1000000) \
fprintf(stderr, "%.2f (μs)", s->max / 1000.0); \ fprintf(stderr, "%.2f (μs)", s->max / 1000.0); \
else if (s->max < 1000000000) \ else if (s->max < 1000000000) \

View file

@ -16,6 +16,7 @@
* under the License. * under the License.
* *
*/ */
#include "erl_nif.h" #include "erl_nif.h"
#include "erl_driver.h" #include "erl_driver.h"
@ -29,8 +30,8 @@
#include "wiredtiger.h" #include "wiredtiger.h"
#include "stats.h" #include "stats.h"
#include "async_nif.h" #include "async_nif.h"
#include "khash.h"
#include "queue.h" #include "queue.h"
#include "cas.h"
#define MAX_CACHE_SIZE ASYNC_NIF_MAX_WORKERS #define MAX_CACHE_SIZE ASYNC_NIF_MAX_WORKERS
@ -40,27 +41,20 @@ static ErlNifResourceType *wterl_cursor_RESOURCE;
typedef char Uri[128]; typedef char Uri[128];
struct wterl_ctx { struct wterl_ctx {
SLIST_ENTRY(wterl_ctx) entries; STAILQ_ENTRY(wterl_ctx) entries;
uint64_t sig; uint64_t sig;
uint64_t tstamp; uint64_t tstamp;
WT_SESSION *session; WT_SESSION *session;
WT_CURSOR *cursors[]; // Note: must be last in struct WT_CURSOR *cursors[]; // Note: must be last in struct
}; };
struct cache_entry {
uint64_t sig;
SLIST_HEAD(ctxs, wterl_ctx) contexts;
};
KHASH_MAP_INIT_INT64(cache_entries, struct cache_entry*);
typedef struct wterl_conn { typedef struct wterl_conn {
WT_CONNECTION *conn; WT_CONNECTION *conn;
const char *session_config; const char *session_config;
STAILQ_HEAD(ctxs, wterl_ctx) cache;
ErlNifMutex *cache_mutex; ErlNifMutex *cache_mutex;
khash_t(cache_entries) *cache; uint32_t cache_size;
uint32_t num_ctx_in_cache; struct wterl_ctx *mru_ctx[ASYNC_NIF_MAX_WORKERS];
struct wterl_ctx *last_ctx_used[ASYNC_NIF_MAX_WORKERS];
SLIST_ENTRY(wterl_conn) conns; SLIST_ENTRY(wterl_conn) conns;
uint64_t histogram[64]; uint64_t histogram[64];
uint64_t histogram_count; uint64_t histogram_count;
@ -164,12 +158,9 @@ __ctx_cache_evict(WterlConnHandle *conn_handle)
{ {
uint32_t mean, log, num_evicted, i; uint32_t mean, log, num_evicted, i;
uint64_t now, elapsed; uint64_t now, elapsed;
khash_t(cache_entries) *h = conn_handle->cache;
khiter_t itr;
struct cache_entry *e;
struct wterl_ctx *c, *n; struct wterl_ctx *c, *n;
if (conn_handle->num_ctx_in_cache != MAX_CACHE_SIZE) if (conn_handle->cache_size != MAX_CACHE_SIZE)
return 0; return 0;
now = cpu_clock_ticks(); now = cpu_clock_ticks();
@ -190,30 +181,20 @@ __ctx_cache_evict(WterlConnHandle *conn_handle)
* items from the lists stored in the tree. * items from the lists stored in the tree.
*/ */
num_evicted = 0; num_evicted = 0;
for (itr = kh_begin(h); itr != kh_end(h); ++itr) { c = STAILQ_FIRST(&conn_handle->cache);
if (kh_exist(h, itr)) {
e = kh_val(h, itr);
c = SLIST_FIRST(&e->contexts);
while (c != NULL) { while (c != NULL) {
n = SLIST_NEXT(c, entries); n = STAILQ_NEXT(c, entries);
elapsed = c->tstamp - now; elapsed = c->tstamp - now;
log = __log2(elapsed); log = __log2(elapsed);
if (log > mean) { if (log > mean) {
SLIST_REMOVE(&e->contexts, c, wterl_ctx, entries); STAILQ_REMOVE(&conn_handle->cache, c, wterl_ctx, entries);
c->session->close(c->session, NULL); c->session->close(c->session, NULL);
enif_free(c); enif_free(c);
num_evicted++; num_evicted++;
} }
c = n; c = n;
} }
if (SLIST_EMPTY(&e->contexts)) { conn_handle->cache_size -= num_evicted;
kh_del(cache_entries, h, itr);
enif_free(e);
kh_value(h, itr) = NULL;
}
}
}
conn_handle->num_ctx_in_cache -= num_evicted;
return num_evicted; return num_evicted;
} }
@ -229,29 +210,20 @@ __ctx_cache_evict(WterlConnHandle *conn_handle)
static struct wterl_ctx * static struct wterl_ctx *
__ctx_cache_find(WterlConnHandle *conn_handle, const uint64_t sig) __ctx_cache_find(WterlConnHandle *conn_handle, const uint64_t sig)
{ {
struct wterl_ctx *c = NULL; struct wterl_ctx *c, *n;
struct cache_entry *e;
khash_t(cache_entries) *h;
khiter_t itr;
h = conn_handle->cache;
enif_mutex_lock(conn_handle->cache_mutex); enif_mutex_lock(conn_handle->cache_mutex);
if (conn_handle->num_ctx_in_cache > 0) { c = STAILQ_FIRST(&conn_handle->cache);
itr = kh_get(cache_entries, h, sig); while (c != NULL) {
if (itr != kh_end(h)) { n = STAILQ_NEXT(c, entries);
e = kh_value(h, itr); if (c->sig == sig) {
if (!SLIST_EMPTY(&e->contexts)) { // cache hit:
/* STAILQ_REMOVE_HEAD(&conn_handle->cache, entries);
* cache hit:
* remove a context from the list in the tree node
*/
c = SLIST_FIRST(&e->contexts);
SLIST_REMOVE_HEAD(&e->contexts, entries);
conn_handle->histogram[__log2(cpu_clock_ticks() - c->tstamp)]++; conn_handle->histogram[__log2(cpu_clock_ticks() - c->tstamp)]++;
conn_handle->histogram_count++; conn_handle->histogram_count++;
conn_handle->num_ctx_in_cache -= 1; conn_handle->cache_size -= 1;
}
} }
c = n;
} }
enif_mutex_unlock(conn_handle->cache_mutex); enif_mutex_unlock(conn_handle->cache_mutex);
return c; return c;
@ -266,26 +238,11 @@ __ctx_cache_find(WterlConnHandle *conn_handle, const uint64_t sig)
static void static void
__ctx_cache_add(WterlConnHandle *conn_handle, struct wterl_ctx *c) __ctx_cache_add(WterlConnHandle *conn_handle, struct wterl_ctx *c)
{ {
struct cache_entry *e;
khash_t(cache_entries) *h;
khiter_t itr;
int itr_status;
enif_mutex_lock(conn_handle->cache_mutex); enif_mutex_lock(conn_handle->cache_mutex);
__ctx_cache_evict(conn_handle); __ctx_cache_evict(conn_handle);
c->tstamp = cpu_clock_ticks(); c->tstamp = cpu_clock_ticks();
h = conn_handle->cache; STAILQ_INSERT_TAIL(&conn_handle->cache, c, entries);
itr = kh_get(cache_entries, h, c->sig); conn_handle->cache_size += 1;
if (itr == kh_end(h)) {
e = enif_alloc(sizeof(struct cache_entry)); // TODO: enomem
memset(e, 0, sizeof(struct cache_entry));
SLIST_INIT(&e->contexts);
itr = kh_put(cache_entries, h, c->sig, &itr_status);
kh_value(h, itr) = e;
}
e = kh_value(h, itr);
SLIST_INSERT_HEAD(&e->contexts, c, entries);
conn_handle->num_ctx_in_cache += 1;
enif_mutex_unlock(conn_handle->cache_mutex); enif_mutex_unlock(conn_handle->cache_mutex);
} }
@ -382,30 +339,46 @@ __retain_ctx(WterlConnHandle *conn_handle, uint32_t worker_id,
va_list ap; va_list ap;
uint64_t sig; uint64_t sig;
const char *arg; const char *arg;
struct wterl_ctx *c;
arg = session_config; arg = session_config;
va_start(ap, session_config); va_start(ap, session_config);
sig = __ctx_cache_sig(session_config, ap, count); sig = __ctx_cache_sig(session_config, ap, count);
va_end(ap); va_end(ap);
DPRINTF("worker: %u cache size: %u", worker_id, conn_handle->num_ctx_in_cache); do {
if (conn_handle->last_ctx_used[worker_id] != NULL && c = conn_handle->mru_ctx[worker_id];
conn_handle->last_ctx_used[worker_id]->sig == sig) { if (CASPO(&conn_handle->mru_ctx[worker_id], c, NULL) != c) {
(*ctx) = conn_handle->last_ctx_used[worker_id]; if (c == NULL) {
DPRINTF("worker: %u reuse hit: %lu %p", worker_id, sig, *ctx); // mru miss:
*ctx = NULL;
} else { } else {
if (conn_handle->last_ctx_used[worker_id] != NULL) if (c->sig == sig) {
__ctx_cache_add(conn_handle, conn_handle->last_ctx_used[worker_id]); // mru hit:
conn_handle->last_ctx_used[worker_id] = NULL; *ctx = c;
} else {
// mru missmatch:
__ctx_cache_add(conn_handle, c);
*ctx = NULL;
}
}
} else {
// CAS failed, retry...
continue;
}
} while(0);
if (*ctx == NULL) {
// check the cache
(*ctx) = __ctx_cache_find(conn_handle, sig); (*ctx) = __ctx_cache_find(conn_handle, sig);
if ((*ctx) == NULL) { if ((*ctx) == NULL) {
// cache miss // cache miss:
DPRINTF("worker: %u cache miss: %lu", worker_id, sig);
WT_CONNECTION *conn = conn_handle->conn; WT_CONNECTION *conn = conn_handle->conn;
WT_SESSION *session = NULL; WT_SESSION *session = NULL;
int rc = conn->open_session(conn, NULL, session_config, &session); int rc = conn->open_session(conn, NULL, session_config, &session);
if (rc != 0) if (rc != 0) {
return rc; return rc;
}
size_t s = sizeof(struct wterl_ctx) + (count * sizeof(WT_CURSOR*)); size_t s = sizeof(struct wterl_ctx) + (count * sizeof(WT_CURSOR*));
*ctx = enif_alloc(s); // TODO: enif_alloc_resource() *ctx = enif_alloc(s); // TODO: enif_alloc_resource()
if (*ctx == NULL) { if (*ctx == NULL) {
@ -428,9 +401,7 @@ __retain_ctx(WterlConnHandle *conn_handle, uint32_t worker_id,
} }
} }
va_end(ap); va_end(ap);
} else { // else { cache hit } } // else { cache hit }
DPRINTF("worker: %u cache hit: %lu %p", worker_id, sig, *ctx);
}
} }
return 0; return 0;
} }
@ -443,16 +414,17 @@ __release_ctx(WterlConnHandle *conn_handle, uint32_t worker_id, struct wterl_ctx
{ {
int i, n; int i, n;
WT_CURSOR *cursor; WT_CURSOR *cursor;
struct wterl_ctx *c;
DPRINTF("worker: %u cache size: %u", worker_id, conn_handle->num_ctx_in_cache);
n = sizeof((WT_CURSOR**)ctx->cursors) / sizeof(ctx->cursors[0]); n = sizeof((WT_CURSOR**)ctx->cursors) / sizeof(ctx->cursors[0]);
for (i = 0; i < n; i++) { for (i = 0; i < n; i++) {
cursor = ctx->cursors[i]; cursor = ctx->cursors[i];
cursor->reset(cursor); cursor->reset(cursor);
} }
assert(conn_handle->last_ctx_used[worker_id] == 0 ||
conn_handle->last_ctx_used[worker_id] == ctx); do {
conn_handle->last_ctx_used[worker_id] = ctx; c = conn_handle->mru_ctx[worker_id];
} while(CASPO(&conn_handle->mru_ctx[worker_id], c, ctx) != c);
} }
/** /**
@ -463,34 +435,31 @@ __release_ctx(WterlConnHandle *conn_handle, uint32_t worker_id, struct wterl_ctx
void void
__close_all_sessions(WterlConnHandle *conn_handle) __close_all_sessions(WterlConnHandle *conn_handle)
{ {
khash_t(cache_entries) *h = conn_handle->cache; struct wterl_ctx *c, *n;
struct cache_entry *e; int worker_id;
struct wterl_ctx *c;
int i;
for (i = 0; i < ASYNC_NIF_MAX_WORKERS; i++) { // clear out the mru
c = conn_handle->last_ctx_used[i]; for (worker_id = 0; worker_id < ASYNC_NIF_MAX_WORKERS; worker_id++) {
if (c) { do {
c->session->close(c->session, NULL); c = conn_handle->mru_ctx[worker_id];
enif_free(c); } while(CASPO(&conn_handle->mru_ctx[worker_id], c, NULL) != c);
conn_handle->last_ctx_used[i] = NULL;
} if (c != NULL) {
}
khiter_t itr;
for (itr = kh_begin(h); itr != kh_end(h); ++itr) {
if (kh_exist(h, itr)) {
e = kh_val(h, itr);
while ((c = SLIST_FIRST(&e->contexts)) != NULL) {
SLIST_REMOVE(&e->contexts, c, wterl_ctx, entries);
c->session->close(c->session, NULL); c->session->close(c->session, NULL);
enif_free(c); enif_free(c);
} }
kh_del(cache_entries, h, itr);
enif_free(e);
kh_value(h, itr) = NULL;
} }
// clear out the cache
c = STAILQ_FIRST(&conn_handle->cache);
while (c != NULL) {
n = STAILQ_NEXT(c, entries);
STAILQ_REMOVE(&conn_handle->cache, c, wterl_ctx, entries);
conn_handle->cache_size -= 1;
c->session->close(c->session, NULL);
enif_free(c);
c = n;
} }
conn_handle->num_ctx_in_cache = 0;
} }
/** /**
@ -502,8 +471,8 @@ void
__close_cursors_on(WterlConnHandle *conn_handle, const char *uri) __close_cursors_on(WterlConnHandle *conn_handle, const char *uri)
{ {
UNUSED(uri); UNUSED(uri);
// TODO: find a way to only close those session/cursor* open on uri
__close_all_sessions(conn_handle); __close_all_sessions(conn_handle);
return;
} }
/** /**
@ -613,9 +582,9 @@ __wterl_progress_handler(WT_EVENT_HANDLER *handler, const char *operation, uint6
enif_make_int64(msg_env, counter))); enif_make_int64(msg_env, counter)));
enif_clear_env(msg_env); enif_clear_env(msg_env);
if (!enif_send(NULL, to_pid, msg_env, msg)) if (!enif_send(NULL, to_pid, msg_env, msg))
fprintf(stderr, "[%llu] %s\n", counter, operation); fprintf(stderr, "[%llu] %s\n", PRIuint64(counter), operation);
} else { } else {
rc = (printf("[%llu] %s\n", counter, operation) >= 0 ? 0 : EIO); rc = (printf("[%llu] %s\n", PRIuint64(counter), operation) >= 0 ? 0 : EIO);
} }
enif_mutex_unlock(eh->progress_mutex); enif_mutex_unlock(eh->progress_mutex);
return rc; return rc;
@ -719,8 +688,8 @@ ASYNC_NIF_DECL(
ERL_NIF_TERM result = enif_make_resource(env, conn_handle); ERL_NIF_TERM result = enif_make_resource(env, conn_handle);
/* Init hash table which manages the cache of session/cursor(s) */ /* Init hash table which manages the cache of session/cursor(s) */
conn_handle->cache = kh_init(cache_entries); STAILQ_INIT(&conn_handle->cache);
conn_handle->num_ctx_in_cache = 0; conn_handle->cache_size = 0;
/* Keep track of open connections so as to free when unload/reload/etc. /* Keep track of open connections so as to free when unload/reload/etc.
are called. */ are called. */
@ -2378,7 +2347,6 @@ on_unload(ErlNifEnv *env, void *priv_data)
SLIST_FOREACH(conn_handle, &priv->conns, conns) { SLIST_FOREACH(conn_handle, &priv->conns, conns) {
__close_all_sessions(conn_handle); __close_all_sessions(conn_handle);
conn_handle->conn->close(conn_handle->conn, NULL); conn_handle->conn->close(conn_handle->conn, NULL);
kh_destroy(cache_entries, conn_handle->cache);
if (conn_handle->session_config) if (conn_handle->session_config)
enif_free((void*)conn_handle->session_config); enif_free((void*)conn_handle->session_config);
enif_mutex_unlock(conn_handle->cache_mutex); enif_mutex_unlock(conn_handle->cache_mutex);

View file

@ -12,7 +12,7 @@
debug_info, %{d,'DEBUG',true}, debug_info, %{d,'DEBUG',true},
%strict_validation, %strict_validation,
%fail_on_warning, %fail_on_warning,
warn_missing_spec, %warn_missing_spec,
warn_bif_clash, warn_bif_clash,
warn_deprecated_function, warn_deprecated_function,
warn_export_all, warn_export_all,
@ -22,7 +22,7 @@
warn_shadow_vars, warn_shadow_vars,
warn_untyped_record, warn_untyped_record,
warn_unused_function, warn_unused_function,
warn_unused_import, %warn_unused_import,
warn_unused_record, warn_unused_record,
warn_unused_vars warn_unused_vars
]}. ]}.

View file

@ -96,8 +96,8 @@ nif_stub_error(Line) ->
-spec init() -> ok | {error, any()}. -spec init() -> ok | {error, any()}.
init() -> init() ->
erlang:load_nif(filename:join([priv_dir(), atom_to_list(?MODULE)]), erlang:load_nif(filename:join([priv_dir(), atom_to_list(?MODULE)]),
[{wterl_vsn, "f1b7d8322da904a3385b97456819afd63ff41afe"}, [{wterl_vsn, "b2c0b65"},
{wiredtiger_vsn, "1.6.1-a06b59e47db7b120575049bd7d6314df53e78e54"}]). {wiredtiger_vsn, "1.6.1-87-gbe6742a"}]).
-spec connection_open(string(), config_list()) -> {ok, connection()} | {error, term()}. -spec connection_open(string(), config_list()) -> {ok, connection()} | {error, term()}.
-spec connection_open(string(), config_list(), config_list()) -> {ok, connection()} | {error, term()}. -spec connection_open(string(), config_list(), config_list()) -> {ok, connection()} | {error, term()}.
@ -863,7 +863,7 @@ prop_put_delete() ->
DataDir = "test/wterl.putdelete.qc", DataDir = "test/wterl.putdelete.qc",
Table = "table:eqc", Table = "table:eqc",
{ok, CWD} = file:get_cwd(), {ok, CWD} = file:get_cwd(),
rmdir(filename:join([CWD, DataDir])), % ?cmd("rm -rf " ++ filename:join([CWD, DataDir])), rmdir:path(filename:join([CWD, DataDir])), % ?cmd("rm -rf " ++ filename:join([CWD, DataDir])),
ok = filelib:ensure_dir(filename:join([DataDir, "x"])), ok = filelib:ensure_dir(filename:join([DataDir, "x"])),
{ok, ConnRef} = wterl:connection_open(DataDir, [{create,true}]), {ok, ConnRef} = wterl:connection_open(DataDir, [{create,true}]),
try try