Cache session/[{cursor, config}] for reuse and spawn threads when needed. #9

Merged
gburd merged 30 commits from gsb-ctx-cache into master 2013-07-03 12:31:15 +00:00
5 changed files with 140 additions and 102 deletions
Showing only changes of commit ff7d1d6e20 - Show all commits

View file

@ -241,7 +241,6 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
/* Identify the most appropriate worker for this request. */
unsigned int qid = 0;
struct async_nif_work_queue *q = NULL;
unsigned int n = async_nif->num_queues;
/* Either we're choosing a queue based on some affinity/hinted value or we
need to select the next queue in the rotation and atomically update that
@ -254,6 +253,9 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
async_nif->next_q = qid;
}
#if 0 // stats aren't yet thread safe, so this can go haywire... TODO: fix.
unsigned int n = async_nif->num_queues;
/* Now we inspect and interate across the set of queues trying to select one
that isn't too full or too slow. */
do {
@ -281,6 +283,8 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
// TODO: at some point add in work sheading/stealing
} while(n-- > 0);
#endif
/* We hold the queue's lock, and we've seletect a reasonable queue for this
new request so add the request. */
STAT_TICK(q, qwait);
@ -297,7 +301,9 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
}
/**
* TODO:
* Worker threads execute this function. Here each worker pulls requests of
* their respective queues, executes that work and continues doing that until
* they see the shutdown flag is set at which point they exit.
*/
static void *
async_nif_worker_fn(void *arg)

View file

@ -43,7 +43,7 @@ struct scale_time {
};
static const struct scale_time scale[] = {
{ "ns", "nanosecond", 1000000000LL, 1LL, 10, 2300000000000LL },
{ "mcs", "microsecond", 1000000LL, 1000LL, 10, 2300000000LL },
{ "μs", "microsecond", 1000000LL, 1000LL, 10, 2300000000LL },
{ "ms", "millisecond", 1000LL, 1000000LL, 10, 2300000LL },
{ "sec", "second", 1LL, 1000000000LL, 10, 2300LL } };

View file

@ -586,7 +586,7 @@ static kh_inline khint_t __ac_Wang_hash(khint_t key)
@param name Name of the hash table [symbol]
@param khval_t Type of values [type]
*/
#ifdef __x86_64__
#ifdef __x86_64__
#define KHASH_MAP_INIT_PTR(name, khval_t) \
KHASH_INIT(name, void*, khval_t, 1, kh_ptr64_hash_func, kh_ptr64_hash_equal)
#else

View file

@ -184,27 +184,27 @@ static unsigned int __log2_64(uint64_t x) {
#define STAT_INIT(var, name) \
var->name ## _stat.min = ~0; \
var->name ## _stat.max = 0; \
var->name ## _stat.mean = 0.0; \
var->name ## _stat.h = 0; \
var->name ## _stat.d.then = 0; \
var->name ## _stat.d.unit = ns;
(var)->name ## _stat.min = ~0; \
(var)->name ## _stat.max = 0; \
(var)->name ## _stat.mean = 0.0; \
(var)->name ## _stat.h = 0; \
(var)->name ## _stat.d.then = 0; \
(var)->name ## _stat.d.unit = ns;
#define STAT_TICK(var, name) name ## _stat_tick(&var->name ## _stat)
#define STAT_TICK(var, name) name ## _stat_tick(&(var)->name ## _stat)
#define STAT_TOCK(var, name) name ## _stat_tock(&var->name ## _stat)
#define STAT_TOCK(var, name) name ## _stat_tock(&(var)->name ## _stat)
#define STAT_RESET(var, name) name ## _stat_reset(&var->name ## _stat)
#define STAT_RESET(var, name) name ## _stat_reset(&(var)->name ## _stat)
#define STAT_MEAN_LOG2_SAMPLE(var, name) \
name ## _stat_mean_lg2(&var->name ## _stat)
name ## _stat_mean_lg2(&(var)->name ## _stat)
#define STAT_MEAN_SAMPLE(var, name) \
name ## _stat_mean(&var->name ## _stat)
name ## _stat_mean(&(var)->name ## _stat)
#define STAT_PRINT(var, name, mod) \
name ## _stat_print_histogram(&var->name ## _stat, mod)
name ## _stat_print_histogram(&(var)->name ## _stat, mod)
#if defined(__cplusplus)

View file

@ -42,10 +42,16 @@ typedef char Uri[128];
struct wterl_ctx {
STAILQ_ENTRY(wterl_ctx) entries;
uint64_t sig;
uint64_t tstamp;
uint64_t sig;
size_t sig_len;
WT_SESSION *session;
WT_CURSOR *cursors[]; // Note: must be last in struct
const char *session_config;
struct cursor_info {
const char *uri;
const char *config;
WT_CURSOR *cursor;
} ci[]; // Note: must be last in struct
};
typedef struct wterl_conn {
@ -110,13 +116,35 @@ ASYNC_NIF_INIT(wterl);
* -> an integer hash encoding of the bytes
*/
static inline uint32_t
__str_hash(const char *s)
__str_hash(uint32_t in, const char *p, size_t len)
{
unsigned int h = (unsigned int)*s;
if (h) for (++s ; *s; ++s) h = (h << 5) - h + (unsigned int)*s;
uint32_t h = in;
for (++p ; len > 0; ++p, --len)
h = (h << 5) - h + (uint32_t)*p;
return h;
}
#if defined(__amd64) || defined(__x86_64)
/* Note: we'll use this to lower the chances that we'll have a hash
collision until I can finish a nice trie and use that to be a bit
more precise. When that's done we can skip hash/crc32 and just
use the binary position in the trie as our "signature". */
static inline uint32_t
__crc32(uint32_t crc, const char *bytes, size_t len)
{
const uint8_t *p;
for (p = (const uint8_t*)bytes; len > 0; ++p, --len) {
__asm__ __volatile__(
".byte 0xF2, 0x0F, 0x38, 0xF0, 0xF1"
: "=S" (crc)
: "0" (crc), "c" (*p));
}
return crc;
}
#else
#error unsupported platform
#endif
/**
* Calculate the log2 of 64bit unsigned integers.
*/
@ -189,7 +217,7 @@ __ctx_cache_evict(WterlConnHandle *conn_handle)
if (log > mean) {
STAILQ_REMOVE(&conn_handle->cache, c, wterl_ctx, entries);
c->session->close(c->session, NULL);
memset(c, 0, sizeof(struct wterl_ctx));
memset(c, 0, sizeof(*c));
enif_free(c);
num_evicted++;
}
@ -205,8 +233,8 @@ __ctx_cache_evict(WterlConnHandle *conn_handle)
* See if there exists an item in the cache with a matching signature, if
* so remove it from the cache and return it for use by the callee.
*
* sig a 64-bit signature (hash) representing the session/cursor* needed
* for the operation
* sig a 64-bit signature (hash) representing the combination of Uri and
* session+config/cursor+config pairs needed for this operation
*/
static struct wterl_ctx *
__ctx_cache_find(WterlConnHandle *conn_handle, const uint64_t sig)
@ -217,7 +245,7 @@ __ctx_cache_find(WterlConnHandle *conn_handle, const uint64_t sig)
c = STAILQ_FIRST(&conn_handle->cache);
while (c != NULL) {
n = STAILQ_NEXT(c, entries);
if (c->sig == sig) {
if (c->sig == sig) { // TODO: hash collisions *will* lead to SEGVs
// cache hit:
STAILQ_REMOVE_HEAD(&conn_handle->cache, entries);
conn_handle->histogram[__log2(cpu_clock_ticks() - c->tstamp)]++;
@ -249,60 +277,13 @@ __ctx_cache_add(WterlConnHandle *conn_handle, struct wterl_ctx *c)
enif_mutex_unlock(conn_handle->cache_mutex);
}
/**
* Produce the "Z-Index" or "Morton Number" from 2 32-bit unsigned integers.
* e.g. p = 0101 1011 0100 0011
* q = 1011 1100 0001 0011
* z = 0110 0111 1101 1010 0010 0001 0000 1111
*/
static inline uint64_t
__zi(uint32_t p, uint32_t q)
{
static const uint32_t B[] = {0x55555555, 0x33333333, 0x0F0F0F0F, 0x00FF00FF};
static const uint32_t S[] = {1, 2, 4, 8};
uint32_t x, y;
uint64_t z;
x = p & 0x0000FFFF; // Interleave lower 16 bits of p as x and q as y, so the
y = q & 0x0000FFFF; // bits of x are in the even positions and bits from y
z = 0; // in the odd; the first 32 bits of 'z' is the result.
x = (x | (x << S[3])) & B[3];
x = (x | (x << S[2])) & B[2];
x = (x | (x << S[1])) & B[1];
x = (x | (x << S[0])) & B[0];
y = (y | (y << S[3])) & B[3];
y = (y | (y << S[2])) & B[2];
y = (y | (y << S[1])) & B[1];
y = (y | (y << S[0])) & B[0];
z = x | (y << 1);
x = (p >> 16) & 0x0000FFFF; // Interleave the upper 16 bits of p as x and q as y
y = (q >> 16) & 0x0000FFFF; // just as before.
x = (x | (x << S[3])) & B[3];
x = (x | (x << S[2])) & B[2];
x = (x | (x << S[1])) & B[1];
x = (x | (x << S[0])) & B[0];
y = (y | (y << S[3])) & B[3];
y = (y | (y << S[2])) & B[2];
y = (y | (y << S[1])) & B[1];
y = (y | (y << S[0])) & B[0];
z = (z << 16) | (x | (y << 1)); // the resulting 64-bit Morton Number.
return z;
}
/**
* Create a signature for the operation we're about to perform.
*
* Create a 64bit signature for this a combination of session configuration
* some number of cursors open on tables each potentially with a different
* configuration. "session_config, [{table_name, cursor_config}, ...]"
* Create a 64-bit hash signature for this a combination of session
* configuration some number of cursors open on tables each potentially with a
* different configuration. "session_config, [{table_name, cursor_config},
* ...]"
*
* session_config the string used to configure the WT_SESSION
* ... each pair of items in the varargs array is a table name,
@ -310,23 +291,35 @@ __zi(uint32_t p, uint32_t q)
* -> number of variable arguments processed
*/
static uint64_t
__ctx_cache_sig(const char *c, va_list ap, int count)
__ctx_cache_sig(const char *c, va_list ap, int count, size_t *len)
{
int i = 0;
uint64_t h;
uint32_t hash = 0;
uint32_t crc = 0;
uint64_t sig = 0;
const char *arg;
size_t l = 0;
if (c)
h = __str_hash(c);
else
h = 0;
if (c) {
l = strlen(c);
hash = __str_hash(hash, c, l);
crc = __crc32(crc, c, l);
*len += l + 1;
}
for (i = 0; i < (2 * count); i++) {
arg = va_arg(ap, const char *);
if (arg) h = __zi((uint32_t)(h & 0xFFFFFFFF), __str_hash(arg));
else h = __zi((uint32_t)(h & 0xFFFFFFFF), 0);
if (arg) {
l = strlen(c);
hash = __str_hash(hash, arg, l);
crc = __crc32(crc, arg, strlen(arg));
*len += l + 1;
}
}
return h;
sig = crc;
sig = sig << 32;
sig &= hash;
return sig;
}
/**
@ -339,6 +332,7 @@ __retain_ctx(WterlConnHandle *conn_handle, uint32_t worker_id,
int count, const char *session_config, ...)
{
int i = 3;
size_t sig_len = 0;
va_list ap;
uint64_t sig;
const char *arg;
@ -346,7 +340,7 @@ __retain_ctx(WterlConnHandle *conn_handle, uint32_t worker_id,
arg = session_config;
va_start(ap, session_config);
sig = __ctx_cache_sig(session_config, ap, count);
sig = __ctx_cache_sig(session_config, ap, count, &sig_len);
va_end(ap);
*ctx = NULL;
@ -364,8 +358,8 @@ __retain_ctx(WterlConnHandle *conn_handle, uint32_t worker_id,
*ctx = c;
break;
} else {
// mru missmatch:
DPRINTF("[%.4u] mru missmatch: %llu != %llu", worker_id, PRIuint64(sig), PRIuint64(c->sig));
// mru mismatch:
DPRINTF("[%.4u] mru mismatch: %llu != %llu", worker_id, PRIuint64(sig), PRIuint64(c->sig));
__ctx_cache_add(conn_handle, c);
*ctx = NULL;
}
@ -386,7 +380,7 @@ __retain_ctx(WterlConnHandle *conn_handle, uint32_t worker_id,
if (rc != 0) {
return rc;
}
size_t s = sizeof(struct wterl_ctx) + (count * sizeof(WT_CURSOR*));
size_t s = sizeof(struct wterl_ctx) + (count * sizeof(struct cursor_info)) + sig_len;
*ctx = enif_alloc(s); // TODO: enif_alloc_resource()
if (*ctx == NULL) {
session->close(session, NULL);
@ -395,14 +389,23 @@ __retain_ctx(WterlConnHandle *conn_handle, uint32_t worker_id,
memset(*ctx, 0, s);
(*ctx)->sig = sig;
(*ctx)->session = session;
(*ctx)->sig_len = sig_len;
char *p = (char *)(*ctx) + (s - sig_len);
(*ctx)->session_config = p;
memcpy(p, session_config, strlen(session_config)); p++;
session_config = arg;
va_start(ap, session_config);
for (i = 0; i < count; i++) {
const char *uri = va_arg(ap, const char *);
const char *config = va_arg(ap, const char *);
// TODO: error when uri or config is NULL
rc = session->open_cursor(session, uri, NULL, config, &(*ctx)->cursors[i]);
// TODO: what to do (if anything) when uri or config is NULL?
(*ctx)->ci[i].uri = p;
memcpy(p, uri, strlen(uri)); p++;
(*ctx)->ci[i].config = p;
memcpy(p, config, strlen(config)); p++;
rc = session->open_cursor(session, uri, NULL, config, &(*ctx)->ci[i].cursor);
if (rc != 0) {
enif_free(*ctx);
session->close(session, NULL); // this will free the cursors too
return rc;
}
@ -426,9 +429,9 @@ __release_ctx(WterlConnHandle *conn_handle, uint32_t worker_id, struct wterl_ctx
WT_CURSOR *cursor;
struct wterl_ctx *c = NULL;
n = sizeof((WT_CURSOR**)ctx->cursors) / sizeof(ctx->cursors[0]);
n = sizeof((WT_CURSOR**)ctx->ci) / sizeof(ctx->ci[0]);
for (i = 0; i < n; i++) {
cursor = ctx->cursors[i];
cursor = ctx->ci[i].cursor;
cursor->reset(cursor);
}
@ -461,7 +464,7 @@ __close_all_sessions(WterlConnHandle *conn_handle)
if (c != NULL) {
c->session->close(c->session, NULL);
memset(c, 0, sizeof(struct wterl_ctx));
memset(c, 0, sizeof(*c));
enif_free(c);
}
}
@ -487,8 +490,37 @@ __close_all_sessions(WterlConnHandle *conn_handle)
void
__close_cursors_on(WterlConnHandle *conn_handle, const char *uri)
{
UNUSED(uri);
__close_all_sessions(conn_handle);
struct wterl_ctx *c, *n;
int worker_id, cnt;
// TODO: improve this... but for now it's easiest to just toss everything
// from the mru into the cache as a first step.
for (worker_id = 0; worker_id < ASYNC_NIF_MAX_WORKERS; worker_id++) {
do {
c = conn_handle->mru_ctx[worker_id];
} while(CASPO(&conn_handle->mru_ctx[worker_id], c, NULL) != c);
if (c != NULL)
__ctx_cache_add(conn_handle, c);
}
// walk the cache, look for open cursors on matching uri
c = STAILQ_FIRST(&conn_handle->cache);
while (c != NULL) {
n = STAILQ_NEXT(c, entries);
cnt = sizeof((WT_CURSOR**)c->ci) / sizeof(c->ci[0]);
for(;cnt > 0; cnt--) {
if (!strcmp(c->ci[cnt].uri, uri)) {
STAILQ_REMOVE(&conn_handle->cache, c, wterl_ctx, entries);
conn_handle->cache_size -= 1;
c->session->close(c->session, NULL);
memset(c, 0, sizeof(*c));
enif_free(c);
break;
}
}
c = n;
}
return;
}
@ -1395,7 +1427,7 @@ ASYNC_NIF_DECL(
}
args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]);
enif_keep_resource((void*)args->conn_handle);
affinity = __str_hash(args->uri);
affinity = __str_hash(0, args->uri, strlen(args->uri));
},
{ // work
@ -1414,7 +1446,7 @@ ASYNC_NIF_DECL(
ASYNC_NIF_REPLY(__strerror_term(env, rc));
return;
}
cursor = ctx->cursors[0];
cursor = ctx->ci[0].cursor;
WT_ITEM item_key;
item_key.data = key.data;
@ -1454,7 +1486,7 @@ ASYNC_NIF_DECL(
}
args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]);
enif_keep_resource((void*)args->conn_handle);
affinity = __str_hash(args->uri);
affinity = __str_hash(0, args->uri, strlen(args->uri));
},
{ // work
@ -1473,7 +1505,7 @@ ASYNC_NIF_DECL(
ASYNC_NIF_REPLY(__strerror_term(env, rc));
return;
}
cursor = ctx->cursors[0];
cursor = ctx->ci[0].cursor;
WT_ITEM item_key;
WT_ITEM item_value;
@ -1534,7 +1566,7 @@ ASYNC_NIF_DECL(
args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]);
args->value = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[3]);
enif_keep_resource((void*)args->conn_handle);
affinity = __str_hash(args->uri);
affinity = __str_hash(0, args->uri, strlen(args->uri));
},
{ // work
@ -1558,7 +1590,7 @@ ASYNC_NIF_DECL(
ASYNC_NIF_REPLY(__strerror_term(env, rc));
return;
}
cursor = ctx->cursors[0];
cursor = ctx->ci[0].cursor;
WT_ITEM item_key;
WT_ITEM item_value;