WIP: devising a better way to cache/reuse session/cursor pairs.

This commit is contained in:
Gregory Burd 2013-05-28 16:14:19 -04:00
parent 013251e6d9
commit a2cd1d562c
5 changed files with 87 additions and 92 deletions

View file

@ -30,11 +30,11 @@ extern "C" {
#include "fifo_q.h"
#include "stats.h"
#ifndef __UNUSED
#define __UNUSED(v) ((void)(v))
#ifndef UNUSED
#define UNUSED(v) ((void)(v))
#endif
#define ASYNC_NIF_MAX_WORKERS 128
#define ASYNC_NIF_MAX_WORKERS 1024
#define ASYNC_NIF_WORKER_QUEUE_SIZE 500
#define ASYNC_NIF_MAX_QUEUED_REQS 1000 * ASYNC_NIF_MAX_WORKERS
@ -80,11 +80,11 @@ struct async_nif_state {
#define ASYNC_NIF_DECL(decl, frame, pre_block, work_block, post_block) \
struct decl ## _args frame; \
static void fn_work_ ## decl (ErlNifEnv *env, ERL_NIF_TERM ref, ErlNifPid *pid, unsigned int worker_id, struct decl ## _args *args) { \
__UNUSED(worker_id); \
UNUSED(worker_id); \
do work_block while(0); \
} \
static void fn_post_ ## decl (struct decl ## _args *args) { \
__UNUSED(args); \
UNUSED(args); \
do post_block while(0); \
} \
static ERL_NIF_TERM decl(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv_in[]) { \
@ -92,7 +92,7 @@ struct async_nif_state {
struct decl ## _args *args = &on_stack_args; \
struct decl ## _args *copy_of_args; \
struct async_nif_req_entry *req = NULL; \
const char *affinity = NULL; \
const unsigned int affinity = 0; \
ErlNifEnv *new_env = NULL; \
/* argv[0] is a ref used for selective recv */ \
const ERL_NIF_TERM *argv = argv_in + 1; \
@ -122,7 +122,7 @@ struct async_nif_state {
req->fn_post = (void (*)(void *))fn_post_ ## decl; \
int h = -1; \
if (affinity) \
h = async_nif_str_hash_func(affinity) % async_nif->num_queues; \
h = affinity % async_nif->num_queues; \
ERL_NIF_TERM reply = async_nif_enqueue_req(async_nif, req, h); \
if (!reply) { \
fn_post_ ## decl (args); \
@ -218,23 +218,6 @@ async_nif_recycle_req(struct async_nif_req_entry *req, struct async_nif_state *a
enif_mutex_unlock(async_nif->recycled_req_mutex);
}
/**
* A string hash function.
*
* A basic hash function for strings of characters used during the
* affinity association.
*
* s a NULL terminated set of bytes to be hashed
* -> an integer hash encoding of the bytes
*/
static inline unsigned int
async_nif_str_hash_func(const char *s)
{
unsigned int h = (unsigned int)*s;
if (h) for (++s ; *s; ++s) h = (h << 5) - h + (unsigned int)*s;
return h;
}
/**
* Enqueue a request for processing by a worker thread.
*
@ -366,7 +349,7 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
unsigned int num_queues = async_nif->num_queues;
struct async_nif_work_queue *q = NULL;
struct async_nif_req_entry *req = NULL;
__UNUSED(env);
UNUSED(env);
STAT_PRINT(async_nif, qwait, "wterl");
@ -521,7 +504,7 @@ async_nif_load()
static void
async_nif_upgrade(ErlNifEnv *env)
{
__UNUSED(env);
UNUSED(env);
// TODO:
}

View file

@ -39,8 +39,8 @@ get_wt ()
git clone ${WT_REPO} && \
(cd $BASEDIR/wiredtiger && git checkout $WT_VSN || exit 1)
else
git clone -b ${WT_BRANCH} ${WT_REPO} && \
(cd $BASEDIR/wiredtiger && git checkout $WT_BRANCH origin/$WT_BRANCH || exit 1)
git clone ${WT_REPO} && \
(cd $BASEDIR/wiredtiger && git checkout -b $WT_BRANCH origin/$WT_BRANCH || exit 1)
fi
mv wiredtiger $WT_DIR || exit 1
fi
@ -49,8 +49,8 @@ get_wt ()
[ -e $BASEDIR/wiredtiger-build.patch ] && \
(patch -p1 --forward < $BASEDIR/wiredtiger-build.patch || exit 1 )
./autogen.sh || exit 1
cd ./build_posix || exit 1
[ -e Makefile ] && $MAKE distclean
[ -e $BASEDIR/$WT_DIR/build_posix/Makefile ] && \
(cd $BASEDIR/$WT_DIR/build_posix && $MAKE distclean)
wt_configure;
)
}
@ -109,7 +109,8 @@ build_snappy ()
case "$1" in
clean)
[ -d $WT_DIR/build_posix ] && (cd $WT_DIR/build_posix; make distclean)
[ -e $BASEDIR/$WT_DIR/build_posix/Makefile ] && \
(cd $BASEDIR/$WT_DIR/build_posix && $MAKE distclean)
rm -rf system $SNAPPY_DIR
rm -f ${BASEDIR}/../priv/wt
rm -f ${BASEDIR}/../priv/libwiredtiger-*.so

View file

@ -33,8 +33,7 @@ static uint64_t ts(time_scale unit)
((uint64_t)ts.tv_nsec / scale[unit].div));
}
#if 0
//if defined(__i386__) || defined(__x86_64__)
if defined(__i386__) || defined(__x86_64__)
/**
* cpu_clock_ticks()
@ -55,6 +54,10 @@ static inline uint64_t cpu_clock_ticks()
return (uint64_t)hi << 32 | lo;
}
#endif
#if 0
/**
* cpu_clock_ticks()
*

View file

@ -35,8 +35,8 @@
# define dprint(s, ...) {}
#endif
#ifndef __UNUSED
#define __UNUSED(v) ((void)(v))
#ifndef UNUSED
#define UNUSED(v) ((void)(v))
#endif
#include "wiredtiger.h"
@ -47,36 +47,37 @@
#include "stats.h"
#endif
#if (ASYNC_NIF_MAX_WORKERS > 32768)
#error "WterlCtx cache won't work properly with > 32,768 workers."
#endif
static ErlNifResourceType *wterl_conn_RESOURCE;
static ErlNifResourceType *wterl_cursor_RESOURCE;
/* Generators for 'cursors' a named, type-specific hash table functions. */
KHASH_MAP_INIT_STR(cursors, WT_CURSOR*);
/* Generators for named, type-specific hash table functions. */
KHASH_MAP_INIT_STR(uri, unsigned int); // URI -> number of cursors(URI)
union {
unsigned int hash[];
struct {
unsigned int:02 nest; // cuckoo's nest choosen on hash collision
unsigned int:15 off; // bitpop((bmp & (1 << off) - 1) & bmp)
unsigned int:10 depth;
} nests;
} cuckoo;
/**
* We will have exactly one (1) WterlCtx for each async worker thread. As
* requests arrive we will reuse the same WterlConnHandle->contexts[worker_id]
* WterlCtx in the work block ensuring that each async worker thread a) has
* a separate WT_SESSION (because they are not thread safe) and b) when
* possible we avoid opening new cursors by first looking for one in the
* cursors hash table. In practice this means we could have (num_workers
* * num_tables) of cursors open which we need to account for when setting
* session_max in the configuration of WiredTiger so that it creates enough
* hazard pointers for this extreme case.
*
* Note: We don't protect access to this struct with a mutex because it will
* only be accessed by the same worker thread.
*/
typedef struct {
WT_SESSION *session;
khash_t(cursors) *cursors;
WT_CURSOR *cursor;
} WterlCtx;
typedef struct {
WT_CONNECTION *conn;
const char *session_config;
ErlNifMutex *contexts_mutex;
WterlCtx contexts[ASYNC_NIF_MAX_WORKERS];
unsigned int num_contexts;
WterlCtx **contexts; // TODO: free this
} WterlConnHandle;
typedef struct {
@ -268,24 +269,6 @@ __init_session_and_cursor_cache(WterlConnHandle *conn_handle, WterlCtx *ctx)
return 0;
}
/**
* Get the per-worker reusable WT_SESSION for a worker_id.
*/
static int
__session_for(WterlConnHandle *conn_handle, unsigned int worker_id, WT_SESSION **session)
{
WterlCtx *ctx = &conn_handle->contexts[worker_id];
int rc = 0;
if (ctx->session == NULL) {
enif_mutex_lock(conn_handle->contexts_mutex);
rc = __init_session_and_cursor_cache(conn_handle, ctx);
enif_mutex_unlock(conn_handle->contexts_mutex);
}
*session = ctx->session;
return rc;
}
/**
* Close all sessions and all cursors open on any objects.
*
@ -346,18 +329,36 @@ __close_cursors_on(WterlConnHandle *conn_handle, const char *uri)
}
}
/**
* A string hash function.
*
* A basic hash function for strings of characters used during the
* affinity association.
*
* s a NULL terminated set of bytes to be hashed
* -> an integer hash encoding of the bytes
*/
static inline unsigned int
__str_hash_func(const char *s)
{
unsigned int h = (unsigned int)*s;
if (h) for (++s ; *s; ++s) h = (h << 5) - h + (unsigned int)*s;
return h;
}
/**
* Get a reusable cursor that was opened for a particular worker within its
* session.
*/
static int
__retain_cursor(WterlConnHandle *conn_handle, unsigned int worker_id, const char *uri, WT_CURSOR **cursor)
__retain_ctx(WterlConnHandle *conn_handle, const char *uri, WterlCtx **ctx)
{
/* Check to see if we have a cursor open for this uri and if so reuse it. */
WterlCtx *ctx = &conn_handle->contexts[worker_id];
khash_t(cursors) *h = NULL;
khiter_t itr;
int rc;
unsigned int h = __str_hash_func(uri); // TODO: add config at some point
if (ctx->session == NULL) {
enif_mutex_lock(conn_handle->contexts_mutex);
@ -398,11 +399,11 @@ __retain_cursor(WterlConnHandle *conn_handle, unsigned int worker_id, const char
}
static void
__release_cursor(WterlConnHandle *conn_handle, unsigned int worker_id, const char *uri, WT_CURSOR *cursor)
__release_ctx(WterlConnHandle *conn_handle, const char *uri, WterlCtx *ctx)
{
__UNUSED(conn_handle);
__UNUSED(worker_id);
__UNUSED(uri);
UNUSED(conn_handle);
UNUSED(worker_id);
UNUSED(uri);
cursor->reset(cursor);
}
@ -843,8 +844,7 @@ ASYNC_NIF_DECL(
* of objects specified.
*
* argv[0] WterlConnHandle resource
* argv[1] object name URI string
* argv[2] config string as an Erlang binary
* argv[1] config string as an Erlang binary
*/
ASYNC_NIF_DECL(
wterl_checkpoint,
@ -870,13 +870,15 @@ ASYNC_NIF_DECL(
ASYNC_NIF_REPLY(enif_make_badarg(env));
return;
}
WT_CONNECTION *conn = args->conn_handle->conn;
WT_SESSION *session = NULL;
int rc = __session_for(args->conn_handle, worker_id, &session);
int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session);
if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc));
return;
}
rc = session->checkpoint(session, (const char*)config.data);
(void)session->close(session, NULL);
ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc));
},
{ // post
@ -1208,12 +1210,14 @@ ASYNC_NIF_DECL(
return;
}
WterlCtx *ctx = NULL;
WT_CURSOR *cursor = NULL;
int rc = __retain_cursor(args->conn_handle, worker_id, args->uri, &cursor);
int rc = __retain_ctx(args->conn_handle, args->uri, &ctx);
if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc));
return;
}
cursor = ctx->cursor;
WT_ITEM item_key;
item_key.data = key.data;
@ -1221,7 +1225,7 @@ ASYNC_NIF_DECL(
cursor->set_key(cursor, &item_key);
rc = cursor->remove(cursor);
ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc));
__release_cursor(args->conn_handle, worker_id, args->uri, cursor);
__release_ctx(args->conn_handle, args->uri, cursor);
},
{ // post
@ -1263,12 +1267,14 @@ ASYNC_NIF_DECL(
return;
}
WterlCtx *ctx = NULL
WT_CURSOR *cursor = NULL;
int rc = __retain_cursor(args->conn_handle, worker_id, args->uri, &cursor);
int rc = __retain_ctx(args->conn_handle, args->uri, &ctx);
if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc));
return;
}
cursor = ctx->cursor;
WT_ITEM item_key;
WT_ITEM item_value;
@ -1290,7 +1296,7 @@ ASYNC_NIF_DECL(
unsigned char *bin = enif_make_new_binary(env, item_value.size, &value);
memcpy(bin, item_value.data, item_value.size);
ASYNC_NIF_REPLY(enif_make_tuple2(env, ATOM_OK, value));
__release_cursor(args->conn_handle, worker_id, args->uri, cursor);
__release_ctx(args->conn_handle, args->uri, ctx);
},
{ // post
@ -1341,12 +1347,14 @@ ASYNC_NIF_DECL(
return;
}
WterlCtx *ctx = NULL;
WT_CURSOR *cursor = NULL;
int rc = __retain_cursor(args->conn_handle, worker_id, args->uri, &cursor);
int rc = __retain_ctx(args->conn_handle, args->uri, &ctx);
if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc));
return;
}
cursor = ctx->cursors;
WT_ITEM item_key;
WT_ITEM item_value;
@ -1357,7 +1365,7 @@ ASYNC_NIF_DECL(
item_value.size = value.size;
cursor->set_value(cursor, &item_value);
rc = cursor->insert(cursor);
__release_cursor(args->conn_handle, worker_id, args->uri, cursor);
__release_ctx(args->conn_handle, args->uri, ctx);
ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc));
},
{ // post
@ -2142,9 +2150,9 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info)
static int
on_reload(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info)
{
__UNUSED(env);
__UNUSED(priv_data);
__UNUSED(load_info);
UNUSED(env);
UNUSED(priv_data);
UNUSED(load_info);
return 0; // TODO: implement
}
@ -2231,9 +2239,9 @@ on_unload(ErlNifEnv *env, void *priv_data)
static int
on_upgrade(ErlNifEnv *env, void **priv_data, void **old_priv_data, ERL_NIF_TERM load_info)
{
__UNUSED(priv_data);
__UNUSED(old_priv_data);
__UNUSED(load_info);
UNUSED(priv_data);
UNUSED(old_priv_data);
UNUSED(load_info);
ASYNC_NIF_UPGRADE(wterl, env); // TODO: implement
return 0;
}

View file

@ -122,7 +122,7 @@ start(Partition, Config) ->
[{internal_page_max, "128K"},
{leaf_page_max, "128K"},
{lsm_chunk_size, "100MB"},
{lsm_merge_threads, "2"},
{lsm_merge_threads, 2},
{prefix_compression, false},
{lsm_bloom_newest, true},
{lsm_bloom_oldest, true} ,