diff --git a/Makefile b/Makefile index e485507..0e2e357 100644 --- a/Makefile +++ b/Makefile @@ -84,9 +84,9 @@ repl: @$(ERL) -pa ebin -pz deps/lager/ebin eunit-repl: - @$(ERL) -pa .eunit deps/lager/ebin + @$(ERL) erl -pa .eunit -pz deps/lager/ebin -ERL_TOP= /home/gburd/eng/otp_R15B01 +ERL_TOP= /home/gburd/repos/otp_R15B01 CERL= ${ERL_TOP}/bin/cerl VALGRIND_MISC_FLAGS= "--verbose --leak-check=full --show-reachable=yes --trace-children=yes --track-origins=yes --suppressions=${ERL_TOP}/erts/emulator/valgrind/suppress.standard --show-possibly-lost=no --malloc-fill=AB --free-fill=CD" diff --git a/c_src/async_nif.h b/c_src/async_nif.h index e7a9670..92ebe66 100644 --- a/c_src/async_nif.h +++ b/c_src/async_nif.h @@ -4,18 +4,16 @@ * Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved. * Author: Gregory Burd * - * This file is provided to you under the Apache License, - * Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain - * a copy of the License at + * This file is provided to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations * under the License. */ @@ -27,18 +25,17 @@ extern "C" { #endif #include -#include "fifo_q.h" -#include "stats.h" -#ifndef __UNUSED -#define __UNUSED(v) ((void)(v)) +#include "queue.h" + +#ifndef UNUSED +#define UNUSED(v) ((void)(v)) #endif -#define ASYNC_NIF_MAX_WORKERS 128 -#define ASYNC_NIF_WORKER_QUEUE_SIZE 500 -#define ASYNC_NIF_MAX_QUEUED_REQS 1000 * ASYNC_NIF_MAX_WORKERS - -STAT_DECL(qwait, 1000); +#define ASYNC_NIF_MAX_WORKERS 1024 +#define ASYNC_NIF_MIN_WORKERS 2 +#define ASYNC_NIF_WORKER_QUEUE_SIZE 100 +#define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS struct async_nif_req_entry { ERL_NIF_TERM ref; @@ -47,14 +44,16 @@ struct async_nif_req_entry { void *args; void (*fn_work)(ErlNifEnv*, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *); void (*fn_post)(void *); + STAILQ_ENTRY(async_nif_req_entry) entries; }; -DECL_FIFO_QUEUE(reqs, struct async_nif_req_entry); + struct async_nif_work_queue { - STAT_DEF(qwait); + unsigned int num_workers; + unsigned int depth; ErlNifMutex *reqs_mutex; ErlNifCond *reqs_cnd; - FIFO_QUEUE_TYPE(reqs) reqs; + STAILQ_HEAD(reqs, async_nif_req_entry) reqs; }; struct async_nif_worker_entry { @@ -62,16 +61,17 @@ struct async_nif_worker_entry { unsigned int worker_id; struct async_nif_state *async_nif; struct async_nif_work_queue *q; + SLIST_ENTRY(async_nif_worker_entry) entries; }; struct async_nif_state { - STAT_DEF(qwait); unsigned int shutdown; - unsigned int num_workers; - struct async_nif_worker_entry worker_entries[ASYNC_NIF_MAX_WORKERS]; + ErlNifMutex *we_mutex; + unsigned int we_active; + SLIST_HEAD(joining, async_nif_worker_entry) we_joining; unsigned int num_queues; unsigned int next_q; - FIFO_QUEUE_TYPE(reqs) recycled_reqs; + STAILQ_HEAD(recycled_reqs, async_nif_req_entry) recycled_reqs; unsigned int num_reqs; ErlNifMutex *recycled_req_mutex; struct async_nif_work_queue queues[]; @@ -80,37 +80,46 @@ struct async_nif_state { #define ASYNC_NIF_DECL(decl, frame, pre_block, work_block, post_block) \ struct decl ## _args frame; \ static void fn_work_ ## decl (ErlNifEnv *env, ERL_NIF_TERM ref, ErlNifPid *pid, unsigned int worker_id, struct decl ## _args *args) { \ - __UNUSED(worker_id); \ + UNUSED(worker_id); \ + DPRINTF("async_nif: calling \"%s\"", __func__); \ do work_block while(0); \ + DPRINTF("async_nif: returned from \"%s\"", __func__); \ } \ static void fn_post_ ## decl (struct decl ## _args *args) { \ - __UNUSED(args); \ + UNUSED(args); \ + DPRINTF("async_nif: calling \"fn_post_%s\"", #decl); \ do post_block while(0); \ + DPRINTF("async_nif: returned from \"fn_post_%s\"", #decl); \ } \ static ERL_NIF_TERM decl(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv_in[]) { \ struct decl ## _args on_stack_args; \ struct decl ## _args *args = &on_stack_args; \ struct decl ## _args *copy_of_args; \ struct async_nif_req_entry *req = NULL; \ - const char *affinity = NULL; \ + unsigned int affinity = 0; \ ErlNifEnv *new_env = NULL; \ /* argv[0] is a ref used for selective recv */ \ const ERL_NIF_TERM *argv = argv_in + 1; \ argc -= 1; \ /* Note: !!! this assumes that the first element of priv_data is ours */ \ struct async_nif_state *async_nif = *(struct async_nif_state**)enif_priv_data(env); \ - if (async_nif->shutdown) \ + if (async_nif->shutdown) { \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \ enif_make_atom(env, "shutdown")); \ + } \ req = async_nif_reuse_req(async_nif); \ + if (!req) { \ + return enif_make_tuple2(env, enif_make_atom(env, "error"), \ + enif_make_atom(env, "eagain")); \ + } \ new_env = req->env; \ - if (!req) \ - return enif_make_tuple2(env, enif_make_atom(env, "error"), \ - enif_make_atom(env, "eagain")); \ + DPRINTF("async_nif: calling \"%s\"", __func__); \ do pre_block while(0); \ + DPRINTF("async_nif: returned from \"%s\"", __func__); \ copy_of_args = (struct decl ## _args *)enif_alloc(sizeof(struct decl ## _args)); \ if (!copy_of_args) { \ fn_post_ ## decl (args); \ + async_nif_recycle_req(req, async_nif); \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \ enif_make_atom(env, "enomem")); \ } \ @@ -122,13 +131,14 @@ struct async_nif_state { req->fn_post = (void (*)(void *))fn_post_ ## decl; \ int h = -1; \ if (affinity) \ - h = async_nif_str_hash_func(affinity) % async_nif->num_queues; \ + h = ((unsigned int)affinity) % async_nif->num_queues; \ ERL_NIF_TERM reply = async_nif_enqueue_req(async_nif, req, h); \ if (!reply) { \ fn_post_ ## decl (args); \ + async_nif_recycle_req(req, async_nif); \ enif_free(copy_of_args); \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \ - enif_make_atom(env, "shutdown")); \ + enif_make_atom(env, "eagain")); \ } \ return reply; \ } @@ -179,26 +189,26 @@ async_nif_reuse_req(struct async_nif_state *async_nif) ErlNifEnv *env = NULL; enif_mutex_lock(async_nif->recycled_req_mutex); - if (fifo_q_empty(reqs, async_nif->recycled_reqs)) { + if (STAILQ_EMPTY(&async_nif->recycled_reqs)) { if (async_nif->num_reqs < ASYNC_NIF_MAX_QUEUED_REQS) { req = enif_alloc(sizeof(struct async_nif_req_entry)); if (req) { memset(req, 0, sizeof(struct async_nif_req_entry)); env = enif_alloc_env(); - if (!env) { - enif_free(req); - req = NULL; - } else { + if (env) { req->env = env; async_nif->num_reqs++; + } else { + enif_free(req); + req = NULL; } } } } else { - req = fifo_q_get(reqs, async_nif->recycled_reqs); + req = STAILQ_FIRST(&async_nif->recycled_reqs); + STAILQ_REMOVE(&async_nif->recycled_reqs, req, async_nif_req_entry, entries); } enif_mutex_unlock(async_nif->recycled_req_mutex); - STAT_TICK(async_nif, qwait); return req; } @@ -212,27 +222,59 @@ async_nif_reuse_req(struct async_nif_state *async_nif) void async_nif_recycle_req(struct async_nif_req_entry *req, struct async_nif_state *async_nif) { - STAT_TOCK(async_nif, qwait); + ErlNifEnv *env = NULL; enif_mutex_lock(async_nif->recycled_req_mutex); - fifo_q_put(reqs, async_nif->recycled_reqs, req); + enif_clear_env(req->env); + env = req->env; + memset(req, 0, sizeof(struct async_nif_req_entry)); + req->env = env; + STAILQ_INSERT_TAIL(&async_nif->recycled_reqs, req, entries); enif_mutex_unlock(async_nif->recycled_req_mutex); } +static void *async_nif_worker_fn(void *); + /** - * A string hash function. - * - * A basic hash function for strings of characters used during the - * affinity association. - * - * s a NULL terminated set of bytes to be hashed - * -> an integer hash encoding of the bytes + * Start up a worker thread. */ -static inline unsigned int -async_nif_str_hash_func(const char *s) +static int +async_nif_start_worker(struct async_nif_state *async_nif, struct async_nif_work_queue *q) { - unsigned int h = (unsigned int)*s; - if (h) for (++s ; *s; ++s) h = (h << 5) - h + (unsigned int)*s; - return h; + struct async_nif_worker_entry *we; + + if (0 == q) + return EINVAL; + + enif_mutex_lock(async_nif->we_mutex); + + we = SLIST_FIRST(&async_nif->we_joining); + while(we != NULL) { + struct async_nif_worker_entry *n = SLIST_NEXT(we, entries); + SLIST_REMOVE(&async_nif->we_joining, we, async_nif_worker_entry, entries); + void *exit_value = 0; /* We ignore the thread_join's exit value. */ + enif_thread_join(we->tid, &exit_value); + enif_free(we); + async_nif->we_active--; + we = n; + } + + if (async_nif->we_active == ASYNC_NIF_MAX_WORKERS) { + enif_mutex_unlock(async_nif->we_mutex); + return EAGAIN; + } + + we = enif_alloc(sizeof(struct async_nif_worker_entry)); + if (!we) { + enif_mutex_unlock(async_nif->we_mutex); + return ENOMEM; + } + memset(we, 0, sizeof(struct async_nif_worker_entry)); + we->worker_id = async_nif->we_active++; + we->async_nif = async_nif; + we->q = q; + + enif_mutex_unlock(async_nif->we_mutex); + return enif_thread_create(NULL,&we->tid, &async_nif_worker_fn, (void*)we, 0); } /** @@ -245,9 +287,9 @@ static ERL_NIF_TERM async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req, int hint) { /* Identify the most appropriate worker for this request. */ - unsigned int qid = 0; + unsigned int i, qid = 0; struct async_nif_work_queue *q = NULL; - unsigned int n = async_nif->num_queues; + double avg_depth; /* Either we're choosing a queue based on some affinity/hinted value or we need to select the next queue in the rotation and atomically update that @@ -262,46 +304,68 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en /* Now we inspect and interate across the set of queues trying to select one that isn't too full or too slow. */ - do { + for (i = 0; i < async_nif->num_queues; i++) { + /* Compute the average queue depth not counting queues which are empty or + the queue we're considering right now. */ + unsigned int j, n = 0; + for (j = 0; j < async_nif->num_queues; j++) { + if (j != qid && async_nif->queues[j].depth != 0) { + n++; + avg_depth += async_nif->queues[j].depth; + } + } + if (avg_depth != 0) + avg_depth /= n; + + /* Lock this queue under consideration, then check for shutdown. While + we hold this lock either a) we're shutting down so exit now or b) this + queue will be valid until we release the lock. */ q = &async_nif->queues[qid]; enif_mutex_lock(q->reqs_mutex); - - /* Now that we hold the lock, check for shutdown. As long as we hold - this lock either a) we're shutting down so exit now or b) this queue - will be valid until we release the lock. */ if (async_nif->shutdown) { enif_mutex_unlock(q->reqs_mutex); return 0; } - double await = STAT_MEAN_LOG2_SAMPLE(async_nif, qwait); - double await_inthisq = STAT_MEAN_LOG2_SAMPLE(q, qwait); - if (fifo_q_full(reqs, q->reqs) || await_inthisq > await) { + + /* Try not to enqueue a request into a queue that isn't keeping up with + the request volume. */ + if (q->depth <= avg_depth) break; + else { enif_mutex_unlock(q->reqs_mutex); qid = (qid + 1) % async_nif->num_queues; - q = &async_nif->queues[qid]; - } else { - break; } - // TODO: at some point add in work sheading/stealing - } while(n-- > 0); + } - /* We hold the queue's lock, and we've seletect a reasonable queue for this - new request so add the request. */ - STAT_TICK(q, qwait); - fifo_q_put(reqs, q->reqs, req); + /* If the for loop finished then we didn't find a suitable queue for this + request, meaning we're backed up so trigger eagain. */ + if (i == async_nif->num_queues) { + enif_mutex_unlock(q->reqs_mutex); + return 0; + } + + /* Add the request to the queue. */ + STAILQ_INSERT_TAIL(&q->reqs, req, entries); + q->depth++; + + /* We've selected a queue for this new request now check to make sure there are + enough workers actively processing requests on this queue. */ + if (q->depth > q->num_workers) + if (async_nif_start_worker(async_nif, q) == 0) q->num_workers++; /* Build the term before releasing the lock so as not to race on the use of the req pointer (which will soon become invalid in another thread performing the request). */ ERL_NIF_TERM reply = enif_make_tuple2(req->env, enif_make_atom(req->env, "ok"), enif_make_atom(req->env, "enqueued")); - enif_mutex_unlock(q->reqs_mutex); enif_cond_signal(q->reqs_cnd); + enif_mutex_unlock(q->reqs_mutex); return reply; } /** - * TODO: + * Worker threads execute this function. Here each worker pulls requests of + * their respective queues, executes that work and continues doing that until + * they see the shutdown flag is set at which point they exit. */ static void * async_nif_worker_fn(void *arg) @@ -320,26 +384,29 @@ async_nif_worker_fn(void *arg) enif_mutex_unlock(q->reqs_mutex); break; } - if (fifo_q_empty(reqs, q->reqs)) { + if (STAILQ_EMPTY(&q->reqs)) { /* Queue is empty so we wait for more work to arrive. */ - STAT_RESET(q, qwait); - enif_cond_wait(q->reqs_cnd, q->reqs_mutex); - goto check_again_for_work; + if (q->num_workers > ASYNC_NIF_MIN_WORKERS) { + enif_mutex_unlock(q->reqs_mutex); + break; + } else { + enif_cond_wait(q->reqs_cnd, q->reqs_mutex); + goto check_again_for_work; + } } else { - assert(fifo_q_size(reqs, q->reqs) > 0); - assert(fifo_q_size(reqs, q->reqs) < fifo_q_capacity(reqs, q->reqs)); /* At this point the next req is ours to process and we hold the reqs_mutex lock. Take the request off the queue. */ - req = fifo_q_get(reqs, q->reqs); - enif_mutex_unlock(q->reqs_mutex); + req = STAILQ_FIRST(&q->reqs); + STAILQ_REMOVE(&q->reqs, req, async_nif_req_entry, entries); + q->depth--; /* Ensure that there is at least one other worker thread watching this queue. */ enif_cond_signal(q->reqs_cnd); + enif_mutex_unlock(q->reqs_mutex); /* Perform the work. */ req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args); - STAT_TOCK(q, qwait); /* Now call the post-work cleanup function. */ req->fn_post(req->args); @@ -350,11 +417,14 @@ async_nif_worker_fn(void *arg) req->fn_post = 0; enif_free(req->args); req->args = NULL; - enif_clear_env(req->env); async_nif_recycle_req(req, async_nif); req = NULL; } } + enif_mutex_lock(async_nif->we_mutex); + SLIST_INSERT_HEAD(&async_nif->we_joining, we, entries); + enif_mutex_unlock(async_nif->we_mutex); + q->num_workers--; enif_thread_exit(0); return 0; } @@ -366,41 +436,44 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif) unsigned int num_queues = async_nif->num_queues; struct async_nif_work_queue *q = NULL; struct async_nif_req_entry *req = NULL; - __UNUSED(env); + struct async_nif_worker_entry *we = NULL; + UNUSED(env); - STAT_PRINT(async_nif, qwait, "wterl"); - - /* Signal the worker threads, stop what you're doing and exit. To - ensure that we don't race with the enqueue() process we first - lock all the worker queues, then set shutdown to true, then - unlock. The enqueue function will take the queue mutex, then - test for shutdown condition, then enqueue only if not shutting - down. */ + /* Signal the worker threads, stop what you're doing and exit. To ensure + that we don't race with the enqueue() process we first lock all the worker + queues, then set shutdown to true, then unlock. The enqueue function will + take the queue mutex, then test for shutdown condition, then enqueue only + if not shutting down. */ for (i = 0; i < num_queues; i++) { q = &async_nif->queues[i]; enif_mutex_lock(q->reqs_mutex); } + /* Set the shutdown flag so that worker threads will no continue + executing requests. */ async_nif->shutdown = 1; for (i = 0; i < num_queues; i++) { q = &async_nif->queues[i]; - enif_cond_broadcast(q->reqs_cnd); enif_mutex_unlock(q->reqs_mutex); } /* Join for the now exiting worker threads. */ - for (i = 0; i < async_nif->num_workers; ++i) { - void *exit_value = 0; /* We ignore the thread_join's exit value. */ - enif_thread_join(async_nif->worker_entries[i].tid, &exit_value); + while(async_nif->we_active > 0) { + for (i = 0; i < num_queues; i++) + enif_cond_broadcast(async_nif->queues[i].reqs_cnd); + enif_mutex_lock(async_nif->we_mutex); + we = SLIST_FIRST(&async_nif->we_joining); + while(we != NULL) { + struct async_nif_worker_entry *n = SLIST_NEXT(we, entries); + SLIST_REMOVE(&async_nif->we_joining, we, async_nif_worker_entry, entries); + void *exit_value = 0; /* We ignore the thread_join's exit value. */ + enif_thread_join(we->tid, &exit_value); + enif_free(we); + async_nif->we_active--; + we = n; + } + enif_mutex_unlock(async_nif->we_mutex); } - - /* Free req structres sitting on the recycle queue. */ - enif_mutex_lock(async_nif->recycled_req_mutex); - req = NULL; - fifo_q_foreach(reqs, async_nif->recycled_reqs, req, { - enif_free_env(req->env); - enif_free(req); - }); - fifo_q_free(reqs, async_nif->recycled_reqs); + enif_mutex_destroy(async_nif->we_mutex); /* Cleanup in-flight requests, mutexes and conditions in each work queue. */ for (i = 0; i < num_queues; i++) { @@ -408,7 +481,9 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif) /* Worker threads are stopped, now toss anything left in the queue. */ req = NULL; - fifo_q_foreach(reqs, q->reqs, req, { + req = STAILQ_FIRST(&q->reqs); + while(req != NULL) { + struct async_nif_req_entry *n = STAILQ_NEXT(req, entries); enif_clear_env(req->env); enif_send(NULL, &req->pid, req->env, enif_make_tuple2(req->env, enif_make_atom(req->env, "error"), @@ -417,12 +492,23 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif) enif_free_env(req->env); enif_free(req->args); enif_free(req); - }); - fifo_q_free(reqs, q->reqs); + req = n; + } enif_mutex_destroy(q->reqs_mutex); enif_cond_destroy(q->reqs_cnd); } + /* Free any req structures sitting unused on the recycle queue. */ + enif_mutex_lock(async_nif->recycled_req_mutex); + req = NULL; + req = STAILQ_FIRST(&async_nif->recycled_reqs); + while(req != NULL) { + struct async_nif_req_entry *n = STAILQ_NEXT(req, entries); + enif_free_env(req->env); + enif_free(req); + req = n; + } + enif_mutex_unlock(async_nif->recycled_req_mutex); enif_mutex_destroy(async_nif->recycled_req_mutex); memset(async_nif, 0, sizeof(struct async_nif_state) + (sizeof(struct async_nif_work_queue) * async_nif->num_queues)); @@ -433,7 +519,7 @@ static void * async_nif_load() { static int has_init = 0; - unsigned int i, j, num_queues; + unsigned int i, num_queues; ErlNifSysInfo info; struct async_nif_state *async_nif; @@ -463,57 +549,22 @@ async_nif_load() if (!async_nif) return NULL; memset(async_nif, 0, sizeof(struct async_nif_state) + - sizeof(struct async_nif_work_queue) * num_queues); + sizeof(struct async_nif_work_queue) * num_queues); async_nif->num_queues = num_queues; - async_nif->num_workers = 2 * num_queues; + async_nif->we_active = 0; async_nif->next_q = 0; async_nif->shutdown = 0; - async_nif->recycled_reqs = fifo_q_new(reqs, ASYNC_NIF_MAX_QUEUED_REQS); + STAILQ_INIT(&async_nif->recycled_reqs); async_nif->recycled_req_mutex = enif_mutex_create(NULL); - STAT_INIT(async_nif, qwait); + async_nif->we_mutex = enif_mutex_create(NULL); + SLIST_INIT(&async_nif->we_joining); for (i = 0; i < async_nif->num_queues; i++) { struct async_nif_work_queue *q = &async_nif->queues[i]; - q->reqs = fifo_q_new(reqs, ASYNC_NIF_WORKER_QUEUE_SIZE); + STAILQ_INIT(&q->reqs); q->reqs_mutex = enif_mutex_create(NULL); q->reqs_cnd = enif_cond_create(NULL); - STAT_INIT(q, qwait); - } - - /* Setup the thread pool management. */ - memset(async_nif->worker_entries, 0, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS); - - /* Start the worker threads. */ - for (i = 0; i < async_nif->num_workers; i++) { - struct async_nif_worker_entry *we = &async_nif->worker_entries[i]; - we->async_nif = async_nif; - we->worker_id = i; - we->q = &async_nif->queues[i % async_nif->num_queues]; - if (enif_thread_create(NULL, &async_nif->worker_entries[i].tid, - &async_nif_worker_fn, (void*)we, NULL) != 0) { - async_nif->shutdown = 1; - - for (j = 0; j < async_nif->num_queues; j++) { - struct async_nif_work_queue *q = &async_nif->queues[j]; - enif_cond_broadcast(q->reqs_cnd); - } - - while(i-- > 0) { - void *exit_value = 0; /* Ignore this. */ - enif_thread_join(async_nif->worker_entries[i].tid, &exit_value); - } - - for (j = 0; j < async_nif->num_queues; j++) { - struct async_nif_work_queue *q = &async_nif->queues[j]; - enif_mutex_destroy(q->reqs_mutex); - enif_cond_destroy(q->reqs_cnd); - } - - memset(async_nif->worker_entries, 0, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS); - enif_free(async_nif); - return NULL; - } } return async_nif; } @@ -521,7 +572,7 @@ async_nif_load() static void async_nif_upgrade(ErlNifEnv *env) { - __UNUSED(env); + UNUSED(env); // TODO: } diff --git a/c_src/build_deps.sh b/c_src/build_deps.sh index 789bcea..15608ef 100755 --- a/c_src/build_deps.sh +++ b/c_src/build_deps.sh @@ -11,9 +11,9 @@ unset POSIX_SHELL # clear it so if we invoke other scripts, they run as ksh as w set -e WT_REPO=http://github.com/wiredtiger/wiredtiger.git -WT_BRANCH=develop -WT_VSN="" -WT_DIR=wiredtiger-$WT_BRANCH +WT_BRANCH= +WT_REF="tags/1.6.2" +WT_DIR=wiredtiger-`basename $WT_REF` SNAPPY_VSN="1.0.4" SNAPPY_DIR=snappy-$SNAPPY_VSN @@ -35,22 +35,21 @@ get_wt () if [ -d $BASEDIR/$WT_DIR/.git ]; then (cd $BASEDIR/$WT_DIR && git pull -u) || exit 1 else - if [ "X$WT_VSN" != "X" ]; then - git clone ${WT_REPO} && \ - (cd $BASEDIR/wiredtiger && git checkout $WT_VSN || exit 1) + if [ "X$WT_REF" != "X" ]; then + git clone ${WT_REPO} ${WT_DIR} && \ + (cd $BASEDIR/$WT_DIR && git checkout refs/$WT_REF || exit 1) else - git clone -b ${WT_BRANCH} ${WT_REPO} && \ - (cd $BASEDIR/wiredtiger && git checkout $WT_BRANCH origin/$WT_BRANCH || exit 1) + git clone ${WT_REPO} ${WT_DIR} && \ + (cd $BASEDIR/$WT_DIR && git checkout -b $WT_BRANCH origin/$WT_BRANCH || exit 1) fi - mv wiredtiger $WT_DIR || exit 1 fi [ -d $BASEDIR/$WT_DIR ] || (echo "Missing WiredTiger source directory" && exit 1) (cd $BASEDIR/$WT_DIR [ -e $BASEDIR/wiredtiger-build.patch ] && \ (patch -p1 --forward < $BASEDIR/wiredtiger-build.patch || exit 1 ) ./autogen.sh || exit 1 - cd ./build_posix || exit 1 - [ -e Makefile ] && $MAKE distclean + [ -e $BASEDIR/$WT_DIR/build_posix/Makefile ] && \ + (cd $BASEDIR/$WT_DIR/build_posix && $MAKE distclean) wt_configure; ) } @@ -109,7 +108,8 @@ build_snappy () case "$1" in clean) - [ -d $WT_DIR/build_posix ] && (cd $WT_DIR/build_posix; make distclean) + [ -e $BASEDIR/$WT_DIR/build_posix/Makefile ] && \ + (cd $BASEDIR/$WT_DIR/build_posix && $MAKE distclean) rm -rf system $SNAPPY_DIR rm -f ${BASEDIR}/../priv/wt rm -f ${BASEDIR}/../priv/libwiredtiger-*.so diff --git a/c_src/cas.h b/c_src/cas.h new file mode 100644 index 0000000..2f35cb4 --- /dev/null +++ b/c_src/cas.h @@ -0,0 +1,159 @@ +/* + * wterl: an Erlang NIF for WiredTiger + * + * Copyright (c) 2012-2013 Basho Technologies, Inc. All Rights Reserved. + * + * This file is provided to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + */ + +/* + * Most of the following source code is copied directly from: "The Lock-Free + * Library" (http://www.cl.cam.ac.uk/research/srg/netos/lock-free/) reused and + * redistrubuted in accordance with their license: + * + * Copyright (c) 2002-2003 K A Fraser, All Rights Reserved. + * + * * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * * The name of the author may not be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef __CAS_H_ +#define __CAS_H_ + +#define CACHE_LINE_SIZE 64 + +#define ATOMIC_INCR(_v,_newval) \ +do { \ + __typeof(_v) __val = (_v); \ + while ( (_newval = CASIO(&(_v),__val,__val+1)) != __val ) \ + __val = _newval; \ +} while ( 0 ) +#define ATOMIC_ADD_TO(_v,_x) \ +do { \ + __typeof(_v) __val = (_v), __newval; \ + while ( (__newval = CASIO(&(_v),__val,__val+(_x))) != __val ) \ + __val = __newval; \ +} while ( 0 ) + +#define ATOMIC_SET_TO(_v,_x) \ +do { \ + int __val = (_v), __newval; \ + while ( (__newval = CASIO(&(_v),__val,__val=(_x))) != __val ) \ + __val = __newval; \ +} while ( 0 ) + +#define CACHE_ALIGNED_SIZEOF(_s) \ + ((sizeof(_s)) + CACHE_LINE_SIZE*2) + \ + CACHE_LINE_SIZE - 1) & ~(CACHE_LINE_SIZE-1))) \ + +/* + * I. Compare-and-swap. + */ + +/* + * This is a strong barrier! Reads cannot be delayed beyond a later store. + * Reads cannot be hoisted beyond a LOCK prefix. Stores always in-order. + */ +#define CAS(_a, _o, _n) \ +({ __typeof__(_o) __o = _o; \ + __asm__ __volatile__( \ + "lock cmpxchg %3,%1" \ + : "=a" (__o), "=m" (*(volatile unsigned int *)(_a)) \ + : "0" (__o), "r" (_n) ); \ + __o; \ +}) + +#define FAS(_a, _n) \ +({ __typeof__(_n) __o; \ + __asm__ __volatile__( \ + "lock xchg %0,%1" \ + : "=r" (__o), "=m" (*(volatile unsigned int *)(_a)) \ + : "0" (_n) ); \ + __o; \ +}) + +#define CAS64(_a, _o, _n) \ +({ __typeof__(_o) __o = _o; \ + __asm__ __volatile__( \ + "movl %3, %%ecx;" \ + "movl %4, %%ebx;" \ + "lock cmpxchg8b %1" \ + : "=A" (__o), "=m" (*(volatile unsigned long long *)(_a)) \ + : "0" (__o), "m" (_n >> 32), "m" (_n) \ + : "ebx", "ecx" ); \ + __o; \ +}) + +/* Update Integer location, return Old value. */ +#define CASIO CAS +#define FASIO FAS +/* Update Pointer location, return Old value. */ +#define CASPO CAS +#define FASPO FAS +/* Update 32/64-bit location, return Old value. */ +#define CAS32O CAS +#define CAS64O CAS64 + +/* + * II. Memory barriers. + * WMB(): All preceding write operations must commit before any later writes. + * RMB(): All preceding read operations must commit before any later reads. + * MB(): All preceding memory accesses must commit before any later accesses. + * + * If the compiler does not observe these barriers (but any sane compiler + * will!), then VOLATILE should be defined as 'volatile'. + */ + +#define MB() __asm__ __volatile__ ("lock; addl $0,0(%%esp)" : : : "memory") +#define WMB() __asm__ __volatile__ ("" : : : "memory") +#define RMB() MB() +#define VOLATILE /*volatile*/ + +/* On Intel, CAS is a strong barrier, but not a compile barrier. */ +#define RMB_NEAR_CAS() WMB() +#define WMB_NEAR_CAS() WMB() +#define MB_NEAR_CAS() WMB() + + +/* + * III. Cycle counter access. + */ + +typedef unsigned long long tick_t; +#define RDTICK() \ + ({ tick_t __t; __asm__ __volatile__ ("rdtsc" : "=A" (__t)); __t; }) + +#endif /* __CAS_H_ */ diff --git a/c_src/common.h b/c_src/common.h new file mode 100644 index 0000000..df2f162 --- /dev/null +++ b/c_src/common.h @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved. + * Author: Gregory Burd + * + * This file is provided to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef __COMMON_H__ +#define __COMMON_H__ + +#if defined(__cplusplus) +extern "C" { +#endif + +#if !(__STDC_VERSION__ >= 199901L || defined(__GNUC__)) +# undef DEBUG +# define DEBUG 0 +# define DPRINTF (void) /* Vararg macros may be unsupported */ +#elif DEBUG +#include +#include +#define DPRINTF(fmt, ...) \ + do { \ + fprintf(stderr, "%s:%d " fmt "\n", __FILE__, __LINE__, __VA_ARGS__); \ + fflush(stderr); \ + } while(0) +#define DPUTS(arg) DPRINTF("%s", arg) +#else +#define DPRINTF(fmt, ...) ((void) 0) +#define DPUTS(arg) ((void) 0) +#endif + +#ifndef __UNUSED +#define __UNUSED(v) ((void)(v)) +#endif + +#ifndef COMPQUIET +#define COMPQUIET(n, v) do { \ + (n) = (v); \ + (n) = (n); \ +} while (0) +#endif + +#ifdef __APPLE__ +#define PRIuint64(x) (x) +#else +#define PRIuint64(x) (unsigned long long)(x) +#endif + +#if defined(__cplusplus) +} +#endif + +#endif // __COMMON_H__ diff --git a/c_src/duration.h b/c_src/duration.h deleted file mode 100644 index 635d0fd..0000000 --- a/c_src/duration.h +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Copyright (C) 2013, all rights reserved by Gregory Burd - * - * This Source Code Form is subject to the terms of the Mozilla Public License, - * version 2 (MPLv2). If a copy of the MPL was not distributed with this file, - * you can obtain one at: http://mozilla.org/MPL/2.0/ - * - * NOTES: - * - on some platforms this will require -lrt - */ -#include -#include -#include -#include - -typedef enum { ns = 0, mcs, ms, s } time_scale; -struct scale_time { - const char *abbreviation; - const char *name; - uint64_t mul, div, overhead, ticks_per; -}; -static const struct scale_time scale[] = { - { "ns", "nanosecond", 1000000000LL, 1LL, 10, 2300000000000LL }, - { "mcs", "microsecond", 1000000LL, 1000LL, 10, 2300000000LL }, - { "ms", "millisecond", 1000LL, 1000000LL, 10, 2300000LL }, - { "sec", "second", 1LL, 1000000000LL, 10, 2300LL } }; - -static uint64_t ts(time_scale unit) -{ - struct timespec ts; - clock_gettime(CLOCK_REALTIME, &ts); - return (((uint64_t)ts.tv_sec * scale[unit].mul) + - ((uint64_t)ts.tv_nsec / scale[unit].div)); -} - -#if 0 -//if defined(__i386__) || defined(__x86_64__) - -/** - * cpu_clock_ticks() - * - * A measure provided by Intel x86 CPUs which provides the number of cycles - * (aka "ticks") executed as a counter using the RDTSC instruction. - */ -static inline uint64_t cpu_clock_ticks() -{ - uint32_t lo, hi; - __asm__ __volatile__ ( - "xorl %%eax, %%eax\n" - "cpuid\n" - "rdtsc\n" - : "=a" (lo), "=d" (hi) - : - : "%ebx", "%ecx" ); - return (uint64_t)hi << 32 | lo; -} - -/** - * cpu_clock_ticks() - * - * An approximation of elapsed [ns, mcs, ms, s] from CPU clock ticks. - */ -static uint64_t elapsed_cpu_clock_ticks(uint64_t start, time_scale unit) -{ - return (cpu_clock_ticks() - start - scale[unit].overhead) * scale[unit].ticks_per; -} - -#endif - -typedef struct { - uint64_t then; - time_scale unit; -} duration_t; - -static inline uint64_t elapsed(duration_t *d) -{ - uint64_t now = ts(d->unit); - uint64_t elapsed = now - d->then; - d->then = now; - return elapsed; -} - -#define DURATION(name, resolution) duration_t name = \ - {ts(resolution), resolution} - -#define ELAPSED_DURING(result, resolution, block) \ - do { \ - DURATION(__x, resolution); \ - do block while(0); \ - *result = elapsed(&__x); \ - } while(0); - -#define CYCLES_DURING(result, block) \ - do { \ - uint64_t __begin = cpu_clock_ticks(); \ - do block while(0); \ - *result = cpu_clock_ticks() - __begin; \ - } while(0); diff --git a/c_src/fifo_q.h b/c_src/fifo_q.h deleted file mode 100644 index f37bf67..0000000 --- a/c_src/fifo_q.h +++ /dev/null @@ -1,93 +0,0 @@ -/* - * fifo_q: a macro-based implementation of a FIFO Queue - * - * Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved. - * Author: Gregory Burd - * - * This file is provided to you under the Apache License, - * Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain - * a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#ifndef __FIFO_Q_H__ -#define __FIFO_Q_H__ - -#if defined(__cplusplus) -extern "C" { -#endif - -#define FIFO_QUEUE_TYPE(name) \ - struct fifo_q__ ## name * -#define DECL_FIFO_QUEUE(name, type) \ - struct fifo_q__ ## name { \ - unsigned int h, t, s; \ - type *items[]; \ - }; \ - static struct fifo_q__ ## name *fifo_q_ ## name ## _new(unsigned int n) { \ - int sz = sizeof(struct fifo_q__ ## name) + ((n+1) * sizeof(type *));\ - struct fifo_q__ ## name *q = enif_alloc(sz); \ - if (!q) \ - return 0; \ - memset(q, 0, sz); \ - q->s = n + 1; \ - return q; \ - } \ - static inline void fifo_q_ ## name ## _free(struct fifo_q__ ## name *q) { \ - memset(q, 0, sizeof(struct fifo_q__ ## name) + (q->s * sizeof(type *))); \ - enif_free(q); \ - } \ - static inline type *fifo_q_ ## name ## _put(struct fifo_q__ ## name *q, type *n) { \ - q->items[q->h] = n; \ - q->h = (q->h + 1) % q->s; \ - return n; \ - } \ - static inline type *fifo_q_ ## name ## _get(struct fifo_q__ ## name *q) { \ - type *n = q->items[q->t]; \ - q->items[q->t] = 0; \ - q->t = (q->t + 1) % q->s; \ - return n; \ - } \ - static inline unsigned int fifo_q_ ## name ## _size(struct fifo_q__ ## name *q) { \ - return (q->h - q->t + q->s) % q->s; \ - } \ - static inline unsigned int fifo_q_ ## name ## _capacity(struct fifo_q__ ## name *q) { \ - return q->s - 1; \ - } \ - static inline int fifo_q_ ## name ## _empty(struct fifo_q__ ## name *q) { \ - return (q->t == q->h); \ - } \ - static inline int fifo_q_ ## name ## _full(struct fifo_q__ ## name *q) { \ - return ((q->h + 1) % q->s) == q->t; \ - } - -#define fifo_q_new(name, size) fifo_q_ ## name ## _new(size) -#define fifo_q_free(name, queue) fifo_q_ ## name ## _free(queue) -#define fifo_q_get(name, queue) fifo_q_ ## name ## _get(queue) -#define fifo_q_put(name, queue, item) fifo_q_ ## name ## _put(queue, item) -#define fifo_q_size(name, queue) fifo_q_ ## name ## _size(queue) -#define fifo_q_capacity(name, queue) fifo_q_ ## name ## _capacity(queue) -#define fifo_q_empty(name, queue) fifo_q_ ## name ## _empty(queue) -#define fifo_q_full(name, queue) fifo_q_ ## name ## _full(queue) -#define fifo_q_foreach(name, queue, item, task) do { \ - while(!fifo_q_ ## name ## _empty(queue)) { \ - item = fifo_q_ ## name ## _get(queue); \ - do task while(0); \ - } \ - } while(0); - - -#if defined(__cplusplus) -} -#endif - -#endif // __FIFO_Q_H__ diff --git a/c_src/khash.h b/c_src/khash.h index ab157b1..69549dc 100644 --- a/c_src/khash.h +++ b/c_src/khash.h @@ -586,7 +586,7 @@ static kh_inline khint_t __ac_Wang_hash(khint_t key) @param name Name of the hash table [symbol] @param khval_t Type of values [type] */ -#ifdef __x86_64__ +#ifdef __x86_64__ #define KHASH_MAP_INIT_PTR(name, khval_t) \ KHASH_INIT(name, void*, khval_t, 1, kh_ptr64_hash_func, kh_ptr64_hash_equal) #else diff --git a/c_src/queue.h b/c_src/queue.h new file mode 100644 index 0000000..4c6a153 --- /dev/null +++ b/c_src/queue.h @@ -0,0 +1,678 @@ +/* + * Copyright (c) 1991, 1993 + * The Regents of the University of California. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 4. Neither the name of the University nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + * + * @(#)queue.h 8.5 (Berkeley) 8/20/94 + * $FreeBSD: src/sys/sys/queue.h,v 1.54 2002/08/05 05:18:43 alfred Exp $ + */ + +#ifndef _DB_QUEUE_H_ +#define _DB_QUEUE_H_ + +#ifndef __offsetof +#define __offsetof(st, m) \ + ((size_t) ( (char *)&((st *)0)->m - (char *)0 )) +#endif + +#ifndef __containerof +#define __containerof(ptr, type, member) ({ \ + const typeof( ((type *)0)->member ) *__mptr = (ptr); \ + (type *)( (char *)__mptr - __offsetof(type,member) );}) +#endif + +#if defined(__cplusplus) +extern "C" { +#endif + +/* + * This file defines four types of data structures: singly-linked lists, + * singly-linked tail queues, lists and tail queues. + * + * A singly-linked list is headed by a single forward pointer. The elements + * are singly linked for minimum space and pointer manipulation overhead at + * the expense of O(n) removal for arbitrary elements. New elements can be + * added to the list after an existing element or at the head of the list. + * Elements being removed from the head of the list should use the explicit + * macro for this purpose for optimum efficiency. A singly-linked list may + * only be traversed in the forward direction. Singly-linked lists are ideal + * for applications with large datasets and few or no removals or for + * implementing a LIFO queue. + * + * A singly-linked tail queue is headed by a pair of pointers, one to the + * head of the list and the other to the tail of the list. The elements are + * singly linked for minimum space and pointer manipulation overhead at the + * expense of O(n) removal for arbitrary elements. New elements can be added + * to the list after an existing element, at the head of the list, or at the + * end of the list. Elements being removed from the head of the tail queue + * should use the explicit macro for this purpose for optimum efficiency. + * A singly-linked tail queue may only be traversed in the forward direction. + * Singly-linked tail queues are ideal for applications with large datasets + * and few or no removals or for implementing a FIFO queue. + * + * A list is headed by a single forward pointer (or an array of forward + * pointers for a hash table header). The elements are doubly linked + * so that an arbitrary element can be removed without a need to + * traverse the list. New elements can be added to the list before + * or after an existing element or at the head of the list. A list + * may only be traversed in the forward direction. + * + * A tail queue is headed by a pair of pointers, one to the head of the + * list and the other to the tail of the list. The elements are doubly + * linked so that an arbitrary element can be removed without a need to + * traverse the list. New elements can be added to the list before or + * after an existing element, at the head of the list, or at the end of + * the list. A tail queue may be traversed in either direction. + * + * For details on the use of these macros, see the queue(3) manual page. + * + * + * SLIST LIST STAILQ TAILQ + * _HEAD + + + + + * _HEAD_INITIALIZER + + + + + * _ENTRY + + + + + * _INIT + + + + + * _EMPTY + + + + + * _FIRST + + + + + * _NEXT + + + + + * _PREV - - - + + * _LAST - - + + + * _FOREACH + + + + + * _FOREACH_REVERSE - - - + + * _INSERT_HEAD + + + + + * _INSERT_BEFORE - + - + + * _INSERT_AFTER + + + + + * _INSERT_TAIL - - + + + * _CONCAT - - + + + * _REMOVE_HEAD + - + - + * _REMOVE + + + + + * + */ + +/* + * XXX + * We #undef all of the macros because there are incompatible versions of this + * file and these macros on various systems. What makes the problem worse is + * they are included and/or defined by system include files which we may have + * already loaded into Berkeley DB before getting here. For example, FreeBSD's + * includes its system , and VxWorks UnixLib.h defines + * several of the LIST_XXX macros. Visual C.NET 7.0 also defines some of these + * same macros in Vc7\PlatformSDK\Include\WinNT.h. Make sure we use ours. + */ +#undef LIST_EMPTY +#undef LIST_ENTRY +#undef LIST_FIRST +#undef LIST_FOREACH +#undef LIST_HEAD +#undef LIST_HEAD_INITIALIZER +#undef LIST_INIT +#undef LIST_INSERT_AFTER +#undef LIST_INSERT_BEFORE +#undef LIST_INSERT_HEAD +#undef LIST_NEXT +#undef LIST_REMOVE +#undef QMD_TRACE_ELEM +#undef QMD_TRACE_HEAD +#undef QUEUE_MACRO_DEBUG +#undef SLIST_EMPTY +#undef SLIST_ENTRY +#undef SLIST_FIRST +#undef SLIST_FOREACH +#undef SLIST_FOREACH_PREVPTR +#undef SLIST_HEAD +#undef SLIST_HEAD_INITIALIZER +#undef SLIST_INIT +#undef SLIST_INSERT_AFTER +#undef SLIST_INSERT_HEAD +#undef SLIST_NEXT +#undef SLIST_REMOVE +#undef SLIST_REMOVE_HEAD +#undef STAILQ_CONCAT +#undef STAILQ_EMPTY +#undef STAILQ_ENTRY +#undef STAILQ_FIRST +#undef STAILQ_FOREACH +#undef STAILQ_HEAD +#undef STAILQ_HEAD_INITIALIZER +#undef STAILQ_INIT +#undef STAILQ_INSERT_AFTER +#undef STAILQ_INSERT_HEAD +#undef STAILQ_INSERT_TAIL +#undef STAILQ_LAST +#undef STAILQ_NEXT +#undef STAILQ_REMOVE +#undef STAILQ_REMOVE_HEAD +#undef STAILQ_REMOVE_HEAD_UNTIL +#undef TAILQ_CONCAT +#undef TAILQ_EMPTY +#undef TAILQ_ENTRY +#undef TAILQ_FIRST +#undef TAILQ_FOREACH +#undef TAILQ_FOREACH_REVERSE +#undef TAILQ_HEAD +#undef TAILQ_HEAD_INITIALIZER +#undef TAILQ_INIT +#undef TAILQ_INSERT_AFTER +#undef TAILQ_INSERT_BEFORE +#undef TAILQ_INSERT_HEAD +#undef TAILQ_INSERT_TAIL +#undef TAILQ_LAST +#undef TAILQ_NEXT +#undef TAILQ_PREV +#undef TAILQ_REMOVE +#undef TRACEBUF +#undef TRASHIT + +#define QUEUE_MACRO_DEBUG 0 +#if QUEUE_MACRO_DEBUG +/* Store the last 2 places the queue element or head was altered */ +struct qm_trace { + char * lastfile; + int lastline; + char * prevfile; + int prevline; +}; + +#define TRACEBUF struct qm_trace trace; +#define TRASHIT(x) do {(x) = (void *)-1;} while (0) + +#define QMD_TRACE_HEAD(head) do { \ + (head)->trace.prevline = (head)->trace.lastline; \ + (head)->trace.prevfile = (head)->trace.lastfile; \ + (head)->trace.lastline = __LINE__; \ + (head)->trace.lastfile = __FILE__; \ +} while (0) + +#define QMD_TRACE_ELEM(elem) do { \ + (elem)->trace.prevline = (elem)->trace.lastline; \ + (elem)->trace.prevfile = (elem)->trace.lastfile; \ + (elem)->trace.lastline = __LINE__; \ + (elem)->trace.lastfile = __FILE__; \ +} while (0) + +#else +#define QMD_TRACE_ELEM(elem) +#define QMD_TRACE_HEAD(head) +#define TRACEBUF +#define TRASHIT(x) +#endif /* QUEUE_MACRO_DEBUG */ + +/* + * Singly-linked List declarations. + */ +#define SLIST_HEAD(name, type) \ +struct name { \ + struct type *slh_first; /* first element */ \ +} + +#define SLIST_HEAD_INITIALIZER(head) \ + { NULL } + +#define SLIST_ENTRY(type) \ +struct { \ + struct type *sle_next; /* next element */ \ +} + +/* + * Singly-linked List functions. + */ +#define SLIST_EMPTY(head) ((head)->slh_first == NULL) + +#define SLIST_FIRST(head) ((head)->slh_first) + +#define SLIST_FOREACH(var, head, field) \ + for ((var) = SLIST_FIRST((head)); \ + (var); \ + (var) = SLIST_NEXT((var), field)) + +#define SLIST_FOREACH_PREVPTR(var, varp, head, field) \ + for ((varp) = &SLIST_FIRST((head)); \ + ((var) = *(varp)) != NULL; \ + (varp) = &SLIST_NEXT((var), field)) + +#define SLIST_INIT(head) do { \ + SLIST_FIRST((head)) = NULL; \ +} while (0) + +#define SLIST_INSERT_AFTER(slistelm, elm, field) do { \ + SLIST_NEXT((elm), field) = SLIST_NEXT((slistelm), field); \ + SLIST_NEXT((slistelm), field) = (elm); \ +} while (0) + +#define SLIST_INSERT_HEAD(head, elm, field) do { \ + SLIST_NEXT((elm), field) = SLIST_FIRST((head)); \ + SLIST_FIRST((head)) = (elm); \ +} while (0) + +#define SLIST_NEXT(elm, field) ((elm)->field.sle_next) + +#define SLIST_REMOVE(head, elm, type, field) do { \ + if (SLIST_FIRST((head)) == (elm)) { \ + SLIST_REMOVE_HEAD((head), field); \ + } \ + else { \ + struct type *curelm = SLIST_FIRST((head)); \ + while (SLIST_NEXT(curelm, field) != (elm)) \ + curelm = SLIST_NEXT(curelm, field); \ + SLIST_NEXT(curelm, field) = \ + SLIST_NEXT(SLIST_NEXT(curelm, field), field); \ + } \ +} while (0) + +#define SLIST_REMOVE_HEAD(head, field) do { \ + SLIST_FIRST((head)) = SLIST_NEXT(SLIST_FIRST((head)), field); \ +} while (0) + +/* + * Singly-linked Tail queue declarations. + */ +#define STAILQ_HEAD(name, type) \ +struct name { \ + struct type *stqh_first;/* first element */ \ + struct type **stqh_last;/* addr of last next element */ \ +} + +#define STAILQ_HEAD_INITIALIZER(head) \ + { NULL, &(head).stqh_first } + +#define STAILQ_ENTRY(type) \ +struct { \ + struct type *stqe_next; /* next element */ \ +} + +/* + * Singly-linked Tail queue functions. + */ +#define STAILQ_CONCAT(head1, head2) do { \ + if (!STAILQ_EMPTY((head2))) { \ + *(head1)->stqh_last = (head2)->stqh_first; \ + (head1)->stqh_last = (head2)->stqh_last; \ + STAILQ_INIT((head2)); \ + } \ +} while (0) + +#define STAILQ_EMPTY(head) ((head)->stqh_first == NULL) + +#define STAILQ_FIRST(head) ((head)->stqh_first) + +#define STAILQ_FOREACH(var, head, field) \ + for ((var) = STAILQ_FIRST((head)); \ + (var); \ + (var) = STAILQ_NEXT((var), field)) + +#define STAILQ_INIT(head) do { \ + STAILQ_FIRST((head)) = NULL; \ + (head)->stqh_last = &STAILQ_FIRST((head)); \ +} while (0) + +#define STAILQ_INSERT_AFTER(head, tqelm, elm, field) do { \ + if ((STAILQ_NEXT((elm), field) = STAILQ_NEXT((tqelm), field)) == NULL)\ + (head)->stqh_last = &STAILQ_NEXT((elm), field); \ + STAILQ_NEXT((tqelm), field) = (elm); \ +} while (0) + +#define STAILQ_INSERT_HEAD(head, elm, field) do { \ + if ((STAILQ_NEXT((elm), field) = STAILQ_FIRST((head))) == NULL) \ + (head)->stqh_last = &STAILQ_NEXT((elm), field); \ + STAILQ_FIRST((head)) = (elm); \ +} while (0) + +#define STAILQ_INSERT_TAIL(head, elm, field) do { \ + STAILQ_NEXT((elm), field) = NULL; \ + *(head)->stqh_last = (elm); \ + (head)->stqh_last = &STAILQ_NEXT((elm), field); \ +} while (0) + +#define STAILQ_LAST(head, type, field) \ + (STAILQ_EMPTY((head)) ? \ + NULL : \ + ((struct type *) \ + ((char *)((head)->stqh_last) - __offsetof(struct type, field)))) + +#define STAILQ_NEXT(elm, field) ((elm)->field.stqe_next) + +#define STAILQ_REMOVE(head, elm, type, field) do { \ + if (STAILQ_FIRST((head)) == (elm)) { \ + STAILQ_REMOVE_HEAD((head), field); \ + } \ + else { \ + struct type *curelm = STAILQ_FIRST((head)); \ + while (STAILQ_NEXT(curelm, field) != (elm)) \ + curelm = STAILQ_NEXT(curelm, field); \ + if ((STAILQ_NEXT(curelm, field) = \ + STAILQ_NEXT(STAILQ_NEXT(curelm, field), field)) == NULL)\ + (head)->stqh_last = &STAILQ_NEXT((curelm), field);\ + } \ +} while (0) + +#define STAILQ_REMOVE_HEAD(head, field) do { \ + if ((STAILQ_FIRST((head)) = \ + STAILQ_NEXT(STAILQ_FIRST((head)), field)) == NULL) \ + (head)->stqh_last = &STAILQ_FIRST((head)); \ +} while (0) + +#define STAILQ_REMOVE_HEAD_UNTIL(head, elm, field) do { \ + if ((STAILQ_FIRST((head)) = STAILQ_NEXT((elm), field)) == NULL) \ + (head)->stqh_last = &STAILQ_FIRST((head)); \ +} while (0) + +/* + * List declarations. + */ +#define LIST_HEAD(name, type) \ +struct name { \ + struct type *lh_first; /* first element */ \ +} + +#define LIST_HEAD_INITIALIZER(head) \ + { NULL } + +#define LIST_ENTRY(type) \ +struct { \ + struct type *le_next; /* next element */ \ + struct type **le_prev; /* address of previous next element */ \ +} + +/* + * List functions. + */ + +#define LIST_EMPTY(head) ((head)->lh_first == NULL) + +#define LIST_FIRST(head) ((head)->lh_first) + +#define LIST_FOREACH(var, head, field) \ + for ((var) = LIST_FIRST((head)); \ + (var); \ + (var) = LIST_NEXT((var), field)) + +#define LIST_INIT(head) do { \ + LIST_FIRST((head)) = NULL; \ +} while (0) + +#define LIST_INSERT_AFTER(listelm, elm, field) do { \ + if ((LIST_NEXT((elm), field) = LIST_NEXT((listelm), field)) != NULL)\ + LIST_NEXT((listelm), field)->field.le_prev = \ + &LIST_NEXT((elm), field); \ + LIST_NEXT((listelm), field) = (elm); \ + (elm)->field.le_prev = &LIST_NEXT((listelm), field); \ +} while (0) + +#define LIST_INSERT_BEFORE(listelm, elm, field) do { \ + (elm)->field.le_prev = (listelm)->field.le_prev; \ + LIST_NEXT((elm), field) = (listelm); \ + *(listelm)->field.le_prev = (elm); \ + (listelm)->field.le_prev = &LIST_NEXT((elm), field); \ +} while (0) + +#define LIST_INSERT_HEAD(head, elm, field) do { \ + if ((LIST_NEXT((elm), field) = LIST_FIRST((head))) != NULL) \ + LIST_FIRST((head))->field.le_prev = &LIST_NEXT((elm), field);\ + LIST_FIRST((head)) = (elm); \ + (elm)->field.le_prev = &LIST_FIRST((head)); \ +} while (0) + +#define LIST_NEXT(elm, field) ((elm)->field.le_next) + +#define LIST_REMOVE(elm, field) do { \ + if (LIST_NEXT((elm), field) != NULL) \ + LIST_NEXT((elm), field)->field.le_prev = \ + (elm)->field.le_prev; \ + *(elm)->field.le_prev = LIST_NEXT((elm), field); \ +} while (0) + +/* + * Tail queue declarations. + */ +#define TAILQ_HEAD(name, type) \ +struct name { \ + struct type *tqh_first; /* first element */ \ + struct type **tqh_last; /* addr of last next element */ \ + TRACEBUF \ +} + +#define TAILQ_HEAD_INITIALIZER(head) \ + { NULL, &(head).tqh_first } + +#define TAILQ_ENTRY(type) \ +struct { \ + struct type *tqe_next; /* next element */ \ + struct type **tqe_prev; /* address of previous next element */ \ + TRACEBUF \ +} + +/* + * Tail queue functions. + */ +#define TAILQ_CONCAT(head1, head2, field) do { \ + if (!TAILQ_EMPTY(head2)) { \ + *(head1)->tqh_last = (head2)->tqh_first; \ + (head2)->tqh_first->field.tqe_prev = (head1)->tqh_last; \ + (head1)->tqh_last = (head2)->tqh_last; \ + TAILQ_INIT((head2)); \ + QMD_TRACE_HEAD(head); \ + QMD_TRACE_HEAD(head2); \ + } \ +} while (0) + +#define TAILQ_EMPTY(head) ((head)->tqh_first == NULL) + +#define TAILQ_FIRST(head) ((head)->tqh_first) + +#define TAILQ_FOREACH(var, head, field) \ + for ((var) = TAILQ_FIRST((head)); \ + (var); \ + (var) = TAILQ_NEXT((var), field)) + +#define TAILQ_FOREACH_REVERSE(var, head, headname, field) \ + for ((var) = TAILQ_LAST((head), headname); \ + (var); \ + (var) = TAILQ_PREV((var), headname, field)) + +#define TAILQ_INIT(head) do { \ + TAILQ_FIRST((head)) = NULL; \ + (head)->tqh_last = &TAILQ_FIRST((head)); \ + QMD_TRACE_HEAD(head); \ +} while (0) + +#define TAILQ_INSERT_AFTER(head, listelm, elm, field) do { \ + if ((TAILQ_NEXT((elm), field) = TAILQ_NEXT((listelm), field)) != NULL)\ + TAILQ_NEXT((elm), field)->field.tqe_prev = \ + &TAILQ_NEXT((elm), field); \ + else { \ + (head)->tqh_last = &TAILQ_NEXT((elm), field); \ + QMD_TRACE_HEAD(head); \ + } \ + TAILQ_NEXT((listelm), field) = (elm); \ + (elm)->field.tqe_prev = &TAILQ_NEXT((listelm), field); \ + QMD_TRACE_ELEM(&(elm)->field); \ + QMD_TRACE_ELEM(&listelm->field); \ +} while (0) + +#define TAILQ_INSERT_BEFORE(listelm, elm, field) do { \ + (elm)->field.tqe_prev = (listelm)->field.tqe_prev; \ + TAILQ_NEXT((elm), field) = (listelm); \ + *(listelm)->field.tqe_prev = (elm); \ + (listelm)->field.tqe_prev = &TAILQ_NEXT((elm), field); \ + QMD_TRACE_ELEM(&(elm)->field); \ + QMD_TRACE_ELEM(&listelm->field); \ +} while (0) + +#define TAILQ_INSERT_HEAD(head, elm, field) do { \ + if ((TAILQ_NEXT((elm), field) = TAILQ_FIRST((head))) != NULL) \ + TAILQ_FIRST((head))->field.tqe_prev = \ + &TAILQ_NEXT((elm), field); \ + else \ + (head)->tqh_last = &TAILQ_NEXT((elm), field); \ + TAILQ_FIRST((head)) = (elm); \ + (elm)->field.tqe_prev = &TAILQ_FIRST((head)); \ + QMD_TRACE_HEAD(head); \ + QMD_TRACE_ELEM(&(elm)->field); \ +} while (0) + +#define TAILQ_INSERT_TAIL(head, elm, field) do { \ + TAILQ_NEXT((elm), field) = NULL; \ + (elm)->field.tqe_prev = (head)->tqh_last; \ + *(head)->tqh_last = (elm); \ + (head)->tqh_last = &TAILQ_NEXT((elm), field); \ + QMD_TRACE_HEAD(head); \ + QMD_TRACE_ELEM(&(elm)->field); \ +} while (0) + +#define TAILQ_LAST(head, headname) \ + (*(((struct headname *)((head)->tqh_last))->tqh_last)) + +#define TAILQ_NEXT(elm, field) ((elm)->field.tqe_next) + +#define TAILQ_PREV(elm, headname, field) \ + (*(((struct headname *)((elm)->field.tqe_prev))->tqh_last)) + +#define TAILQ_REMOVE(head, elm, field) do { \ + if ((TAILQ_NEXT((elm), field)) != NULL) \ + TAILQ_NEXT((elm), field)->field.tqe_prev = \ + (elm)->field.tqe_prev; \ + else { \ + (head)->tqh_last = (elm)->field.tqe_prev; \ + QMD_TRACE_HEAD(head); \ + } \ + *(elm)->field.tqe_prev = TAILQ_NEXT((elm), field); \ + TRASHIT((elm)->field.tqe_next); \ + TRASHIT((elm)->field.tqe_prev); \ + QMD_TRACE_ELEM(&(elm)->field); \ +} while (0) + +/* + * Circular queue definitions. + */ +#define CIRCLEQ_HEAD(name, type) \ +struct name { \ + struct type *cqh_first; /* first element */ \ + struct type *cqh_last; /* last element */ \ +} + +#define CIRCLEQ_HEAD_INITIALIZER(head) \ + { (void *)&head, (void *)&head } + +#define CIRCLEQ_ENTRY(type) \ +struct { \ + struct type *cqe_next; /* next element */ \ + struct type *cqe_prev; /* previous element */ \ +} + +/* + * Circular queue functions. + */ +#define CIRCLEQ_INIT(head) do { \ + (head)->cqh_first = (void *)(head); \ + (head)->cqh_last = (void *)(head); \ +} while (/*CONSTCOND*/0) + +#define CIRCLEQ_INSERT_AFTER(head, listelm, elm, field) do { \ + (elm)->field.cqe_next = (listelm)->field.cqe_next; \ + (elm)->field.cqe_prev = (listelm); \ + if ((listelm)->field.cqe_next == (void *)(head)) \ + (head)->cqh_last = (elm); \ + else \ + (listelm)->field.cqe_next->field.cqe_prev = (elm); \ + (listelm)->field.cqe_next = (elm); \ +} while (/*CONSTCOND*/0) + +#define CIRCLEQ_INSERT_BEFORE(head, listelm, elm, field) do { \ + (elm)->field.cqe_next = (listelm); \ + (elm)->field.cqe_prev = (listelm)->field.cqe_prev; \ + if ((listelm)->field.cqe_prev == (void *)(head)) \ + (head)->cqh_first = (elm); \ + else \ + (listelm)->field.cqe_prev->field.cqe_next = (elm); \ + (listelm)->field.cqe_prev = (elm); \ +} while (/*CONSTCOND*/0) + +#define CIRCLEQ_INSERT_HEAD(head, elm, field) do { \ + (elm)->field.cqe_next = (head)->cqh_first; \ + (elm)->field.cqe_prev = (void *)(head); \ + if ((head)->cqh_last == (void *)(head)) \ + (head)->cqh_last = (elm); \ + else \ + (head)->cqh_first->field.cqe_prev = (elm); \ + (head)->cqh_first = (elm); \ +} while (/*CONSTCOND*/0) + +#define CIRCLEQ_INSERT_TAIL(head, elm, field) do { \ + (elm)->field.cqe_next = (void *)(head); \ + (elm)->field.cqe_prev = (head)->cqh_last; \ + if ((head)->cqh_first == (void *)(head)) \ + (head)->cqh_first = (elm); \ + else \ + (head)->cqh_last->field.cqe_next = (elm); \ + (head)->cqh_last = (elm); \ +} while (/*CONSTCOND*/0) + +#define CIRCLEQ_REMOVE(head, elm, field) do { \ + if ((elm)->field.cqe_next == (void *)(head)) \ + (head)->cqh_last = (elm)->field.cqe_prev; \ + else \ + (elm)->field.cqe_next->field.cqe_prev = \ + (elm)->field.cqe_prev; \ + if ((elm)->field.cqe_prev == (void *)(head)) \ + (head)->cqh_first = (elm)->field.cqe_next; \ + else \ + (elm)->field.cqe_prev->field.cqe_next = \ + (elm)->field.cqe_next; \ +} while (/*CONSTCOND*/0) + +#define CIRCLEQ_FOREACH(var, head, field) \ + for ((var) = ((head)->cqh_first); \ + (var) != (const void *)(head); \ + (var) = ((var)->field.cqe_next)) + +#define CIRCLEQ_FOREACH_REVERSE(var, head, field) \ + for ((var) = ((head)->cqh_last); \ + (var) != (const void *)(head); \ + (var) = ((var)->field.cqe_prev)) + +/* + * Circular queue access methods. + */ +#define CIRCLEQ_EMPTY(head) ((head)->cqh_first == (void *)(head)) +#define CIRCLEQ_FIRST(head) ((head)->cqh_first) +#define CIRCLEQ_LAST(head) ((head)->cqh_last) +#define CIRCLEQ_NEXT(elm, field) ((elm)->field.cqe_next) +#define CIRCLEQ_PREV(elm, field) ((elm)->field.cqe_prev) + +#define CIRCLEQ_LOOP_NEXT(head, elm, field) \ + (((elm)->field.cqe_next == (void *)(head)) \ + ? ((head)->cqh_first) \ + : (elm->field.cqe_next)) +#define CIRCLEQ_LOOP_PREV(head, elm, field) \ + (((elm)->field.cqe_prev == (void *)(head)) \ + ? ((head)->cqh_last) \ + : (elm->field.cqe_prev)) + + +#if defined(__cplusplus) +} +#endif +#endif /* !_DB_QUEUE_H_ */ diff --git a/c_src/stats.h b/c_src/stats.h deleted file mode 100644 index 2f465be..0000000 --- a/c_src/stats.h +++ /dev/null @@ -1,214 +0,0 @@ -/* - * stats: - * - * Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved. - * Author: Gregory Burd - * - * This file is provided to you under the Apache License, - * Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain - * a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -#ifndef __STATS_H__ -#define __STATS_H__ - -#if defined(__cplusplus) -extern "C" { -#endif - -#include "duration.h" - -/** - * Calculate the log2 of 64bit unsigned integers. - */ -#ifdef __GCC__ -#define LOG2(X) ((unsigned) ((8 * (sizeof(uint64_t) - 1)) - __builtin_clzll((X)))) -#else -static unsigned int __log2_64(uint64_t x) { - static const int tab64[64] = { - 63, 0, 58, 1, 59, 47, 53, 2, - 60, 39, 48, 27, 54, 33, 42, 3, - 61, 51, 37, 40, 49, 18, 28, 20, - 55, 30, 34, 11, 43, 14, 22, 4, - 62, 57, 46, 52, 38, 26, 32, 41, - 50, 36, 17, 19, 29, 10, 13, 21, - 56, 45, 25, 31, 35, 16, 9, 12, - 44, 24, 15, 8, 23, 7, 6, 5}; - if (x == 0) return 0; - uint64_t v = x; - v |= v >> 1; - v |= v >> 2; - v |= v >> 4; - v |= v >> 8; - v |= v >> 16; - v |= v >> 32; - return tab64[((uint64_t)((v - (v >> 1)) * 0x07EDD5E59A4E28C2)) >> 58]; -} -#define LOG2(X) __log2_64(X) -#endif - -#define STAT_DEF(name) struct name ## _stat name ## _stat; - -#define STAT_DECL(name, nsamples) \ - struct name ## _stat { \ - duration_t d; \ - uint64_t histogram[64]; \ - uint32_t h, n; \ - uint64_t samples[nsamples]; \ - uint64_t min, max; \ - double mean; \ - }; \ - static inline double name ## _stat_mean(struct name ## _stat *s) { \ - uint32_t t = s->h; \ - uint32_t h = (s->h + 1) % nsamples; \ - double mean = 0; \ - while (h != t) { \ - mean += s->samples[h]; \ - h = (h + 1) % nsamples; \ - } \ - if (mean > 0) \ - mean /= (double)(s->n < nsamples ? s->n : nsamples); \ - return mean; \ - } \ - static inline double name ## _stat_mean_lg2(struct name ## _stat *s) { \ - uint32_t i; \ - double mean = 0; \ - for (i = 0; i < 64; i++) \ - mean += (s->histogram[i] * i); \ - if (mean > 0) \ - mean /= (double)s->n; \ - return mean; \ - } \ - static inline uint64_t name ## _stat_tick(struct name ## _stat *s) \ - { \ - uint64_t t = ts(s->d.unit); \ - s->d.then = t; \ - return t; \ - } \ - static inline void name ## _stat_reset(struct name ## _stat *s) \ - { \ - s->min = ~0; \ - s->max = 0; \ - s->h = 0; \ - memset(&s->histogram, 0, sizeof(uint64_t) * 64); \ - memset(&s->samples, 0, sizeof(uint64_t) * nsamples); \ - } \ - static inline uint64_t name ## _stat_tock(struct name ## _stat *s) \ - { \ - uint64_t now = ts(s->d.unit); \ - uint64_t elapsed = now - s->d.then; \ - uint32_t i = s->h; \ - if (s->n == nsamples) { \ - s->mean = (s->mean + name ## _stat_mean(s)) / 2.0; \ - if (s->n >= 4294967295) \ - name ## _stat_reset(s); \ - } \ - s->h = (s->h + 1) % nsamples; \ - s->samples[i] = elapsed; \ - if (elapsed < s->min) \ - s->min = elapsed; \ - if (elapsed > s->max) \ - s->max = elapsed; \ - s->histogram[LOG2(elapsed)]++; \ - s->n++; \ - s->d.then = ts(s->d.unit); \ - return elapsed; \ - } \ - static void name ## _stat_print_histogram(struct name ## _stat *s, const char *mod) \ - { \ - uint8_t logs[64]; \ - uint8_t i, j, max_log = 0; \ - double m = (s->mean + name ## _stat_mean(s) / 2.0); \ - \ - fprintf(stderr, "%s:async_nif request latency histogram:\n", mod); \ - for (i = 0; i < 64; i++) { \ - logs[i] = LOG2(s->histogram[i]); \ - if (logs[i] > max_log) \ - max_log = logs[i]; \ - } \ - for (i = max_log; i > 0; i--) { \ - if (!(i % 10)) \ - fprintf(stderr, "2^%2d ", i); \ - else \ - fprintf(stderr, " "); \ - for(j = 0; j < 64; j++) \ - fprintf(stderr, logs[j] >= i ? "•" : " "); \ - fprintf(stderr, "\n"); \ - } \ - if (max_log == 0) { \ - fprintf(stderr, "[empty]\n"); \ - } else { \ - fprintf(stderr, " ns μs ms s ks\n"); \ - fprintf(stderr, "min: "); \ - if (s->min < 1000) \ - fprintf(stderr, "%lu (ns)", s->min); \ - else if (s->min < 1000000) \ - fprintf(stderr, "%.2f (μs)", s->min / 1000.0); \ - else if (s->min < 1000000000) \ - fprintf(stderr, "%.2f (ms)", s->min / 1000000.0); \ - else if (s->min < 1000000000000) \ - fprintf(stderr, "%.2f (s)", s->min / 1000000000.0); \ - fprintf(stderr, " max: "); \ - if (s->max < 1000) \ - fprintf(stderr, "%lu (ns)", s->max); \ - else if (s->max < 1000000) \ - fprintf(stderr, "%.2f (μs)", s->max / 1000.0); \ - else if (s->max < 1000000000) \ - fprintf(stderr, "%.2f (ms)", s->max / 1000000.0); \ - else if (s->max < 1000000000000) \ - fprintf(stderr, "%.2f (s)", s->max / 1000000000.0); \ - fprintf(stderr, " mean: "); \ - if (m < 1000) \ - fprintf(stderr, "%.2f (ns)", m); \ - else if (m < 1000000) \ - fprintf(stderr, "%.2f (μs)", m / 1000.0); \ - else if (m < 1000000000) \ - fprintf(stderr, "%.2f (ms)", m / 1000000.0); \ - else if (m < 1000000000000) \ - fprintf(stderr, "%.2f (s)", m / 1000000000.0); \ - fprintf(stderr, "\n"); \ - } \ - fflush(stderr); \ - } - - -#define STAT_INIT(var, name) \ - var->name ## _stat.min = ~0; \ - var->name ## _stat.max = 0; \ - var->name ## _stat.mean = 0.0; \ - var->name ## _stat.h = 0; \ - var->name ## _stat.d.then = 0; \ - var->name ## _stat.d.unit = ns; - -#define STAT_TICK(var, name) name ## _stat_tick(&var->name ## _stat) - -#define STAT_TOCK(var, name) name ## _stat_tock(&var->name ## _stat) - -#define STAT_RESET(var, name) name ## _stat_reset(&var->name ## _stat) - -#define STAT_MEAN_LOG2_SAMPLE(var, name) \ - name ## _stat_mean_lg2(&var->name ## _stat) - -#define STAT_MEAN_SAMPLE(var, name) \ - name ## _stat_mean(&var->name ## _stat) - -#define STAT_PRINT(var, name, mod) \ - name ## _stat_print_histogram(&var->name ## _stat, mod) - - -#if defined(__cplusplus) -} -#endif - -#endif // __STATS_H__ diff --git a/c_src/wiredtiger-build.patch b/c_src/wiredtiger-build.patch index 8b0c1ac..cb619ff 100644 --- a/c_src/wiredtiger-build.patch +++ b/c_src/wiredtiger-build.patch @@ -10,3 +10,97 @@ index 6d78823..2122cf8 100644 +libwiredtiger_snappy_la_CFLAGS = -I$(src_builddir)/../../system/include +libwiredtiger_snappy_la_LDFLAGS = -avoid-version -module -L$(src_builddir)/../../system/lib -Wl,-rpath,lib/wterl-0.9.0/priv:lib/wterl/priv:priv libwiredtiger_snappy_la_LIBADD = -lsnappy +diff --git a/src/support/cksum.c b/src/support/cksum.c +index 7e9befe..b924db7 100644 +--- a/src/support/cksum.c ++++ b/src/support/cksum.c +@@ -27,6 +27,13 @@ + + #include "wt_internal.h" + ++#if defined(__amd64) || defined(__x86_64) ++#define USE_HARDWARE_CRC32 1 ++#else ++#undef USE_HARDWARE_CRC32 ++#endif ++ ++#ifdef USE_HARDWARE_CRC32 + static const uint32_t g_crc_slicing[8][256] = { + #ifdef WORDS_BIGENDIAN + /* +@@ -1078,6 +1085,7 @@ static const uint32_t g_crc_slicing[8][256] = { + } + #endif + }; ++#endif /* USE_HARDWARE_CRC32 */ + + /* + * __wt_cksum -- +@@ -1106,15 +1114,29 @@ __wt_cksum(const void *chunk, size_t len) + /* Checksum one byte at a time to the first 4B boundary. */ + for (p = chunk; + ((uintptr_t)p & (sizeof(uint32_t) - 1)) != 0 && +- len > 0; ++p, --len) ++ len > 0; ++p, --len) { ++#ifdef USE_HARDWARE_CRC32 ++ __asm__ __volatile__( ++ ".byte 0xF2, 0x0F, 0x38, 0xF0, 0xF1" ++ : "=S" (crc) ++ : "0" (crc), "c" (*p)); ++#else + #ifdef WORDS_BIGENDIAN + crc = g_crc_slicing[0][((crc >> 24) ^ *p) & 0xFF] ^ (crc << 8); + #else + crc = g_crc_slicing[0][(crc ^ *p) & 0xFF] ^ (crc >> 8); + #endif ++#endif ++ } + + /* Checksum in 8B chunks. */ + for (nqwords = len / sizeof(uint64_t); nqwords; nqwords--) { ++#ifdef USE_HARDWARE_CRC32 ++ __asm__ __volatile__ ( ++ ".byte 0xf2, 0x48, 0x0f, 0x38, 0xf0, 0xf1;" ++ : "=S"(crc) ++ : "S"(crc), "c"(*p)); ++#else + crc ^= *(uint32_t *)p; + p += sizeof(uint32_t); + next = *(uint32_t *)p; +@@ -1139,22 +1161,32 @@ __wt_cksum(const void *chunk, size_t len) + g_crc_slicing[1][(next >> 16) & 0xFF] ^ + g_crc_slicing[0][(next >> 24)]; + #endif ++#endif + } + + /* Checksum trailing bytes one byte at a time. */ ++ for (len &= 0x7; len > 0; ++p, len--) { ++#ifdef USE_HARDWARE_CRC32 ++ __asm__ __volatile__( ++ ".byte 0xF2, 0x0F, 0x38, 0xF0, 0xF1" ++ : "=S" (crc) ++ : "0" (crc), "c" (*p)); ++#else + #ifdef WORDS_BIGENDIAN +- for (len &= 0x7; len > 0; ++p, len--) + crc = g_crc_slicing[0][((crc >> 24) ^ *p) & 0xFF] ^ (crc << 8); ++#else ++ crc = g_crc_slicing[0][(crc ^ *p) & 0xFF] ^ (crc >> 8); ++#endif ++#endif ++ } + ++#ifdef WORDS_BIGENDIAN + /* Do final byte swap to produce a result identical to little endian */ + crc = + ((crc << 24) & 0xFF000000) | + ((crc << 8) & 0x00FF0000) | + ((crc >> 8) & 0x0000FF00) | + ((crc >> 24) & 0x000000FF); +-#else +- for (len &= 0x7; len > 0; ++p, len--) +- crc = g_crc_slicing[0][(crc ^ *p) & 0xFF] ^ (crc >> 8); + #endif + return (~crc); + } diff --git a/c_src/wterl.c b/c_src/wterl.c index f68723e..8a04d57 100644 --- a/c_src/wterl.c +++ b/c_src/wterl.c @@ -16,76 +16,75 @@ * under the License. * */ + #include "erl_nif.h" #include "erl_driver.h" #include #include +#include +#include #include -#ifdef DEBUG -#include -#include -#define dprint(s, ...) do { \ - fprintf(stderr, s, ##__VA_ARGS__); \ - fprintf(stderr, "\r\n"); \ - fflush(stderr); \ - } while(0); -#else -# define dprint(s, ...) {} -#endif - -#ifndef __UNUSED -#define __UNUSED(v) ((void)(v)) -#endif - #include "wiredtiger.h" -#include "async_nif.h" -#include "khash.h" -#ifdef WTERL_STATS -#include "stats.h" -#endif +#include "common.h" +#include "async_nif.h" +#include "queue.h" +#include "cas.h" + +#define MAX_CACHE_SIZE ASYNC_NIF_MAX_WORKERS static ErlNifResourceType *wterl_conn_RESOURCE; static ErlNifResourceType *wterl_cursor_RESOURCE; -/* Generators for 'cursors' a named, type-specific hash table functions. */ -KHASH_MAP_INIT_STR(cursors, WT_CURSOR*); +typedef char Uri[128]; -/** - * We will have exactly one (1) WterlCtx for each async worker thread. As - * requests arrive we will reuse the same WterlConnHandle->contexts[worker_id] - * WterlCtx in the work block ensuring that each async worker thread a) has - * a separate WT_SESSION (because they are not thread safe) and b) when - * possible we avoid opening new cursors by first looking for one in the - * cursors hash table. In practice this means we could have (num_workers - * * num_tables) of cursors open which we need to account for when setting - * session_max in the configuration of WiredTiger so that it creates enough - * hazard pointers for this extreme case. - * - * Note: We don't protect access to this struct with a mutex because it will - * only be accessed by the same worker thread. - */ -typedef struct { +struct wterl_ctx { + STAILQ_ENTRY(wterl_ctx) entries; + uint64_t sig; + size_t sig_len; WT_SESSION *session; - khash_t(cursors) *cursors; -} WterlCtx; + uint32_t num_cursors; + const char *session_config; + struct cursor_info { + const char *uri; + const char *config; + WT_CURSOR *cursor; + } ci[]; // Note: must be last in struct +}; -typedef struct { +typedef struct wterl_conn { WT_CONNECTION *conn; const char *session_config; - ErlNifMutex *contexts_mutex; - WterlCtx contexts[ASYNC_NIF_MAX_WORKERS]; + STAILQ_HEAD(ctxs, wterl_ctx) cache; + ErlNifMutex *cache_mutex; + uint32_t cache_size; + struct wterl_ctx *mru_ctx[ASYNC_NIF_MAX_WORKERS]; } WterlConnHandle; typedef struct { - WT_CURSOR *cursor; WT_SESSION *session; + WT_CURSOR *cursor; } WterlCursorHandle; -/* WiredTiger object names*/ -typedef char Uri[128]; +struct wterl_event_handlers { + WT_EVENT_HANDLER handlers; + ErlNifEnv *msg_env_error; + ErlNifMutex *error_mutex; + ErlNifEnv *msg_env_message; + ErlNifMutex *message_mutex; + ErlNifEnv *msg_env_progress; + ErlNifMutex *progress_mutex; + ErlNifPid to_pid; +}; + +struct wterl_priv_data { + void *async_nif_priv; // Note: must be first element in struct + struct wterl_event_handlers eh; + char wterl_vsn[512]; + char wiredtiger_vsn[512]; +}; /* Atoms (initialized in on_load) */ static ERL_NIF_TERM ATOM_ERROR; @@ -99,32 +98,415 @@ static ERL_NIF_TERM ATOM_WTERL_VSN; static ERL_NIF_TERM ATOM_WIREDTIGER_VSN; static ERL_NIF_TERM ATOM_MSG_PID; -struct wterl_event_handlers { - WT_EVENT_HANDLER handlers; - ErlNifEnv *msg_env_error; - ErlNifMutex *error_mutex; - ErlNifEnv *msg_env_message; - ErlNifMutex *message_mutex; - ErlNifEnv *msg_env_progress; - ErlNifMutex *progress_mutex; - ErlNifPid to_pid; -}; - -/* Generators for 'conns' a named, type-specific hash table functions. */ -KHASH_MAP_INIT_PTR(conns, WterlConnHandle*); - -struct wterl_priv_data { - void *async_nif_priv; // Note: must be first element in struct - ErlNifMutex *conns_mutex; - khash_t(conns) *conns; - struct wterl_event_handlers eh; - char wterl_vsn[512]; - char wiredtiger_vsn[512]; -}; - /* Global init for async_nif. */ ASYNC_NIF_INIT(wterl); +static inline size_t +__strlen(const char *s) +{ + if (s) + return strlen(s); + else + return 0; +} + +/** + * A string hash function. + * + * A basic hash function for strings of characters used during the + * affinity association. + * + * s a NULL terminated set of bytes to be hashed + * -> an integer hash encoding of the bytes + */ +static inline uint32_t +__str_hash(uint32_t in, const char *p, size_t len) +{ + uint32_t h = in; + for (++p ; len > 0; ++p, --len) + h += (h << 5) + (h >> 27) + *p; + return h; +} + +#if defined(__amd64) || defined(__x86_64) +/* Note: we'll use this to lower the chances that we'll have a hash + collision until I can finish a nice trie and use that to be a bit + more precise. When that's done we can skip hash/crc32 and just + use the binary position in the trie as our "signature". */ +static inline uint32_t +__crc32(uint32_t crc, const char *bytes, size_t len) +{ + const uint8_t *p; + for (p = (const uint8_t*)bytes; len > 0; ++p, --len) { + __asm__ __volatile__( + ".byte 0xF2, 0x0F, 0x38, 0xF0, 0xF1" + : "=S" (crc) + : "0" (crc), "c" (*p)); + } + return crc; +} +#else +#error unsupported platform +#endif + +/** + * Calculate the log2 of 64bit unsigned integers. + */ +#ifdef __GCC__ +#define __log2(X) ((unsigned) ((8 * (sizeof(uint64_t) - 1)) - __builtin_clzll((X)))) +#else +static inline uint32_t __log2(uint64_t x) { + static const int tab64[64] = { + 63, 0, 58, 1, 59, 47, 53, 2, + 60, 39, 48, 27, 54, 33, 42, 3, + 61, 51, 37, 40, 49, 18, 28, 20, + 55, 30, 34, 11, 43, 14, 22, 4, + 62, 57, 46, 52, 38, 26, 32, 41, + 50, 36, 17, 19, 29, 10, 13, 21, + 56, 45, 25, 31, 35, 16, 9, 12, + 44, 24, 15, 8, 23, 7, 6, 5}; + if (x == 0) return 0; + uint64_t v = x; + v |= v >> 1; + v |= v >> 2; + v |= v >> 4; + v |= v >> 8; + v |= v >> 16; + v |= v >> 32; + return tab64[((uint64_t)((v - (v >> 1)) * 0x07EDD5E59A4E28C2)) >> 58]; +} +#endif + +/** + * Evict items from the cache. + * + * Evict old contexts from the cache to make space for new, more frequently + * used contexts. + * + * -> number of items evicted + */ +static int +__ctx_cache_evict(WterlConnHandle *conn_handle) +{ + uint32_t mean, num_evicted; + struct wterl_ctx *c; + +#ifndef DEBUG + if (conn_handle->cache_size < MAX_CACHE_SIZE) + return 0; +#endif + + mean = conn_handle->cache_size / 2; + if (mean < 2) return 0; + + num_evicted = 0; + while (mean--) { + c = STAILQ_LAST(&conn_handle->cache, wterl_ctx, entries); + if (c) { + STAILQ_REMOVE(&conn_handle->cache, c, wterl_ctx, entries); + if (c->session) + c->session->close(c->session, NULL); + enif_free(c); + num_evicted++; + } + } + conn_handle->cache_size -= num_evicted; + return num_evicted; +} + +/** + * Find a matching item in the cache. + * + * See if there exists an item in the cache with a matching signature, if + * so remove it from the cache and return it for use by the callee. + * + * sig a 64-bit signature (hash) representing the combination of Uri and + * session+config/cursor+config pairs needed for this operation + */ +static struct wterl_ctx * +__ctx_cache_find(WterlConnHandle *conn_handle, const uint64_t sig) +{ + struct wterl_ctx *c; + + enif_mutex_lock(conn_handle->cache_mutex); + c = STAILQ_FIRST(&conn_handle->cache); + while (c != NULL) { + if (c->sig == sig) { // TODO: hash collisions *will* lead to SEGVs + // cache hit: + STAILQ_REMOVE(&conn_handle->cache, c, wterl_ctx, entries); + conn_handle->cache_size -= 1; + break; + } + c = STAILQ_NEXT(c, entries); + } + enif_mutex_unlock(conn_handle->cache_mutex); + DPRINTF("cache_find: [%u] %s (%p)", conn_handle->cache_size, c ? "hit" : "miss", c); + return c; +} + +/** + * Add/Return an item to the cache. + * + * Return an item into the cache, reset the cursors it has open and put it at + * the front of the LRU. + */ +static void +__ctx_cache_add(WterlConnHandle *conn_handle, struct wterl_ctx *c) +{ + enif_mutex_lock(conn_handle->cache_mutex); + __ctx_cache_evict(conn_handle); + STAILQ_INSERT_TAIL(&conn_handle->cache, c, entries); + conn_handle->cache_size += 1; +#ifdef DEBUG + uint32_t sz = 0; + struct wterl_ctx *f; + STAILQ_FOREACH(f, &conn_handle->cache, entries) { + sz++; + } +#endif + enif_mutex_unlock(conn_handle->cache_mutex); + DPRINTF("cache_add: [%u:%u] (%p)", sz, conn_handle->cache_size, c); +} + +static inline char * +__copy_str_into(char **p, const char *s) +{ + char *a = *p; + size_t len = __strlen(s); + memcpy(*p, s, len); + (*p)[len] = '\0'; + *p += len + 1; + return a; +} + +/** + * Get a reusable cursor that was opened for a particular worker within its + * session. + */ +static int +__retain_ctx(WterlConnHandle *conn_handle, uint32_t worker_id, + struct wterl_ctx **ctx, + int count, const char *session_config, ...) +{ + int i = 0; + uint32_t hash = 0; + uint32_t crc = 0; + uint64_t sig = 0; + size_t l, sig_len = 0; + va_list ap; + const char *arg; + struct wterl_ctx *c; + + arg = session_config; + va_start(ap, session_config); + if (session_config) { + l = __strlen(session_config); + hash = __str_hash(hash, session_config, l); + crc = __crc32(crc, session_config, l); + sig_len += l + 1; + DPRINTF("sig/1: %s", session_config); + } else { + sig_len += 1; + } + for (i = 0; i < (2 * count); i++) { + arg = va_arg(ap, const char *); + if (arg) { + l = __strlen(arg); + DPRINTF("sig/args: %s", arg); + hash = __str_hash(hash, arg, l); + crc = __crc32(crc, arg, l); + sig_len += l + 1; + } else { + sig_len += 1; + } + } + sig = (uint64_t)crc << 32 | hash; + DPRINTF("sig %llu [%u:%u]", PRIuint64(sig), crc, hash); + va_end(ap); + + *ctx = NULL; + + c = conn_handle->mru_ctx[worker_id]; + if (CASPO(&conn_handle->mru_ctx[worker_id], c, 0) == c) { + if (c == 0) { + // mru miss: + DPRINTF("[%.4u] mru miss, empty", worker_id); + *ctx = NULL; + } else { + if (c->sig == sig) { + // mru hit: + DPRINTF("[%.4u] mru hit: %llu found", worker_id, PRIuint64(sig)); + *ctx = c; + } else { + // mru mismatch: + DPRINTF("[%.4u] mru miss: %llu != %llu", worker_id, PRIuint64(sig), PRIuint64(c->sig)); + __ctx_cache_add(conn_handle, c); + *ctx = NULL; + } + } + } + + if (*ctx == NULL) { + // check the cache + (*ctx) = __ctx_cache_find(conn_handle, sig); + if ((*ctx) == NULL) { + // cache miss: + DPRINTF("[%.4u] cache miss: %llu [cache size: %d]", worker_id, PRIuint64(sig), conn_handle->cache_size); + WT_CONNECTION *conn = conn_handle->conn; + WT_SESSION *session = NULL; + int rc = conn->open_session(conn, NULL, session_config, &session); + if (rc != 0) { + return rc; + } + size_t s = sizeof(struct wterl_ctx) + (count * sizeof(struct cursor_info)) + sig_len; + *ctx = enif_alloc(s); // TODO: enif_alloc_resource() + if (*ctx == NULL) { + session->close(session, NULL); + return ENOMEM; + } + memset(*ctx, 0, s); + (*ctx)->sig = sig; + (*ctx)->session = session; + (*ctx)->sig_len = sig_len; + char *p = (char *)(*ctx) + (s - sig_len); + (*ctx)->session_config = __copy_str_into(&p, session_config); + (*ctx)->num_cursors = count; + session_config = arg; + va_start(ap, session_config); + for (i = 0; i < count; i++) { + const char *uri = va_arg(ap, const char *); + const char *config = va_arg(ap, const char *); + // TODO: what to do (if anything) when uri or config is NULL? + (*ctx)->ci[i].uri = __copy_str_into(&p, uri); + (*ctx)->ci[i].config = __copy_str_into(&p, config); + rc = session->open_cursor(session, uri, NULL, config, &(*ctx)->ci[i].cursor); + if (rc != 0) { + enif_free(*ctx); + session->close(session, NULL); // this will free the cursors too + return rc; + } + } + va_end(ap); + } else { + // cache hit: + DPRINTF("[%.4u] cache hit: %llu [cache size: %d]", worker_id, PRIuint64(sig), conn_handle->cache_size); + } + } + return 0; +} + +/** + * Return a context to the cache for reuse. + */ +static void +__release_ctx(WterlConnHandle *conn_handle, uint32_t worker_id, struct wterl_ctx *ctx) +{ + uint32_t i; + WT_CURSOR *cursor; + struct wterl_ctx *c = NULL; + + for (i = 0; i < ctx->num_cursors; i++) { + cursor = ctx->ci[i].cursor; + cursor->reset(cursor); + } + + c = conn_handle->mru_ctx[worker_id]; + if (CASPO(&conn_handle->mru_ctx[worker_id], c, ctx) != c) { + __ctx_cache_add(conn_handle, ctx); + DPRINTF("[%.4u] reset %d cursors, returnd ctx to cache", worker_id, ctx->num_cursors); + } else { + if (c != NULL) { + __ctx_cache_add(conn_handle, c); + DPRINTF("[%.4u] reset %d cursors, returned ctx to cache", worker_id, ctx->num_cursors); + } else { + DPRINTF("[%.4u] reset %d cursors, returned ctx to mru", worker_id, ctx->num_cursors); + } + } +} + +/** + * Close all sessions and all cursors open on any objects. + * + * Note: always call within enif_mutex_lock/unlock(conn_handle->cache_mutex) + */ +void +__close_all_sessions(WterlConnHandle *conn_handle) +{ + struct wterl_ctx *c, *n; + int worker_id; + + // clear out the mru + for (worker_id = 0; worker_id < ASYNC_NIF_MAX_WORKERS; worker_id++) { + do { + c = conn_handle->mru_ctx[worker_id]; + } while(CASPO(&conn_handle->mru_ctx[worker_id], c, 0) != c); + + if (c != 0) { + c->session->close(c->session, NULL); + enif_free(c); + } + } + + // clear out the cache + c = STAILQ_FIRST(&conn_handle->cache); + while (c != NULL) { + n = STAILQ_NEXT(c, entries); + STAILQ_REMOVE(&conn_handle->cache, c, wterl_ctx, entries); + conn_handle->cache_size -= 1; + c->session->close(c->session, NULL); + enif_free(c); + c = n; + } +} + +/** + * Close cursors open on 'uri' object. + * + * Note: always call within enif_mutex_lock/unlock(conn_handle->cache_mutex) + */ +void +__close_cursors_on(WterlConnHandle *conn_handle, const char *uri) +{ + struct wterl_ctx *c, *n; + int worker_id, idx, cnt; + + // walk the mru first, look for open cursors on matching uri + for (worker_id = 0; worker_id < ASYNC_NIF_MAX_WORKERS; worker_id++) { + c = conn_handle->mru_ctx[worker_id]; + if (CASPO(&conn_handle->mru_ctx[worker_id], c, 0) == c && c != 0) { + cnt = c->num_cursors; + for(idx = 0; idx < cnt; idx++) { + if (!strcmp(c->ci[idx].uri, uri)) { + c->session->close(c->session, NULL); + enif_free(c); + break; + } else { + if (CASPO(&conn_handle->mru_ctx[worker_id], 0, c) != 0) { + __ctx_cache_add(conn_handle, c); + } + } + } + } + } + + // next we walk the cache, look for open cursors on matching uri + c = STAILQ_FIRST(&conn_handle->cache); + while (c != NULL) { + n = STAILQ_NEXT(c, entries); + cnt = c->num_cursors; + for(idx = 0; idx < cnt; idx++) { + if (!strcmp(c->ci[idx].uri, uri)) { + STAILQ_REMOVE(&conn_handle->cache, c, wterl_ctx, entries); + conn_handle->cache_size -= 1; + c->session->close(c->session, NULL); + enif_free(c); + break; + } + } + c = n; + } + return; +} /** * Callback to handle error messages. @@ -233,179 +615,14 @@ __wterl_progress_handler(WT_EVENT_HANDLER *handler, const char *operation, uint6 enif_make_int64(msg_env, counter))); enif_clear_env(msg_env); if (!enif_send(NULL, to_pid, msg_env, msg)) - fprintf(stderr, "[%ld] %s\n", counter, operation); + fprintf(stderr, "[%llu] %s\n", PRIuint64(counter), operation); } else { - rc = (printf("[%ld] %s\n", counter, operation) >= 0 ? 0 : EIO); + rc = (printf("[%llu] %s\n", PRIuint64(counter), operation) >= 0 ? 0 : EIO); } enif_mutex_unlock(eh->progress_mutex); return rc; } -/** - * Open a WT_SESSION for the thread context 'ctx' to use, also init the - * shared cursor hash table. - * - * Note: always call within enif_mutex_lock/unlock(conn_handle->contexts_mutex) - */ -static int -__init_session_and_cursor_cache(WterlConnHandle *conn_handle, WterlCtx *ctx) -{ - /* Create a context for this worker thread to reuse. */ - WT_CONNECTION *conn = conn_handle->conn; - int rc = conn->open_session(conn, NULL, conn_handle->session_config, &ctx->session); - if (rc != 0) { - ctx->session = NULL; - return rc; - } - - ctx->cursors = kh_init(cursors); - if (!ctx->cursors) { - ctx->session->close(ctx->session, NULL); - ctx->session = NULL; - return ENOMEM; - } - - return 0; -} - -/** - * Get the per-worker reusable WT_SESSION for a worker_id. - */ -static int -__session_for(WterlConnHandle *conn_handle, unsigned int worker_id, WT_SESSION **session) -{ - WterlCtx *ctx = &conn_handle->contexts[worker_id]; - int rc = 0; - - if (ctx->session == NULL) { - enif_mutex_lock(conn_handle->contexts_mutex); - rc = __init_session_and_cursor_cache(conn_handle, ctx); - enif_mutex_unlock(conn_handle->contexts_mutex); - } - *session = ctx->session; - return rc; -} - -/** - * Close all sessions and all cursors open on any objects. - * - * Note: always call within enif_mutex_lock/unlock(conn_handle->contexts_mutex) - */ -void -__close_all_sessions(WterlConnHandle *conn_handle) -{ - int i; - - for (i = 0; i < ASYNC_NIF_MAX_WORKERS; i++) { - WterlCtx *ctx = &conn_handle->contexts[i]; - if (ctx->session != NULL) { - WT_SESSION *session = ctx->session; - khash_t(cursors) *h = ctx->cursors; - khiter_t itr; - for (itr = kh_begin(h); itr != kh_end(h); ++itr) { - if (kh_exist(h, itr)) { - WT_CURSOR *cursor = kh_val(h, itr); - char *key = (char *)kh_key(h, itr); - cursor->close(cursor); - kh_del(cursors, h, itr); - enif_free(key); - kh_value(h, itr) = NULL; - } - } - kh_destroy(cursors, h); - session->close(session, NULL); - ctx->session = NULL; - } - } -} - -/** - * Close cursors open on 'uri' object. - * - * Note: always call within enif_mutex_lock/unlock(conn_handle->contexts_mutex) - */ -void -__close_cursors_on(WterlConnHandle *conn_handle, const char *uri) -{ - int i; - - for (i = 0; i < ASYNC_NIF_MAX_WORKERS; i++) { - WterlCtx *ctx = &conn_handle->contexts[i]; - if (ctx->session != NULL) { - khash_t(cursors) *h = ctx->cursors; - khiter_t itr = kh_get(cursors, h, (char *)uri); - if (itr != kh_end(h)) { - WT_CURSOR *cursor = kh_value(h, itr); - char *key = (char *)kh_key(h, itr); - cursor->close(cursor); - kh_del(cursors, h, itr); - enif_free(key); - kh_value(h, itr) = NULL; - } - } - } -} - -/** - * Get a reusable cursor that was opened for a particular worker within its - * session. - */ -static int -__retain_cursor(WterlConnHandle *conn_handle, unsigned int worker_id, const char *uri, WT_CURSOR **cursor) -{ - /* Check to see if we have a cursor open for this uri and if so reuse it. */ - WterlCtx *ctx = &conn_handle->contexts[worker_id]; - khash_t(cursors) *h = NULL; - khiter_t itr; - int rc; - - if (ctx->session == NULL) { - enif_mutex_lock(conn_handle->contexts_mutex); - rc = __init_session_and_cursor_cache(conn_handle, ctx); - enif_mutex_unlock(conn_handle->contexts_mutex); - if (rc != 0) - return rc; - } - - h = ctx->cursors; - itr = kh_get(cursors, h, (char *)uri); - if (itr != kh_end(h)) { - // key exists in hash table, retrieve it - *cursor = (WT_CURSOR*)kh_value(h, itr); - } else { - // key does not exist in hash table, create and insert one - enif_mutex_lock(conn_handle->contexts_mutex); - WT_SESSION *session = conn_handle->contexts[worker_id].session; - rc = session->open_cursor(session, uri, NULL, "overwrite,raw", cursor); - if (rc != 0) { - enif_mutex_unlock(conn_handle->contexts_mutex); - return rc; - } - - char *key = enif_alloc(sizeof(Uri)); - if (!key) { - session->close(session, NULL); - enif_mutex_unlock(conn_handle->contexts_mutex); - return ENOMEM; - } - memcpy(key, uri, 128); - int itr_status; - itr = kh_put(cursors, h, key, &itr_status); - kh_value(h, itr) = *cursor; - enif_mutex_unlock(conn_handle->contexts_mutex); - } - return 0; -} - -static void -__release_cursor(WterlConnHandle *conn_handle, unsigned int worker_id, const char *uri, WT_CURSOR *cursor) -{ - __UNUSED(conn_handle); - __UNUSED(worker_id); - __UNUSED(uri); - cursor->reset(cursor); -} - /** * Convenience function to generate {error, {errno, Reason}} or 'not_found' * Erlang terms to return to callers. @@ -424,10 +641,11 @@ __strerror_term(ErlNifEnv* env, int rc) and/or may be localized to any given language (i18n). Use the errno atom rather than the message when matching in Erlang. You've been warned. */ + DPRINTF("error: %s", erl_errno_id(rc)); return enif_make_tuple2(env, ATOM_ERROR, - enif_make_tuple2(env, - enif_make_atom(env, erl_errno_id(rc)), - enif_make_string(env, wiredtiger_strerror(rc), ERL_NIF_LATIN1))); + enif_make_tuple2(env, + enif_make_atom(env, erl_errno_id(rc)), + enif_make_string(env, wiredtiger_strerror(rc), ERL_NIF_LATIN1))); } } @@ -450,7 +668,7 @@ ASYNC_NIF_DECL( { // pre if (!(argc == 3 && - (enif_get_string(env, argv[0], args->homedir, sizeof args->homedir, ERL_NIF_LATIN1) > 0) && + (enif_get_string(env, argv[0], args->homedir, sizeof(args->homedir), ERL_NIF_LATIN1) > 0) && enif_is_binary(env, argv[1]) && enif_is_binary(env, argv[2]))) { ASYNC_NIF_RETURN_BADARG(); @@ -477,10 +695,11 @@ ASYNC_NIF_DECL( int rc = wiredtiger_open(args->homedir, (WT_EVENT_HANDLER*)&args->priv->eh.handlers, - config.data[0] != 0 ? (const char*)config.data : NULL, + (config.size > 1) ? (const char *)config.data : NULL, &conn); if (rc == 0) { WterlConnHandle *conn_handle = enif_alloc_resource(wterl_conn_RESOURCE, sizeof(WterlConnHandle)); + memset(conn_handle, 0, sizeof(WterlConnHandle)); if (!conn_handle) { ASYNC_NIF_REPLY(__strerror_term(env, ENOMEM)); return; @@ -493,30 +712,21 @@ ASYNC_NIF_DECL( return; } memcpy(sc, session_config.data, session_config.size); - conn_handle->session_config = (const char *)sc; } else { conn_handle->session_config = NULL; } - conn_handle->contexts_mutex = enif_mutex_create(NULL); - enif_mutex_lock(conn_handle->contexts_mutex); + conn_handle->cache_mutex = enif_mutex_create(NULL); + enif_mutex_lock(conn_handle->cache_mutex); conn_handle->conn = conn; - memset(conn_handle->contexts, 0, sizeof(WterlCtx) * ASYNC_NIF_MAX_WORKERS); ERL_NIF_TERM result = enif_make_resource(env, conn_handle); - /* Keep track of open connections so as to free when unload/reload/etc. - are called. */ - khash_t(conns) *h; - enif_mutex_lock(args->priv->conns_mutex); - h = args->priv->conns; - int itr_status = 0; - khiter_t itr = kh_put(conns, h, conn, &itr_status); - if (itr_status != 0) // 0 indicates the key exists already - kh_value(h, itr) = conn_handle; - enif_mutex_unlock(args->priv->conns_mutex); + /* Init list for cache of reuseable contexts */ + STAILQ_INIT(&conn_handle->cache); + conn_handle->cache_size = 0; enif_release_resource(conn_handle); - enif_mutex_unlock(conn_handle->contexts_mutex); + enif_mutex_unlock(conn_handle->cache_mutex); ASYNC_NIF_REPLY(enif_make_tuple2(env, ATOM_OK, result)); } else @@ -553,7 +763,7 @@ ASYNC_NIF_DECL( { // work /* Free up the shared sessions and cursors. */ - enif_mutex_lock(args->conn_handle->contexts_mutex); + enif_mutex_lock(args->conn_handle->cache_mutex); __close_all_sessions(args->conn_handle); if (args->conn_handle->session_config) { enif_free((char *)args->conn_handle->session_config); @@ -561,21 +771,8 @@ ASYNC_NIF_DECL( } WT_CONNECTION* conn = args->conn_handle->conn; int rc = conn->close(conn, NULL); - - /* Connection is closed, remove it so we don't free on unload/reload/etc. */ - khash_t(conns) *h; - enif_mutex_lock(args->priv->conns_mutex); - h = args->priv->conns; - khiter_t itr; - itr = kh_get(conns, h, conn); - if (itr != kh_end(h)) { - /* key exists in table (as expected) delete it */ - kh_del(conns, h, itr); - kh_value(h, itr) = NULL; - } - enif_mutex_unlock(args->priv->conns_mutex); - enif_mutex_unlock(args->conn_handle->contexts_mutex); - enif_mutex_destroy(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); + enif_mutex_destroy(args->conn_handle->cache_mutex); memset(args->conn_handle, 0, sizeof(WterlConnHandle)); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); @@ -607,7 +804,7 @@ ASYNC_NIF_DECL( if (!(argc == 3 && enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle) && - (enif_get_string(env, argv[1], args->uri, sizeof args->uri, ERL_NIF_LATIN1) > 0) && + (enif_get_string(env, argv[1], args->uri, sizeof(args->uri), ERL_NIF_LATIN1) > 0) && enif_is_binary(env, argv[2]))) { ASYNC_NIF_RETURN_BADARG(); } @@ -661,7 +858,7 @@ ASYNC_NIF_DECL( if (!(argc == 3 && enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle) && - (enif_get_string(env, argv[1], args->uri, sizeof args->uri, ERL_NIF_LATIN1) > 0) && + (enif_get_string(env, argv[1], args->uri, sizeof(args->uri), ERL_NIF_LATIN1) > 0) && enif_is_binary(env, argv[2]))) { ASYNC_NIF_RETURN_BADARG(); } @@ -671,12 +868,12 @@ ASYNC_NIF_DECL( { // work /* This call requires that there be no open cursors referencing the object. */ - enif_mutex_lock(args->conn_handle->contexts_mutex); + enif_mutex_lock(args->conn_handle->cache_mutex); __close_cursors_on(args->conn_handle, args->uri); ErlNifBinary config; if (!enif_inspect_binary(env, args->config, &config)) { - enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(enif_make_badarg(env)); return; } @@ -688,7 +885,7 @@ ASYNC_NIF_DECL( WT_SESSION *session = NULL; int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); if (rc != 0) { - enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } @@ -698,7 +895,7 @@ ASYNC_NIF_DECL( this will result in EBUSY(16) "Device or resource busy". */ rc = session->drop(session, args->uri, (const char*)config.data); (void)session->close(session, NULL); - enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); }, { // post @@ -727,8 +924,8 @@ ASYNC_NIF_DECL( if (!(argc == 4 && enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle) && - (enif_get_string(env, argv[1], args->oldname, sizeof args->oldname, ERL_NIF_LATIN1) > 0) && - (enif_get_string(env, argv[2], args->newname, sizeof args->newname, ERL_NIF_LATIN1) > 0) && + (enif_get_string(env, argv[1], args->oldname, sizeof(args->oldname), ERL_NIF_LATIN1) > 0) && + (enif_get_string(env, argv[2], args->newname, sizeof(args->newname), ERL_NIF_LATIN1) > 0) && enif_is_binary(env, argv[3]))) { ASYNC_NIF_RETURN_BADARG(); } @@ -738,12 +935,12 @@ ASYNC_NIF_DECL( { // work /* This call requires that there be no open cursors referencing the object. */ - enif_mutex_lock(args->conn_handle->contexts_mutex); + enif_mutex_lock(args->conn_handle->cache_mutex); __close_cursors_on(args->conn_handle, args->oldname); ErlNifBinary config; if (!enif_inspect_binary(env, args->config, &config)) { - enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(enif_make_badarg(env)); return; } @@ -755,7 +952,7 @@ ASYNC_NIF_DECL( WT_SESSION *session = NULL; int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); if (rc != 0) { - enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } @@ -766,7 +963,7 @@ ASYNC_NIF_DECL( this will result in EBUSY(16) "Device or resource busy". */ rc = session->rename(session, args->oldname, args->newname, (const char*)config.data); (void)session->close(session, NULL); - enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); }, { // post @@ -796,7 +993,7 @@ ASYNC_NIF_DECL( if (!(argc == 3 && enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle) && - (enif_get_string(env, argv[1], args->uri, sizeof args->uri, ERL_NIF_LATIN1) > 0) && + (enif_get_string(env, argv[1], args->uri, sizeof(args->uri), ERL_NIF_LATIN1) > 0) && enif_is_binary(env, argv[2]))) { ASYNC_NIF_RETURN_BADARG(); } @@ -806,12 +1003,12 @@ ASYNC_NIF_DECL( { // work /* This call requires that there be no open cursors referencing the object. */ - enif_mutex_lock(args->conn_handle->contexts_mutex); + enif_mutex_lock(args->conn_handle->cache_mutex); __close_cursors_on(args->conn_handle, args->uri); ErlNifBinary config; if (!enif_inspect_binary(env, args->config, &config)) { - enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(enif_make_badarg(env)); return; } @@ -823,14 +1020,14 @@ ASYNC_NIF_DECL( WT_SESSION *session = NULL; int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); if (rc != 0) { - enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } rc = session->salvage(session, args->uri, (const char*)config.data); (void)session->close(session, NULL); - enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); }, { // post @@ -843,8 +1040,7 @@ ASYNC_NIF_DECL( * of objects specified. * * argv[0] WterlConnHandle resource - * argv[1] object name URI string - * argv[2] config string as an Erlang binary + * argv[1] config string as an Erlang binary */ ASYNC_NIF_DECL( wterl_checkpoint, @@ -870,13 +1066,15 @@ ASYNC_NIF_DECL( ASYNC_NIF_REPLY(enif_make_badarg(env)); return; } + WT_CONNECTION *conn = args->conn_handle->conn; WT_SESSION *session = NULL; - int rc = __session_for(args->conn_handle, worker_id, &session); + int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); if (rc != 0) { ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } rc = session->checkpoint(session, (const char*)config.data); + (void)session->close(session, NULL); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); }, { // post @@ -909,7 +1107,7 @@ ASYNC_NIF_DECL( if (!(argc == 5 && enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle) && - (enif_get_string(env, argv[1], args->uri, sizeof args->uri, ERL_NIF_LATIN1) > 0) && + (enif_get_string(env, argv[1], args->uri, sizeof(args->uri), ERL_NIF_LATIN1) > 0) && enif_is_binary(env, argv[4]))) { ASYNC_NIF_RETURN_BADARG(); } @@ -933,17 +1131,16 @@ ASYNC_NIF_DECL( } args->config = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[4]); enif_keep_resource((void*)args->conn_handle); - affinity = args->uri; }, { // work /* This call requires that there be no open cursors referencing the object. */ - enif_mutex_lock(args->conn_handle->contexts_mutex); + enif_mutex_lock(args->conn_handle->cache_mutex); __close_cursors_on(args->conn_handle, args->uri); ErlNifBinary config; if (!enif_inspect_binary(env, args->config, &config)) { - enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(enif_make_badarg(env)); return; } @@ -956,7 +1153,7 @@ ASYNC_NIF_DECL( WT_SESSION *session = NULL; int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); if (rc != 0) { - enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } @@ -971,7 +1168,7 @@ ASYNC_NIF_DECL( mess. */ if (!args->from_first) { if (!enif_inspect_binary(env, args->start, &start_key)) { - enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(enif_make_badarg(env)); return; } @@ -979,7 +1176,7 @@ ASYNC_NIF_DECL( rc = session->open_cursor(session, args->uri, NULL, "raw", &start); if (rc != 0) { session->close(session, NULL); - enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } @@ -989,7 +1186,7 @@ ASYNC_NIF_DECL( if (rc != 0) { start->close(start); session->close(session, NULL); - enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } @@ -1004,7 +1201,7 @@ ASYNC_NIF_DECL( if (!enif_inspect_binary(env, args->stop, &stop_key)) { start->close(start); session->close(session, NULL); - enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(enif_make_badarg(env)); return; } @@ -1013,7 +1210,7 @@ ASYNC_NIF_DECL( if (rc != 0) { start->close(start); session->close(session, NULL); - enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } @@ -1024,7 +1221,7 @@ ASYNC_NIF_DECL( start->close(start); stop->close(stop); session->close(session, NULL); - enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } @@ -1042,7 +1239,7 @@ ASYNC_NIF_DECL( start->close(start); stop->close(stop); session->close(session, NULL); - enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); }, { // post @@ -1069,7 +1266,7 @@ ASYNC_NIF_DECL( if (!(argc == 3 && enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle) && - (enif_get_string(env, argv[1], args->uri, sizeof args->uri, ERL_NIF_LATIN1) > 0) && + (enif_get_string(env, argv[1], args->uri, sizeof(args->uri), ERL_NIF_LATIN1) > 0) && enif_is_binary(env, argv[2]))) { ASYNC_NIF_RETURN_BADARG(); } @@ -1079,12 +1276,12 @@ ASYNC_NIF_DECL( { // work /* This call requires that there be no open cursors referencing the object. */ - enif_mutex_lock(args->conn_handle->contexts_mutex); + enif_mutex_lock(args->conn_handle->cache_mutex); __close_cursors_on(args->conn_handle, args->uri); ErlNifBinary config; if (!enif_inspect_binary(env, args->config, &config)) { - enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(enif_make_badarg(env)); return; } @@ -1096,14 +1293,14 @@ ASYNC_NIF_DECL( WT_SESSION *session = NULL; int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); if (rc != 0) { - enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } rc = session->upgrade(session, args->uri, (const char*)config.data); (void)session->close(session, NULL); - enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); }, { // post @@ -1131,7 +1328,7 @@ ASYNC_NIF_DECL( if (!(argc == 3 && enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle) && - (enif_get_string(env, argv[1], args->uri, sizeof args->uri, ERL_NIF_LATIN1) > 0) && + (enif_get_string(env, argv[1], args->uri, sizeof(args->uri), ERL_NIF_LATIN1) > 0) && enif_is_binary(env, argv[2]))) { ASYNC_NIF_RETURN_BADARG(); } @@ -1141,12 +1338,12 @@ ASYNC_NIF_DECL( { // work /* This call requires that there be no open cursors referencing the object. */ - enif_mutex_lock(args->conn_handle->contexts_mutex); + enif_mutex_lock(args->conn_handle->cache_mutex); __close_all_sessions(args->conn_handle); ErlNifBinary config; if (!enif_inspect_binary(env, args->config, &config)) { - enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(enif_make_badarg(env)); return; } @@ -1158,14 +1355,14 @@ ASYNC_NIF_DECL( WT_SESSION *session = NULL; int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); if (rc != 0) { - enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } rc = session->verify(session, args->uri, (const char*)config.data); (void)session->close(session, NULL); - enif_mutex_unlock(args->conn_handle->contexts_mutex); + enif_mutex_unlock(args->conn_handle->cache_mutex); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); }, { // post @@ -1192,13 +1389,13 @@ ASYNC_NIF_DECL( if (!(argc == 3 && enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle) && - (enif_get_string(env, argv[1], args->uri, sizeof args->uri, ERL_NIF_LATIN1) > 0) && + (enif_get_string(env, argv[1], args->uri, sizeof(args->uri), ERL_NIF_LATIN1) > 0) && enif_is_binary(env, argv[2]))) { ASYNC_NIF_RETURN_BADARG(); } args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); enif_keep_resource((void*)args->conn_handle); - affinity = args->uri; + //affinity = __str_hash(0, args->uri, __strlen(args->uri)); }, { // work @@ -1208,12 +1405,16 @@ ASYNC_NIF_DECL( return; } + struct wterl_ctx *ctx = NULL; WT_CURSOR *cursor = NULL; - int rc = __retain_cursor(args->conn_handle, worker_id, args->uri, &cursor); + int rc = __retain_ctx(args->conn_handle, worker_id, &ctx, 1, + args->conn_handle->session_config, + args->uri, "overwrite,raw"); if (rc != 0) { ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } + cursor = ctx->ci[0].cursor; WT_ITEM item_key; item_key.data = key.data; @@ -1221,7 +1422,7 @@ ASYNC_NIF_DECL( cursor->set_key(cursor, &item_key); rc = cursor->remove(cursor); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); - __release_cursor(args->conn_handle, worker_id, args->uri, cursor); + __release_ctx(args->conn_handle, worker_id, ctx); }, { // post @@ -1247,13 +1448,13 @@ ASYNC_NIF_DECL( if (!(argc == 3 && enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle) && - (enif_get_string(env, argv[1], args->uri, sizeof args->uri, ERL_NIF_LATIN1) > 0) && + (enif_get_string(env, argv[1], args->uri, sizeof(args->uri), ERL_NIF_LATIN1) > 0) && enif_is_binary(env, argv[2]))) { ASYNC_NIF_RETURN_BADARG(); } args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); enif_keep_resource((void*)args->conn_handle); - affinity = args->uri; + //affinity = __str_hash(0, args->uri, __strlen(args->uri)); }, { // work @@ -1263,12 +1464,16 @@ ASYNC_NIF_DECL( return; } + struct wterl_ctx *ctx = NULL; WT_CURSOR *cursor = NULL; - int rc = __retain_cursor(args->conn_handle, worker_id, args->uri, &cursor); + int rc = __retain_ctx(args->conn_handle, worker_id, &ctx, 1, + args->conn_handle->session_config, + args->uri, "overwrite,raw"); if (rc != 0) { ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } + cursor = ctx->ci[0].cursor; WT_ITEM item_key; WT_ITEM item_value; @@ -1277,20 +1482,23 @@ ASYNC_NIF_DECL( cursor->set_key(cursor, &item_key); rc = cursor->search(cursor); if (rc != 0) { + __release_ctx(args->conn_handle, worker_id, ctx); ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } rc = cursor->get_value(cursor, &item_value); if (rc != 0) { + __release_ctx(args->conn_handle, worker_id, ctx); ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } + ERL_NIF_TERM value; unsigned char *bin = enif_make_new_binary(env, item_value.size, &value); memcpy(bin, item_value.data, item_value.size); ASYNC_NIF_REPLY(enif_make_tuple2(env, ATOM_OK, value)); - __release_cursor(args->conn_handle, worker_id, args->uri, cursor); + __release_ctx(args->conn_handle, worker_id, ctx); }, { // post @@ -1318,7 +1526,7 @@ ASYNC_NIF_DECL( if (!(argc == 4 && enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle) && - (enif_get_string(env, argv[1], args->uri, sizeof args->uri, ERL_NIF_LATIN1) > 0) && + (enif_get_string(env, argv[1], args->uri, sizeof(args->uri), ERL_NIF_LATIN1) > 0) && enif_is_binary(env, argv[2]) && enif_is_binary(env, argv[3]))) { ASYNC_NIF_RETURN_BADARG(); @@ -1326,7 +1534,7 @@ ASYNC_NIF_DECL( args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); args->value = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[3]); enif_keep_resource((void*)args->conn_handle); - affinity = args->uri; + //affinity = __str_hash(0, args->uri, __strlen(args->uri)); }, { // work @@ -1341,12 +1549,16 @@ ASYNC_NIF_DECL( return; } + struct wterl_ctx *ctx = NULL; WT_CURSOR *cursor = NULL; - int rc = __retain_cursor(args->conn_handle, worker_id, args->uri, &cursor); + int rc = __retain_ctx(args->conn_handle, worker_id, &ctx, 1, + args->conn_handle->session_config, + args->uri, "overwrite,raw"); if (rc != 0) { ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } + cursor = ctx->ci[0].cursor; WT_ITEM item_key; WT_ITEM item_value; @@ -1357,7 +1569,7 @@ ASYNC_NIF_DECL( item_value.size = value.size; cursor->set_value(cursor, &item_value); rc = cursor->insert(cursor); - __release_cursor(args->conn_handle, worker_id, args->uri, cursor); + __release_ctx(args->conn_handle, worker_id, ctx); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); }, { // post @@ -1384,13 +1596,12 @@ ASYNC_NIF_DECL( if (!(argc == 3 && enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle) && - (enif_get_string(env, argv[1], args->uri, sizeof args->uri, ERL_NIF_LATIN1) > 0) && + (enif_get_string(env, argv[1], args->uri, sizeof(args->uri), ERL_NIF_LATIN1) > 0) && enif_is_binary(env, argv[2]))) { ASYNC_NIF_RETURN_BADARG(); } args->config = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); enif_keep_resource((void*)args->conn_handle); - affinity = args->uri; }, { // work @@ -1424,6 +1635,7 @@ ASYNC_NIF_DECL( ASYNC_NIF_REPLY(__strerror_term(env, ENOMEM)); return; } + memset(cursor_handle, 0, sizeof(WterlCursorHandle)); cursor_handle->session = session; cursor_handle->cursor = cursor; ERL_NIF_TERM result = enif_make_resource(env, cursor_handle); @@ -1600,6 +1812,7 @@ ASYNC_NIF_DECL( WT_CURSOR* cursor = args->cursor_handle->cursor; ASYNC_NIF_REPLY(__cursor_value_ret(env, cursor, cursor->next(cursor))); + DPRINTF("env: %p cursor: %p", env, cursor); }, { // post @@ -2054,6 +2267,26 @@ wterl_set_event_handler_pid(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) } +/** + * Called when a connection is free'd, our opportunity to clean up + * allocated resources. + */ +static void __wterl_conn_dtor(ErlNifEnv* env, void* obj) +{ + UNUSED(env); + WterlConnHandle *conn_handle = (WterlConnHandle *)obj; + + if (conn_handle->cache_mutex) { + DPRINTF("conn_handle dtor free'ing (%p)", obj); + enif_mutex_lock(conn_handle->cache_mutex); + __close_all_sessions(conn_handle); + conn_handle->conn->close(conn_handle->conn, NULL); + enif_mutex_unlock(conn_handle->cache_mutex); + enif_mutex_destroy(conn_handle->cache_mutex); + } +} + + /** * Called as this driver is loaded by the Erlang BEAM runtime triggered by the * module's on_load directive. @@ -2073,7 +2306,7 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info) const ERL_NIF_TERM* option; ErlNifResourceFlags flags = ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER; wterl_conn_RESOURCE = enif_open_resource_type(env, NULL, "wterl_conn_resource", - NULL, flags, NULL); + __wterl_conn_dtor, flags, NULL); wterl_cursor_RESOURCE = enif_open_resource_type(env, NULL, "wterl_cursor_resource", NULL, flags, NULL); @@ -2093,14 +2326,6 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info) return ENOMEM; memset(priv, 0, sizeof(struct wterl_priv_data)); - priv->conns_mutex = enif_mutex_create(NULL); - priv->conns = kh_init(conns); - if (!priv->conns) { - enif_mutex_destroy(priv->conns_mutex); - enif_free(priv); - return ENOMEM; - } - struct wterl_event_handlers *eh = &priv->eh; eh->error_mutex = enif_mutex_create(NULL); eh->message_mutex = enif_mutex_create(NULL); @@ -2126,8 +2351,7 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info) pointer to the async_nif's private data which we set here. */ ASYNC_NIF_LOAD(wterl, priv->async_nif_priv); if (!priv->async_nif_priv) { - kh_destroy(conns, priv->conns); - enif_mutex_destroy(priv->conns_mutex); + memset(priv, 0, sizeof(struct wterl_priv_data)); enif_free(priv); return ENOMEM; } @@ -2142,72 +2366,23 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info) static int on_reload(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info) { - __UNUSED(env); - __UNUSED(priv_data); - __UNUSED(load_info); + UNUSED(env); + UNUSED(priv_data); + UNUSED(load_info); return 0; // TODO: implement } static void on_unload(ErlNifEnv *env, void *priv_data) { - unsigned int i; struct wterl_priv_data *priv = (struct wterl_priv_data *)priv_data; - khash_t(conns) *h; - khiter_t itr_conns; - WterlConnHandle *conn_handle; - enif_mutex_lock(priv->conns_mutex); - h = priv->conns; + if (priv_data == NULL) + return; - for (itr_conns = kh_begin(h); itr_conns != kh_end(h); ++itr_conns) { - if (kh_exist(h, itr_conns)) { - conn_handle = kh_val(h, itr_conns); - if (conn_handle) { - enif_mutex_lock(conn_handle->contexts_mutex); - enif_free((void*)conn_handle->session_config); - for (i = 0; i < ASYNC_NIF_MAX_WORKERS; i++) { - WterlCtx *ctx = &conn_handle->contexts[i]; - if (ctx->session != NULL) { - WT_SESSION *session = ctx->session; - khash_t(cursors) *h = ctx->cursors; - khiter_t itr_cursors; - for (itr_cursors = kh_begin(h); itr_cursors != kh_end(h); ++itr_cursors) { - if (kh_exist(h, itr_cursors)) { - WT_CURSOR *cursor = kh_val(h, itr_cursors); - char *key = (char *)kh_key(h, itr_cursors); - cursor->close(cursor); - kh_del(cursors, h, itr_cursors); - enif_free(key); - kh_value(h, itr_cursors) = NULL; - } - } - kh_destroy(cursors, h); - session->close(session, NULL); - } - } - } - - /* This would have closed all cursors and sessions for us - but we do that explicitly above. */ - conn_handle->conn->close(conn_handle->conn, NULL); - } - } - - /* Continue to hold the context mutex while unloading the async_nif - to prevent new work from coming in while shutting down. */ + DPRINTF("unloading wterl NIF (%p)", priv); ASYNC_NIF_UNLOAD(wterl, env, priv->async_nif_priv); - for (itr_conns = kh_begin(h); itr_conns != kh_end(h); ++itr_conns) { - if (kh_exist(h, itr_conns)) { - conn_handle = kh_val(h, itr_conns); - if (conn_handle) { - enif_mutex_unlock(conn_handle->contexts_mutex); - enif_mutex_destroy(conn_handle->contexts_mutex); - } - } - } - /* At this point all WiredTiger state and threads are free'd/stopped so there is no chance that the event handler functions will be called so we can be sure that there won't be a race on eh.msg_env in the callback functions. */ @@ -2222,18 +2397,18 @@ on_unload(ErlNifEnv *env, void *priv_data) if (eh->msg_env_progress) enif_free_env(eh->msg_env_progress); - kh_destroy(conns, h); - enif_mutex_unlock(priv->conns_mutex); - enif_mutex_destroy(priv->conns_mutex); + memset(priv, 0, sizeof(struct wterl_priv_data)); enif_free(priv); + + priv_data = NULL; } static int on_upgrade(ErlNifEnv *env, void **priv_data, void **old_priv_data, ERL_NIF_TERM load_info) { - __UNUSED(priv_data); - __UNUSED(old_priv_data); - __UNUSED(load_info); + UNUSED(priv_data); + UNUSED(old_priv_data); + UNUSED(load_info); ASYNC_NIF_UPGRADE(wterl, env); // TODO: implement return 0; } diff --git a/rebar.config b/rebar.config index 36d44a6..46f0af2 100644 --- a/rebar.config +++ b/rebar.config @@ -7,24 +7,24 @@ {eunit_opts, [verbose, {report, {eunit_surefire, [{dir, "."}]}}]}. -{erl_opts, [%{d,'DEBUG',true}, - debug_info, - fail_on_warning, - warn_unused_vars, - warn_export_all, - warn_shadow_vars, - warn_unused_import, - warn_unused_function, +{erl_opts, [ + {parse_transform, lager_transform}, + debug_info, %{d,'DEBUG',true}, + %strict_validation, + %fail_on_warning, + %warn_missing_spec, warn_bif_clash, - warn_unused_record, warn_deprecated_function, - warn_obsolete_guard, + warn_export_all, warn_export_vars, warn_exported_vars, + warn_obsolete_guard, + warn_shadow_vars, warn_untyped_record, - {parse_transform, lager_transform} - %warn_missing_spec, - %strict_validation + warn_unused_function, + %warn_unused_import, + warn_unused_record, + warn_unused_vars ]}. {xref_checks, [undefined_function_calls, deprecated_function_calls]}. diff --git a/src/async_nif.hrl b/src/async_nif.hrl index 9d0f215..9034d8a 100644 --- a/src/async_nif.hrl +++ b/src/async_nif.hrl @@ -26,9 +26,6 @@ async_nif_enqueue(R, F, A) -> case erlang:apply(F, [R|A]) of {ok, enqueued} -> receive - {R, {error, eagain}} -> - %% Work unit was not queued, try again. - async_nif_enqueue(R, F, A); {R, {error, shutdown}=Error} -> %% Work unit was queued, but not executed. Error; @@ -38,6 +35,11 @@ async_nif_enqueue(R, F, A) -> {R, Reply} -> Reply end; + {error, eagain} -> + %% Work unit was not queued, try again. + async_nif_enqueue(R, F, A); + %{error, enomem} -> + %{error, shutdown} -> Other -> Other end. diff --git a/src/riak_kv_wterl_backend.erl b/src/riak_kv_wterl_backend.erl index 4d0448d..94cf8bb 100644 --- a/src/riak_kv_wterl_backend.erl +++ b/src/riak_kv_wterl_backend.erl @@ -52,9 +52,7 @@ -record(state, {table :: string(), type :: string(), - connection :: wterl:connection(), - is_empty_cursor :: wterl:cursor(), - status_cursor :: wterl:cursor()}). + connection :: wterl:connection()}). -type state() :: #state{}. -type config() :: [{atom(), term()}]. @@ -122,7 +120,7 @@ start(Partition, Config) -> [{internal_page_max, "128K"}, {leaf_page_max, "128K"}, {lsm_chunk_size, "100MB"}, - {lsm_merge_threads, "2"}, + {lsm_merge_threads, 2}, {prefix_compression, false}, {lsm_bloom_newest, true}, {lsm_bloom_oldest, true} , @@ -135,15 +133,8 @@ start(Partition, Config) -> end, case wterl:create(Connection, Table, TableOpts) of ok -> - case establish_utility_cursors(Connection, Table) of - {ok, IsEmptyCursor, StatusCursor} -> - {ok, #state{table=Table, type=Type, - connection=Connection, - is_empty_cursor=IsEmptyCursor, - status_cursor=StatusCursor}}; - {error, Reason2} -> - {error, Reason2} - end; + {ok, #state{table=Table, type=Type, + connection=Connection}}; {error, Reason3} -> {error, Reason3} end @@ -329,25 +320,42 @@ drop(#state{connection=Connection, table=Table}=State) -> %% @doc Returns true if this wterl backend contains any %% non-tombstone values; otherwise returns false. -spec is_empty(state()) -> boolean(). -is_empty(#state{is_empty_cursor=Cursor}) -> - wterl:cursor_reset(Cursor), - case wterl:cursor_next(Cursor) of - not_found -> true; - {error, {eperm, _}} -> false; % TODO: review/fix this logic - _ -> false +is_empty(#state{connection=Connection, table=Table}) -> + case wterl:cursor_open(Connection, Table) of + {ok, Cursor} -> + IsEmpty = + case wterl:cursor_next(Cursor) of + not_found -> + true; + {error, {eperm, _}} -> + false; % TODO: review/fix this logic + _ -> + false + end, + wterl:cursor_close(Cursor), + IsEmpty; + {error, Reason2} -> + {error, Reason2} end. %% @doc Get the status information for this wterl backend -spec status(state()) -> [{atom(), term()}]. -status(#state{status_cursor=Cursor}) -> - wterl:cursor_reset(Cursor), - case fetch_status(Cursor) of - {ok, Stats} -> - Stats; - {error, {eperm, _}} -> % TODO: review/fix this logic - {ok, []}; - _ -> - {ok, []} +status(#state{connection=Connection, table=Table}) -> + case wterl:cursor_open(Connection, Table) of + {ok, Cursor} -> + TheStats = + case fetch_status(Cursor) of + {ok, Stats} -> + Stats; + {error, {eperm, _}} -> % TODO: review/fix this logic + {ok, []}; + _ -> + {ok, []} + end, + wterl:cursor_close(Cursor), + TheStats; + {error, Reason2} -> + {error, Reason2} end. %% @doc Register an asynchronous callback @@ -373,20 +381,6 @@ max_sessions(Config) -> false -> Est end. -%% @private -establish_utility_cursors(Connection, Table) -> - case wterl:cursor_open(Connection, Table) of - {ok, IsEmptyCursor} -> - case wterl:cursor_open(Connection, "statistics:" ++ Table, [{statistics_fast, true}]) of - {ok, StatusCursor} -> - {ok, IsEmptyCursor, StatusCursor}; - {error, Reason1} -> - {error, Reason1} - end; - {error, Reason2} -> - {error, Reason2} - end. - %% @private establish_connection(Config, Type) -> %% Get the data root directory diff --git a/src/wterl.erl b/src/wterl.erl index 45afae5..9045be2 100644 --- a/src/wterl.erl +++ b/src/wterl.erl @@ -96,7 +96,8 @@ nif_stub_error(Line) -> -spec init() -> ok | {error, any()}. init() -> erlang:load_nif(filename:join([priv_dir(), atom_to_list(?MODULE)]), - [{wterl_vsn, "a1459ce"}, {wiredtiger_vsn, "1.5.2-2-g8f2685b"}]). + [{wterl_vsn, "53307e8"}, + {wiredtiger_vsn, "1.6.2-0-g07cb0a5"}]). -spec connection_open(string(), config_list()) -> {ok, connection()} | {error, term()}. -spec connection_open(string(), config_list(), config_list()) -> {ok, connection()} | {error, term()}. @@ -454,6 +455,7 @@ config_to_bin([{Key, Value} | Rest], Acc) -> [{block_compressor, {string, quoted}}, {cache_size, string}, {checkpoint, config}, + {checksum, string}, {create, bool}, {direct_io, list}, {drop, list}, @@ -522,7 +524,7 @@ set_event_handler_pid(Pid) -define(TEST_DATA_DIR, "test/wterl.basic"). open_test_conn(DataDir) -> - open_test_conn(DataDir, [{create,true},{cache_size,"100MB"},{session_max, 8192}]). + open_test_conn(DataDir, [{create,true},{cache_size,"1GB"},{session_max, 8192}]). open_test_conn(DataDir, OpenConfig) -> {ok, CWD} = file:get_cwd(), rmdir:path(filename:join([CWD, DataDir])), %?cmd("rm -rf " ++ filename:join([CWD, DataDir])), @@ -584,6 +586,68 @@ insert_delete_test() -> ?assertMatch(not_found, get(ConnRef, "table:test", <<"a">>)), ok = connection_close(ConnRef). +%% cursor_fold_keys_test() -> +%% ConnRef = open_test_conn(?TEST_DATA_DIR), +%% ConnRef = open_test_table(ConnRef), +%% [wterl:put(ConnRef, "table:test-fold", crypto:sha(<>), +%% crypto:rand_bytes(crypto:rand_uniform(128, 4096))) +%% || X <- lists:seq(1, 2000)], +%% Cursor = wterl:cursor_open(ConnRef, "table:test-fold"), +%% try +%% {Result, _} = wterl:fold_keys(Cursor, fun(Key, Acc) -> [Key | Acc] end, []) +%% catch +%% _:_ -> wterl:cursor_close(Cursor) +%% after +%% ok = connection_close(ConnRef) +%% end. +%% ?assertMatch(lists:sort(Result), +%% lists:sort([crypto:sha(<>) || X <- lists:seq(1, 2000)])). + +many_open_tables_test_() -> + {timeout, 120, + fun() -> + ConnOpts = [{create,true},{cache_size,"1GB"},{session_max, 8192}], + DataDir = ?TEST_DATA_DIR, + KeyGen = + fun(X) -> + crypto:sha(<>) + end, + ValGen = + fun() -> + crypto:rand_bytes(crypto:rand_uniform(128, 4096)) + end, + TableNameGen = + fun(X) -> + "lsm:" ++ integer_to_list(X) + end, + NumTables = 16, N = 100, + ConnRef = open_test_conn(DataDir, ConnOpts), + Parent = self(), + [ok = wterl:create(ConnRef, TableNameGen(X), [{checksum, "uncompressed"}]) || X <- lists:seq(0, NumTables)], + [spawn(fun() -> + TableName = TableNameGen(X), + [case wterl:put(ConnRef, TableName, KeyGen(P), ValGen()) of + ok -> ok; + {error, {enoent, _}} -> io:format("put failed, table missing ~p~n", [TableName]) + end || P <- lists:seq(1, N)], + [case wterl:get(ConnRef, TableName, KeyGen(P)) of + {ok, _} -> ok; + {error, {enoent, _}} -> io:format("get failed, table missing ~p~n", [TableName]) + end || P <- lists:seq(1, N)], + [case wterl:delete(ConnRef, TableName, KeyGen(P)) of + ok -> ok; + {error, {enoent, _}} -> io:format("delete failed, table missing ~p~n", [TableName]) + end || P <- lists:seq(1, N)], + Parent ! done + end) || X <- lists:seq(0, NumTables)], + [receive done -> ok end || _ <- lists:seq(0, NumTables)], + [case wterl:drop(ConnRef, TableNameGen(X)) of + ok -> ok; + {error, {enoent, _}} -> io:format("drop failed, table missing ~p~n", [TableNameGen(X)]) + end || X <- lists:seq(0, NumTables)], + ok = wterl:connection_close(ConnRef) + end}. + init_test_table() -> ConnRef = open_test_conn(?TEST_DATA_DIR), ConnRef = open_test_table(ConnRef), @@ -616,7 +680,7 @@ various_online_test_() -> end}, {"truncate entire table", fun() -> - ?assertMatch(ok, truncate(ConnRef, "table:test")), + ?assertMatch(ok, truncate(ConnRef, "table:test")), ?assertMatch(not_found, get(ConnRef, "table:test", <<"a">>)) end}, %% {"truncate range [<>..last], ensure value outside range is found after", @@ -664,7 +728,7 @@ various_maintenance_test_() -> fun () -> {ok, CWD} = file:get_cwd(), ?assertMatch(ok, filelib:ensure_dir(filename:join([?TEST_DATA_DIR, "x"]))), - {ok, ConnRef} = connection_open(filename:join([CWD, ?TEST_DATA_DIR]), []), + {ok, ConnRef} = connection_open(filename:join([CWD, ?TEST_DATA_DIR]), [{create,true}]), ConnRef end, fun (ConnRef) -> @@ -861,7 +925,7 @@ prop_put_delete() -> DataDir = "test/wterl.putdelete.qc", Table = "table:eqc", {ok, CWD} = file:get_cwd(), - rmdir(filename:join([CWD, DataDir])), % ?cmd("rm -rf " ++ filename:join([CWD, DataDir])), + rmdir:path(filename:join([CWD, DataDir])), % ?cmd("rm -rf " ++ filename:join([CWD, DataDir])), ok = filelib:ensure_dir(filename:join([DataDir, "x"])), {ok, ConnRef} = wterl:connection_open(DataDir, [{create,true}]), try diff --git a/tools/basho_bench_driver_wterl.erl b/tools/basho_bench_driver_wterl.erl index bdc6eb3..ae4dd93 100644 --- a/tools/basho_bench_driver_wterl.erl +++ b/tools/basho_bench_driver_wterl.erl @@ -26,12 +26,12 @@ new(1) -> new(Id) -> setup(Id). -setup(_Id) -> +setup(Id) -> %% Get the target directory Dir = basho_bench_config:get(wterl_dir, "/tmp"), Config = basho_bench_config:get(wterl, []), Uri = config_value(table_uri, Config, "lsm:test"), - ConnectionOpts = config_value(connection, Config, [{create, true}]), + ConnectionOpts = config_value(connection, Config, [{create,true},{session_max, 8192}]), SessionOpts = config_value(session, Config, []), TableOpts = config_value(table, Config, []), @@ -43,7 +43,7 @@ setup(_Id) -> {ok, Conn} -> Conn; {error, Reason0} -> - ?FAIL_MSG("Failed to establish a WiredTiger connection, wterl backend unable to start: ~p\n", [Reason0]) + ?FAIL_MSG("Failed to establish a WiredTiger connection for ~p, wterl backend unable to start: ~p\n", [Id, Reason0]) end; true -> {ok, Conn} = wterl_conn:get(), diff --git a/update-version.sh b/update-version.sh index 459fc0c..f457d06 100755 --- a/update-version.sh +++ b/update-version.sh @@ -2,10 +2,9 @@ # Note: also, remember to update version numbers in rpath specs so that shared libs can be found at runtime!!! -wterl=`git log -n 1 --pretty=format:"%H"` -wiredtiger0=`(cd c_src/wiredtiger && git log -n 1 --pretty=format:"%H")` +wterl=`git describe --always --long --tags` +wiredtiger0=`(cd c_src/wiredtiger-[0-9.]* && git describe --always --long --tags)` wiredtiger=`echo $wiredtiger0 | awk '{print $2}'` echo $wterl echo $wiredtiger -