WIP-- Compiling, not yet tested/functional -- WIP

Changes required to iron out compiler errors, warnings, etc.  Code now
compiles with clang or gcc.
This commit is contained in:
Gregory Burd 2013-04-06 11:05:41 -04:00
parent 19268b7c77
commit b4f82a388d
2 changed files with 122 additions and 87 deletions

View file

@ -57,6 +57,12 @@ struct async_nif_state {
struct async_nif_worker_entry worker_entries[ASYNC_NIF_MAX_WORKERS]; struct async_nif_worker_entry worker_entries[ASYNC_NIF_MAX_WORKERS];
}; };
struct async_nif_worker_info {
struct async_nif_state *async_nif;
struct async_nif_worker_entry *worker;
unsigned int worker_id;
};
#define ASYNC_NIF_DECL(decl, frame, pre_block, work_block, post_block) \ #define ASYNC_NIF_DECL(decl, frame, pre_block, work_block, post_block) \
struct decl ## _args frame; \ struct decl ## _args frame; \
static void fn_work_ ## decl (ErlNifEnv *env, ERL_NIF_TERM ref, ErlNifPid *pid, unsigned int worker_id, struct decl ## _args *args) work_block \ static void fn_work_ ## decl (ErlNifEnv *env, ERL_NIF_TERM ref, ErlNifPid *pid, unsigned int worker_id, struct decl ## _args *args) work_block \
@ -72,7 +78,7 @@ struct async_nif_state {
/* argv[0] is a ref used for selective recv */ \ /* argv[0] is a ref used for selective recv */ \
const ERL_NIF_TERM *argv = argv_in + 1; \ const ERL_NIF_TERM *argv = argv_in + 1; \
argc--; \ argc--; \
async_nif = (struct async_nif_state*)enif_priv_data(env); \ struct async_nif_state *async_nif = (struct async_nif_state*)enif_priv_data(env); \
if (async_nif->shutdown) \ if (async_nif->shutdown) \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "shutdown")); \ enif_make_atom(env, "shutdown")); \
@ -106,18 +112,13 @@ struct async_nif_state {
} }
#define ASYNC_NIF_LOAD() async_nif_load(); #define ASYNC_NIF_LOAD() async_nif_load();
#define ASYNC_NIF_UNLOAD() async_nif_unload(); #define ASYNC_NIF_UNLOAD(env) async_nif_unload(env);
//define ASYNC_NIF_RELOAD() #define ASYNC_NIF_UPGRADE(env) async_nif_unload(env);
#define ASYNC_NIF_UPGRADE() async_nif_unload();
#define ASYNC_NIF_RETURN_BADARG() return enif_make_badarg(env); #define ASYNC_NIF_RETURN_BADARG() return enif_make_badarg(env);
#define ASYNC_NIF_WORK_ENV new_env #define ASYNC_NIF_WORK_ENV new_env
#ifndef PULSE_FORCE_USING_PULSE_SEND_HERE
#define ASYNC_NIF_REPLY(msg) enif_send(NULL, pid, env, enif_make_tuple2(env, ref, msg)) #define ASYNC_NIF_REPLY(msg) enif_send(NULL, pid, env, enif_make_tuple2(env, ref, msg))
#else
#define ASYNC_NIF_REPLY(msg) PULSE_SEND(NULL, pid, env, enif_make_tuple2(env, ref, msg))
#endif
static ERL_NIF_TERM static ERL_NIF_TERM
async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req) async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req)
@ -125,7 +126,7 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
/* If we're shutting down return an error term and ignore the request. */ /* If we're shutting down return an error term and ignore the request. */
if (async_nif->shutdown) { if (async_nif->shutdown) {
return enif_make_tuple2(req->env, enif_make_atom(req->env, "error"), return enif_make_tuple2(req->env, enif_make_atom(req->env, "error"),
enif_make_atom(req->env, "shutdown"))); enif_make_atom(req->env, "shutdown"));
} }
/* Otherwise, add the request to the work queue. */ /* Otherwise, add the request to the work queue. */
@ -135,21 +136,28 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
enif_mutex_unlock(async_nif->req_mutex); enif_mutex_unlock(async_nif->req_mutex);
enif_cond_broadcast(async_nif->cnd); enif_cond_broadcast(async_nif->cnd);
return enif_make_tuple2(env, enif_make_atom(env, "ok"), return enif_make_tuple2(req->env, enif_make_atom(req->env, "ok"),
enif_make_tuple2(env, enif_make_atom(env, "enqueued"), enif_make_tuple2(req->env, enif_make_atom(req->env, "enqueued"),
enif_make_int(env, async_nif->req_count))); \ enif_make_int(req->env, async_nif->req_count))); \
} }
static void *async_nif_worker_fn(void *arg) static void *
async_nif_worker_fn(void *arg)
{ {
struct async_nif_worker_entry *worker = (struct async_nif_worker_entry *)arg; struct async_nif_worker_info *wi = (struct async_nif_worker_info *)arg;
struct async_nif_req_entry *req = NULL; struct async_nif_worker_entry *worker = wi->worker;
struct async_nif_state *async_nif = wi->async_nif;
unsigned int worker_id = wi->worker_id;
free(wi); // Allocated when starting the thread, now no longer needed.
/* /*
* Workers are active while there is work on the queue to do and * Workers are active while there is work on the queue to do and
* only in the idle list when they are waiting on new work. * only in the idle list when they are waiting on new work.
*/ */
for(;;) { for(;;) {
struct async_nif_req_entry *req = NULL;
/* Examine the request queue, are there things to be done? */ /* Examine the request queue, are there things to be done? */
enif_mutex_lock(async_nif->req_mutex); enif_mutex_lock(async_nif->req_mutex);
enif_mutex_lock(async_nif->worker_mutex); enif_mutex_lock(async_nif->worker_mutex);
@ -163,7 +171,8 @@ static void *async_nif_worker_fn(void *arg)
goto check_again_for_work; goto check_again_for_work;
} else { } else {
/* `req` is our work request and we hold the req_mutex lock. */ /* `req` is our work request and we hold the req_mutex lock. */
// TODO: do we need this? enif_cond_broadcast(async_nif->cnd); // TODO: do we need this broadcast?
enif_cond_broadcast(async_nif->cnd);
/* Remove this thread from the list of idle threads. */ /* Remove this thread from the list of idle threads. */
enif_mutex_lock(async_nif->worker_mutex); enif_mutex_lock(async_nif->worker_mutex);
@ -172,12 +181,11 @@ static void *async_nif_worker_fn(void *arg)
do { do {
/* Take the request off the queue. */ /* Take the request off the queue. */
STAILQ_REMOVE(&async_nif->reqs, req, async_nif->req_entry, entries); STAILQ_REMOVE(&async_nif->reqs, req, async_nif_req_entry, entries);
async_nif->req_count--; async_nif->req_count--;
enif_mutex_unlock(async_nif->req_mutex); enif_mutex_unlock(async_nif->req_mutex);
/* Finally, do the work. */ /* Finally, do the work. */
unsigned int worker_id = (unsigned int)(worker - worker_entries);
req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args); req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args);
req->fn_post(req->args); req->fn_post(req->args);
enif_free(req->args); enif_free(req->args);
@ -205,7 +213,7 @@ static void *async_nif_worker_fn(void *arg)
static void async_nif_unload(ErlNifEnv *env) static void async_nif_unload(ErlNifEnv *env)
{ {
unsigned int i; unsigned int i;
struct_nif_state *async_nif = (struct async_nif_state*)enif_priv_data(env); struct async_nif_state *async_nif = (struct async_nif_state*)enif_priv_data(env);
/* Signal the worker threads, stop what you're doing and exit. */ /* Signal the worker threads, stop what you're doing and exit. */
enif_mutex_lock(async_nif->req_mutex); enif_mutex_lock(async_nif->req_mutex);
@ -249,23 +257,25 @@ static void async_nif_unload(ErlNifEnv *env)
static void * static void *
async_nif_load(void) async_nif_load(void)
{ {
static int has_init = 0;
int i, num_schedulers; int i, num_schedulers;
ErlDrvSysInfo info; ErlNifSysInfo info;
struct async_nif_state *async_nif; struct async_nif_state *async_nif;
/* Don't init more than once. */ /* Don't init more than once. */
if (async_nif_req_mutex) return 0; if (has_init) return 0;
else has_init = 1;
/* Find out how many schedulers there are. */ /* Find out how many schedulers there are. */
erl_drv_sys_info(&info, sizeof(ErlDrvSysInfo)); enif_system_info(&info, sizeof(ErlNifSysInfo));
num_schedulers = info->scheduler_threads; num_schedulers = info.scheduler_threads;
/* Init our portion of priv_data's module-specific state. */ /* Init our portion of priv_data's module-specific state. */
async_nif = malloc(sizeof(struct async_nif_state)); async_nif = malloc(sizeof(struct async_nif_state));
if (!async_nif) if (!async_nif)
return NULL; return NULL;
STAILQ_INIT(async_nif->reqs); STAILQ_INIT(&(async_nif->reqs));
LIST_INIT(async_nif->workers); LIST_INIT(&(async_nif->workers));
async_nif->shutdown = 0; async_nif->shutdown = 0;
async_nif->req_mutex = enif_mutex_create(NULL); async_nif->req_mutex = enif_mutex_create(NULL);
@ -287,8 +297,13 @@ async_nif_load(void)
num_worker_threads = 1; num_worker_threads = 1;
for (i = 0; i < num_worker_threads; i++) { for (i = 0; i < num_worker_threads; i++) {
struct async_nif_worker_info *wi;
wi = malloc(sizeof(struct async_nif_worker_info)); // TODO: check
wi->async_nif = async_nif;
wi->worker = &async_nif->worker_entries[i];
wi->worker_id = i;
if (enif_thread_create(NULL, &async_nif->worker_entries[i].tid, if (enif_thread_create(NULL, &async_nif->worker_entries[i].tid,
&async_nif_worker_fn, (void*)&async_nif->worker_entries[i], NULL) != 0) { &async_nif_worker_fn, (void*)&wi, NULL) != 0) {
async_nif->shutdown = 1; async_nif->shutdown = 1;
enif_cond_broadcast(async_nif->cnd); enif_cond_broadcast(async_nif->cnd);
enif_mutex_unlock(async_nif->worker_mutex); enif_mutex_unlock(async_nif->worker_mutex);

View file

@ -24,10 +24,13 @@
#include "wiredtiger.h" #include "wiredtiger.h"
#include "async_nif.h" #include "async_nif.h"
#include "khash.h"
static ErlNifResourceType *wterl_conn_RESOURCE; static ErlNifResourceType *wterl_conn_RESOURCE;
static ErlNifResourceType *wterl_cursor_RESOURCE; static ErlNifResourceType *wterl_cursor_RESOURCE;
KHASH_MAP_INIT_STR(cursors, WT_CURSOR*);
/** /**
* We will have exactly one (1) WterlCtx for each async worker thread. As * We will have exactly one (1) WterlCtx for each async worker thread. As
* requests arrive we will reuse the same WterlConnHandle->contexts[worker_id] * requests arrive we will reuse the same WterlConnHandle->contexts[worker_id]
@ -37,19 +40,21 @@ static ErlNifResourceType *wterl_cursor_RESOURCE;
* cursors hash table. In practice this means we could have (num_workers * cursors hash table. In practice this means we could have (num_workers
* * num_tables) of cursors open which we need to account for when setting * * num_tables) of cursors open which we need to account for when setting
* session_max in the configuration of WiredTiger so that it creates enough * session_max in the configuration of WiredTiger so that it creates enough
* hazard pointers for this extreme * hazard pointers for this extreme case.
* case. *
* Note: We don't protect access to this struct with a mutex because it will
* only be accessed by the same worker thread.
*/ */
typedef struct { typedef struct {
WT_SESSION *session; WT_SESSION *session;
/* WiredTiger objects (tables, indexes, etc.) to open cursors. */ khash_t(cursors) *cursors;
KHASH_MAP_INIT_STR(cursors, (WT_CURSOR*));
} WterlCtx; } WterlCtx;
typedef struct { typedef struct {
WT_CONNECTION *conn; WT_CONNECTION *conn;
const char *session_config; const char *session_config;
ErlNifMutex *context_mutex; ErlNifMutex *context_mutex;
unsigned int num_contexts;
WterlCtx contexts[ASYNC_NIF_MAX_WORKERS]; WterlCtx contexts[ASYNC_NIF_MAX_WORKERS];
} WterlConnHandle; } WterlConnHandle;
@ -70,9 +75,10 @@ static ERL_NIF_TERM ATOM_NOT_FOUND;
* Get the per-worker reusable WT_SESSION for a worker_id. * Get the per-worker reusable WT_SESSION for a worker_id.
*/ */
static int static int
__session_for(WterlConnHandle conn_handle, unsigned int worker_id, WT_SESSION **session) __session_for(WterlConnHandle *conn_handle, unsigned int worker_id, WT_SESSION **session)
{ {
*session = conn_handle->contexts[worker_id]->session; WterlCtx *ctx = &conn_handle->contexts[worker_id];
*session = ctx->session;
if (*session == NULL) { if (*session == NULL) {
/* Create a context for this worker thread to reuse. */ /* Create a context for this worker thread to reuse. */
WT_CONNECTION *conn = conn_handle->conn; WT_CONNECTION *conn = conn_handle->conn;
@ -80,8 +86,7 @@ __session_for(WterlConnHandle conn_handle, unsigned int worker_id, WT_SESSION **
if (rc != 0) if (rc != 0)
return rc; return rc;
ctx->session = *session; ctx->session = *session;
khash_t(cursors) *h = kh_init(cursors); ctx->cursors = kh_init(cursors);
ctx->cursors = h;
} }
return 0; return 0;
} }
@ -91,16 +96,23 @@ __session_for(WterlConnHandle conn_handle, unsigned int worker_id, WT_SESSION **
* session. * session.
*/ */
static int static int
__cursor_for(WterlConnHandle conn_handle, void *worker_id, const char *uri, WT_CURSOR **cursor) __cursor_for(WterlConnHandle *conn_handle, unsigned int worker_id, const char *uri, WT_CURSOR **cursor)
{ {
khash_t(cursors) *h = conn_handle->contexts[worker_id]->cursors; WterlCtx *ctx = &conn_handle->contexts[worker_id];
*cursor = kh_get(cursors, h, uri); khash_t(cursors) *h = ctx->cursors;
if (*cursor == NULL) { khiter_t itr = kh_get(cursors, h, uri);
WT_SESSION *session = conn_handle->contexts[worker_id]->session; if (itr != kh_end(h)) {
// key exists in hash table, retrieve it
*cursor = (WT_CURSOR*)kh_value(h, itr);
} else {
// key does not exist in hash table, create and insert one
WT_SESSION *session = conn_handle->contexts[worker_id].session;
int rc = session->open_cursor(session, uri, NULL, "overwrite,raw", cursor); int rc = session->open_cursor(session, uri, NULL, "overwrite,raw", cursor);
if (rc != 0) if (rc != 0)
return rc; return rc;
kh_put(cursors, h, uri, *cursor); int itr_status;
itr = kh_put(cursors, h, uri, &itr_status);
kh_value(h, itr) = *cursor;
} }
return 0; return 0;
} }
@ -118,8 +130,8 @@ __strerror_term(ErlNifEnv* env, int rc)
if (rc == WT_NOTFOUND) { if (rc == WT_NOTFOUND) {
return ATOM_NOT_FOUND; return ATOM_NOT_FOUND;
} else { } else {
const char *err = enif_make_string(env, wiredtiger_strerror(rc)); return enif_make_tuple2(env, ATOM_ERROR,
return enif_make_tuple2(env, ATOM_ERROR, err, ERL_NIF_LATIN1)); enif_make_string(env, wiredtiger_strerror(rc), ERL_NIF_LATIN1));
} }
} }
@ -167,11 +179,10 @@ ASYNC_NIF_DECL(
if (rc == 0) { if (rc == 0) {
WterlConnHandle *conn_handle = enif_alloc_resource(wterl_conn_RESOURCE, sizeof(WterlConnHandle)); WterlConnHandle *conn_handle = enif_alloc_resource(wterl_conn_RESOURCE, sizeof(WterlConnHandle));
conn_handle->conn = conn; conn_handle->conn = conn;
conn_handle->session_config = (const char *)strndup(config.data, config.size); conn_handle->session_config = (const char *)strndup((const char *)config.data, config.size);
conn_handle->num_contexts = 0; conn_handle->num_contexts = 0;
bzero(conn_handle->contexts, sizeof(WterlCtx) * ASYNC_NIF_MAX_WORKERS); bzero(conn_handle->contexts, sizeof(WterlCtx) * ASYNC_NIF_MAX_WORKERS);
conn_handle->context_mutex = enif_mutex_create(NULL); conn_handle->context_mutex = enif_mutex_create(NULL);
conn_handle->context_bmi = 0;
ERL_NIF_TERM result = enif_make_resource(env, conn_handle); ERL_NIF_TERM result = enif_make_resource(env, conn_handle);
enif_release_resource(conn_handle); // When GC'ed the BEAM calls __resource_conn_dtor() enif_release_resource(conn_handle); // When GC'ed the BEAM calls __resource_conn_dtor()
ASYNC_NIF_REPLY(enif_make_tuple2(env, ATOM_OK, result)); ASYNC_NIF_REPLY(enif_make_tuple2(env, ATOM_OK, result));
@ -236,7 +247,7 @@ ASYNC_NIF_DECL(
{ // pre { // pre
if (!(argc == 3 && if (!(argc == 3 &&
enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle)) && enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle) &&
enif_get_string(env, argv[1], args->uri, sizeof args->uri, ERL_NIF_LATIN1) && enif_get_string(env, argv[1], args->uri, sizeof args->uri, ERL_NIF_LATIN1) &&
enif_is_binary(env, argv[2]))) { enif_is_binary(env, argv[2]))) {
ASYNC_NIF_RETURN_BADARG(); ASYNC_NIF_RETURN_BADARG();
@ -259,13 +270,13 @@ ASYNC_NIF_DECL(
WT_SESSION *session = NULL; WT_SESSION *session = NULL;
int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session);
if (rc != 0) { if (rc != 0) {
AYNC_NIF_REPLY(__strerror_term(env, rc)); ASYNC_NIF_REPLY(__strerror_term(env, rc));
return; return;
} }
int rc = session->create(session, args->uri, (const char*)config.data); rc = session->create(session, args->uri, (const char*)config.data);
ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc));
(void)args->session->close(session, NULL); (void)session->close(session, NULL);
}, },
{ // post { // post
@ -290,7 +301,7 @@ ASYNC_NIF_DECL(
{ // pre { // pre
if (!(argc == 3 && if (!(argc == 3 &&
enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle)) && enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle) &&
enif_get_string(env, argv[1], args->uri, sizeof args->uri, ERL_NIF_LATIN1) && enif_get_string(env, argv[1], args->uri, sizeof args->uri, ERL_NIF_LATIN1) &&
enif_is_binary(env, argv[2]))) { enif_is_binary(env, argv[2]))) {
ASYNC_NIF_RETURN_BADARG(); ASYNC_NIF_RETURN_BADARG();
@ -313,7 +324,7 @@ ASYNC_NIF_DECL(
WT_SESSION *session = NULL; WT_SESSION *session = NULL;
int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session);
if (rc != 0) { if (rc != 0) {
AYNC_NIF_REPLY(__strerror_term(env, rc)); ASYNC_NIF_REPLY(__strerror_term(env, rc));
return; return;
} }
/* Note: we must first close all cursors referencing this object or this /* Note: we must first close all cursors referencing this object or this
@ -324,9 +335,9 @@ ASYNC_NIF_DECL(
// on this table, restart worker threads, do the drop, remove the condition // on this table, restart worker threads, do the drop, remove the condition
// variable (read: punt for now, expect a lot of EBUSYs) // variable (read: punt for now, expect a lot of EBUSYs)
rc = args->session->drop(args->session, args->uri, (const char*)config.data); rc = session->drop(session, args->uri, (const char*)config.data);
ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc));
(void)args->session->close(session, NULL); (void)session->close(session, NULL);
}, },
{ // post { // post
@ -353,7 +364,7 @@ ASYNC_NIF_DECL(
{ // pre { // pre
if (!(argc == 4 && if (!(argc == 4 &&
enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle)) && enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle) &&
enif_get_string(env, argv[1], args->oldname, sizeof args->oldname, ERL_NIF_LATIN1) && enif_get_string(env, argv[1], args->oldname, sizeof args->oldname, ERL_NIF_LATIN1) &&
enif_get_string(env, argv[2], args->newname, sizeof args->newname, ERL_NIF_LATIN1) && enif_get_string(env, argv[2], args->newname, sizeof args->newname, ERL_NIF_LATIN1) &&
enif_is_binary(env, argv[3]))) { enif_is_binary(env, argv[3]))) {
@ -377,14 +388,14 @@ ASYNC_NIF_DECL(
WT_SESSION *session = NULL; WT_SESSION *session = NULL;
int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session);
if (rc != 0) { if (rc != 0) {
AYNC_NIF_REPLY(__strerror_term(env, rc)); ASYNC_NIF_REPLY(__strerror_term(env, rc));
return; return;
} }
/* Note: we must first close all cursors referencing this object or this /* Note: we must first close all cursors referencing this object or this
operation will fail with EBUSY(16) "Device or resource busy". */ operation will fail with EBUSY(16) "Device or resource busy". */
// TODO: see drop's note, same goes here. // TODO: see drop's note, same goes here.
int rc = session->rename(session, args->oldname, args->newname, (const char*)config.data); rc = session->rename(session, args->oldname, args->newname, (const char*)config.data);
ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc));
(void)session->close(session, NULL); (void)session->close(session, NULL);
}, },
@ -437,11 +448,11 @@ ASYNC_NIF_DECL(
WT_SESSION *session = NULL; WT_SESSION *session = NULL;
int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session);
if (rc != 0) { if (rc != 0) {
AYNC_NIF_REPLY(__strerror_term(env, rc)); ASYNC_NIF_REPLY(__strerror_term(env, rc));
return; return;
} }
int rc = session->salvage(session, args->uri, (const char*)config.data); rc = session->salvage(session, args->uri, (const char*)config.data);
ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc));
(void)session->close(session, NULL); (void)session->close(session, NULL);
}, },
@ -462,7 +473,7 @@ ASYNC_NIF_DECL(
wterl_checkpoint, wterl_checkpoint,
{ // struct { // struct
WterlConnectionHandle* conn_handle; WterlConnHandle *conn_handle;
ERL_NIF_TERM config; ERL_NIF_TERM config;
}, },
{ // pre { // pre
@ -483,12 +494,12 @@ ASYNC_NIF_DECL(
return; return;
} }
WT_SESSION *session = NULL; WT_SESSION *session = NULL;
rc = __session_for(args->conn_handle, worker_id, &session); int rc = __session_for(args->conn_handle, worker_id, &session);
if (rc != 0) { if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc)); ASYNC_NIF_REPLY(__strerror_term(env, rc));
return; return;
} }
int rc = session->checkpoint(session, (const char*)config.data); rc = session->checkpoint(session, (const char*)config.data);
ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc));
}, },
{ // post { // post
@ -552,7 +563,7 @@ ASYNC_NIF_DECL(
WT_SESSION *session = NULL; WT_SESSION *session = NULL;
int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session);
if (rc != 0) { if (rc != 0) {
AYNC_NIF_REPLY(__strerror_term(env, rc)); ASYNC_NIF_REPLY(__strerror_term(env, rc));
return; return;
} }
@ -566,7 +577,7 @@ ASYNC_NIF_DECL(
rc = session->open_cursor(session, args->uri, NULL, "raw", &start); rc = session->open_cursor(session, args->uri, NULL, "raw", &start);
if (rc != 0) { if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc)); ASYNC_NIF_REPLY(__strerror_term(env, rc));
session->close(session); session->close(session, NULL);
return; return;
} }
WT_ITEM item_start; WT_ITEM item_start;
@ -585,7 +596,7 @@ ASYNC_NIF_DECL(
rc = session->open_cursor(session, args->uri, NULL, "raw", &stop); rc = session->open_cursor(session, args->uri, NULL, "raw", &stop);
if (rc != 0) { if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc)); ASYNC_NIF_REPLY(__strerror_term(env, rc));
session->close(session); session->close(session, NULL);
return; return;
} }
WT_ITEM item_stop; WT_ITEM item_stop;
@ -594,7 +605,7 @@ ASYNC_NIF_DECL(
stop->set_key(stop, item_stop); stop->set_key(stop, item_stop);
} }
int rc = session->truncate(session, args->uri, start, stop, (const char*)config.data); rc = session->truncate(session, args->uri, start, stop, (const char*)config.data);
ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc));
}, },
{ // post { // post
@ -610,7 +621,7 @@ ASYNC_NIF_DECL(
* argv[2] config string as an Erlang binary * argv[2] config string as an Erlang binary
*/ */
ASYNC_NIF_DECL( ASYNC_NIF_DECL(
wterl_session_upgrade, wterl_upgrade,
{ // struct { // struct
WterlConnHandle *conn_handle; WterlConnHandle *conn_handle;
@ -643,11 +654,11 @@ ASYNC_NIF_DECL(
WT_SESSION *session = NULL; WT_SESSION *session = NULL;
int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session);
if (rc != 0) { if (rc != 0) {
AYNC_NIF_REPLY(__strerror_term(env, rc)); ASYNC_NIF_REPLY(__strerror_term(env, rc));
return; return;
} }
int rc = session->upgrade(session, args->uri, (const char*)config.data); rc = session->upgrade(session, args->uri, (const char*)config.data);
ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc));
(void)session->close(session, NULL); (void)session->close(session, NULL);
}, },
@ -698,13 +709,13 @@ ASYNC_NIF_DECL(
WT_SESSION *session = NULL; WT_SESSION *session = NULL;
int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session);
if (rc != 0) { if (rc != 0) {
AYNC_NIF_REPLY(__strerror_term(env, rc)); ASYNC_NIF_REPLY(__strerror_term(env, rc));
return; return;
} }
int rc = session->verify(session, args->uri, (const char*)config.data); rc = session->verify(session, args->uri, (const char*)config.data);
ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc));
(void)args->session->close(session, NULL); (void)session->close(session, NULL);
}, },
{ // post { // post
@ -719,7 +730,7 @@ ASYNC_NIF_DECL(
* argv[2] key as an Erlang binary * argv[2] key as an Erlang binary
*/ */
ASYNC_NIF_DECL( ASYNC_NIF_DECL(
wterl_session_delete, wterl_delete,
{ // struct { // struct
WterlConnHandle *conn_handle; WterlConnHandle *conn_handle;
@ -746,7 +757,7 @@ ASYNC_NIF_DECL(
} }
WT_SESSION *session = NULL; WT_SESSION *session = NULL;
rc = __session_for(args->conn_handle, worker_id, &session); int rc = __session_for(args->conn_handle, worker_id, &session);
if (rc != 0) { if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc)); ASYNC_NIF_REPLY(__strerror_term(env, rc));
return; return;
@ -807,7 +818,7 @@ ASYNC_NIF_DECL(
} }
WT_SESSION *session = NULL; WT_SESSION *session = NULL;
rc = __session_for(args->conn_handle, worker_id, &session); int rc = __session_for(args->conn_handle, worker_id, &session);
if (rc != 0) { if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc)); ASYNC_NIF_REPLY(__strerror_term(env, rc));
return; return;
@ -875,7 +886,7 @@ ASYNC_NIF_DECL(
} }
args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]);
args->value = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[3]); args->value = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[3]);
enif_keep_resource((void*)args->session_handle); enif_keep_resource((void*)args->conn_handle);
}, },
{ // work { // work
@ -891,7 +902,7 @@ ASYNC_NIF_DECL(
} }
WT_SESSION *session = NULL; WT_SESSION *session = NULL;
rc = __session_for(args->conn_handle, worker_id, &session); int rc = __session_for(args->conn_handle, worker_id, &session);
if (rc != 0) { if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc)); ASYNC_NIF_REPLY(__strerror_term(env, rc));
return; return;
@ -965,12 +976,13 @@ ASYNC_NIF_DECL(
WT_SESSION *session = NULL; WT_SESSION *session = NULL;
int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session);
if (rc != 0) { if (rc != 0) {
AYNC_NIF_REPLY(__strerror_term(env, rc)); ASYNC_NIF_REPLY(__strerror_term(env, rc));
return; return;
} }
WT_CURSOR* cursor; WT_CURSOR* cursor;
int rc = session->open_cursor(session, args->uri, NULL, args->config ? config.data : "overwrite,raw", &cursor); char *c = args->config ? (char *)config.data : "overwrite,raw";
rc = session->open_cursor(session, args->uri, NULL, c, &cursor);
if (rc != 0) { if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc)); ASYNC_NIF_REPLY(__strerror_term(env, rc));
return; return;
@ -1013,7 +1025,7 @@ ASYNC_NIF_DECL(
WT_SESSION* session = args->cursor_handle->session; WT_SESSION* session = args->cursor_handle->session;
/* Note: session->close() will cause all open cursors in the session to be /* Note: session->close() will cause all open cursors in the session to be
closed first, so we don't have explicitly to do that here. */ closed first, so we don't have explicitly to do that here. */
int rc = session->close(cursor); int rc = session->close(session, NULL);
ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc));
}, },
{ // post { // post
@ -1539,13 +1551,18 @@ __resource_conn_dtor(ErlNifEnv *env, void *obj)
/* Free up the shared sessions and cursors. */ /* Free up the shared sessions and cursors. */
enif_mutex_lock(conn_handle->context_mutex); enif_mutex_lock(conn_handle->context_mutex);
for (int i = 0; i < conn_handle->num_contexts; i++) { for (int i = 0; i < conn_handle->num_contexts; i++) {
WterlCtx ctx = conn_handle->contexts[i]; WterlCtx *ctx = &conn_handle->contexts[i];
// TODO: clean up each WterlCtx WT_CURSOR *cursor;
// kh_destroy(cursors, ctx->cursors); kh_foreach_value(ctx->cursors, cursor, {
cursor->close(cursor);
});
kh_destroy(cursors, ctx->cursors);
ctx->session->close(ctx->session, NULL);
} }
bzero(conn_handle->contexts, sizeof(WterlCtx) * ASYNC_NIF_MAX_WORKERS);
enif_mutex_unlock(conn_handle->context_mutex); enif_mutex_unlock(conn_handle->context_mutex);
enif_mutex_destroy(conn_handle->context_mutex); enif_mutex_destroy(conn_handle->context_mutex);
free(conn->session_config); free((void *)conn_handle->session_config);
} }
/** /**
@ -1577,19 +1594,22 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info)
return *priv_data ? 0 : -1; return *priv_data ? 0 : -1;
} }
static void on_reload(ErlNifEnv *env, void *priv_data) static int
on_reload(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info)
{ {
return; // TODO: determine what should be done here, if anything... return 0; // TODO: determine what should be done here, if anything...
} }
static void on_unload(ErlNifEnv *env, void *priv_data) static void
on_unload(ErlNifEnv *env, void *priv_data)
{ {
ASYNC_NIF_UNLOAD(env); ASYNC_NIF_UNLOAD(env);
} }
static int on_upgrade(ErlNifEnv* env, void** priv_data, void** old_priv_data, ERL_NIF_TERM load_info) static int
on_upgrade(ErlNifEnv *env, void **priv_data, void **old_priv_data, ERL_NIF_TERM load_info)
{ {
ASYNC_NIF_UPGRADE(); ASYNC_NIF_UPGRADE(env);
return 0; return 0;
} }