diff --git a/c_src/async_nif.h b/c_src/async_nif.h index 9f5b94f..c18106c 100644 --- a/c_src/async_nif.h +++ b/c_src/async_nif.h @@ -25,6 +25,7 @@ extern "C" { #endif #include + #include "queue.h" #include "stats.h" @@ -33,6 +34,7 @@ extern "C" { #endif #define ASYNC_NIF_MAX_WORKERS 1024 +#define ASYNC_NIF_MIN_WORKERS 2 #define ASYNC_NIF_WORKER_QUEUE_SIZE 1000 #define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS @@ -43,17 +45,18 @@ struct async_nif_req_entry { void *args; void (*fn_work)(ErlNifEnv*, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *); void (*fn_post)(void *); + uint64_t submitted; STAILQ_ENTRY(async_nif_req_entry) entries; }; struct async_nif_work_queue { unsigned int num_workers; + unsigned int depth; ErlNifMutex *reqs_mutex; ErlNifCond *reqs_cnd; STAILQ_HEAD(reqs, async_nif_req_entry) reqs; - STAT_DEF(qwait); - STAT_DEF(wt); + STAT_DEF(work); }; struct async_nif_worker_entry { @@ -65,6 +68,8 @@ struct async_nif_worker_entry { }; struct async_nif_state { + STAT_DEF(wait); + STAT_DEF(service); unsigned int shutdown; ErlNifMutex *we_mutex; unsigned int we_active; @@ -103,10 +108,12 @@ struct async_nif_state { argc -= 1; \ /* Note: !!! this assumes that the first element of priv_data is ours */ \ struct async_nif_state *async_nif = *(struct async_nif_state**)enif_priv_data(env); \ - if (async_nif->shutdown) \ + if (async_nif->shutdown) { \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \ enif_make_atom(env, "shutdown")); \ + } \ req = async_nif_reuse_req(async_nif); \ + req->submitted = ts(ns); \ if (!req) { \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \ enif_make_atom(env, "eagain")); \ @@ -137,7 +144,7 @@ struct async_nif_state { async_nif_recycle_req(req, async_nif); \ enif_free(copy_of_args); \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \ - enif_make_atom(env, "eagain")); \ + enif_make_atom(env, "eagain")); \ } \ return reply; \ } @@ -246,7 +253,6 @@ async_nif_start_worker(struct async_nif_state *async_nif, struct async_nif_work_ enif_mutex_lock(async_nif->we_mutex); -#if 0 // TODO: we = SLIST_FIRST(&async_nif->we_joining); while(we != NULL) { struct async_nif_worker_entry *n = SLIST_NEXT(we, entries); @@ -257,7 +263,6 @@ async_nif_start_worker(struct async_nif_state *async_nif, struct async_nif_work_ async_nif->we_active--; we = n; } -#endif if (async_nif->we_active == ASYNC_NIF_MAX_WORKERS) { enif_mutex_unlock(async_nif->we_mutex); @@ -289,9 +294,8 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en { /* Identify the most appropriate worker for this request. */ unsigned int i, qid = 0; - unsigned int n = async_nif->num_queues; struct async_nif_work_queue *q = NULL; - double avg_wait_across_q, avg_wt_service_time, avg_wait_this_q = 0; + double avg_depth; /* Either we're choosing a queue based on some affinity/hinted value or we need to select the next queue in the rotation and atomically update that @@ -304,52 +308,66 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en async_nif->next_q = qid; } - avg_wait_across_q = 0; - for (i = 0; i < async_nif->num_queues; i++) { - avg_wait_across_q += __stat_mean(async_nif->queues[i].qwait_stat); - } - if (avg_wait_across_q != 0) - avg_wait_across_q /= async_nif->num_queues; - /* Now we inspect and interate across the set of queues trying to select one that isn't too full or too slow. */ - do { + for (i = 0; i < async_nif->num_queues; i++) { + /* Compute the average queue depth not counting queues which are empty or + the queue we're considering right now. */ + unsigned int j, n = 0; + for (j = 0; j < async_nif->num_queues; j++) { + if (j != qid && async_nif->queues[j].depth != 0) { + n++; + avg_depth += async_nif->queues[j].depth; + } + } + if (avg_depth != 0) + avg_depth /= n; + + /* Lock this queue under consideration, then check for shutdown. While + we hold this lock either a) we're shutting down so exit now or b) this + queue will be valid until we release the lock. */ q = &async_nif->queues[qid]; enif_mutex_lock(q->reqs_mutex); - - /* Now that we hold the lock, check for shutdown. As long as we hold - this lock either a) we're shutting down so exit now or b) this queue - will be valid until we release the lock. */ - if (async_nif->shutdown) { enif_mutex_unlock(q->reqs_mutex); return 0; } - avg_wait_this_q = __stat_mean(q->qwait_stat); - avg_wt_service_time = __stat_mean(q->wt_stat); - DPRINTF("q:%d w:%u %f/%f(%f) %c", qid, q->num_workers, avg_wait_this_q, avg_wait_across_q, avg_wt_service_time, avg_wait_this_q <= avg_wait_across_q ? 't' : 'f'); - if (avg_wait_this_q <= avg_wait_across_q) break; + /* Try not to enqueue a request into a queue that isn't keeping up with + the request volume. */ + if (q->depth <= avg_depth) break; else { - enif_mutex_unlock(q->reqs_mutex); - qid = (qid + 1) % async_nif->num_queues; - q = &async_nif->queues[qid]; + enif_mutex_unlock(q->reqs_mutex); + qid = (qid + 1) % async_nif->num_queues; } - } while(n-- > 0); + } - if (n == 0) return 0; // All queues are full, trigger eagain + /* If the for loop finished then we didn't find a suitable queue for this + request, meaning we're backed up so trigger eagain. */ + if (i == async_nif->num_queues) { + enif_mutex_unlock(q->reqs_mutex); + return 0; + } - /* We hold the queue's lock, and we've seletect a reasonable queue for this - new request now check to make sure there are enough workers actively - processing requests on this queue. */ - if (q->num_workers == 0 || avg_wait_this_q >= avg_wt_service_time) { - if (async_nif_start_worker(async_nif, q) == 0) - q->num_workers++; + /* We've selected queue for this new request now check to make sure there are + enough workers actively processing requests on this queue. */ + if (q->num_workers < ASYNC_NIF_MIN_WORKERS) { + if (async_nif_start_worker(async_nif, q) == 0) q->num_workers++; + } else { + /* If more the 1/4 of the time it takes to complete a request is spent in + waiting to be serviced then we need more worker threads to service + requests. */ + double m, n; + m = __stat_mean(async_nif->wait_stat); + n = __stat_mean(q->work_stat); + DPRINTF("w: %f\ts: %f\t%f", m, n, n/(m+n)); + if (m && n && n / (m+n) < 0.75) + if (async_nif_start_worker(async_nif, q) == 0) q->num_workers++; } /* And finally add the request to the queue. */ - __stat_tick(q->qwait_stat); STAILQ_INSERT_TAIL(&q->reqs, req, entries); + q->depth++; /* Build the term before releasing the lock so as not to race on the use of the req pointer (which will soon become invalid in another thread @@ -385,36 +403,40 @@ async_nif_worker_fn(void *arg) } if (STAILQ_EMPTY(&q->reqs)) { /* Queue is empty so we wait for more work to arrive. */ - if (q->num_workers > 2) { - enif_mutex_unlock(q->reqs_mutex); - break; + if (q->num_workers > ASYNC_NIF_MIN_WORKERS) { + enif_mutex_unlock(q->reqs_mutex); + break; } else { - enif_cond_wait(q->reqs_cnd, q->reqs_mutex); - goto check_again_for_work; + enif_cond_wait(q->reqs_cnd, q->reqs_mutex); + goto check_again_for_work; } } else { /* At this point the next req is ours to process and we hold the reqs_mutex lock. Take the request off the queue. */ req = STAILQ_FIRST(&q->reqs); STAILQ_REMOVE(&q->reqs, req, async_nif_req_entry, entries); + q->depth--; /* Ensure that there is at least one other worker thread watching this queue. */ enif_cond_signal(q->reqs_cnd); enif_mutex_unlock(q->reqs_mutex); - __stat_tock(q->qwait_stat); - /* Perform the work. */ - __stat_tick(q->wt_stat); + uint64_t then = ts(q->work_stat->d.unit); + uint64_t wait = then - req->submitted; req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args); - __stat_tock(q->wt_stat); + uint64_t work = ts(q->work_stat->d.unit) - then; + __stat_add(async_nif->wait_stat, wait); + __stat_add(q->work_stat, work); + __stat_add(async_nif->service_stat, wait + work); /* Now call the post-work cleanup function. */ req->fn_post(req->args); /* Clean up req for reuse. */ req->ref = 0; + req->submitted = 0; req->fn_work = 0; req->fn_post = 0; enif_free(req->args); @@ -427,10 +449,6 @@ async_nif_worker_fn(void *arg) SLIST_INSERT_HEAD(&async_nif->we_joining, we, entries); enif_mutex_unlock(async_nif->we_mutex); q->num_workers--; - if (q->num_workers == 0) { - __stat_reset(q->qwait_stat); - __stat_reset(q->wt_stat); - } enif_thread_exit(0); return 0; } @@ -445,12 +463,11 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif) struct async_nif_worker_entry *we = NULL; UNUSED(env); - /* Signal the worker threads, stop what you're doing and exit. To - ensure that we don't race with the enqueue() process we first - lock all the worker queues, then set shutdown to true, then - unlock. The enqueue function will take the queue mutex, then - test for shutdown condition, then enqueue only if not shutting - down. */ + /* Signal the worker threads, stop what you're doing and exit. To ensure + that we don't race with the enqueue() process we first lock all the worker + queues, then set shutdown to true, then unlock. The enqueue function will + take the queue mutex, then test for shutdown condition, then enqueue only + if not shutting down. */ for (i = 0; i < num_queues; i++) { q = &async_nif->queues[i]; enif_mutex_lock(q->reqs_mutex); @@ -462,26 +479,25 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif) for (i = 0; i < num_queues; i++) { q = &async_nif->queues[i]; - __stat_print_histogram(async_nif->queues[i].qwait_stat, "wterl q-wait"); enif_mutex_unlock(q->reqs_mutex); } - __stat_print_histogram(async_nif->queues[i].qwait_stat, "wterl service time"); + __stat_print_histogram(async_nif->service_stat, "wterl"); /* Join for the now exiting worker threads. */ while(async_nif->we_active > 0) { for (i = 0; i < num_queues; i++) - enif_cond_broadcast(async_nif->queues[i].reqs_cnd); + enif_cond_broadcast(async_nif->queues[i].reqs_cnd); we = SLIST_FIRST(&async_nif->we_joining); while(we != NULL) { - struct async_nif_worker_entry *n = SLIST_NEXT(we, entries); - SLIST_REMOVE_HEAD(&async_nif->we_joining, entries); - void *exit_value = 0; /* We ignore the thread_join's exit value. */ - enif_thread_join(we->tid, &exit_value); - enif_free(we); - async_nif->we_active--; - we = n; + struct async_nif_worker_entry *n = SLIST_NEXT(we, entries); + SLIST_REMOVE_HEAD(&async_nif->we_joining, entries); + void *exit_value = 0; /* We ignore the thread_join's exit value. */ + enif_thread_join(we->tid, &exit_value); + enif_free(we); + async_nif->we_active--; + we = n; } } enif_mutex_destroy(async_nif->we_mutex); @@ -570,14 +586,15 @@ async_nif_load() async_nif->recycled_req_mutex = enif_mutex_create(NULL); async_nif->we_mutex = enif_mutex_create(NULL); SLIST_INIT(&async_nif->we_joining); + async_nif->wait_stat = __stat_init(100000); + async_nif->service_stat = __stat_init(100000); for (i = 0; i < async_nif->num_queues; i++) { struct async_nif_work_queue *q = &async_nif->queues[i]; STAILQ_INIT(&q->reqs); q->reqs_mutex = enif_mutex_create(NULL); q->reqs_cnd = enif_cond_create(NULL); - q->qwait_stat = __stat_init(100000); - q->wt_stat = __stat_init(100000); + q->work_stat = __stat_init(100000); } return async_nif; } diff --git a/c_src/stats.c b/c_src/stats.c index 9e6938c..d018c76 100644 --- a/c_src/stats.c +++ b/c_src/stats.c @@ -64,18 +64,20 @@ __stat_mean(struct stat *s) double mean; if (!s) - return 0.0; + return 0.0; + enif_mutex_lock(s->mutex); t = s->h; h = (s->h + 1) % s->num_samples; mean = 0; while (h != t) { - mean += s->samples[h]; - h = (h + 1) % s->num_samples; + mean += s->samples[h]; + h = (h + 1) % s->num_samples; } if (mean > 0) - mean /= (s->n < s->num_samples ? (double)s->n : (double)s->num_samples); + mean /= (s->n < s->num_samples ? (double)s->n : (double)s->num_samples); + enif_mutex_unlock(s->mutex); return mean; } @@ -86,13 +88,14 @@ __stat_mean_log2(struct stat *s) double mean = 0.0; if (!s) - return 0.0; + return 0.0; + enif_mutex_lock(s->mutex); for (i = 0; i < 64; i++) - mean += (s->histogram[i] * i); + mean += (s->histogram[i] * i); if (mean > 0) - mean /= (s->n < s->num_samples ? s->n : s->num_samples); - DPRINTF("n: %u %llu %f", s->n < 64 ? s->n : 64, PRIuint64(s), mean); + mean /= (s->n < s->num_samples ? s->n : s->num_samples); + enif_mutex_unlock(s->mutex); return mean; } @@ -102,10 +105,12 @@ __stat_tick(struct stat *s) uint64_t t; if (!s) - return 0.0; + return 0.0; + enif_mutex_lock(s->mutex); t = ts(s->d.unit); s->d.then = t; + enif_mutex_unlock(s->mutex); return t; } @@ -113,45 +118,48 @@ void __stat_reset(struct stat *s) { if (!s) - return; + return; + enif_mutex_lock(s->mutex); s->h = 0; s->d.unit = ns; s->d.then = 0; memset(s->histogram, 0, sizeof(uint64_t) * 64); memset(s->samples, 0, sizeof(uint64_t) * s->num_samples); + enif_mutex_unlock(s->mutex); } -uint64_t -__stat_tock(struct stat *s) +void +__stat_add(struct stat *s, uint64_t elapsed) { - uint64_t now; - uint64_t elapsed; uint32_t i; - duration_t *d; - if (!s) - return 0.0; - - d = &s->d; - now = ts(d->unit); - elapsed = now - d->then; - i = s->h; + enif_mutex_lock(s->mutex); if (s->n == s->num_samples) { - s->mean = (s->mean + __stat_mean(s)) / 2.0; - if (s->n >= 4294967295) - __stat_reset(s); + s->mean = (s->mean + __stat_mean(s)) / 2.0; + if (s->n >= 4294967295) { + enif_mutex_unlock(s->mutex); + __stat_reset(s); + enif_mutex_lock(s->mutex); + } } + i = s->h; s->h = (s->h + 1) % s->num_samples; s->samples[i] = elapsed; if (elapsed != 0 && elapsed < s->min) - s->min = elapsed; + s->min = elapsed; if (elapsed > s->max) - s->max = elapsed; + s->max = elapsed; s->histogram[LOG2(elapsed)]++; s->n++; - d->then = ts(d->unit); - return elapsed; + enif_mutex_unlock(s->mutex); +} + +void +__stat_tock(struct stat *s) +{ + if (s) + __stat_add(s, ts(s->d.unit)); } void @@ -162,68 +170,73 @@ __stat_print_histogram(struct stat *s, const char *mod) double m; if (!s) - return; + return; + enif_mutex_lock(s->mutex); m = (s->mean + __stat_mean(s) / 2.0); fprintf(stderr, "%s:async_nif request latency histogram:\n", mod); for (i = 0; i < 64; i++) { - logs[i] = LOG2(s->histogram[i]); - if (logs[i] > max_log) - max_log = logs[i]; + logs[i] = LOG2(s->histogram[i]); + if (logs[i] > max_log) + max_log = logs[i]; } for (i = max_log; i > 0; i--) { - if (!(i % 10)) - fprintf(stderr, "2^%2d ", i); - else - fprintf(stderr, " "); - for(j = 0; j < 64; j++) - fprintf(stderr, logs[j] >= i ? "•" : " "); - fprintf(stderr, "\n"); + if (!(i % 10)) + fprintf(stderr, "2^%2d ", i); + else + fprintf(stderr, " "); + for(j = 0; j < 64; j++) + fprintf(stderr, logs[j] >= i ? "•" : " "); + fprintf(stderr, "\n"); } if (max_log == 0) { - fprintf(stderr, "[empty]\n"); + fprintf(stderr, "[empty]\n"); } else { - fprintf(stderr, " ns μs ms s ks\n"); - fprintf(stderr, "min: "); - if (s->min < 1000) - fprintf(stderr, "%llu (ns)", PRIuint64(s->min)); - else if (s->min < 1000000) - fprintf(stderr, "%.2f (μs)", s->min / 1000.0); - else if (s->min < 1000000000) - fprintf(stderr, "%.2f (ms)", s->min / 1000000.0); - else if (s->min < 1000000000000) - fprintf(stderr, "%.2f (s)", s->min / 1000000000.0); - fprintf(stderr, " max: "); - if (s->max < 1000) - fprintf(stderr, "%llu (ns)", PRIuint64(s->max)); - else if (s->max < 1000000) - fprintf(stderr, "%.2f (μs)", s->max / 1000.0); - else if (s->max < 1000000000) - fprintf(stderr, "%.2f (ms)", s->max / 1000000.0); - else if (s->max < 1000000000000) - fprintf(stderr, "%.2f (s)", s->max / 1000000000.0); - fprintf(stderr, " mean: "); - if (m < 1000) - fprintf(stderr, "%.2f (ns)", m); - else if (m < 1000000) - fprintf(stderr, "%.2f (μs)", m / 1000.0); - else if (m < 1000000000) - fprintf(stderr, "%.2f (ms)", m / 1000000.0); - else if (m < 1000000000000) - fprintf(stderr, "%.2f (s)", m / 1000000000.0); - fprintf(stderr, "\n"); + fprintf(stderr, " ns μs ms s ks\n"); + fprintf(stderr, "min: "); + if (s->min < 1000) + fprintf(stderr, "%llu (ns)", PRIuint64(s->min)); + else if (s->min < 1000000) + fprintf(stderr, "%.2f (μs)", s->min / 1000.0); + else if (s->min < 1000000000) + fprintf(stderr, "%.2f (ms)", s->min / 1000000.0); + else if (s->min < 1000000000000) + fprintf(stderr, "%.2f (s)", s->min / 1000000000.0); + fprintf(stderr, " max: "); + if (s->max < 1000) + fprintf(stderr, "%llu (ns)", PRIuint64(s->max)); + else if (s->max < 1000000) + fprintf(stderr, "%.2f (μs)", s->max / 1000.0); + else if (s->max < 1000000000) + fprintf(stderr, "%.2f (ms)", s->max / 1000000.0); + else if (s->max < 1000000000000) + fprintf(stderr, "%.2f (s)", s->max / 1000000000.0); + fprintf(stderr, " mean: "); + if (m < 1000) + fprintf(stderr, "%.2f (ns)", m); + else if (m < 1000000) + fprintf(stderr, "%.2f (μs)", m / 1000.0); + else if (m < 1000000000) + fprintf(stderr, "%.2f (ms)", m / 1000000.0); + else if (m < 1000000000000) + fprintf(stderr, "%.2f (s)", m / 1000000000.0); + fprintf(stderr, "\n"); } fflush(stderr); + enif_mutex_unlock(s->mutex); } void __stat_free(struct stat *s) { if (!s) - return; + return; + enif_mutex_lock(s->mutex); enif_free(s->samples); + enif_mutex_unlock(s->mutex); + enif_mutex_destroy(s->mutex); enif_free(s); } @@ -232,7 +245,7 @@ __stat_init(uint32_t n) { struct stat *s = enif_alloc(sizeof(struct stat) + (sizeof(uint64_t) * n)); if (!s) - return NULL; + return NULL; memset(s, 0, sizeof(struct stat) + (sizeof(uint64_t) * n)); s->min = ~0; s->max = 0; @@ -240,5 +253,6 @@ __stat_init(uint32_t n) s->h = 0; s->num_samples = n; s->d.unit = ns; + s->mutex = enif_mutex_create(NULL); return s; } diff --git a/c_src/stats.h b/c_src/stats.h index 0bdbe8e..f0e550f 100644 --- a/c_src/stats.h +++ b/c_src/stats.h @@ -30,6 +30,7 @@ extern "C" { #define STAT_DEF(name) struct stat *name ## _stat; struct stat { + ErlNifMutex *mutex; duration_t d; uint32_t h, n, num_samples; uint64_t min, max; @@ -41,8 +42,9 @@ struct stat { extern double __stat_mean(struct stat *s); extern double __stat_mean_log2(struct stat *s); extern uint64_t __stat_tick(struct stat *s); +extern void __stat_add(struct stat *s, uint64_t d); extern void __stat_reset(struct stat *s); -extern uint64_t __stat_tock(struct stat *s); +extern void __stat_tock(struct stat *s); extern void __stat_print_histogram(struct stat *s, const char *mod); extern void __stat_free(struct stat *s); extern struct stat *__stat_init(uint32_t n); diff --git a/c_src/wterl.c b/c_src/wterl.c index 7526ef3..c38654f 100644 --- a/c_src/wterl.c +++ b/c_src/wterl.c @@ -197,10 +197,12 @@ __ctx_cache_evict(WterlConnHandle *conn_handle) uint64_t now, elapsed; struct wterl_ctx *c, *n; +#ifndef DEBUG if (conn_handle->cache_size < MAX_CACHE_SIZE) return 0; +#endif - now = cpu_clock_ticks(); + now = ts(ns); // Find the mean of the recorded times that items stayed in cache. mean = 0; @@ -226,7 +228,8 @@ __ctx_cache_evict(WterlConnHandle *conn_handle) if (log > mean) { STAILQ_REMOVE(&conn_handle->cache, c, wterl_ctx, entries); DPRINTF("evicting: %llu", PRIuint64(c->sig)); - c->session->close(c->session, NULL); + if (c->session) + c->session->close(c->session, NULL); enif_free(c); num_evicted++; } @@ -256,22 +259,15 @@ __ctx_cache_find(WterlConnHandle *conn_handle, const uint64_t sig) if (c->sig == sig) { // TODO: hash collisions *will* lead to SEGVs // cache hit: STAILQ_REMOVE(&conn_handle->cache, c, wterl_ctx, entries); - conn_handle->histogram[__log2(cpu_clock_ticks() - c->tstamp)]++; + conn_handle->histogram[__log2(ts(ns) - c->tstamp)]++; conn_handle->histogram_count++; conn_handle->cache_size -= 1; break; } c = STAILQ_NEXT(c, entries); } -#ifdef DEBUG - uint32_t sz = 0; - struct wterl_ctx *f; - STAILQ_FOREACH(f, &conn_handle->cache, entries) { - sz++; - } -#endif enif_mutex_unlock(conn_handle->cache_mutex); - DPRINTF("cache_find: [%u:%u] %s (%p)", sz, conn_handle->cache_size, c ? "hit" : "miss", c); + DPRINTF("cache_find: [%u] %s (%p)", conn_handle->cache_size, c ? "hit" : "miss", c); return c; } @@ -286,14 +282,14 @@ __ctx_cache_add(WterlConnHandle *conn_handle, struct wterl_ctx *c) { enif_mutex_lock(conn_handle->cache_mutex); __ctx_cache_evict(conn_handle); - c->tstamp = cpu_clock_ticks(); + c->tstamp = ts(ns); STAILQ_INSERT_TAIL(&conn_handle->cache, c, entries); conn_handle->cache_size += 1; #ifdef DEBUG uint32_t sz = 0; struct wterl_ctx *f; STAILQ_FOREACH(f, &conn_handle->cache, entries) { - sz++; + sz++; } #endif enif_mutex_unlock(conn_handle->cache_mutex); @@ -336,7 +332,7 @@ __retain_ctx(WterlConnHandle *conn_handle, uint32_t worker_id, hash = __str_hash(hash, session_config, l); crc = __crc32(crc, session_config, l); sig_len += l + 1; - DPRINTF("sig/1: %s", session_config); + DPRINTF("sig/1: %s", session_config); } else { sig_len += 1; } @@ -344,7 +340,7 @@ __retain_ctx(WterlConnHandle *conn_handle, uint32_t worker_id, arg = va_arg(ap, const char *); if (arg) { l = __strlen(arg); - DPRINTF("sig/args: %s", arg); + DPRINTF("sig/args: %s", arg); hash = __str_hash(hash, arg, l); crc = __crc32(crc, arg, l); sig_len += l + 1; @@ -360,21 +356,21 @@ __retain_ctx(WterlConnHandle *conn_handle, uint32_t worker_id, c = conn_handle->mru_ctx[worker_id]; if (CASPO(&conn_handle->mru_ctx[worker_id], c, 0) == c) { - if (c == 0) { - // mru miss: - DPRINTF("[%.4u] mru miss, empty", worker_id); - *ctx = NULL; - } else { - if (c->sig == sig) { - // mru hit: - DPRINTF("[%.4u] mru hit: %llu found", worker_id, PRIuint64(sig)); - *ctx = c; - } else { - // mru mismatch: - DPRINTF("[%.4u] mru miss: %llu != %llu", worker_id, PRIuint64(sig), PRIuint64(c->sig)); - __ctx_cache_add(conn_handle, c); - *ctx = NULL; - } + if (c == 0) { + // mru miss: + DPRINTF("[%.4u] mru miss, empty", worker_id); + *ctx = NULL; + } else { + if (c->sig == sig) { + // mru hit: + DPRINTF("[%.4u] mru hit: %llu found", worker_id, PRIuint64(sig)); + *ctx = c; + } else { + // mru mismatch: + DPRINTF("[%.4u] mru miss: %llu != %llu", worker_id, PRIuint64(sig), PRIuint64(c->sig)); + __ctx_cache_add(conn_handle, c); + *ctx = NULL; + } } } @@ -2309,7 +2305,7 @@ static void __wterl_conn_dtor(ErlNifEnv* env, void* obj) WterlConnHandle *conn_handle = (WterlConnHandle *)obj; if (conn_handle->cache_mutex) { - DPRINTF("Non-NULL conn_handle (%p) to free", obj); + DPRINTF("conn_handle dtor free'ing (%p)", obj); enif_mutex_lock(conn_handle->cache_mutex); __close_all_sessions(conn_handle); conn_handle->conn->close(conn_handle->conn, NULL); diff --git a/src/async_nif.hrl b/src/async_nif.hrl index 9d0f215..9034d8a 100644 --- a/src/async_nif.hrl +++ b/src/async_nif.hrl @@ -26,9 +26,6 @@ async_nif_enqueue(R, F, A) -> case erlang:apply(F, [R|A]) of {ok, enqueued} -> receive - {R, {error, eagain}} -> - %% Work unit was not queued, try again. - async_nif_enqueue(R, F, A); {R, {error, shutdown}=Error} -> %% Work unit was queued, but not executed. Error; @@ -38,6 +35,11 @@ async_nif_enqueue(R, F, A) -> {R, Reply} -> Reply end; + {error, eagain} -> + %% Work unit was not queued, try again. + async_nif_enqueue(R, F, A); + %{error, enomem} -> + %{error, shutdown} -> Other -> Other end. diff --git a/src/wterl.erl b/src/wterl.erl index db44807..9045be2 100644 --- a/src/wterl.erl +++ b/src/wterl.erl @@ -524,7 +524,7 @@ set_event_handler_pid(Pid) -define(TEST_DATA_DIR, "test/wterl.basic"). open_test_conn(DataDir) -> - open_test_conn(DataDir, [{create,true},{cache_size,"100MB"},{session_max, 8192}]). + open_test_conn(DataDir, [{create,true},{cache_size,"1GB"},{session_max, 8192}]). open_test_conn(DataDir, OpenConfig) -> {ok, CWD} = file:get_cwd(), rmdir:path(filename:join([CWD, DataDir])), %?cmd("rm -rf " ++ filename:join([CWD, DataDir])), @@ -606,7 +606,7 @@ insert_delete_test() -> many_open_tables_test_() -> {timeout, 120, fun() -> - ConnOpts = [{create,true},{cache_size,"100MB"},{session_max, 8192}], + ConnOpts = [{create,true},{cache_size,"1GB"},{session_max, 8192}], DataDir = ?TEST_DATA_DIR, KeyGen = fun(X) -> @@ -620,19 +620,31 @@ many_open_tables_test_() -> fun(X) -> "lsm:" ++ integer_to_list(X) end, - N = 1000, + NumTables = 16, N = 100, ConnRef = open_test_conn(DataDir, ConnOpts), Parent = self(), - [wterl:create(ConnRef, TableNameGen(X), [{checksum, "uncompressed"}]) || X <- lists:seq(0, 128)], + [ok = wterl:create(ConnRef, TableNameGen(X), [{checksum, "uncompressed"}]) || X <- lists:seq(0, NumTables)], [spawn(fun() -> TableName = TableNameGen(X), - [wterl:put(ConnRef, TableName, KeyGen(P), ValGen()) || P <- lists:seq(1, N)], - [wterl:get(ConnRef, TableName, KeyGen(P)) || P <- lists:seq(1, N)], - [wterl:delete(ConnRef, TableName, KeyGen(P)) || P <- lists:seq(1, N)], + [case wterl:put(ConnRef, TableName, KeyGen(P), ValGen()) of + ok -> ok; + {error, {enoent, _}} -> io:format("put failed, table missing ~p~n", [TableName]) + end || P <- lists:seq(1, N)], + [case wterl:get(ConnRef, TableName, KeyGen(P)) of + {ok, _} -> ok; + {error, {enoent, _}} -> io:format("get failed, table missing ~p~n", [TableName]) + end || P <- lists:seq(1, N)], + [case wterl:delete(ConnRef, TableName, KeyGen(P)) of + ok -> ok; + {error, {enoent, _}} -> io:format("delete failed, table missing ~p~n", [TableName]) + end || P <- lists:seq(1, N)], Parent ! done - end) || X <- lists:seq(0, 128)], - [wterl:drop(ConnRef, TableNameGen(X)) || X <- lists:seq(0, 128)], - [receive done -> ok end || _ <- lists:seq(0, 128)], + end) || X <- lists:seq(0, NumTables)], + [receive done -> ok end || _ <- lists:seq(0, NumTables)], + [case wterl:drop(ConnRef, TableNameGen(X)) of + ok -> ok; + {error, {enoent, _}} -> io:format("drop failed, table missing ~p~n", [TableNameGen(X)]) + end || X <- lists:seq(0, NumTables)], ok = wterl:connection_close(ConnRef) end}.