diff --git a/c_src/async_nif.h b/c_src/async_nif.h index e7a9670..724b8d5 100644 --- a/c_src/async_nif.h +++ b/c_src/async_nif.h @@ -30,11 +30,11 @@ extern "C" { #include "fifo_q.h" #include "stats.h" -#ifndef __UNUSED -#define __UNUSED(v) ((void)(v)) +#ifndef UNUSED +#define UNUSED(v) ((void)(v)) #endif -#define ASYNC_NIF_MAX_WORKERS 128 +#define ASYNC_NIF_MAX_WORKERS 1024 #define ASYNC_NIF_WORKER_QUEUE_SIZE 500 #define ASYNC_NIF_MAX_QUEUED_REQS 1000 * ASYNC_NIF_MAX_WORKERS @@ -80,11 +80,11 @@ struct async_nif_state { #define ASYNC_NIF_DECL(decl, frame, pre_block, work_block, post_block) \ struct decl ## _args frame; \ static void fn_work_ ## decl (ErlNifEnv *env, ERL_NIF_TERM ref, ErlNifPid *pid, unsigned int worker_id, struct decl ## _args *args) { \ - __UNUSED(worker_id); \ + UNUSED(worker_id); \ do work_block while(0); \ } \ static void fn_post_ ## decl (struct decl ## _args *args) { \ - __UNUSED(args); \ + UNUSED(args); \ do post_block while(0); \ } \ static ERL_NIF_TERM decl(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv_in[]) { \ @@ -92,7 +92,7 @@ struct async_nif_state { struct decl ## _args *args = &on_stack_args; \ struct decl ## _args *copy_of_args; \ struct async_nif_req_entry *req = NULL; \ - const char *affinity = NULL; \ + const unsigned int affinity = 0; \ ErlNifEnv *new_env = NULL; \ /* argv[0] is a ref used for selective recv */ \ const ERL_NIF_TERM *argv = argv_in + 1; \ @@ -122,7 +122,7 @@ struct async_nif_state { req->fn_post = (void (*)(void *))fn_post_ ## decl; \ int h = -1; \ if (affinity) \ - h = async_nif_str_hash_func(affinity) % async_nif->num_queues; \ + h = affinity % async_nif->num_queues; \ ERL_NIF_TERM reply = async_nif_enqueue_req(async_nif, req, h); \ if (!reply) { \ fn_post_ ## decl (args); \ @@ -218,23 +218,6 @@ async_nif_recycle_req(struct async_nif_req_entry *req, struct async_nif_state *a enif_mutex_unlock(async_nif->recycled_req_mutex); } -/** - * A string hash function. - * - * A basic hash function for strings of characters used during the - * affinity association. - * - * s a NULL terminated set of bytes to be hashed - * -> an integer hash encoding of the bytes - */ -static inline unsigned int -async_nif_str_hash_func(const char *s) -{ - unsigned int h = (unsigned int)*s; - if (h) for (++s ; *s; ++s) h = (h << 5) - h + (unsigned int)*s; - return h; -} - /** * Enqueue a request for processing by a worker thread. * @@ -366,7 +349,7 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif) unsigned int num_queues = async_nif->num_queues; struct async_nif_work_queue *q = NULL; struct async_nif_req_entry *req = NULL; - __UNUSED(env); + UNUSED(env); STAT_PRINT(async_nif, qwait, "wterl"); @@ -521,7 +504,7 @@ async_nif_load() static void async_nif_upgrade(ErlNifEnv *env) { - __UNUSED(env); + UNUSED(env); // TODO: } diff --git a/c_src/build_deps.sh b/c_src/build_deps.sh index 789bcea..1a64c10 100755 --- a/c_src/build_deps.sh +++ b/c_src/build_deps.sh @@ -39,8 +39,8 @@ get_wt () git clone ${WT_REPO} && \ (cd $BASEDIR/wiredtiger && git checkout $WT_VSN || exit 1) else - git clone -b ${WT_BRANCH} ${WT_REPO} && \ - (cd $BASEDIR/wiredtiger && git checkout $WT_BRANCH origin/$WT_BRANCH || exit 1) + git clone ${WT_REPO} && \ + (cd $BASEDIR/wiredtiger && git checkout -b $WT_BRANCH origin/$WT_BRANCH || exit 1) fi mv wiredtiger $WT_DIR || exit 1 fi @@ -49,8 +49,8 @@ get_wt () [ -e $BASEDIR/wiredtiger-build.patch ] && \ (patch -p1 --forward < $BASEDIR/wiredtiger-build.patch || exit 1 ) ./autogen.sh || exit 1 - cd ./build_posix || exit 1 - [ -e Makefile ] && $MAKE distclean + [ -e $BASEDIR/$WT_DIR/build_posix/Makefile ] && \ + (cd $BASEDIR/$WT_DIR/build_posix && $MAKE distclean) wt_configure; ) } @@ -109,7 +109,8 @@ build_snappy () case "$1" in clean) - [ -d $WT_DIR/build_posix ] && (cd $WT_DIR/build_posix; make distclean) + [ -e $BASEDIR/$WT_DIR/build_posix/Makefile ] && \ + (cd $BASEDIR/$WT_DIR/build_posix && $MAKE distclean) rm -rf system $SNAPPY_DIR rm -f ${BASEDIR}/../priv/wt rm -f ${BASEDIR}/../priv/libwiredtiger-*.so diff --git a/c_src/duration.h b/c_src/duration.h index 635d0fd..fc31101 100644 --- a/c_src/duration.h +++ b/c_src/duration.h @@ -33,8 +33,7 @@ static uint64_t ts(time_scale unit) ((uint64_t)ts.tv_nsec / scale[unit].div)); } -#if 0 -//if defined(__i386__) || defined(__x86_64__) +if defined(__i386__) || defined(__x86_64__) /** * cpu_clock_ticks() @@ -55,6 +54,10 @@ static inline uint64_t cpu_clock_ticks() return (uint64_t)hi << 32 | lo; } +#endif + +#if 0 + /** * cpu_clock_ticks() * diff --git a/c_src/wterl.c b/c_src/wterl.c index f68723e..5f93268 100644 --- a/c_src/wterl.c +++ b/c_src/wterl.c @@ -35,8 +35,8 @@ # define dprint(s, ...) {} #endif -#ifndef __UNUSED -#define __UNUSED(v) ((void)(v)) +#ifndef UNUSED +#define UNUSED(v) ((void)(v)) #endif #include "wiredtiger.h" @@ -47,36 +47,37 @@ #include "stats.h" #endif +#if (ASYNC_NIF_MAX_WORKERS > 32768) +#error "WterlCtx cache won't work properly with > 32,768 workers." +#endif + static ErlNifResourceType *wterl_conn_RESOURCE; static ErlNifResourceType *wterl_cursor_RESOURCE; -/* Generators for 'cursors' a named, type-specific hash table functions. */ -KHASH_MAP_INIT_STR(cursors, WT_CURSOR*); +/* Generators for named, type-specific hash table functions. */ +KHASH_MAP_INIT_STR(uri, unsigned int); // URI -> number of cursors(URI) + + union { + unsigned int hash[]; + struct { + unsigned int:02 nest; // cuckoo's nest choosen on hash collision + unsigned int:15 off; // bitpop((bmp & (1 << off) - 1) & bmp) + unsigned int:10 depth; + } nests; + } cuckoo; + -/** - * We will have exactly one (1) WterlCtx for each async worker thread. As - * requests arrive we will reuse the same WterlConnHandle->contexts[worker_id] - * WterlCtx in the work block ensuring that each async worker thread a) has - * a separate WT_SESSION (because they are not thread safe) and b) when - * possible we avoid opening new cursors by first looking for one in the - * cursors hash table. In practice this means we could have (num_workers - * * num_tables) of cursors open which we need to account for when setting - * session_max in the configuration of WiredTiger so that it creates enough - * hazard pointers for this extreme case. - * - * Note: We don't protect access to this struct with a mutex because it will - * only be accessed by the same worker thread. - */ typedef struct { WT_SESSION *session; - khash_t(cursors) *cursors; + WT_CURSOR *cursor; } WterlCtx; typedef struct { WT_CONNECTION *conn; const char *session_config; ErlNifMutex *contexts_mutex; - WterlCtx contexts[ASYNC_NIF_MAX_WORKERS]; + unsigned int num_contexts; + WterlCtx **contexts; // TODO: free this } WterlConnHandle; typedef struct { @@ -268,24 +269,6 @@ __init_session_and_cursor_cache(WterlConnHandle *conn_handle, WterlCtx *ctx) return 0; } -/** - * Get the per-worker reusable WT_SESSION for a worker_id. - */ -static int -__session_for(WterlConnHandle *conn_handle, unsigned int worker_id, WT_SESSION **session) -{ - WterlCtx *ctx = &conn_handle->contexts[worker_id]; - int rc = 0; - - if (ctx->session == NULL) { - enif_mutex_lock(conn_handle->contexts_mutex); - rc = __init_session_and_cursor_cache(conn_handle, ctx); - enif_mutex_unlock(conn_handle->contexts_mutex); - } - *session = ctx->session; - return rc; -} - /** * Close all sessions and all cursors open on any objects. * @@ -346,18 +329,36 @@ __close_cursors_on(WterlConnHandle *conn_handle, const char *uri) } } +/** + * A string hash function. + * + * A basic hash function for strings of characters used during the + * affinity association. + * + * s a NULL terminated set of bytes to be hashed + * -> an integer hash encoding of the bytes + */ +static inline unsigned int +__str_hash_func(const char *s) +{ + unsigned int h = (unsigned int)*s; + if (h) for (++s ; *s; ++s) h = (h << 5) - h + (unsigned int)*s; + return h; +} + /** * Get a reusable cursor that was opened for a particular worker within its * session. */ static int -__retain_cursor(WterlConnHandle *conn_handle, unsigned int worker_id, const char *uri, WT_CURSOR **cursor) +__retain_ctx(WterlConnHandle *conn_handle, const char *uri, WterlCtx **ctx) { /* Check to see if we have a cursor open for this uri and if so reuse it. */ WterlCtx *ctx = &conn_handle->contexts[worker_id]; khash_t(cursors) *h = NULL; khiter_t itr; int rc; + unsigned int h = __str_hash_func(uri); // TODO: add config at some point if (ctx->session == NULL) { enif_mutex_lock(conn_handle->contexts_mutex); @@ -398,11 +399,11 @@ __retain_cursor(WterlConnHandle *conn_handle, unsigned int worker_id, const char } static void -__release_cursor(WterlConnHandle *conn_handle, unsigned int worker_id, const char *uri, WT_CURSOR *cursor) +__release_ctx(WterlConnHandle *conn_handle, const char *uri, WterlCtx *ctx) { - __UNUSED(conn_handle); - __UNUSED(worker_id); - __UNUSED(uri); + UNUSED(conn_handle); + UNUSED(worker_id); + UNUSED(uri); cursor->reset(cursor); } @@ -843,8 +844,7 @@ ASYNC_NIF_DECL( * of objects specified. * * argv[0] WterlConnHandle resource - * argv[1] object name URI string - * argv[2] config string as an Erlang binary + * argv[1] config string as an Erlang binary */ ASYNC_NIF_DECL( wterl_checkpoint, @@ -870,13 +870,15 @@ ASYNC_NIF_DECL( ASYNC_NIF_REPLY(enif_make_badarg(env)); return; } + WT_CONNECTION *conn = args->conn_handle->conn; WT_SESSION *session = NULL; - int rc = __session_for(args->conn_handle, worker_id, &session); + int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); if (rc != 0) { ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } rc = session->checkpoint(session, (const char*)config.data); + (void)session->close(session, NULL); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); }, { // post @@ -1208,12 +1210,14 @@ ASYNC_NIF_DECL( return; } + WterlCtx *ctx = NULL; WT_CURSOR *cursor = NULL; - int rc = __retain_cursor(args->conn_handle, worker_id, args->uri, &cursor); + int rc = __retain_ctx(args->conn_handle, args->uri, &ctx); if (rc != 0) { ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } + cursor = ctx->cursor; WT_ITEM item_key; item_key.data = key.data; @@ -1221,7 +1225,7 @@ ASYNC_NIF_DECL( cursor->set_key(cursor, &item_key); rc = cursor->remove(cursor); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); - __release_cursor(args->conn_handle, worker_id, args->uri, cursor); + __release_ctx(args->conn_handle, args->uri, cursor); }, { // post @@ -1263,12 +1267,14 @@ ASYNC_NIF_DECL( return; } + WterlCtx *ctx = NULL WT_CURSOR *cursor = NULL; - int rc = __retain_cursor(args->conn_handle, worker_id, args->uri, &cursor); + int rc = __retain_ctx(args->conn_handle, args->uri, &ctx); if (rc != 0) { ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } + cursor = ctx->cursor; WT_ITEM item_key; WT_ITEM item_value; @@ -1290,7 +1296,7 @@ ASYNC_NIF_DECL( unsigned char *bin = enif_make_new_binary(env, item_value.size, &value); memcpy(bin, item_value.data, item_value.size); ASYNC_NIF_REPLY(enif_make_tuple2(env, ATOM_OK, value)); - __release_cursor(args->conn_handle, worker_id, args->uri, cursor); + __release_ctx(args->conn_handle, args->uri, ctx); }, { // post @@ -1341,12 +1347,14 @@ ASYNC_NIF_DECL( return; } + WterlCtx *ctx = NULL; WT_CURSOR *cursor = NULL; - int rc = __retain_cursor(args->conn_handle, worker_id, args->uri, &cursor); + int rc = __retain_ctx(args->conn_handle, args->uri, &ctx); if (rc != 0) { ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } + cursor = ctx->cursors; WT_ITEM item_key; WT_ITEM item_value; @@ -1357,7 +1365,7 @@ ASYNC_NIF_DECL( item_value.size = value.size; cursor->set_value(cursor, &item_value); rc = cursor->insert(cursor); - __release_cursor(args->conn_handle, worker_id, args->uri, cursor); + __release_ctx(args->conn_handle, args->uri, ctx); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); }, { // post @@ -2142,9 +2150,9 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info) static int on_reload(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info) { - __UNUSED(env); - __UNUSED(priv_data); - __UNUSED(load_info); + UNUSED(env); + UNUSED(priv_data); + UNUSED(load_info); return 0; // TODO: implement } @@ -2231,9 +2239,9 @@ on_unload(ErlNifEnv *env, void *priv_data) static int on_upgrade(ErlNifEnv *env, void **priv_data, void **old_priv_data, ERL_NIF_TERM load_info) { - __UNUSED(priv_data); - __UNUSED(old_priv_data); - __UNUSED(load_info); + UNUSED(priv_data); + UNUSED(old_priv_data); + UNUSED(load_info); ASYNC_NIF_UPGRADE(wterl, env); // TODO: implement return 0; } diff --git a/src/riak_kv_wterl_backend.erl b/src/riak_kv_wterl_backend.erl index 4d0448d..313da29 100644 --- a/src/riak_kv_wterl_backend.erl +++ b/src/riak_kv_wterl_backend.erl @@ -122,7 +122,7 @@ start(Partition, Config) -> [{internal_page_max, "128K"}, {leaf_page_max, "128K"}, {lsm_chunk_size, "100MB"}, - {lsm_merge_threads, "2"}, + {lsm_merge_threads, 2}, {prefix_compression, false}, {lsm_bloom_newest, true}, {lsm_bloom_oldest, true} ,