Cache session/[{cursor, config}] for reuse and spawn threads when needed. #9

Merged
gburd merged 30 commits from gsb-ctx-cache into master 2013-07-03 12:31:15 +00:00
4 changed files with 1202 additions and 130 deletions
Showing only changes of commit b002294c4e - Show all commits

View file

@ -1,12 +1,11 @@
/*- /*-
* Copyright 1997-1999, 2001, John-Mark Gurney. * Copyright 1997-1999, 2001, John-Mark Gurney.
* 2008, Attractive Chaos <attractivechaos@aol.co.uk> * 2008-2009, Attractive Chaos <attractor@live.co.uk>
*
* All rights reserved.
* *
* Redistribution and use in source and binary forms, with or without * Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions * modification, are permitted provided that the following conditions
* are met: * are met:
*
* 1. Redistributions of source code must retain the above copyright * 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer. * notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright * 2. Redistributions in binary form must reproduce the above copyright
@ -26,9 +25,6 @@
* SUCH DAMAGE. * SUCH DAMAGE.
*/ */
/* Reference: http://attractivechaos.awardspace.com/kbtree.h
http://attractivechaos.awardspace.com/kbtree.h.html */
#ifndef __AC_KBTREE_H #ifndef __AC_KBTREE_H
#define __AC_KBTREE_H #define __AC_KBTREE_H
@ -56,14 +52,14 @@ typedef struct {
{ \ { \
kbtree_##name##_t *b; \ kbtree_##name##_t *b; \
b = (kbtree_##name##_t*)calloc(1, sizeof(kbtree_##name##_t)); \ b = (kbtree_##name##_t*)calloc(1, sizeof(kbtree_##name##_t)); \
b->t = ((size - 4 - sizeof(void*)) / (sizeof(void*) + sizeof(key_t)) + 1) / 2; \ b->t = ((size - 4 - sizeof(void*)) / (sizeof(void*) + sizeof(key_t)) + 1) >> 1; \
if (b->t < 2) { \ if (b->t < 2) { \
free(b); return 0; \ free(b); return 0; \
} \ } \
b->n = 2 * b->t - 1; \ b->n = 2 * b->t - 1; \
b->off_ptr = 4 + b->n * sizeof(key_t); \ b->off_ptr = 4 + b->n * sizeof(key_t); \
b->ilen = (4 + sizeof(void*) + b->n * (sizeof(void*) + sizeof(key_t)) + 3) / 4 * 4; \ b->ilen = (4 + sizeof(void*) + b->n * (sizeof(void*) + sizeof(key_t)) + 3) >> 2 << 2; \
b->elen = (b->off_ptr + 3) / 4 * 4; \ b->elen = (b->off_ptr + 3) >> 2 << 2; \
b->root = (kbnode_t*)calloc(1, b->ilen); \ b->root = (kbnode_t*)calloc(1, b->ilen); \
++b->n_nodes; \ ++b->n_nodes; \
return b; \ return b; \
@ -71,7 +67,7 @@ typedef struct {
#define __kb_destroy(b) do { \ #define __kb_destroy(b) do { \
int i, max = 8; \ int i, max = 8; \
kbnode_t *x, **top, **stack; \ kbnode_t *x, **top, **stack = 0; \
if (b) { \ if (b) { \
top = stack = (kbnode_t**)calloc(max, sizeof(kbnode_t*)); \ top = stack = (kbnode_t**)calloc(max, sizeof(kbnode_t*)); \
*top++ = (b)->root; \ *top++ = (b)->root; \
@ -93,10 +89,17 @@ typedef struct {
free(b); free(stack); \ free(b); free(stack); \
} while (0) } while (0)
#define __kb_get_first(key_t, b, ret) do { \
kbnode_t *__x = (b)->root; \
while (__KB_PTR(b, __x)[0] != 0) \
__x = __KB_PTR(b, __x)[0]; \
(ret) = __KB_KEY(key_t, __x)[0]; \
} while (0)
#define __KB_GET_AUX0(name, key_t, __cmp) \ #define __KB_GET_AUX0(name, key_t, __cmp) \
static inline int __kb_get_aux_##name(const kbnode_t * __restrict x, const key_t * __restrict k, int *r) \ static inline int __kb_get_aux_##name(const kbnode_t * __restrict x, const key_t * __restrict k, int *r) \
{ \ { \
int tr, *rr, begin, end, n = x->n / 2; \ int tr, *rr, begin, end, n = x->n >> 1; \
if (x->n == 0) return -1; \ if (x->n == 0) return -1; \
if (__cmp(*k, __KB_KEY(key_t, x)[n]) < 0) { \ if (__cmp(*k, __KB_KEY(key_t, x)[n]) < 0) { \
begin = 0; end = n; \ begin = 0; end = n; \
@ -114,7 +117,7 @@ typedef struct {
if (x->n == 0) return -1; \ if (x->n == 0) return -1; \
rr = r? r : &tr; \ rr = r? r : &tr; \
while (begin < end) { \ while (begin < end) { \
int mid = (begin + end) / 2; \ int mid = (begin + end) >> 1; \
if (__cmp(__KB_KEY(key_t, x)[mid], *k) < 0) begin = mid + 1; \ if (__cmp(__KB_KEY(key_t, x)[mid], *k) < 0) begin = mid + 1; \
else end = mid; \ else end = mid; \
} \ } \
@ -189,7 +192,7 @@ typedef struct {
i = __kb_getp_aux_##name(x, k, 0); \ i = __kb_getp_aux_##name(x, k, 0); \
if (i != x->n - 1) \ if (i != x->n - 1) \
memmove(__KB_KEY(key_t, x) + i + 2, __KB_KEY(key_t, x) + i + 1, (x->n - i - 1) * sizeof(key_t)); \ memmove(__KB_KEY(key_t, x) + i + 2, __KB_KEY(key_t, x) + i + 1, (x->n - i - 1) * sizeof(key_t)); \
__KB_KEY(key_t, x)[i + 1] = (key_t)*k; \ __KB_KEY(key_t, x)[i + 1] = *k; \
++x->n; \ ++x->n; \
} else { \ } else { \
i = __kb_getp_aux_##name(x, k, 0) + 1; \ i = __kb_getp_aux_##name(x, k, 0) + 1; \
@ -227,7 +230,7 @@ typedef struct {
int yn, zn, i, r = 0; \ int yn, zn, i, r = 0; \
kbnode_t *xp, *y, *z; \ kbnode_t *xp, *y, *z; \
key_t kp; \ key_t kp; \
if (x == 0) return (key_t)*k; \ if (x == 0) return *k; \
if (s) { /* s can only be 0, 1 or 2 */ \ if (s) { /* s can only be 0, 1 or 2 */ \
r = x->is_internal == 0? 0 : s == 1? 1 : -1; \ r = x->is_internal == 0? 0 : s == 1? 1 : -1; \
i = s == 1? x->n - 1 : -1; \ i = s == 1? x->n - 1 : -1; \
@ -252,7 +255,7 @@ typedef struct {
return kp; \ return kp; \
} else if (yn == b->t - 1 && zn == b->t - 1) { \ } else if (yn == b->t - 1 && zn == b->t - 1) { \
y = __KB_PTR(b, x)[i]; z = __KB_PTR(b, x)[i + 1]; \ y = __KB_PTR(b, x)[i]; z = __KB_PTR(b, x)[i + 1]; \
__KB_KEY(key_t, y)[y->n++] = (key_t)*k; \ __KB_KEY(key_t, y)[y->n++] = *k; \
memmove(__KB_KEY(key_t, y) + y->n, __KB_KEY(key_t, z), z->n * sizeof(key_t)); \ memmove(__KB_KEY(key_t, y) + y->n, __KB_KEY(key_t, z), z->n * sizeof(key_t)); \
if (y->is_internal) memmove(__KB_PTR(b, y) + y->n, __KB_PTR(b, z), (z->n + 1) * sizeof(void*)); \ if (y->is_internal) memmove(__KB_PTR(b, y) + y->n, __KB_PTR(b, z), (z->n + 1) * sizeof(void*)); \
y->n += z->n; \ y->n += z->n; \
@ -375,7 +378,130 @@ typedef struct {
#define kb_size(b) ((b)->n_keys) #define kb_size(b) ((b)->n_keys)
#define kb_generic_cmp(a, b) (((a) > (b)) - ((a) < (b))) #define kb_generic_cmp(a, b) (((b) < (a)) - ((a) < (b)))
#define kb_str_cmp(a, b) strcmp(a, b) #define kb_str_cmp(a, b) strcmp(a, b)
#endif #endif
#ifdef TEST
#include <stdio.h>
#include <assert.h>
#include <time.h>
#include <stdlib.h>
#include <stdint.h>
#include <string.h>
typedef const char *str_t;
#include "kbtree.h"
typedef struct {
unsigned key;
char *value;
} intmap_t;
#define __intcmp(a, b) (((a).key > (b).key) - ((a).key < (b).key))
KBTREE_INIT(int, uint32_t, kb_generic_cmp)
KBTREE_INIT(str, str_t, kb_str_cmp)
KBTREE_INIT(intmap, intmap_t, __intcmp);
static int data_size = 5000000;
static unsigned *int_data;
static char **str_data;
static intmap_t *intmap_data;
void kb_init_data()
{
int i;
char buf[256];
printf("--- generating data... ");
srand48(11);
int_data = (unsigned*)calloc(data_size, sizeof(unsigned));
str_data = (char**)calloc(data_size, sizeof(char*));
intmap_data = (intmap_t*)calloc(data_size, sizeof(intmap_t));
for (i = 0; i < data_size; ++i) {
int_data[i] = (unsigned)(data_size * drand48() / 4) * 271828183u;
sprintf(buf, "%x", int_data[i]);
str_data[i] = strdup(buf);
intmap_data[i].key = i;
intmap_data[i].value = str_data[i];
}
printf("done!\n");
}
void kb_destroy_data()
{
int i;
for (i = 0; i < data_size; ++i) free(str_data[i]);
free(str_data); free(int_data);
}
void kb_tree_intmap()
{
int i;
intmap_t *data = intmap_data;
kbtree_t(intmap) *h;
h = kb_init(intmap, KB_DEFAULT_SIZE);
for (i = 0; i < data_size; ++i) {
if (kb_get(intmap, h, data[i]) == 0) kb_put(intmap, h, data[i]);
else kb_del(intmap, h, data[i]);
}
printf("[kb_tree_intmap] size: %d\n", kb_size(h));
__kb_destroy(h);
}
void kb_tree_int()
{
int i;
unsigned *data = int_data;
uint32_t *l, *u;
kbtree_t(int) *h;
h = kb_init(int, KB_DEFAULT_SIZE);
for (i = 0; i < data_size; ++i) {
if (kb_get(int, h, data[i]) == 0) kb_put(int, h, data[i]);
else kb_del(int, h, data[i]);
}
printf("[kb_tree_int] size: %d\n", kb_size(h));
if (1) {
int cnt = 0;
uint32_t x, y;
kb_interval(int, h, 2174625464u, &l, &u);
printf("interval for 2174625464: (%u, %u)\n", l? *l : 0, u? *u : 0);
#define traverse_f(p) { if (cnt == 0) y = *p; ++cnt; }
__kb_traverse(uint32_t, h, traverse_f);
__kb_get_first(uint32_t, h, x);
printf("# of elements from traversal: %d\n", cnt);
printf("first element: %d == %d\n", x, y);
}
__kb_destroy(h);
}
void kb_tree_str()
{
int i;
char **data = str_data;
kbtree_t(str) *h;
h = kb_init(str, KB_DEFAULT_SIZE);
for (i = 0; i < data_size; ++i) {
if (kb_get(str, h, data[i]) == 0) kb_put(str, h, data[i]);
else kb_del(str, h, data[i]);
}
printf("[kb_tree_int] size: %d\n", kb_size(h));
__kb_destroy(h);
}
void kb_timing(void (*f)(void))
{
clock_t t = clock();
(*f)();
printf("[kb_timing] %.3lf sec\n", (double)(clock() - t) / CLOCKS_PER_SEC);
}
int main(int argc, char *argv[])
{
if (argc > 1) data_size = atoi(argv[1]);
kb_init_data();
kb_timing(kb_tree_int);
kb_timing(kb_tree_str);
kb_timing(kb_tree_intmap);
kb_destroy_data();
return 0;
}
#endif

View file

@ -31,40 +31,37 @@
#include "async_nif.h" #include "async_nif.h"
#include "kbtree.h" #include "kbtree.h"
#include "queue.h" #include "queue.h"
#include "fifo_q.h"
static ErlNifResourceType *wterl_conn_RESOURCE; static ErlNifResourceType *wterl_conn_RESOURCE;
static ErlNifResourceType *wterl_ctx_RESOURCE;
static ErlNifResourceType *wterl_cursor_RESOURCE; static ErlNifResourceType *wterl_cursor_RESOURCE;
/* WiredTiger object names*/ /* WiredTiger object names*/
typedef char Uri[128]; typedef char Uri[128];
struct wterl_ctx { struct wterl_ctx {
WT_SESSION *session; // open session SLIST_ENTRY(wterl_ctx) entries;
WT_CURSOR **cursors; // open cursors, all reset ready to reuse
uint64_t sig; uint64_t sig;
uint64_t tstamp;
WT_SESSION *session;
WT_CURSOR *cursors[]; // Note: must be last in struct
}; };
struct cache_entry { struct cache_entry {
struct wterl_ctx *ctx;
uint64_t sig; uint64_t sig;
uint64_t tstamp; SLIST_HEAD(ctxs, wterl_ctx) contexts;
}; };
#define __ctx_sig_cmp(a, b) ((((a)->sig) > ((b)->sig)) - (((a)->sig) < ((b)->sig))) #define __ctx_sig_cmp(a, b) (((a).sig > (b).sig) - ((a).sig < (b).sig))
KBTREE_INIT(cache_entries, struct cache_entry*, __ctx_sig_cmp); KBTREE_INIT(cache_entries, struct cache_entry, __ctx_sig_cmp);
DECL_FIFO_QUEUE(cache_entries, struct cache_entry);
typedef struct wterl_conn { typedef struct wterl_conn {
WT_CONNECTION *conn; WT_CONNECTION *conn;
const char *session_config; const char *session_config;
ErlNifMutex *cache_mutex; ErlNifMutex *cache_mutex;
kbtree_t(cache_entries) *cache; kbtree_t(cache_entries) *cache;
fifo_t(cache_entries) recycled_cache_entries;
SLIST_ENTRY(wterl_conn) conns; SLIST_ENTRY(wterl_conn) conns;
uint64_t histogram[64]; uint64_t histogram[64];
uint64_t hits, misses; uint64_t histogram_count;
} WterlConnHandle; } WterlConnHandle;
typedef struct { typedef struct {
@ -158,9 +155,9 @@ static inline uint32_t __log2(uint64_t x) {
* -> 0 = no/false, anything else is true * -> 0 = no/false, anything else is true
*/ */
static int static int
__ctx_cache_full(WterlConnHandle *conn) __ctx_cache_full(WterlConnHandle *conn_handle)
{ {
return fifo_q_full(cache_entries, conn->recycled_cache_entries); return kb_size(conn_handle->cache) == ASYNC_NIF_MAX_WORKERS; // TODO:
} }
/** /**
@ -174,38 +171,72 @@ __ctx_cache_full(WterlConnHandle *conn)
static int static int
__ctx_cache_evict(WterlConnHandle *conn_handle) __ctx_cache_evict(WterlConnHandle *conn_handle)
{ {
uint32_t i; uint32_t num_evicted, i;
uint64_t mean, now = cpu_clock_ticks(); uint64_t mean, now;
struct wterl_ctx *to_free[ASYNC_NIF_MAX_WORKERS];
now = cpu_clock_ticks();
kbtree_t(cache_entries) *t = conn_handle->cache; kbtree_t(cache_entries) *t = conn_handle->cache;
// Find the mean of the recorded times that items stayed in cache. // Find the mean of the recorded times that items stayed in cache.
mean = 0;
for (i = 0; i < 64; i++) for (i = 0; i < 64; i++)
mean += (conn_handle->histogram[i] * i); mean += (conn_handle->histogram[i] * i);
if (mean > 0) if (mean > 0)
mean /= conn_handle->hits; mean /= conn_handle->histogram_count;
// Clear out the histogram and hit/misses // Clear out the histogram and hit/misses
memset(conn_handle->histogram, 0, sizeof(uint64_t) * 64); memset(conn_handle->histogram, 0, sizeof(uint64_t) * 64);
conn_handle->hits = 0; conn_handle->histogram_count = 0;
conn_handle->misses = 0;
// Evict anything older than the mean time in queue. /*
i = 0; * Evict anything older than the mean time in queue by removing those
* items from the lists at the leaf nodes of the tree.
*/
num_evicted = 0;
#define traverse_f(p) \ #define traverse_f(p) \
{ \ { \
struct cache_entry *e = *p; \ struct cache_entry *e; \
uint64_t elapsed = e->tstamp - now; \ struct wterl_ctx *c; \
if (__log2(elapsed) > mean) { \ e = (struct cache_entry *)p; \
kb_del(cache_entries, t, e); \ SLIST_FOREACH(c, &e->contexts, entries) { \
e->ctx->session->close(e->ctx->session, NULL); \ uint64_t elapsed = c->tstamp - now; \
enif_free(e->ctx); \ uint32_t log = __log2(elapsed); \
fifo_q_put(cache_entries, conn_handle->recycled_cache_entries, e); \ if (log > mean) { \
i++; \ SLIST_REMOVE(&e->contexts, c, wterl_ctx, entries); \
c->session->close(c->session, NULL); \
to_free[num_evicted] = c; \
num_evicted++; \
} \
} \ } \
} }
__kb_traverse(struct cache_entry *, t, traverse_f); __kb_traverse(struct cache_entry, t, traverse_f);
#undef traverse_f #undef traverse_f
return i;
/*
* Free up the wterl_ctx we've removed after finishing the loop.
*/
for (i = 0; i < num_evicted; i++) {
enif_free(to_free[i]);
}
/*
* Walk the tree again looking for empty lists to prune from the
* tree.
*/
#define traverse_f(p) \
{ \
struct cache_entry *e, query; \
e = p; \
query.sig = e->sig; \
if (SLIST_EMPTY(&e->contexts)) { \
kb_del(cache_entries, t, query); \
} \
}
__kb_traverse(struct cache_entry, t, traverse_f);
#undef traverse_f
return num_evicted;
} }
/** /**
@ -220,24 +251,23 @@ __ctx_cache_evict(WterlConnHandle *conn_handle)
static struct wterl_ctx * static struct wterl_ctx *
__ctx_cache_find(WterlConnHandle *conn_handle, const uint64_t sig) __ctx_cache_find(WterlConnHandle *conn_handle, const uint64_t sig)
{ {
struct wterl_ctx *p = NULL; struct wterl_ctx *c = NULL;
struct cache_entry key, *e; struct cache_entry query, *result;
key.sig = sig; query.sig = sig;
e = *kb_get(cache_entries, conn_handle->cache, &key); result = kb_get(cache_entries, conn_handle->cache, query);
if (e) { if (result && !SLIST_EMPTY(&result->contexts)) {
// cache hit, remove it from the tree /*
uint64_t elapsed = cpu_clock_ticks() - e->tstamp; * cache hit:
kb_del(cache_entries, conn_handle->cache, &key); * remove a context from the list in the tree node
p = e->ctx; */
memset(e, 0, sizeof(struct cache_entry)); c = SLIST_FIRST(&result->contexts);
fifo_q_put(cache_entries, conn_handle->recycled_cache_entries, e); SLIST_REMOVE_HEAD(&result->contexts, entries);
conn_handle->hits++; uint64_t elapsed = cpu_clock_ticks() - c->tstamp;
conn_handle->histogram[__log2(elapsed)]++; conn_handle->histogram[__log2(elapsed)]++;
} else { conn_handle->histogram_count++;
conn_handle->misses++; } // else { cache miss
} return c;
return p;
} }
/** /**
@ -246,20 +276,29 @@ __ctx_cache_find(WterlConnHandle *conn_handle, const uint64_t sig)
* Return an item into the cache, reset the cursors it has open and put it at * Return an item into the cache, reset the cursors it has open and put it at
* the front of the LRU. * the front of the LRU.
*/ */
static int static void
__ctx_cache_add(WterlConnHandle *conn, struct wterl_ctx *c) __ctx_cache_add(WterlConnHandle *conn, struct wterl_ctx *c)
{ {
struct cache_entry *e; struct cache_entry query, *result;
/*
* Check to see if the cache is full and if so trigger eviction which will
* remove the least-recently-used half of the items from the cache.
*/
if (__ctx_cache_full(conn)) if (__ctx_cache_full(conn))
__ctx_cache_evict(conn); __ctx_cache_evict(conn);
e = fifo_q_get(cache_entries, conn->recycled_cache_entries); c->tstamp = cpu_clock_ticks();
e->ctx = c;
e->sig = c->sig; query.sig = c->sig;
e->tstamp = cpu_clock_ticks(); result = kb_get(cache_entries, conn->cache, query);
kb_put(cache_entries, conn->cache, e); if (result == NULL) {
return 0; SLIST_INIT(&query.contexts); // TODO: should this be on the heap?
SLIST_INSERT_HEAD(&query.contexts, c, entries);
kb_put(cache_entries, conn->cache, query);
} else {
SLIST_INSERT_HEAD(&result->contexts, c, entries);
}
} }
/** /**
@ -322,42 +361,25 @@ __zi(uint32_t p, uint32_t q)
* cursor config pair * cursor config pair
* -> number of variable arguments processed * -> number of variable arguments processed
*/ */
static int static uint64_t
__ctx_cache_sig_(const char *c, va_list ap, uint64_t *h) __ctx_cache_sig(const char *c, va_list ap, int count)
{ {
int i = 0; int i = 0;
if (NULL == c)
return 0;
*h = __str_hash(c);
while (*c) {
*h = __zi((uint32_t)(*h & 0xFFFFFFFF), __str_hash(va_arg(ap, const char *)));
*h <<= 1;
i++;
}
return i;
}
#if 0
static uint64_t
__ctx_cache_sig(const char *c, ...)
{
int i;
va_list ap;
uint64_t h; uint64_t h;
const char *arg;
if (NULL == c) if (c)
return 0; h = __str_hash(c);
else
h = 0;
va_start(ap, c); for (i = 0; i < (2 * count); i++) {
i = __ctx_cache_sig_(c, ap, &h); arg = va_arg(ap, const char *);
va_end (ap); if (arg) h = __zi((uint32_t)(h & 0xFFFFFFFF), __str_hash(arg));
else h = __zi((uint32_t)(h & 0xFFFFFFFF), 0);
return i; }
return h;
} }
#endif
/** /**
* Get a reusable cursor that was opened for a particular worker within its * Get a reusable cursor that was opened for a particular worker within its
@ -365,16 +387,16 @@ __ctx_cache_sig(const char *c, ...)
*/ */
static int static int
__retain_ctx(WterlConnHandle *conn_handle, struct wterl_ctx **ctx, __retain_ctx(WterlConnHandle *conn_handle, struct wterl_ctx **ctx,
const char *session_config, ...) int count, const char *session_config, ...)
{ {
int i, count; int i = 0;
va_list ap; va_list ap;
uint64_t sig; uint64_t sig;
const char *c; const char *arg;
c = session_config; arg = session_config;
va_start(ap, session_config); va_start(ap, session_config);
count = __ctx_cache_sig_(session_config, ap, &sig); sig = __ctx_cache_sig(session_config, ap, count);
va_end(ap); va_end(ap);
enif_mutex_lock(conn_handle->cache_mutex); enif_mutex_lock(conn_handle->cache_mutex);
@ -386,9 +408,8 @@ __retain_ctx(WterlConnHandle *conn_handle, struct wterl_ctx **ctx,
int rc = conn->open_session(conn, NULL, session_config, &session); int rc = conn->open_session(conn, NULL, session_config, &session);
if (rc != 0) if (rc != 0)
return rc; return rc;
size_t s = sizeof(struct wterl_ctx) + ((count / 2) * sizeof(WT_CURSOR*)); size_t s = sizeof(struct wterl_ctx) + (count * sizeof(WT_CURSOR*));
// TODO: *ctx = enif_alloc_resource(wterl_ctx_RESOURCE, s); *ctx = enif_alloc(s); // TODO: enif_alloc_resource()
*ctx = enif_alloc(s);
if (*ctx == NULL) { if (*ctx == NULL) {
session->close(session, NULL); session->close(session, NULL);
return ENOMEM; return ENOMEM;
@ -396,14 +417,13 @@ __retain_ctx(WterlConnHandle *conn_handle, struct wterl_ctx **ctx,
memset(*ctx, 0, s); memset(*ctx, 0, s);
(*ctx)->sig = sig; (*ctx)->sig = sig;
(*ctx)->session = session; (*ctx)->session = session;
WT_CURSOR **cursors = (*ctx)->cursors; session_config = arg;
session_config = c;
va_start(ap, session_config); va_start(ap, session_config);
for (i = 0; i < (count / 2); i++) { for (i = 0; i < count; i++) {
const char *uri = va_arg(ap, const char *); const char *uri = va_arg(ap, const char *);
const char *config = va_arg(ap, const char *); const char *config = va_arg(ap, const char *);
// TODO: error when uri or config is NULL // TODO: error when uri or config is NULL
rc = session->open_cursor(session, uri, NULL, config, &cursors[i]); rc = session->open_cursor(session, uri, NULL, config, &(*ctx)->cursors[i]);
if (rc != 0) { if (rc != 0) {
session->close(session, NULL); // this will free the cursors too session->close(session, NULL); // this will free the cursors too
return rc; return rc;
@ -424,7 +444,7 @@ __release_ctx(WterlConnHandle *conn_handle, struct wterl_ctx *ctx)
int i, c; int i, c;
WT_CURSOR *cursor; WT_CURSOR *cursor;
c = sizeof(ctx->cursors) / sizeof(ctx->cursors[0]); c = sizeof((WT_CURSOR**)ctx->cursors) / sizeof(ctx->cursors[0]);
for (i = 0; i < c; i++) { for (i = 0; i < c; i++) {
cursor = ctx->cursors[i]; cursor = ctx->cursors[i];
cursor->reset(cursor); cursor->reset(cursor);
@ -442,14 +462,46 @@ __release_ctx(WterlConnHandle *conn_handle, struct wterl_ctx *ctx)
void void
__close_all_sessions(WterlConnHandle *conn_handle) __close_all_sessions(WterlConnHandle *conn_handle)
{ {
int i, num_closed = 0;
struct wterl_ctx *to_free[ASYNC_NIF_MAX_WORKERS];
kbtree_t(cache_entries) *t = conn_handle->cache; kbtree_t(cache_entries) *t = conn_handle->cache;
#define traverse_f(p) { \ #define traverse_f(p) \
kb_del(cache_entries, t, *p); \ { \
enif_free(p); \ struct cache_entry *e; \
struct wterl_ctx *c; \
e = (struct cache_entry *)p; \
SLIST_FOREACH(c, &e->contexts, entries) { \
SLIST_REMOVE(&e->contexts, c, wterl_ctx, entries); \
c->session->close(c->session, NULL); \
to_free[num_closed] = c; \
num_closed++; \
} \
} }
__kb_traverse(struct cache_entry *, t, traverse_f); __kb_traverse(struct cache_entry *, t, traverse_f);
#undef traverse_f #undef traverse_f
/*
* Free up the wterl_ctx we've removed after finishing the loop.
*/
for (i = 0; i < num_closed; i++) {
enif_free(to_free[i]);
}
/*
* Walk the tree again to prune all the empty lists from the tree.
*/
#define traverse_f(p) \
{ \
struct cache_entry *e, query; \
e = (struct cache_entry *)p; \
query.sig = e->sig; \
if (SLIST_EMPTY(&e->contexts)) { \
kb_del(cache_entries, t, query); \
} \
}
__kb_traverse(struct cache_entry, t, traverse_f);
#undef traverse_f
} }
/** /**
@ -678,7 +730,6 @@ ASYNC_NIF_DECL(
/* Init tree which manages the cache of session/cursor(s) */ /* Init tree which manages the cache of session/cursor(s) */
conn_handle->cache = kb_init(cache_entries, ASYNC_NIF_MAX_WORKERS); // TODO: size conn_handle->cache = kb_init(cache_entries, ASYNC_NIF_MAX_WORKERS); // TODO: size
conn_handle->recycled_cache_entries = fifo_q_new(cache_entries, ASYNC_NIF_MAX_WORKERS);
/* Keep track of open connections so as to free when unload/reload/etc. /* Keep track of open connections so as to free when unload/reload/etc.
are called. */ are called. */
@ -1374,7 +1425,7 @@ ASYNC_NIF_DECL(
struct wterl_ctx *ctx = NULL; struct wterl_ctx *ctx = NULL;
WT_CURSOR *cursor = NULL; WT_CURSOR *cursor = NULL;
int rc = __retain_ctx(args->conn_handle, &ctx, int rc = __retain_ctx(args->conn_handle, &ctx, 1,
args->conn_handle->session_config, args->conn_handle->session_config,
args->uri, "overwrite,raw"); args->uri, "overwrite,raw");
if (rc != 0) { if (rc != 0) {
@ -1433,7 +1484,7 @@ ASYNC_NIF_DECL(
struct wterl_ctx *ctx = NULL; struct wterl_ctx *ctx = NULL;
WT_CURSOR *cursor = NULL; WT_CURSOR *cursor = NULL;
int rc = __retain_ctx(args->conn_handle, &ctx, int rc = __retain_ctx(args->conn_handle, &ctx, 1,
args->conn_handle->session_config, args->conn_handle->session_config,
args->uri, "overwrite,raw"); args->uri, "overwrite,raw");
if (rc != 0) { if (rc != 0) {
@ -1515,7 +1566,7 @@ ASYNC_NIF_DECL(
struct wterl_ctx *ctx = NULL; struct wterl_ctx *ctx = NULL;
WT_CURSOR *cursor = NULL; WT_CURSOR *cursor = NULL;
int rc = __retain_ctx(args->conn_handle, &ctx, int rc = __retain_ctx(args->conn_handle, &ctx, 1,
args->conn_handle->session_config, args->conn_handle->session_config,
args->uri, "overwrite,raw"); args->uri, "overwrite,raw");
if (rc != 0) { if (rc != 0) {
@ -2252,8 +2303,6 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info)
NULL, flags, NULL); NULL, flags, NULL);
wterl_cursor_RESOURCE = enif_open_resource_type(env, NULL, "wterl_cursor_resource", wterl_cursor_RESOURCE = enif_open_resource_type(env, NULL, "wterl_cursor_resource",
NULL, flags, NULL); NULL, flags, NULL);
wterl_ctx_RESOURCE = enif_open_resource_type(env, NULL, "wterl_ctx_resource",
NULL, flags, NULL);
ATOM_ERROR = enif_make_atom(env, "error"); ATOM_ERROR = enif_make_atom(env, "error");
ATOM_OK = enif_make_atom(env, "ok"); ATOM_OK = enif_make_atom(env, "ok");
@ -2325,7 +2374,6 @@ on_unload(ErlNifEnv *env, void *priv_data)
{ {
struct wterl_priv_data *priv = (struct wterl_priv_data *)priv_data; struct wterl_priv_data *priv = (struct wterl_priv_data *)priv_data;
WterlConnHandle *conn_handle; WterlConnHandle *conn_handle;
struct cache_entry *e;
enif_mutex_lock(priv->conns_mutex); enif_mutex_lock(priv->conns_mutex);
@ -2338,12 +2386,10 @@ on_unload(ErlNifEnv *env, void *priv_data)
ASYNC_NIF_UNLOAD(wterl, env, priv->async_nif_priv); ASYNC_NIF_UNLOAD(wterl, env, priv->async_nif_priv);
SLIST_FOREACH(conn_handle, &priv->conns, conns) { SLIST_FOREACH(conn_handle, &priv->conns, conns) {
fifo_q_foreach(cache_entries, conn_handle->recycled_cache_entries, e, { __close_all_sessions(conn_handle);
struct wterl_ctx *ctx = e->ctx; conn_handle->conn->close(conn_handle->conn, NULL);
ctx->session->close(ctx->session, NULL);
});
fifo_q_free(cache_entries, conn_handle->recycled_cache_entries);
kb_destroy(cache_entries, conn_handle->cache); kb_destroy(cache_entries, conn_handle->cache);
enif_free((void*)conn_handle->session_config);
enif_mutex_unlock(conn_handle->cache_mutex); enif_mutex_unlock(conn_handle->cache_mutex);
enif_mutex_destroy(conn_handle->cache_mutex); enif_mutex_destroy(conn_handle->cache_mutex);
} }

View file

@ -0,0 +1,613 @@
%% -------------------------------------------------------------------
%%
%% riak_kv_wterl_backend: WiredTiger Driver for Riak
%%
%% Copyright (c) 2012-2013 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(riak_kv_wterl_backend).
-behavior(temp_riak_kv_backend).
%% KV Backend API
-export([api_version/0,
capabilities/1,
capabilities/2,
start/2,
stop/1,
get/3,
put/5,
delete/4,
drop/1,
fold_buckets/4,
fold_keys/4,
fold_objects/4,
is_empty/1,
status/1,
callback/3]).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-compiel(export_all).
-endif.
-define(API_VERSION, 1).
%% TODO: for when this backend supports 2i
%%-define(CAPABILITIES, [async_fold, indexes]).
-define(CAPABILITIES, [async_fold]).
-record(state, {table :: string(),
type :: string(),
connection :: wterl:connection(),
is_empty_cursor :: wterl:cursor(),
status_cursor :: wterl:cursor()}).
-type state() :: #state{}.
-type config() :: [{atom(), term()}].
%% ===================================================================
%% Public API
%% ===================================================================
%% @doc Return the major version of the
%% current API.
-spec api_version() -> {ok, integer()}.
api_version() ->
{ok, ?API_VERSION}.
%% @doc Return the capabilities of the backend.
-spec capabilities(state()) -> {ok, [atom()]}.
capabilities(_) ->
{ok, ?CAPABILITIES}.
%% @doc Return the capabilities of the backend.
-spec capabilities(riak_object:bucket(), state()) -> {ok, [atom()]}.
capabilities(_, _) ->
{ok, ?CAPABILITIES}.
%% @doc Start the wterl backend
-spec start(integer(), config()) -> {ok, state()} | {error, term()}.
start(Partition, Config) ->
AppStart =
case application:start(wterl) of
ok ->
ok;
{error, {already_started, _}} ->
ok;
{error, Reason1} ->
lager:error("Failed to start wterl: ~p", [Reason1]),
{error, Reason1}
end,
case AppStart of
ok ->
Type =
case wterl:config_value(type, Config, "lsm") of
{type, "lsm"} -> "lsm";
{type, "table"} -> "table";
{type, "btree"} -> "table";
{type, BadType} ->
lager:info("wterl:start ignoring unknown type ~p, using lsm instead", [BadType]),
"lsm";
_ ->
lager:info("wterl:start ignoring mistaken setting defaulting to lsm"),
"lsm"
end,
{ok, Connection} = establish_connection(Config, Type),
Table = Type ++ ":" ++ integer_to_list(Partition),
Compressor =
case wterl:config_value(block_compressor, Config, "snappy") of
{block_compressor, "snappy"}=C -> [C];
{block_compressor, "none"} -> [];
{block_compressor, none} -> [];
{block_compressor, _} -> [{block_compressor, "snappy"}];
_ -> [{block_compressor, "snappy"}]
end,
TableOpts =
case Type of
"lsm" ->
[{internal_page_max, "128K"},
{leaf_page_max, "128K"},
{lsm_chunk_size, "100MB"},
{lsm_merge_threads, 2},
{prefix_compression, false},
{lsm_bloom_newest, true},
{lsm_bloom_oldest, true} ,
{lsm_bloom_bit_count, 128},
{lsm_bloom_hash_count, 64},
{lsm_bloom_config, [{leaf_page_max, "8MB"}]}
] ++ Compressor;
"table" ->
Compressor
end,
case wterl:create(Connection, Table, TableOpts) of
ok ->
case establish_utility_cursors(Connection, Table) of
{ok, IsEmptyCursor, StatusCursor} ->
{ok, #state{table=Table, type=Type,
connection=Connection,
is_empty_cursor=IsEmptyCursor,
status_cursor=StatusCursor}};
{error, Reason2} ->
{error, Reason2}
end;
{error, Reason3} ->
{error, Reason3}
end
end.
%% @doc Stop the wterl backend
-spec stop(state()) -> ok.
stop(_State) ->
ok. %% The connection is closed by wterl_conn:stop()
%% @doc Retrieve an object from the wterl backend
-spec get(riak_object:bucket(), riak_object:key(), state()) ->
{ok, any(), state()} |
{ok, not_found, state()} |
{error, term(), state()}.
get(Bucket, Key, #state{connection=Connection, table=Table}=State) ->
WTKey = to_object_key(Bucket, Key),
case wterl:get(Connection, Table, WTKey) of
{ok, Value} ->
{ok, Value, State};
not_found ->
{error, not_found, State};
{error, Reason} ->
{error, Reason, State}
end.
%% @doc Insert an object into the wterl backend.
%% NOTE: The wterl backend does not currently support
%% secondary indexing and the_IndexSpecs parameter
%% is ignored.
-type index_spec() :: {add, Index, SecondaryKey} | {remove, Index, SecondaryKey}.
-spec put(riak_object:bucket(), riak_object:key(), [index_spec()], binary(), state()) ->
{ok, state()} |
{error, term(), state()}.
put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{connection=Connection, table=Table}=State) ->
case wterl:put(Connection, Table, to_object_key(Bucket, PrimaryKey), Val) of
ok ->
{ok, State};
{error, Reason} ->
{error, Reason, State}
end.
%% @doc Delete an object from the wterl backend
%% NOTE: The wterl backend does not currently support
%% secondary indexing and the_IndexSpecs parameter
%% is ignored.
-spec delete(riak_object:bucket(), riak_object:key(), [index_spec()], state()) ->
{ok, state()} |
{error, term(), state()}.
delete(Bucket, Key, _IndexSpecs, #state{connection=Connection, table=Table}=State) ->
case wterl:delete(Connection, Table, to_object_key(Bucket, Key)) of
ok ->
{ok, State};
{error, Reason} ->
{error, Reason, State}
end.
%% @doc Fold over all the buckets
-spec fold_buckets(riak_kv_backend:fold_buckets_fun(),
any(),
[],
state()) -> {ok, any()} | {async, fun()}.
fold_buckets(FoldBucketsFun, Acc, Opts, #state{connection=Connection, table=Table}) ->
FoldFun = fold_buckets_fun(FoldBucketsFun),
BucketFolder =
fun() ->
case wterl:cursor_open(Connection, Table) of
{error, {enoent, _Message}} ->
Acc;
{ok, Cursor} ->
try
{FoldResult, _} =
wterl:fold_keys(Cursor, FoldFun, {Acc, []}),
FoldResult
catch
{break, AccFinal} ->
AccFinal
after
ok = wterl:cursor_close(Cursor)
end
end
end,
case lists:member(async_fold, Opts) of
true ->
{async, BucketFolder};
false ->
{ok, BucketFolder()}
end.
%% @doc Fold over all the keys for one or all buckets.
-spec fold_keys(riak_kv_backend:fold_keys_fun(),
any(),
[{atom(), term()}],
state()) -> {ok, term()} | {async, fun()}.
fold_keys(FoldKeysFun, Acc, Opts, #state{connection=Connection, table=Table}) ->
%% Figure out how we should limit the fold: by bucket, by
%% secondary index, or neither (fold across everything.)
Bucket = lists:keyfind(bucket, 1, Opts),
Index = lists:keyfind(index, 1, Opts),
%% Multiple limiters may exist. Take the most specific limiter.
Limiter =
if Index /= false -> Index;
Bucket /= false -> Bucket;
true -> undefined
end,
%% Set up the fold...
FoldFun = fold_keys_fun(FoldKeysFun, Limiter),
KeyFolder =
fun() ->
case wterl:cursor_open(Connection, Table) of
{error, {enoent, _Message}} ->
Acc;
{ok, Cursor} ->
try
wterl:fold_keys(Cursor, FoldFun, Acc)
catch
{break, AccFinal} ->
AccFinal
after
ok = wterl:cursor_close(Cursor)
end
end
end,
case lists:member(async_fold, Opts) of
true ->
{async, KeyFolder};
false ->
{ok, KeyFolder()}
end.
%% @doc Fold over all the objects for one or all buckets.
-spec fold_objects(riak_kv_backend:fold_objects_fun(),
any(),
[{atom(), term()}],
state()) -> {ok, any()} | {async, fun()}.
fold_objects(FoldObjectsFun, Acc, Opts, #state{connection=Connection, table=Table}) ->
Bucket = proplists:get_value(bucket, Opts),
FoldFun = fold_objects_fun(FoldObjectsFun, Bucket),
ObjectFolder =
fun() ->
case wterl:cursor_open(Connection, Table) of
{error, {enoent, _Message}} ->
Acc;
{ok, Cursor} ->
try
wterl:fold(Cursor, FoldFun, Acc)
catch
{break, AccFinal} ->
AccFinal
after
case wterl:cursor_close(Cursor) of
ok ->
ok;
{error, {eperm, _}} -> %% TODO: review/fix
ok;
{error, _}=E ->
E
end
end
end
end,
case lists:member(async_fold, Opts) of
true ->
{async, ObjectFolder};
false ->
{ok, ObjectFolder()}
end.
%% @doc Delete all objects from this wterl backend
-spec drop(state()) -> {ok, state()} | {error, term(), state()}.
drop(#state{connection=Connection, table=Table}=State) ->
case wterl:drop(Connection, Table) of
ok ->
{ok, State};
{error, {ebusy, _}} -> %% TODO: review/fix
{ok, State};
Error ->
{error, Error, State}
end.
%% @doc Returns true if this wterl backend contains any
%% non-tombstone values; otherwise returns false.
-spec is_empty(state()) -> boolean().
is_empty(#state{is_empty_cursor=Cursor}) ->
wterl:cursor_reset(Cursor),
case wterl:cursor_next(Cursor) of
not_found -> true;
{error, {eperm, _}} -> false; % TODO: review/fix this logic
_ -> false
end.
%% @doc Get the status information for this wterl backend
-spec status(state()) -> [{atom(), term()}].
status(#state{status_cursor=Cursor}) ->
wterl:cursor_reset(Cursor),
case fetch_status(Cursor) of
{ok, Stats} ->
Stats;
{error, {eperm, _}} -> % TODO: review/fix this logic
{ok, []};
_ ->
{ok, []}
end.
%% @doc Register an asynchronous callback
-spec callback(reference(), any(), state()) -> {ok, state()}.
callback(_Ref, _Msg, State) ->
{ok, State}.
%% ===================================================================
%% Internal functions
%% ===================================================================
%% @private
max_sessions(Config) ->
RingSize =
case app_helper:get_prop_or_env(ring_creation_size, Config, riak_core) of
undefined -> 1024;
Size -> Size
end,
Est = 100 * (RingSize * erlang:system_info(schedulers)), % TODO: review/fix this logic
case Est > 1000000000 of % Note: WiredTiger uses a signed int for this
true -> 1000000000;
false -> Est
end.
%% @private
establish_utility_cursors(Connection, Table) ->
case wterl:cursor_open(Connection, Table) of
{ok, IsEmptyCursor} ->
case wterl:cursor_open(Connection, "statistics:" ++ Table, [{statistics_fast, true}]) of
{ok, StatusCursor} ->
{ok, IsEmptyCursor, StatusCursor};
{error, Reason1} ->
{error, Reason1}
end;
{error, Reason2} ->
{error, Reason2}
end.
%% @private
establish_connection(Config, Type) ->
%% Get the data root directory
case app_helper:get_prop_or_env(data_root, Config, wterl) of
undefined ->
lager:error("Failed to create wterl dir: data_root is not set"),
{error, data_root_unset};
DataRoot ->
ok = filelib:ensure_dir(filename:join(DataRoot, "x")),
%% WT Connection Options:
%% NOTE: LSM auto-checkpoints, so we don't have too.
CheckpointSetting =
case Type =:= "lsm" of
true ->
[];
false ->
app_helper:get_prop_or_env(checkpoint, Config, wterl, [{wait, 10}])
end,
RequestedCacheSize = app_helper:get_prop_or_env(cache_size, Config, wterl),
ConnectionOpts =
orddict:from_list(
[ wterl:config_value(create, Config, true),
wterl:config_value(sync, Config, false),
wterl:config_value(logging, Config, true),
wterl:config_value(transactional, Config, true),
wterl:config_value(session_max, Config, max_sessions(Config)),
wterl:config_value(cache_size, Config, size_cache(RequestedCacheSize)),
wterl:config_value(statistics_log, Config, [{wait, 300}]), % sec
wterl:config_value(verbose, Config, [ "salvage", "verify"
% Note: for some unknown reason, if you add these additional
% verbose flags Erlang SEGV's "size_object: bad tag for 0x80"
% no idea why... yet... you've been warned.
%"block", "shared_cache", "reconcile", "evict", "lsm",
%"fileops", "read", "write", "readserver", "evictserver",
%"hazard", "mutex", "ckpt"
]) ] ++ CheckpointSetting ++ proplists:get_value(wterl, Config, [])), % sec
%% WT Session Options:
SessionOpts = [{isolation, "snapshot"}],
case wterl_conn:open(DataRoot, ConnectionOpts, SessionOpts) of
{ok, Connection} ->
{ok, Connection};
{error, Reason2} ->
lager:error("Failed to establish a WiredTiger connection, wterl backend unable to start: ~p\n", [Reason2]),
{error, Reason2}
end
end.
%% @private
%% Return a function to fold over the buckets on this backend
fold_buckets_fun(FoldBucketsFun) ->
fun(BK, {Acc, LastBucket}) ->
case from_object_key(BK) of
{LastBucket, _} ->
{Acc, LastBucket};
{Bucket, _} ->
{FoldBucketsFun(Bucket, Acc), Bucket};
_ ->
throw({break, Acc})
end
end.
%% @private
%% Return a function to fold over keys on this backend
fold_keys_fun(FoldKeysFun, undefined) ->
%% Fold across everything...
fun(StorageKey, Acc) ->
case from_object_key(StorageKey) of
{Bucket, Key} ->
FoldKeysFun(Bucket, Key, Acc);
_ ->
throw({break, Acc})
end
end;
fold_keys_fun(FoldKeysFun, {bucket, FilterBucket}) ->
%% Fold across a specific bucket...
fun(StorageKey, Acc) ->
case from_object_key(StorageKey) of
{Bucket, Key} when Bucket == FilterBucket ->
FoldKeysFun(Bucket, Key, Acc);
_ ->
throw({break, Acc})
end
end;
fold_keys_fun(FoldKeysFun, {index, FilterBucket, {eq, <<"$bucket">>, _}}) ->
%% 2I exact match query on special $bucket field...
fold_keys_fun(FoldKeysFun, {bucket, FilterBucket});
fold_keys_fun(FoldKeysFun, {index, FilterBucket, {eq, FilterField, FilterTerm}}) ->
%% Rewrite 2I exact match query as a range...
NewQuery = {range, FilterField, FilterTerm, FilterTerm},
fold_keys_fun(FoldKeysFun, {index, FilterBucket, NewQuery});
fold_keys_fun(FoldKeysFun, {index, FilterBucket, {range, <<"$key">>, StartKey, EndKey}}) ->
%% 2I range query on special $key field...
fun(StorageKey, Acc) ->
case from_object_key(StorageKey) of
{Bucket, Key} when FilterBucket == Bucket,
StartKey =< Key,
EndKey >= Key ->
FoldKeysFun(Bucket, Key, Acc);
_ ->
throw({break, Acc})
end
end;
fold_keys_fun(FoldKeysFun, {index, FilterBucket, {range, FilterField, StartTerm, EndTerm}}) ->
%% 2I range query...
fun(StorageKey, Acc) ->
case from_index_key(StorageKey) of
{Bucket, Key, Field, Term} when FilterBucket == Bucket,
FilterField == Field,
StartTerm =< Term,
EndTerm >= Term ->
FoldKeysFun(Bucket, Key, Acc);
_ ->
throw({break, Acc})
end
end;
fold_keys_fun(_FoldKeysFun, Other) ->
throw({unknown_limiter, Other}).
%% @private
%% Return a function to fold over the objects on this backend
fold_objects_fun(FoldObjectsFun, FilterBucket) ->
%% 2I does not support fold objects at this time, so this is much
%% simpler than fold_keys_fun.
fun({StorageKey, Value}, Acc) ->
case from_object_key(StorageKey) of
{Bucket, Key} when FilterBucket == undefined;
Bucket == FilterBucket ->
FoldObjectsFun(Bucket, Key, Value, Acc);
_ ->
throw({break, Acc})
end
end.
to_object_key(Bucket, Key) ->
sext:encode({o, Bucket, Key}).
from_object_key(LKey) ->
case sext:decode(LKey) of
{o, Bucket, Key} ->
{Bucket, Key};
_ ->
undefined
end.
from_index_key(LKey) ->
case sext:decode(LKey) of
{i, Bucket, Field, Term, Key} ->
{Bucket, Key, Field, Term};
_ ->
undefined
end.
%% @private
%% Return all status from wterl statistics cursor
fetch_status(Cursor) ->
{ok, fetch_status(Cursor, wterl:cursor_next_value(Cursor), [])}.
fetch_status(_Cursor, {error, _}, Acc) ->
lists:reverse(Acc);
fetch_status(_Cursor, not_found, Acc) ->
lists:reverse(Acc);
fetch_status(Cursor, {ok, Stat}, Acc) ->
[What,Val|_] = [binary_to_list(B) || B <- binary:split(Stat, [<<0>>], [global])],
fetch_status(Cursor, wterl:cursor_next_value(Cursor), [{What,Val}|Acc]).
size_cache(RequestedSize) ->
Size =
case RequestedSize of
undefined ->
RunningApps = application:which_applications(),
FinalGuess =
case proplists:is_defined(sasl, RunningApps) andalso
proplists:is_defined(os_mon, RunningApps) of
true ->
Memory = memsup:get_system_memory_data(),
TotalRAM = proplists:get_value(system_total_memory, Memory),
FreeRAM = proplists:get_value(free_memory, Memory),
UsedByBeam = proplists:get_value(total, erlang:memory()),
Target = ((TotalRAM - UsedByBeam) div 4),
FirstGuess = (Target - (Target rem (1024 * 1024))),
SecondGuess =
case FirstGuess > FreeRAM of
true -> FreeRAM - (FreeRAM rem (1024 * 1024));
_ -> FirstGuess
end,
case SecondGuess < 1073741824 of %% < 1GB?
true -> "1GB";
false ->
ThirdGuess = SecondGuess div (1024 * 1024),
integer_to_list(ThirdGuess) ++ "MB"
end;
false ->
"1GB"
end,
application:set_env(wterl, cache_size, FinalGuess),
FinalGuess;
Value when is_list(Value) ->
Value;
Value when is_number(Value) ->
integer_to_list(Value)
end,
Size.
%% ===================================================================
%% EUnit tests
%% ===================================================================
-ifdef(TEST).
simple_test_() ->
{ok, CWD} = file:get_cwd(),
rmdir:path(filename:join([CWD, "test/wterl-backend"])), %?assertCmd("rm -rf test/wterl-backend"),
application:set_env(wterl, data_root, "test/wterl-backend"),
temp_riak_kv_backend:standard_test(?MODULE, []).
custom_config_test_() ->
{ok, CWD} = file:get_cwd(),
rmdir:path(filename:join([CWD, "test/wterl-backend"])), %?assertCmd("rm -rf test/wterl-backend"),
application:set_env(wterl, data_root, ""),
temp_riak_kv_backend:standard_test(?MODULE, [{data_root, "test/wterl-backend"}]).
-endif.

View file

@ -0,0 +1,287 @@
%% -------------------------------------------------------------------
%%
%% riak_kv_backend: Riak backend behaviour
%%
%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%%% NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE
%%%
%%% This is a temporary copy of riak_kv_backend, just here to keep
%%% wterl development private for now. When riak_kv_wterl_backend is
%%% moved to riak_kv, delete this file.
%%%
%%% NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE NOTE
-module(temp_riak_kv_backend).
-export([behaviour_info/1]).
-export([callback_after/3]).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-compile(export_all).
-export([standard_test/2]).
-endif.
-type fold_buckets_fun() :: fun((binary(), any()) -> any() | no_return()).
-type fold_keys_fun() :: fun((binary(), binary(), any()) -> any() |
no_return()).
-type fold_objects_fun() :: fun((binary(), binary(), term(), any()) ->
any() |
no_return()).
-export_type([fold_buckets_fun/0,
fold_keys_fun/0,
fold_objects_fun/0]).
-spec behaviour_info(atom()) -> 'undefined' | [{atom(), arity()}].
behaviour_info(callbacks) ->
[
{api_version,0},
{capabilities, 1}, % (State)
{capabilities, 2}, % (Bucket, State)
{start,2}, % (Partition, Config)
{stop,1}, % (State)
{get,3}, % (Bucket, Key, State)
{put,5}, % (Bucket, Key, IndexSpecs, Val, State)
{delete,4}, % (Bucket, Key, IndexSpecs, State)
{drop,1}, % (State)
{fold_buckets,4}, % (FoldBucketsFun, Acc, Opts, State),
% FoldBucketsFun(Bucket, Acc)
{fold_keys,4}, % (FoldKeysFun, Acc, Opts, State),
% FoldKeysFun(Bucket, Key, Acc)
{fold_objects,4}, % (FoldObjectsFun, Acc, Opts, State),
% FoldObjectsFun(Bucket, Key, Object, Acc)
{is_empty,1}, % (State)
{status,1}, % (State)
{callback,3}]; % (Ref, Msg, State) ->
behaviour_info(_Other) ->
undefined.
%% Queue a callback for the backend after Time ms.
-spec callback_after(integer(), reference(), term()) -> reference().
callback_after(Time, Ref, Msg) when is_integer(Time), is_reference(Ref) ->
riak_core_vnode:send_command_after(Time, {backend_callback, Ref, Msg}).
-ifdef(TEST).
standard_test(BackendMod, Config) ->
{spawn,
[
{setup,
fun() -> ?MODULE:setup({BackendMod, Config}) end,
fun ?MODULE:cleanup/1,
fun(X) ->
[?MODULE:basic_store_and_fetch(X),
?MODULE:fold_buckets(X),
?MODULE:fold_keys(X),
?MODULE:delete_object(X),
?MODULE:fold_objects(X),
?MODULE:empty_check(X)
]
end
}]}.
basic_store_and_fetch({Backend, State}) ->
{"basic store and fetch test",
fun() ->
[
?_assertMatch({ok, _},
Backend:put(<<"b1">>, <<"k1">>, [], <<"v1">>, State)),
?_assertMatch({ok, _},
Backend:put(<<"b2">>, <<"k2">>, [], <<"v2">>, State)),
?_assertMatch({ok,<<"v2">>, _},
Backend:get(<<"b2">>, <<"k2">>, State)),
?_assertMatch({error, not_found, _},
Backend:get(<<"b1">>, <<"k3">>, State))
]
end
}.
fold_buckets({Backend, State}) ->
{"bucket folding test",
fun() ->
FoldBucketsFun =
fun(Bucket, Acc) ->
[Bucket | Acc]
end,
?_assertEqual([<<"b1">>, <<"b2">>],
begin
{ok, Buckets1} =
Backend:fold_buckets(FoldBucketsFun,
[],
[],
State),
lists:sort(Buckets1)
end)
end
}.
fold_keys({Backend, State}) ->
{"key folding test",
fun() ->
FoldKeysFun =
fun(Bucket, Key, Acc) ->
[{Bucket, Key} | Acc]
end,
FoldKeysFun1 =
fun(_Bucket, Key, Acc) ->
[Key | Acc]
end,
FoldKeysFun2 =
fun(Bucket, Key, Acc) ->
case Bucket =:= <<"b1">> of
true ->
[Key | Acc];
false ->
Acc
end
end,
FoldKeysFun3 =
fun(Bucket, Key, Acc) ->
case Bucket =:= <<"b1">> of
true ->
Acc;
false ->
[Key | Acc]
end
end,
[
?_assertEqual([{<<"b1">>, <<"k1">>}, {<<"b2">>, <<"k2">>}],
begin
{ok, Keys1} =
Backend:fold_keys(FoldKeysFun,
[],
[],
State),
lists:sort(Keys1)
end),
?_assertEqual({ok, [<<"k1">>]},
Backend:fold_keys(FoldKeysFun1,
[],
[{bucket, <<"b1">>}],
State)),
?_assertEqual([<<"k2">>],
Backend:fold_keys(FoldKeysFun1,
[],
[{bucket, <<"b2">>}],
State)),
?_assertEqual({ok, [<<"k1">>]},
Backend:fold_keys(FoldKeysFun2, [], [], State)),
?_assertEqual({ok, [<<"k1">>]},
Backend:fold_keys(FoldKeysFun2,
[],
[{bucket, <<"b1">>}],
State)),
?_assertEqual({ok, [<<"k2">>]},
Backend:fold_keys(FoldKeysFun3, [], [], State)),
?_assertEqual({ok, []},
Backend:fold_keys(FoldKeysFun3,
[],
[{bucket, <<"b1">>}],
State))
]
end
}.
delete_object({Backend, State}) ->
{"object deletion test",
fun() ->
[
?_assertMatch({ok, _}, Backend:delete(<<"b2">>, <<"k2">>, State)),
?_assertMatch({error, not_found, _},
Backend:get(<<"b2">>, <<"k2">>, State))
]
end
}.
fold_objects({Backend, State}) ->
{"object folding test",
fun() ->
FoldKeysFun =
fun(Bucket, Key, Acc) ->
[{Bucket, Key} | Acc]
end,
FoldObjectsFun =
fun(Bucket, Key, Value, Acc) ->
[{{Bucket, Key}, Value} | Acc]
end,
[
?_assertEqual([{<<"b1">>, <<"k1">>}],
begin
{ok, Keys} =
Backend:fold_keys(FoldKeysFun,
[],
[],
State),
lists:sort(Keys)
end),
?_assertEqual([{{<<"b1">>,<<"k1">>}, <<"v1">>}],
begin
{ok, Objects1} =
Backend:fold_objects(FoldObjectsFun,
[],
[],
State),
lists:sort(Objects1)
end),
?_assertMatch({ok, _},
Backend:put(<<"b3">>, <<"k3">>, [], <<"v3">>, State)),
?_assertEqual([{{<<"b1">>,<<"k1">>},<<"v1">>},
{{<<"b3">>,<<"k3">>},<<"v3">>}],
begin
{ok, Objects} =
Backend:fold_objects(FoldObjectsFun,
[],
[],
State),
lists:sort(Objects)
end)
]
end
}.
empty_check({Backend, State}) ->
{"is_empty test",
fun() ->
[
?_assertEqual(false, Backend:is_empty(State)),
?_assertMatch({ok, _}, Backend:delete(<<"b1">>,<<"k1">>, State)),
?_assertMatch({ok, _}, Backend:delete(<<"b3">>,<<"k3">>, State)),
?_assertEqual(true, Backend:is_empty(State))
]
end
}.
setup({BackendMod, Config}) ->
application:start(lager),
application:start(sasl),
application:start(os_mon),
{ok, S} = BackendMod:start(42, Config),
{BackendMod, S}.
cleanup({BackendMod, S}) ->
ok = BackendMod:stop(S),
application:stop(lager),
application:stop(sasl),
application:stop(os_mon).
-endif. % TEST