diff --git a/c_src/async_nif.h b/c_src/async_nif.h index 35b7898..026a38f 100644 --- a/c_src/async_nif.h +++ b/c_src/async_nif.h @@ -38,45 +38,37 @@ extern "C" { #define DECL_FIFO_QUEUE(name, type) \ struct fifo_q__ ## name { \ unsigned int h, t, s; \ - type **items; \ + type *items[]; \ }; \ static struct fifo_q__ ## name *fifo_q_ ## name ## _new(unsigned int n) { \ - type **items = enif_alloc((n+1) * sizeof(type *)); \ - if (!items) return 0; \ - memset(items, 0, (n+1) * sizeof(type *)); \ - struct fifo_q__ ## name q = {0,0,n+1,items}; \ - struct fifo_q__ ## name *qptr = enif_alloc(sizeof(struct fifo_q__ ## name)); \ - if (!qptr) return 0; \ - memset(qptr, 0, sizeof(struct fifo_q__ ## name)); \ - *qptr = q; \ - return qptr; \ + int sz = sizeof(struct fifo_q__ ## name) + ((n+1) * sizeof(type *));\ + struct fifo_q__ ## name *q = enif_alloc(sz); \ + if (!q) \ + return 0; \ + memset(q, 0, sz); \ + q->s = n + 1; \ + return q; \ } \ - static int fifo_q_ ## name ## _put(struct fifo_q__ ## name *q, type *item) { \ - q->items[q->h] = item; \ + static type *fifo_q_ ## name ## _put(struct fifo_q__ ## name *q, type *n) { \ + q->items[q->h] = n; \ q->h = (q->h + 1) % q->s; \ - return 0; \ - } \ - static unsigned int fifo_q_ ## name ## _next_index(struct fifo_q__ ## name *q) { \ - return (q->h + 1) % q->s; \ + return n; \ } \ static type *fifo_q_ ## name ## _get(struct fifo_q__ ## name *q) { \ - type *r = 0; \ - r = q->items[q->t]; \ + type *n = q->items[q->t]; \ q->items[q->t] = 0; \ q->t = (q->t + 1) % q->s; \ - return r; \ + return n; \ } \ static void fifo_q_ ## name ## _free(struct fifo_q__ ## name *q) { \ - memset(q->items, 0, q->s * sizeof(type *)); \ - memset(q, 0, sizeof(struct fifo_q__ ## name)); \ - enif_free(q->items); \ + memset(q, 0, sizeof(*q)); \ enif_free(q); \ } \ static unsigned int fifo_q_ ## name ## _size(struct fifo_q__ ## name *q) { \ return (q->h - q->t + q->s) % q->s; \ } \ static unsigned int fifo_q_ ## name ## _capacity(struct fifo_q__ ## name *q) { \ - return q->s; \ + return q->s - 1; \ } \ static int fifo_q_ ## name ## _empty(struct fifo_q__ ## name *q) { \ return (q->t == q->h); \ @@ -84,10 +76,8 @@ extern "C" { #define fifo_q_new(name, size) fifo_q_ ## name ## _new(size) #define fifo_q_free(name, queue) fifo_q_ ## name ## _free(queue) - #define fifo_q_get(name, queue) fifo_q_ ## name ## _get(queue) #define fifo_q_put(name, queue, item) fifo_q_ ## name ## _put(queue, item) -#define fifo_q_next_index(name, queue) fifo_q_ ## name ## _next_index(queue) #define fifo_q_size(name, queue) fifo_q_ ## name ## _size(queue) #define fifo_q_capacity(name, queue) fifo_q_ ## name ## _capacity(queue) #define fifo_q_empty(name, queue) fifo_q_ ## name ## _empty(queue) @@ -112,7 +102,6 @@ struct async_nif_work_queue { ErlNifMutex *reqs_mutex; ErlNifCond *reqs_cnd; FIFO_QUEUE_TYPE(reqs) reqs; - struct async_nif_req_entry items[ASYNC_NIF_WORKER_QUEUE_SIZE]; }; struct async_nif_worker_entry { @@ -141,6 +130,7 @@ struct async_nif_state { struct decl ## _args on_stack_args; \ struct decl ## _args *args = &on_stack_args; \ struct decl ## _args *copy_of_args; \ + struct async_nif_req_entry *req; \ ErlNifEnv *new_env = NULL; \ /* argv[0] is a ref used for selective recv */ \ const ERL_NIF_TERM *argv = argv_in + 1; \ @@ -154,6 +144,13 @@ struct async_nif_state { enif_make_atom(env, "enomem")); \ } \ do pre_block while(0); \ + req = (struct async_nif_req_entry*)enif_alloc(sizeof(struct async_nif_req_entry)); \ + if (!req) { \ + fn_post_ ## decl (args); \ + enif_free_env(new_env); \ + return enif_make_tuple2(env, enif_make_atom(env, "error"), \ + enif_make_atom(env, "enomem")); \ + } \ copy_of_args = (struct decl ## _args *)enif_alloc(sizeof(struct decl ## _args)); \ if (!copy_of_args) { \ fn_post_ ## decl (args); \ @@ -162,15 +159,13 @@ struct async_nif_state { enif_make_atom(env, "enomem")); \ } \ memcpy(copy_of_args, args, sizeof(struct decl ## _args)); \ - ErlNifPid req_pid; \ - enif_self(env, &req_pid); \ - return async_nif_enqueue_req(async_nif, \ - (void*)copy_of_args, \ - new_env, \ - enif_make_copy(new_env, argv_in[0]), \ - req_pid, \ - (void (*)(ErlNifEnv *, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *))fn_work_ ## decl , \ - (void (*)(void *))fn_post_ ## decl); \ + req->env = new_env; \ + req->ref = enif_make_copy(new_env, argv_in[0]); \ + enif_self(env, &req->pid); \ + req->args = (void*)copy_of_args; \ + req->fn_work = (void (*)(ErlNifEnv *, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *))fn_work_ ## decl ; \ + req->fn_post = (void (*)(void *))fn_post_ ## decl; \ + return async_nif_enqueue_req(async_nif, req); \ } #define ASYNC_NIF_INIT(name) \ @@ -208,19 +203,16 @@ struct async_nif_state { /** */ static ERL_NIF_TERM -async_nif_enqueue_req(struct async_nif_state* async_nif, - void *args, - ErlNifEnv *env, - ERL_NIF_TERM ref, - ErlNifPid pid, - void (*fn_work)(ErlNifEnv *, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *), - void (*fn_post)(void *)) +async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req) { /* If we're shutting down return an error term and ignore the request. */ if (async_nif->shutdown) { - enif_free(args); // args is an enif_alloc'ed copy of the args frame - return enif_make_tuple2(env, enif_make_atom(env, "error"), - enif_make_atom(env, "shutdown")); + ERL_NIF_TERM reply = enif_make_tuple2(req->env, enif_make_atom(req->env, "error"), + enif_make_atom(req->env, "shutdown")); + enif_free(req->args); + enif_free_env(req->env); + enif_free(req); + return reply; } unsigned int qid = async_nif->next_q; // Keep a local to avoid the race. @@ -237,14 +229,8 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, /* Otherwise, add the request to the work queue. */ enif_mutex_lock(q->reqs_mutex); - struct async_nif_req_entry *req = &(q->items[fifo_q_next_index(reqs, q->reqs)]); - req->ref = ref; - req->env = env; - memcpy(&req->pid, &pid, sizeof(ErlNifPid)); - req->args = args; - req->fn_work = fn_work; - req->fn_post = fn_post; fifo_q_put(reqs, q->reqs, req); + /* Build the term before releasing the lock so as not to race on the use of the req pointer (which will soon become invalid). */ ERL_NIF_TERM reply = enif_make_tuple2(req->env, enif_make_atom(req->env, "ok"), @@ -293,6 +279,7 @@ async_nif_worker_fn(void *arg) req->fn_post(req->args); enif_free(req->args); enif_free_env(req->env); + enif_free(req); /* Continue working if more requests are in the queue, otherwise wait for new work to arrive. */ @@ -343,6 +330,8 @@ async_nif_unload(ErlNifEnv *env) enif_make_atom(req->env, "shutdown"))); req->fn_post(req->args); enif_free(req->args); + enif_free_env(req->env); + enif_free(req); }); fifo_q_free(reqs, q->reqs); } diff --git a/c_src/wterl.c b/c_src/wterl.c index c81a76c..7e93255 100644 --- a/c_src/wterl.c +++ b/c_src/wterl.c @@ -1201,7 +1201,6 @@ ASYNC_NIF_DECL( /* We create a separate session here to ensure that operations are thread safe. */ WT_CONNECTION *conn = args->conn_handle->conn; WT_SESSION *session = NULL; - //dprint("cursor open: %s", (char *)args->conn_handle->session_config); int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); if (rc != 0) { ASYNC_NIF_REPLY(__strerror_term(env, rc)); @@ -1578,26 +1577,24 @@ ASYNC_NIF_DECL( return; } WT_ITEM item_key; - int exact; + int exact = 0; item_key.data = key.data; item_key.size = key.size; cursor->set_key(cursor, &item_key); int rc = cursor->search_near(cursor, &exact); - ERL_NIF_TERM reply; if (rc == 0) { if (exact == 0) { /* an exact match */ - reply = enif_make_tuple2(env, ATOM_OK, enif_make_atom(env, "match")); + ASYNC_NIF_REPLY(enif_make_tuple2(env, ATOM_OK, enif_make_atom(env, "match"))); } else if (exact < 0) { /* cursor now positioned at the next smaller key */ - reply = enif_make_tuple2(env, ATOM_OK, enif_make_atom(env, "lt")); + ASYNC_NIF_REPLY(enif_make_tuple2(env, ATOM_OK, enif_make_atom(env, "lt"))); } else if (exact > 0) { /* cursor now positioned at the next larger key */ - reply = enif_make_tuple2(env, ATOM_OK, enif_make_atom(env, "gt")); + ASYNC_NIF_REPLY(enif_make_tuple2(env, ATOM_OK, enif_make_atom(env, "gt"))); } - ASYNC_NIF_REPLY(reply); } else { ASYNC_NIF_REPLY(__strerror_term(env, rc)); } diff --git a/src/wterl.erl b/src/wterl.erl index d9ce0f2..60ffe60 100644 --- a/src/wterl.erl +++ b/src/wterl.erl @@ -632,39 +632,39 @@ various_online_test_() -> ?assertMatch(ok, truncate(ConnRef, "table:test")), ?assertMatch(not_found, get(ConnRef, "table:test", <<"a">>)) end}, - %% {"truncate range [<>..last], ensure value outside range is found after", - %% fun() -> - %% ?assertMatch(ok, truncate(ConnRef, "table:test", <<"b">>, last)), - %% ?assertMatch({ok, <<"apple">>}, get(ConnRef, "table:test", <<"a">>)) - %% end}, - %% {"truncate range [first..<>], ensure value inside range is not_found after", - %% fun() -> - %% ?assertMatch(ok, truncate(ConnRef, "table:test", first, <<"b">>)), - %% ?assertMatch(not_found, get(ConnRef, "table:test", <<"a">>)) - %% end}, - %% {"truncate range [first..not_found] with a key that doesn't exist", - %% fun() -> - %% ?assertMatch(not_found, truncate(ConnRef, "table:test", first, <<"z">>)) - %% end}, - %% {"truncate range [not_found..last] with a key that doesn't exist", - %% fun() -> - %% ?assertMatch(not_found, truncate(ConnRef, "table:test", <<"0">>, last)) - %% end}, - %% {"truncate range [not_found..not_found] with keys that don't exist", - %% fun() -> - %% ?assertMatch(not_found, truncate(ConnRef, "table:test", <<"0">>, <<"0">>)) - %% end}, - %% {"truncate range [<...<>], ensure value before & after range still exist", - %% fun() -> - %% ?assertMatch(ok, truncate(ConnRef, "table:test", <<"b">>, <<"f">>)), - %% ?assertMatch({ok, <<"apple">>}, get(ConnRef, "table:test", <<"a">>)), - %% ?assertMatch(not_found, get(ConnRef, "table:test", <<"b">>)), - %% ?assertMatch(not_found, get(ConnRef, "table:test", <<"c">>)), - %% ?assertMatch(not_found, get(ConnRef, "table:test", <<"d">>)), - %% ?assertMatch(not_found, get(ConnRef, "table:test", <<"e">>)), - %% ?assertMatch(not_found, get(ConnRef, "table:test", <<"f">>)), - %% ?assertMatch({ok, <<"gooseberry">>}, get(ConnRef, "table:test", <<"g">>)) - %% end}, + {"truncate range [<>..last], ensure value outside range is found after", + fun() -> + ?assertMatch(ok, truncate(ConnRef, "table:test", <<"b">>, last)), + ?assertMatch({ok, <<"apple">>}, get(ConnRef, "table:test", <<"a">>)) + end}, + {"truncate range [first..<>], ensure value inside range is not_found after", + fun() -> + ?assertMatch(ok, truncate(ConnRef, "table:test", first, <<"b">>)), + ?assertMatch(not_found, get(ConnRef, "table:test", <<"a">>)) + end}, + {"truncate range [first..not_found] with a key that doesn't exist", + fun() -> + ?assertMatch(not_found, truncate(ConnRef, "table:test", first, <<"z">>)) + end}, + {"truncate range [not_found..last] with a key that doesn't exist", + fun() -> + ?assertMatch(not_found, truncate(ConnRef, "table:test", <<"0">>, last)) + end}, + {"truncate range [not_found..not_found] with keys that don't exist", + fun() -> + ?assertMatch(not_found, truncate(ConnRef, "table:test", <<"0">>, <<"0">>)) + end}, + {"truncate range [<...<>], ensure value before & after range still exist", + fun() -> + ?assertMatch(ok, truncate(ConnRef, "table:test", <<"b">>, <<"f">>)), + ?assertMatch({ok, <<"apple">>}, get(ConnRef, "table:test", <<"a">>)), + ?assertMatch(not_found, get(ConnRef, "table:test", <<"b">>)), + ?assertMatch(not_found, get(ConnRef, "table:test", <<"c">>)), + ?assertMatch(not_found, get(ConnRef, "table:test", <<"d">>)), + ?assertMatch(not_found, get(ConnRef, "table:test", <<"e">>)), + ?assertMatch(not_found, get(ConnRef, "table:test", <<"f">>)), + ?assertMatch({ok, <<"gooseberry">>}, get(ConnRef, "table:test", <<"g">>)) + end}, {"drop table", fun() -> ?assertMatch(ok, drop(ConnRef, "table:test"))