diff --git a/c_src/async_nif.h b/c_src/async_nif.h index e34748e..3236f4c 100644 --- a/c_src/async_nif.h +++ b/c_src/async_nif.h @@ -241,7 +241,6 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en /* Identify the most appropriate worker for this request. */ unsigned int qid = 0; struct async_nif_work_queue *q = NULL; - unsigned int n = async_nif->num_queues; /* Either we're choosing a queue based on some affinity/hinted value or we need to select the next queue in the rotation and atomically update that @@ -254,6 +253,9 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en async_nif->next_q = qid; } +#if 0 // stats aren't yet thread safe, so this can go haywire... TODO: fix. + unsigned int n = async_nif->num_queues; + /* Now we inspect and interate across the set of queues trying to select one that isn't too full or too slow. */ do { @@ -281,6 +283,8 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en // TODO: at some point add in work sheading/stealing } while(n-- > 0); +#endif + /* We hold the queue's lock, and we've seletect a reasonable queue for this new request so add the request. */ STAT_TICK(q, qwait); @@ -297,7 +301,9 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en } /** - * TODO: + * Worker threads execute this function. Here each worker pulls requests of + * their respective queues, executes that work and continues doing that until + * they see the shutdown flag is set at which point they exit. */ static void * async_nif_worker_fn(void *arg) diff --git a/c_src/duration.h b/c_src/duration.h index 6c05df0..1404f41 100644 --- a/c_src/duration.h +++ b/c_src/duration.h @@ -43,7 +43,7 @@ struct scale_time { }; static const struct scale_time scale[] = { { "ns", "nanosecond", 1000000000LL, 1LL, 10, 2300000000000LL }, - { "mcs", "microsecond", 1000000LL, 1000LL, 10, 2300000000LL }, + { "μs", "microsecond", 1000000LL, 1000LL, 10, 2300000000LL }, { "ms", "millisecond", 1000LL, 1000000LL, 10, 2300000LL }, { "sec", "second", 1LL, 1000000000LL, 10, 2300LL } }; diff --git a/c_src/khash.h b/c_src/khash.h index ab157b1..69549dc 100644 --- a/c_src/khash.h +++ b/c_src/khash.h @@ -586,7 +586,7 @@ static kh_inline khint_t __ac_Wang_hash(khint_t key) @param name Name of the hash table [symbol] @param khval_t Type of values [type] */ -#ifdef __x86_64__ +#ifdef __x86_64__ #define KHASH_MAP_INIT_PTR(name, khval_t) \ KHASH_INIT(name, void*, khval_t, 1, kh_ptr64_hash_func, kh_ptr64_hash_equal) #else diff --git a/c_src/stats.h b/c_src/stats.h index 12f5d21..35192ec 100644 --- a/c_src/stats.h +++ b/c_src/stats.h @@ -184,27 +184,27 @@ static unsigned int __log2_64(uint64_t x) { #define STAT_INIT(var, name) \ - var->name ## _stat.min = ~0; \ - var->name ## _stat.max = 0; \ - var->name ## _stat.mean = 0.0; \ - var->name ## _stat.h = 0; \ - var->name ## _stat.d.then = 0; \ - var->name ## _stat.d.unit = ns; + (var)->name ## _stat.min = ~0; \ + (var)->name ## _stat.max = 0; \ + (var)->name ## _stat.mean = 0.0; \ + (var)->name ## _stat.h = 0; \ + (var)->name ## _stat.d.then = 0; \ + (var)->name ## _stat.d.unit = ns; -#define STAT_TICK(var, name) name ## _stat_tick(&var->name ## _stat) +#define STAT_TICK(var, name) name ## _stat_tick(&(var)->name ## _stat) -#define STAT_TOCK(var, name) name ## _stat_tock(&var->name ## _stat) +#define STAT_TOCK(var, name) name ## _stat_tock(&(var)->name ## _stat) -#define STAT_RESET(var, name) name ## _stat_reset(&var->name ## _stat) +#define STAT_RESET(var, name) name ## _stat_reset(&(var)->name ## _stat) #define STAT_MEAN_LOG2_SAMPLE(var, name) \ - name ## _stat_mean_lg2(&var->name ## _stat) + name ## _stat_mean_lg2(&(var)->name ## _stat) #define STAT_MEAN_SAMPLE(var, name) \ - name ## _stat_mean(&var->name ## _stat) + name ## _stat_mean(&(var)->name ## _stat) #define STAT_PRINT(var, name, mod) \ - name ## _stat_print_histogram(&var->name ## _stat, mod) + name ## _stat_print_histogram(&(var)->name ## _stat, mod) #if defined(__cplusplus) diff --git a/c_src/wterl.c b/c_src/wterl.c index eda611a..d1790b4 100644 --- a/c_src/wterl.c +++ b/c_src/wterl.c @@ -42,10 +42,16 @@ typedef char Uri[128]; struct wterl_ctx { STAILQ_ENTRY(wterl_ctx) entries; - uint64_t sig; uint64_t tstamp; + uint64_t sig; + size_t sig_len; WT_SESSION *session; - WT_CURSOR *cursors[]; // Note: must be last in struct + const char *session_config; + struct cursor_info { + const char *uri; + const char *config; + WT_CURSOR *cursor; + } ci[]; // Note: must be last in struct }; typedef struct wterl_conn { @@ -110,13 +116,35 @@ ASYNC_NIF_INIT(wterl); * -> an integer hash encoding of the bytes */ static inline uint32_t -__str_hash(const char *s) +__str_hash(uint32_t in, const char *p, size_t len) { - unsigned int h = (unsigned int)*s; - if (h) for (++s ; *s; ++s) h = (h << 5) - h + (unsigned int)*s; + uint32_t h = in; + for (++p ; len > 0; ++p, --len) + h = (h << 5) - h + (uint32_t)*p; return h; } +#if defined(__amd64) || defined(__x86_64) +/* Note: we'll use this to lower the chances that we'll have a hash + collision until I can finish a nice trie and use that to be a bit + more precise. When that's done we can skip hash/crc32 and just + use the binary position in the trie as our "signature". */ +static inline uint32_t +__crc32(uint32_t crc, const char *bytes, size_t len) +{ + const uint8_t *p; + for (p = (const uint8_t*)bytes; len > 0; ++p, --len) { + __asm__ __volatile__( + ".byte 0xF2, 0x0F, 0x38, 0xF0, 0xF1" + : "=S" (crc) + : "0" (crc), "c" (*p)); + } + return crc; +} +#else +#error unsupported platform +#endif + /** * Calculate the log2 of 64bit unsigned integers. */ @@ -189,7 +217,7 @@ __ctx_cache_evict(WterlConnHandle *conn_handle) if (log > mean) { STAILQ_REMOVE(&conn_handle->cache, c, wterl_ctx, entries); c->session->close(c->session, NULL); - memset(c, 0, sizeof(struct wterl_ctx)); + memset(c, 0, sizeof(*c)); enif_free(c); num_evicted++; } @@ -205,8 +233,8 @@ __ctx_cache_evict(WterlConnHandle *conn_handle) * See if there exists an item in the cache with a matching signature, if * so remove it from the cache and return it for use by the callee. * - * sig a 64-bit signature (hash) representing the session/cursor* needed - * for the operation + * sig a 64-bit signature (hash) representing the combination of Uri and + * session+config/cursor+config pairs needed for this operation */ static struct wterl_ctx * __ctx_cache_find(WterlConnHandle *conn_handle, const uint64_t sig) @@ -217,7 +245,7 @@ __ctx_cache_find(WterlConnHandle *conn_handle, const uint64_t sig) c = STAILQ_FIRST(&conn_handle->cache); while (c != NULL) { n = STAILQ_NEXT(c, entries); - if (c->sig == sig) { + if (c->sig == sig) { // TODO: hash collisions *will* lead to SEGVs // cache hit: STAILQ_REMOVE_HEAD(&conn_handle->cache, entries); conn_handle->histogram[__log2(cpu_clock_ticks() - c->tstamp)]++; @@ -249,60 +277,13 @@ __ctx_cache_add(WterlConnHandle *conn_handle, struct wterl_ctx *c) enif_mutex_unlock(conn_handle->cache_mutex); } -/** - * Produce the "Z-Index" or "Morton Number" from 2 32-bit unsigned integers. - * e.g. p = 0101 1011 0100 0011 - * q = 1011 1100 0001 0011 - * z = 0110 0111 1101 1010 0010 0001 0000 1111 - */ -static inline uint64_t -__zi(uint32_t p, uint32_t q) -{ - static const uint32_t B[] = {0x55555555, 0x33333333, 0x0F0F0F0F, 0x00FF00FF}; - static const uint32_t S[] = {1, 2, 4, 8}; - uint32_t x, y; - uint64_t z; - - x = p & 0x0000FFFF; // Interleave lower 16 bits of p as x and q as y, so the - y = q & 0x0000FFFF; // bits of x are in the even positions and bits from y - z = 0; // in the odd; the first 32 bits of 'z' is the result. - - x = (x | (x << S[3])) & B[3]; - x = (x | (x << S[2])) & B[2]; - x = (x | (x << S[1])) & B[1]; - x = (x | (x << S[0])) & B[0]; - - y = (y | (y << S[3])) & B[3]; - y = (y | (y << S[2])) & B[2]; - y = (y | (y << S[1])) & B[1]; - y = (y | (y << S[0])) & B[0]; - - z = x | (y << 1); - - x = (p >> 16) & 0x0000FFFF; // Interleave the upper 16 bits of p as x and q as y - y = (q >> 16) & 0x0000FFFF; // just as before. - - x = (x | (x << S[3])) & B[3]; - x = (x | (x << S[2])) & B[2]; - x = (x | (x << S[1])) & B[1]; - x = (x | (x << S[0])) & B[0]; - - y = (y | (y << S[3])) & B[3]; - y = (y | (y << S[2])) & B[2]; - y = (y | (y << S[1])) & B[1]; - y = (y | (y << S[0])) & B[0]; - - z = (z << 16) | (x | (y << 1)); // the resulting 64-bit Morton Number. - - return z; -} - /** * Create a signature for the operation we're about to perform. * - * Create a 64bit signature for this a combination of session configuration - * some number of cursors open on tables each potentially with a different - * configuration. "session_config, [{table_name, cursor_config}, ...]" + * Create a 64-bit hash signature for this a combination of session + * configuration some number of cursors open on tables each potentially with a + * different configuration. "session_config, [{table_name, cursor_config}, + * ...]" * * session_config the string used to configure the WT_SESSION * ... each pair of items in the varargs array is a table name, @@ -310,23 +291,35 @@ __zi(uint32_t p, uint32_t q) * -> number of variable arguments processed */ static uint64_t -__ctx_cache_sig(const char *c, va_list ap, int count) +__ctx_cache_sig(const char *c, va_list ap, int count, size_t *len) { int i = 0; - uint64_t h; + uint32_t hash = 0; + uint32_t crc = 0; + uint64_t sig = 0; const char *arg; + size_t l = 0; - if (c) - h = __str_hash(c); - else - h = 0; + if (c) { + l = strlen(c); + hash = __str_hash(hash, c, l); + crc = __crc32(crc, c, l); + *len += l + 1; + } for (i = 0; i < (2 * count); i++) { arg = va_arg(ap, const char *); - if (arg) h = __zi((uint32_t)(h & 0xFFFFFFFF), __str_hash(arg)); - else h = __zi((uint32_t)(h & 0xFFFFFFFF), 0); + if (arg) { + l = strlen(c); + hash = __str_hash(hash, arg, l); + crc = __crc32(crc, arg, strlen(arg)); + *len += l + 1; + } } - return h; + sig = crc; + sig = sig << 32; + sig &= hash; + return sig; } /** @@ -339,6 +332,7 @@ __retain_ctx(WterlConnHandle *conn_handle, uint32_t worker_id, int count, const char *session_config, ...) { int i = 3; + size_t sig_len = 0; va_list ap; uint64_t sig; const char *arg; @@ -346,7 +340,7 @@ __retain_ctx(WterlConnHandle *conn_handle, uint32_t worker_id, arg = session_config; va_start(ap, session_config); - sig = __ctx_cache_sig(session_config, ap, count); + sig = __ctx_cache_sig(session_config, ap, count, &sig_len); va_end(ap); *ctx = NULL; @@ -364,8 +358,8 @@ __retain_ctx(WterlConnHandle *conn_handle, uint32_t worker_id, *ctx = c; break; } else { - // mru missmatch: - DPRINTF("[%.4u] mru missmatch: %llu != %llu", worker_id, PRIuint64(sig), PRIuint64(c->sig)); + // mru mismatch: + DPRINTF("[%.4u] mru mismatch: %llu != %llu", worker_id, PRIuint64(sig), PRIuint64(c->sig)); __ctx_cache_add(conn_handle, c); *ctx = NULL; } @@ -386,7 +380,7 @@ __retain_ctx(WterlConnHandle *conn_handle, uint32_t worker_id, if (rc != 0) { return rc; } - size_t s = sizeof(struct wterl_ctx) + (count * sizeof(WT_CURSOR*)); + size_t s = sizeof(struct wterl_ctx) + (count * sizeof(struct cursor_info)) + sig_len; *ctx = enif_alloc(s); // TODO: enif_alloc_resource() if (*ctx == NULL) { session->close(session, NULL); @@ -395,14 +389,23 @@ __retain_ctx(WterlConnHandle *conn_handle, uint32_t worker_id, memset(*ctx, 0, s); (*ctx)->sig = sig; (*ctx)->session = session; + (*ctx)->sig_len = sig_len; + char *p = (char *)(*ctx) + (s - sig_len); + (*ctx)->session_config = p; + memcpy(p, session_config, strlen(session_config)); p++; session_config = arg; va_start(ap, session_config); for (i = 0; i < count; i++) { const char *uri = va_arg(ap, const char *); const char *config = va_arg(ap, const char *); - // TODO: error when uri or config is NULL - rc = session->open_cursor(session, uri, NULL, config, &(*ctx)->cursors[i]); + // TODO: what to do (if anything) when uri or config is NULL? + (*ctx)->ci[i].uri = p; + memcpy(p, uri, strlen(uri)); p++; + (*ctx)->ci[i].config = p; + memcpy(p, config, strlen(config)); p++; + rc = session->open_cursor(session, uri, NULL, config, &(*ctx)->ci[i].cursor); if (rc != 0) { + enif_free(*ctx); session->close(session, NULL); // this will free the cursors too return rc; } @@ -426,9 +429,9 @@ __release_ctx(WterlConnHandle *conn_handle, uint32_t worker_id, struct wterl_ctx WT_CURSOR *cursor; struct wterl_ctx *c = NULL; - n = sizeof((WT_CURSOR**)ctx->cursors) / sizeof(ctx->cursors[0]); + n = sizeof((WT_CURSOR**)ctx->ci) / sizeof(ctx->ci[0]); for (i = 0; i < n; i++) { - cursor = ctx->cursors[i]; + cursor = ctx->ci[i].cursor; cursor->reset(cursor); } @@ -461,7 +464,7 @@ __close_all_sessions(WterlConnHandle *conn_handle) if (c != NULL) { c->session->close(c->session, NULL); - memset(c, 0, sizeof(struct wterl_ctx)); + memset(c, 0, sizeof(*c)); enif_free(c); } } @@ -487,8 +490,37 @@ __close_all_sessions(WterlConnHandle *conn_handle) void __close_cursors_on(WterlConnHandle *conn_handle, const char *uri) { - UNUSED(uri); - __close_all_sessions(conn_handle); + struct wterl_ctx *c, *n; + int worker_id, cnt; + + // TODO: improve this... but for now it's easiest to just toss everything + // from the mru into the cache as a first step. + for (worker_id = 0; worker_id < ASYNC_NIF_MAX_WORKERS; worker_id++) { + do { + c = conn_handle->mru_ctx[worker_id]; + } while(CASPO(&conn_handle->mru_ctx[worker_id], c, NULL) != c); + + if (c != NULL) + __ctx_cache_add(conn_handle, c); + } + + // walk the cache, look for open cursors on matching uri + c = STAILQ_FIRST(&conn_handle->cache); + while (c != NULL) { + n = STAILQ_NEXT(c, entries); + cnt = sizeof((WT_CURSOR**)c->ci) / sizeof(c->ci[0]); + for(;cnt > 0; cnt--) { + if (!strcmp(c->ci[cnt].uri, uri)) { + STAILQ_REMOVE(&conn_handle->cache, c, wterl_ctx, entries); + conn_handle->cache_size -= 1; + c->session->close(c->session, NULL); + memset(c, 0, sizeof(*c)); + enif_free(c); + break; + } + } + c = n; + } return; } @@ -1395,7 +1427,7 @@ ASYNC_NIF_DECL( } args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); enif_keep_resource((void*)args->conn_handle); - affinity = __str_hash(args->uri); + affinity = __str_hash(0, args->uri, strlen(args->uri)); }, { // work @@ -1414,7 +1446,7 @@ ASYNC_NIF_DECL( ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } - cursor = ctx->cursors[0]; + cursor = ctx->ci[0].cursor; WT_ITEM item_key; item_key.data = key.data; @@ -1454,7 +1486,7 @@ ASYNC_NIF_DECL( } args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); enif_keep_resource((void*)args->conn_handle); - affinity = __str_hash(args->uri); + affinity = __str_hash(0, args->uri, strlen(args->uri)); }, { // work @@ -1473,7 +1505,7 @@ ASYNC_NIF_DECL( ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } - cursor = ctx->cursors[0]; + cursor = ctx->ci[0].cursor; WT_ITEM item_key; WT_ITEM item_value; @@ -1534,7 +1566,7 @@ ASYNC_NIF_DECL( args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); args->value = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[3]); enif_keep_resource((void*)args->conn_handle); - affinity = __str_hash(args->uri); + affinity = __str_hash(0, args->uri, strlen(args->uri)); }, { // work @@ -1558,7 +1590,7 @@ ASYNC_NIF_DECL( ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } - cursor = ctx->cursors[0]; + cursor = ctx->ci[0].cursor; WT_ITEM item_key; WT_ITEM item_value;