From b4f82a388d70a2bafe7dd5e2c9fbdbbacde64351 Mon Sep 17 00:00:00 2001 From: Gregory Burd Date: Sat, 6 Apr 2013 11:05:41 -0400 Subject: [PATCH] WIP-- Compiling, not yet tested/functional -- WIP Changes required to iron out compiler errors, warnings, etc. Code now compiles with clang or gcc. --- c_src/async_nif.h | 67 +++++++++++++--------- c_src/wterl.c | 142 ++++++++++++++++++++++++++-------------------- 2 files changed, 122 insertions(+), 87 deletions(-) diff --git a/c_src/async_nif.h b/c_src/async_nif.h index 9185980..6a9f31c 100644 --- a/c_src/async_nif.h +++ b/c_src/async_nif.h @@ -57,6 +57,12 @@ struct async_nif_state { struct async_nif_worker_entry worker_entries[ASYNC_NIF_MAX_WORKERS]; }; +struct async_nif_worker_info { + struct async_nif_state *async_nif; + struct async_nif_worker_entry *worker; + unsigned int worker_id; +}; + #define ASYNC_NIF_DECL(decl, frame, pre_block, work_block, post_block) \ struct decl ## _args frame; \ static void fn_work_ ## decl (ErlNifEnv *env, ERL_NIF_TERM ref, ErlNifPid *pid, unsigned int worker_id, struct decl ## _args *args) work_block \ @@ -72,7 +78,7 @@ struct async_nif_state { /* argv[0] is a ref used for selective recv */ \ const ERL_NIF_TERM *argv = argv_in + 1; \ argc--; \ - async_nif = (struct async_nif_state*)enif_priv_data(env); \ + struct async_nif_state *async_nif = (struct async_nif_state*)enif_priv_data(env); \ if (async_nif->shutdown) \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \ enif_make_atom(env, "shutdown")); \ @@ -106,18 +112,13 @@ struct async_nif_state { } #define ASYNC_NIF_LOAD() async_nif_load(); -#define ASYNC_NIF_UNLOAD() async_nif_unload(); -//define ASYNC_NIF_RELOAD() -#define ASYNC_NIF_UPGRADE() async_nif_unload(); +#define ASYNC_NIF_UNLOAD(env) async_nif_unload(env); +#define ASYNC_NIF_UPGRADE(env) async_nif_unload(env); #define ASYNC_NIF_RETURN_BADARG() return enif_make_badarg(env); #define ASYNC_NIF_WORK_ENV new_env -#ifndef PULSE_FORCE_USING_PULSE_SEND_HERE #define ASYNC_NIF_REPLY(msg) enif_send(NULL, pid, env, enif_make_tuple2(env, ref, msg)) -#else -#define ASYNC_NIF_REPLY(msg) PULSE_SEND(NULL, pid, env, enif_make_tuple2(env, ref, msg)) -#endif static ERL_NIF_TERM async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req) @@ -125,7 +126,7 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en /* If we're shutting down return an error term and ignore the request. */ if (async_nif->shutdown) { return enif_make_tuple2(req->env, enif_make_atom(req->env, "error"), - enif_make_atom(req->env, "shutdown"))); + enif_make_atom(req->env, "shutdown")); } /* Otherwise, add the request to the work queue. */ @@ -135,21 +136,28 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en enif_mutex_unlock(async_nif->req_mutex); enif_cond_broadcast(async_nif->cnd); - return enif_make_tuple2(env, enif_make_atom(env, "ok"), - enif_make_tuple2(env, enif_make_atom(env, "enqueued"), - enif_make_int(env, async_nif->req_count))); \ + return enif_make_tuple2(req->env, enif_make_atom(req->env, "ok"), + enif_make_tuple2(req->env, enif_make_atom(req->env, "enqueued"), + enif_make_int(req->env, async_nif->req_count))); \ } -static void *async_nif_worker_fn(void *arg) +static void * +async_nif_worker_fn(void *arg) { - struct async_nif_worker_entry *worker = (struct async_nif_worker_entry *)arg; - struct async_nif_req_entry *req = NULL; + struct async_nif_worker_info *wi = (struct async_nif_worker_info *)arg; + struct async_nif_worker_entry *worker = wi->worker; + struct async_nif_state *async_nif = wi->async_nif; + unsigned int worker_id = wi->worker_id; + + free(wi); // Allocated when starting the thread, now no longer needed. /* * Workers are active while there is work on the queue to do and * only in the idle list when they are waiting on new work. */ for(;;) { + struct async_nif_req_entry *req = NULL; + /* Examine the request queue, are there things to be done? */ enif_mutex_lock(async_nif->req_mutex); enif_mutex_lock(async_nif->worker_mutex); @@ -163,7 +171,8 @@ static void *async_nif_worker_fn(void *arg) goto check_again_for_work; } else { /* `req` is our work request and we hold the req_mutex lock. */ - // TODO: do we need this? enif_cond_broadcast(async_nif->cnd); + // TODO: do we need this broadcast? + enif_cond_broadcast(async_nif->cnd); /* Remove this thread from the list of idle threads. */ enif_mutex_lock(async_nif->worker_mutex); @@ -172,12 +181,11 @@ static void *async_nif_worker_fn(void *arg) do { /* Take the request off the queue. */ - STAILQ_REMOVE(&async_nif->reqs, req, async_nif->req_entry, entries); + STAILQ_REMOVE(&async_nif->reqs, req, async_nif_req_entry, entries); async_nif->req_count--; enif_mutex_unlock(async_nif->req_mutex); /* Finally, do the work. */ - unsigned int worker_id = (unsigned int)(worker - worker_entries); req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args); req->fn_post(req->args); enif_free(req->args); @@ -205,7 +213,7 @@ static void *async_nif_worker_fn(void *arg) static void async_nif_unload(ErlNifEnv *env) { unsigned int i; - struct_nif_state *async_nif = (struct async_nif_state*)enif_priv_data(env); + struct async_nif_state *async_nif = (struct async_nif_state*)enif_priv_data(env); /* Signal the worker threads, stop what you're doing and exit. */ enif_mutex_lock(async_nif->req_mutex); @@ -249,23 +257,25 @@ static void async_nif_unload(ErlNifEnv *env) static void * async_nif_load(void) { + static int has_init = 0; int i, num_schedulers; - ErlDrvSysInfo info; + ErlNifSysInfo info; struct async_nif_state *async_nif; /* Don't init more than once. */ - if (async_nif_req_mutex) return 0; + if (has_init) return 0; + else has_init = 1; /* Find out how many schedulers there are. */ - erl_drv_sys_info(&info, sizeof(ErlDrvSysInfo)); - num_schedulers = info->scheduler_threads; + enif_system_info(&info, sizeof(ErlNifSysInfo)); + num_schedulers = info.scheduler_threads; /* Init our portion of priv_data's module-specific state. */ async_nif = malloc(sizeof(struct async_nif_state)); if (!async_nif) return NULL; - STAILQ_INIT(async_nif->reqs); - LIST_INIT(async_nif->workers); + STAILQ_INIT(&(async_nif->reqs)); + LIST_INIT(&(async_nif->workers)); async_nif->shutdown = 0; async_nif->req_mutex = enif_mutex_create(NULL); @@ -287,8 +297,13 @@ async_nif_load(void) num_worker_threads = 1; for (i = 0; i < num_worker_threads; i++) { + struct async_nif_worker_info *wi; + wi = malloc(sizeof(struct async_nif_worker_info)); // TODO: check + wi->async_nif = async_nif; + wi->worker = &async_nif->worker_entries[i]; + wi->worker_id = i; if (enif_thread_create(NULL, &async_nif->worker_entries[i].tid, - &async_nif_worker_fn, (void*)&async_nif->worker_entries[i], NULL) != 0) { + &async_nif_worker_fn, (void*)&wi, NULL) != 0) { async_nif->shutdown = 1; enif_cond_broadcast(async_nif->cnd); enif_mutex_unlock(async_nif->worker_mutex); diff --git a/c_src/wterl.c b/c_src/wterl.c index e2b532f..22d6ccd 100644 --- a/c_src/wterl.c +++ b/c_src/wterl.c @@ -24,10 +24,13 @@ #include "wiredtiger.h" #include "async_nif.h" +#include "khash.h" static ErlNifResourceType *wterl_conn_RESOURCE; static ErlNifResourceType *wterl_cursor_RESOURCE; +KHASH_MAP_INIT_STR(cursors, WT_CURSOR*); + /** * We will have exactly one (1) WterlCtx for each async worker thread. As * requests arrive we will reuse the same WterlConnHandle->contexts[worker_id] @@ -37,19 +40,21 @@ static ErlNifResourceType *wterl_cursor_RESOURCE; * cursors hash table. In practice this means we could have (num_workers * * num_tables) of cursors open which we need to account for when setting * session_max in the configuration of WiredTiger so that it creates enough - * hazard pointers for this extreme - * case. + * hazard pointers for this extreme case. + * + * Note: We don't protect access to this struct with a mutex because it will + * only be accessed by the same worker thread. */ typedef struct { WT_SESSION *session; - /* WiredTiger objects (tables, indexes, etc.) to open cursors. */ - KHASH_MAP_INIT_STR(cursors, (WT_CURSOR*)); + khash_t(cursors) *cursors; } WterlCtx; typedef struct { WT_CONNECTION *conn; const char *session_config; ErlNifMutex *context_mutex; + unsigned int num_contexts; WterlCtx contexts[ASYNC_NIF_MAX_WORKERS]; } WterlConnHandle; @@ -70,9 +75,10 @@ static ERL_NIF_TERM ATOM_NOT_FOUND; * Get the per-worker reusable WT_SESSION for a worker_id. */ static int -__session_for(WterlConnHandle conn_handle, unsigned int worker_id, WT_SESSION **session) +__session_for(WterlConnHandle *conn_handle, unsigned int worker_id, WT_SESSION **session) { - *session = conn_handle->contexts[worker_id]->session; + WterlCtx *ctx = &conn_handle->contexts[worker_id]; + *session = ctx->session; if (*session == NULL) { /* Create a context for this worker thread to reuse. */ WT_CONNECTION *conn = conn_handle->conn; @@ -80,8 +86,7 @@ __session_for(WterlConnHandle conn_handle, unsigned int worker_id, WT_SESSION ** if (rc != 0) return rc; ctx->session = *session; - khash_t(cursors) *h = kh_init(cursors); - ctx->cursors = h; + ctx->cursors = kh_init(cursors); } return 0; } @@ -91,16 +96,23 @@ __session_for(WterlConnHandle conn_handle, unsigned int worker_id, WT_SESSION ** * session. */ static int -__cursor_for(WterlConnHandle conn_handle, void *worker_id, const char *uri, WT_CURSOR **cursor) +__cursor_for(WterlConnHandle *conn_handle, unsigned int worker_id, const char *uri, WT_CURSOR **cursor) { - khash_t(cursors) *h = conn_handle->contexts[worker_id]->cursors; - *cursor = kh_get(cursors, h, uri); - if (*cursor == NULL) { - WT_SESSION *session = conn_handle->contexts[worker_id]->session; + WterlCtx *ctx = &conn_handle->contexts[worker_id]; + khash_t(cursors) *h = ctx->cursors; + khiter_t itr = kh_get(cursors, h, uri); + if (itr != kh_end(h)) { + // key exists in hash table, retrieve it + *cursor = (WT_CURSOR*)kh_value(h, itr); + } else { + // key does not exist in hash table, create and insert one + WT_SESSION *session = conn_handle->contexts[worker_id].session; int rc = session->open_cursor(session, uri, NULL, "overwrite,raw", cursor); if (rc != 0) return rc; - kh_put(cursors, h, uri, *cursor); + int itr_status; + itr = kh_put(cursors, h, uri, &itr_status); + kh_value(h, itr) = *cursor; } return 0; } @@ -118,8 +130,8 @@ __strerror_term(ErlNifEnv* env, int rc) if (rc == WT_NOTFOUND) { return ATOM_NOT_FOUND; } else { - const char *err = enif_make_string(env, wiredtiger_strerror(rc)); - return enif_make_tuple2(env, ATOM_ERROR, err, ERL_NIF_LATIN1)); + return enif_make_tuple2(env, ATOM_ERROR, + enif_make_string(env, wiredtiger_strerror(rc), ERL_NIF_LATIN1)); } } @@ -167,11 +179,10 @@ ASYNC_NIF_DECL( if (rc == 0) { WterlConnHandle *conn_handle = enif_alloc_resource(wterl_conn_RESOURCE, sizeof(WterlConnHandle)); conn_handle->conn = conn; - conn_handle->session_config = (const char *)strndup(config.data, config.size); + conn_handle->session_config = (const char *)strndup((const char *)config.data, config.size); conn_handle->num_contexts = 0; bzero(conn_handle->contexts, sizeof(WterlCtx) * ASYNC_NIF_MAX_WORKERS); conn_handle->context_mutex = enif_mutex_create(NULL); - conn_handle->context_bmi = 0; ERL_NIF_TERM result = enif_make_resource(env, conn_handle); enif_release_resource(conn_handle); // When GC'ed the BEAM calls __resource_conn_dtor() ASYNC_NIF_REPLY(enif_make_tuple2(env, ATOM_OK, result)); @@ -236,7 +247,7 @@ ASYNC_NIF_DECL( { // pre if (!(argc == 3 && - enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle)) && + enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle) && enif_get_string(env, argv[1], args->uri, sizeof args->uri, ERL_NIF_LATIN1) && enif_is_binary(env, argv[2]))) { ASYNC_NIF_RETURN_BADARG(); @@ -259,13 +270,13 @@ ASYNC_NIF_DECL( WT_SESSION *session = NULL; int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); if (rc != 0) { - AYNC_NIF_REPLY(__strerror_term(env, rc)); + ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } - int rc = session->create(session, args->uri, (const char*)config.data); + rc = session->create(session, args->uri, (const char*)config.data); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); - (void)args->session->close(session, NULL); + (void)session->close(session, NULL); }, { // post @@ -290,7 +301,7 @@ ASYNC_NIF_DECL( { // pre if (!(argc == 3 && - enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle)) && + enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle) && enif_get_string(env, argv[1], args->uri, sizeof args->uri, ERL_NIF_LATIN1) && enif_is_binary(env, argv[2]))) { ASYNC_NIF_RETURN_BADARG(); @@ -313,7 +324,7 @@ ASYNC_NIF_DECL( WT_SESSION *session = NULL; int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); if (rc != 0) { - AYNC_NIF_REPLY(__strerror_term(env, rc)); + ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } /* Note: we must first close all cursors referencing this object or this @@ -324,9 +335,9 @@ ASYNC_NIF_DECL( // on this table, restart worker threads, do the drop, remove the condition // variable (read: punt for now, expect a lot of EBUSYs) - rc = args->session->drop(args->session, args->uri, (const char*)config.data); + rc = session->drop(session, args->uri, (const char*)config.data); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); - (void)args->session->close(session, NULL); + (void)session->close(session, NULL); }, { // post @@ -353,7 +364,7 @@ ASYNC_NIF_DECL( { // pre if (!(argc == 4 && - enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle)) && + enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle) && enif_get_string(env, argv[1], args->oldname, sizeof args->oldname, ERL_NIF_LATIN1) && enif_get_string(env, argv[2], args->newname, sizeof args->newname, ERL_NIF_LATIN1) && enif_is_binary(env, argv[3]))) { @@ -377,14 +388,14 @@ ASYNC_NIF_DECL( WT_SESSION *session = NULL; int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); if (rc != 0) { - AYNC_NIF_REPLY(__strerror_term(env, rc)); + ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } /* Note: we must first close all cursors referencing this object or this operation will fail with EBUSY(16) "Device or resource busy". */ // TODO: see drop's note, same goes here. - int rc = session->rename(session, args->oldname, args->newname, (const char*)config.data); + rc = session->rename(session, args->oldname, args->newname, (const char*)config.data); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); (void)session->close(session, NULL); }, @@ -437,11 +448,11 @@ ASYNC_NIF_DECL( WT_SESSION *session = NULL; int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); if (rc != 0) { - AYNC_NIF_REPLY(__strerror_term(env, rc)); + ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } - int rc = session->salvage(session, args->uri, (const char*)config.data); + rc = session->salvage(session, args->uri, (const char*)config.data); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); (void)session->close(session, NULL); }, @@ -462,7 +473,7 @@ ASYNC_NIF_DECL( wterl_checkpoint, { // struct - WterlConnectionHandle* conn_handle; + WterlConnHandle *conn_handle; ERL_NIF_TERM config; }, { // pre @@ -483,12 +494,12 @@ ASYNC_NIF_DECL( return; } WT_SESSION *session = NULL; - rc = __session_for(args->conn_handle, worker_id, &session); + int rc = __session_for(args->conn_handle, worker_id, &session); if (rc != 0) { ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } - int rc = session->checkpoint(session, (const char*)config.data); + rc = session->checkpoint(session, (const char*)config.data); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); }, { // post @@ -552,7 +563,7 @@ ASYNC_NIF_DECL( WT_SESSION *session = NULL; int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); if (rc != 0) { - AYNC_NIF_REPLY(__strerror_term(env, rc)); + ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } @@ -566,7 +577,7 @@ ASYNC_NIF_DECL( rc = session->open_cursor(session, args->uri, NULL, "raw", &start); if (rc != 0) { ASYNC_NIF_REPLY(__strerror_term(env, rc)); - session->close(session); + session->close(session, NULL); return; } WT_ITEM item_start; @@ -585,7 +596,7 @@ ASYNC_NIF_DECL( rc = session->open_cursor(session, args->uri, NULL, "raw", &stop); if (rc != 0) { ASYNC_NIF_REPLY(__strerror_term(env, rc)); - session->close(session); + session->close(session, NULL); return; } WT_ITEM item_stop; @@ -594,7 +605,7 @@ ASYNC_NIF_DECL( stop->set_key(stop, item_stop); } - int rc = session->truncate(session, args->uri, start, stop, (const char*)config.data); + rc = session->truncate(session, args->uri, start, stop, (const char*)config.data); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); }, { // post @@ -610,7 +621,7 @@ ASYNC_NIF_DECL( * argv[2] config string as an Erlang binary */ ASYNC_NIF_DECL( - wterl_session_upgrade, + wterl_upgrade, { // struct WterlConnHandle *conn_handle; @@ -643,11 +654,11 @@ ASYNC_NIF_DECL( WT_SESSION *session = NULL; int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); if (rc != 0) { - AYNC_NIF_REPLY(__strerror_term(env, rc)); + ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } - int rc = session->upgrade(session, args->uri, (const char*)config.data); + rc = session->upgrade(session, args->uri, (const char*)config.data); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); (void)session->close(session, NULL); }, @@ -698,13 +709,13 @@ ASYNC_NIF_DECL( WT_SESSION *session = NULL; int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); if (rc != 0) { - AYNC_NIF_REPLY(__strerror_term(env, rc)); + ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } - int rc = session->verify(session, args->uri, (const char*)config.data); + rc = session->verify(session, args->uri, (const char*)config.data); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); - (void)args->session->close(session, NULL); + (void)session->close(session, NULL); }, { // post @@ -719,7 +730,7 @@ ASYNC_NIF_DECL( * argv[2] key as an Erlang binary */ ASYNC_NIF_DECL( - wterl_session_delete, + wterl_delete, { // struct WterlConnHandle *conn_handle; @@ -746,7 +757,7 @@ ASYNC_NIF_DECL( } WT_SESSION *session = NULL; - rc = __session_for(args->conn_handle, worker_id, &session); + int rc = __session_for(args->conn_handle, worker_id, &session); if (rc != 0) { ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; @@ -807,7 +818,7 @@ ASYNC_NIF_DECL( } WT_SESSION *session = NULL; - rc = __session_for(args->conn_handle, worker_id, &session); + int rc = __session_for(args->conn_handle, worker_id, &session); if (rc != 0) { ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; @@ -875,7 +886,7 @@ ASYNC_NIF_DECL( } args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); args->value = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[3]); - enif_keep_resource((void*)args->session_handle); + enif_keep_resource((void*)args->conn_handle); }, { // work @@ -891,7 +902,7 @@ ASYNC_NIF_DECL( } WT_SESSION *session = NULL; - rc = __session_for(args->conn_handle, worker_id, &session); + int rc = __session_for(args->conn_handle, worker_id, &session); if (rc != 0) { ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; @@ -965,12 +976,13 @@ ASYNC_NIF_DECL( WT_SESSION *session = NULL; int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); if (rc != 0) { - AYNC_NIF_REPLY(__strerror_term(env, rc)); + ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } WT_CURSOR* cursor; - int rc = session->open_cursor(session, args->uri, NULL, args->config ? config.data : "overwrite,raw", &cursor); + char *c = args->config ? (char *)config.data : "overwrite,raw"; + rc = session->open_cursor(session, args->uri, NULL, c, &cursor); if (rc != 0) { ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; @@ -1013,7 +1025,7 @@ ASYNC_NIF_DECL( WT_SESSION* session = args->cursor_handle->session; /* Note: session->close() will cause all open cursors in the session to be closed first, so we don't have explicitly to do that here. */ - int rc = session->close(cursor); + int rc = session->close(session, NULL); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); }, { // post @@ -1539,13 +1551,18 @@ __resource_conn_dtor(ErlNifEnv *env, void *obj) /* Free up the shared sessions and cursors. */ enif_mutex_lock(conn_handle->context_mutex); for (int i = 0; i < conn_handle->num_contexts; i++) { - WterlCtx ctx = conn_handle->contexts[i]; - // TODO: clean up each WterlCtx - // kh_destroy(cursors, ctx->cursors); + WterlCtx *ctx = &conn_handle->contexts[i]; + WT_CURSOR *cursor; + kh_foreach_value(ctx->cursors, cursor, { + cursor->close(cursor); + }); + kh_destroy(cursors, ctx->cursors); + ctx->session->close(ctx->session, NULL); } + bzero(conn_handle->contexts, sizeof(WterlCtx) * ASYNC_NIF_MAX_WORKERS); enif_mutex_unlock(conn_handle->context_mutex); enif_mutex_destroy(conn_handle->context_mutex); - free(conn->session_config); + free((void *)conn_handle->session_config); } /** @@ -1577,19 +1594,22 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info) return *priv_data ? 0 : -1; } -static void on_reload(ErlNifEnv *env, void *priv_data) +static int +on_reload(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info) { - return; // TODO: determine what should be done here, if anything... + return 0; // TODO: determine what should be done here, if anything... } -static void on_unload(ErlNifEnv *env, void *priv_data) +static void +on_unload(ErlNifEnv *env, void *priv_data) { ASYNC_NIF_UNLOAD(env); } -static int on_upgrade(ErlNifEnv* env, void** priv_data, void** old_priv_data, ERL_NIF_TERM load_info) +static int +on_upgrade(ErlNifEnv *env, void **priv_data, void **old_priv_data, ERL_NIF_TERM load_info) { - ASYNC_NIF_UPGRADE(); + ASYNC_NIF_UPGRADE(env); return 0; }