Return to alloc'ed requests because there may be many more in flight

than those in the various queues.  Reenable the (still failing)
truncate tests (because they don't SEGV anymore).  Still might be
a memory leak, next up is valgrind.
This commit is contained in:
Gregory Burd 2013-04-15 17:37:14 -04:00
parent 668109de25
commit 371779d14e
3 changed files with 78 additions and 92 deletions

View file

@ -38,45 +38,37 @@ extern "C" {
#define DECL_FIFO_QUEUE(name, type) \ #define DECL_FIFO_QUEUE(name, type) \
struct fifo_q__ ## name { \ struct fifo_q__ ## name { \
unsigned int h, t, s; \ unsigned int h, t, s; \
type **items; \ type *items[]; \
}; \ }; \
static struct fifo_q__ ## name *fifo_q_ ## name ## _new(unsigned int n) { \ static struct fifo_q__ ## name *fifo_q_ ## name ## _new(unsigned int n) { \
type **items = enif_alloc((n+1) * sizeof(type *)); \ int sz = sizeof(struct fifo_q__ ## name) + ((n+1) * sizeof(type *));\
if (!items) return 0; \ struct fifo_q__ ## name *q = enif_alloc(sz); \
memset(items, 0, (n+1) * sizeof(type *)); \ if (!q) \
struct fifo_q__ ## name q = {0,0,n+1,items}; \ return 0; \
struct fifo_q__ ## name *qptr = enif_alloc(sizeof(struct fifo_q__ ## name)); \ memset(q, 0, sz); \
if (!qptr) return 0; \ q->s = n + 1; \
memset(qptr, 0, sizeof(struct fifo_q__ ## name)); \ return q; \
*qptr = q; \
return qptr; \
} \ } \
static int fifo_q_ ## name ## _put(struct fifo_q__ ## name *q, type *item) { \ static type *fifo_q_ ## name ## _put(struct fifo_q__ ## name *q, type *n) { \
q->items[q->h] = item; \ q->items[q->h] = n; \
q->h = (q->h + 1) % q->s; \ q->h = (q->h + 1) % q->s; \
return 0; \ return n; \
} \
static unsigned int fifo_q_ ## name ## _next_index(struct fifo_q__ ## name *q) { \
return (q->h + 1) % q->s; \
} \ } \
static type *fifo_q_ ## name ## _get(struct fifo_q__ ## name *q) { \ static type *fifo_q_ ## name ## _get(struct fifo_q__ ## name *q) { \
type *r = 0; \ type *n = q->items[q->t]; \
r = q->items[q->t]; \
q->items[q->t] = 0; \ q->items[q->t] = 0; \
q->t = (q->t + 1) % q->s; \ q->t = (q->t + 1) % q->s; \
return r; \ return n; \
} \ } \
static void fifo_q_ ## name ## _free(struct fifo_q__ ## name *q) { \ static void fifo_q_ ## name ## _free(struct fifo_q__ ## name *q) { \
memset(q->items, 0, q->s * sizeof(type *)); \ memset(q, 0, sizeof(*q)); \
memset(q, 0, sizeof(struct fifo_q__ ## name)); \
enif_free(q->items); \
enif_free(q); \ enif_free(q); \
} \ } \
static unsigned int fifo_q_ ## name ## _size(struct fifo_q__ ## name *q) { \ static unsigned int fifo_q_ ## name ## _size(struct fifo_q__ ## name *q) { \
return (q->h - q->t + q->s) % q->s; \ return (q->h - q->t + q->s) % q->s; \
} \ } \
static unsigned int fifo_q_ ## name ## _capacity(struct fifo_q__ ## name *q) { \ static unsigned int fifo_q_ ## name ## _capacity(struct fifo_q__ ## name *q) { \
return q->s; \ return q->s - 1; \
} \ } \
static int fifo_q_ ## name ## _empty(struct fifo_q__ ## name *q) { \ static int fifo_q_ ## name ## _empty(struct fifo_q__ ## name *q) { \
return (q->t == q->h); \ return (q->t == q->h); \
@ -84,10 +76,8 @@ extern "C" {
#define fifo_q_new(name, size) fifo_q_ ## name ## _new(size) #define fifo_q_new(name, size) fifo_q_ ## name ## _new(size)
#define fifo_q_free(name, queue) fifo_q_ ## name ## _free(queue) #define fifo_q_free(name, queue) fifo_q_ ## name ## _free(queue)
#define fifo_q_get(name, queue) fifo_q_ ## name ## _get(queue) #define fifo_q_get(name, queue) fifo_q_ ## name ## _get(queue)
#define fifo_q_put(name, queue, item) fifo_q_ ## name ## _put(queue, item) #define fifo_q_put(name, queue, item) fifo_q_ ## name ## _put(queue, item)
#define fifo_q_next_index(name, queue) fifo_q_ ## name ## _next_index(queue)
#define fifo_q_size(name, queue) fifo_q_ ## name ## _size(queue) #define fifo_q_size(name, queue) fifo_q_ ## name ## _size(queue)
#define fifo_q_capacity(name, queue) fifo_q_ ## name ## _capacity(queue) #define fifo_q_capacity(name, queue) fifo_q_ ## name ## _capacity(queue)
#define fifo_q_empty(name, queue) fifo_q_ ## name ## _empty(queue) #define fifo_q_empty(name, queue) fifo_q_ ## name ## _empty(queue)
@ -112,7 +102,6 @@ struct async_nif_work_queue {
ErlNifMutex *reqs_mutex; ErlNifMutex *reqs_mutex;
ErlNifCond *reqs_cnd; ErlNifCond *reqs_cnd;
FIFO_QUEUE_TYPE(reqs) reqs; FIFO_QUEUE_TYPE(reqs) reqs;
struct async_nif_req_entry items[ASYNC_NIF_WORKER_QUEUE_SIZE];
}; };
struct async_nif_worker_entry { struct async_nif_worker_entry {
@ -141,6 +130,7 @@ struct async_nif_state {
struct decl ## _args on_stack_args; \ struct decl ## _args on_stack_args; \
struct decl ## _args *args = &on_stack_args; \ struct decl ## _args *args = &on_stack_args; \
struct decl ## _args *copy_of_args; \ struct decl ## _args *copy_of_args; \
struct async_nif_req_entry *req; \
ErlNifEnv *new_env = NULL; \ ErlNifEnv *new_env = NULL; \
/* argv[0] is a ref used for selective recv */ \ /* argv[0] is a ref used for selective recv */ \
const ERL_NIF_TERM *argv = argv_in + 1; \ const ERL_NIF_TERM *argv = argv_in + 1; \
@ -154,6 +144,13 @@ struct async_nif_state {
enif_make_atom(env, "enomem")); \ enif_make_atom(env, "enomem")); \
} \ } \
do pre_block while(0); \ do pre_block while(0); \
req = (struct async_nif_req_entry*)enif_alloc(sizeof(struct async_nif_req_entry)); \
if (!req) { \
fn_post_ ## decl (args); \
enif_free_env(new_env); \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "enomem")); \
} \
copy_of_args = (struct decl ## _args *)enif_alloc(sizeof(struct decl ## _args)); \ copy_of_args = (struct decl ## _args *)enif_alloc(sizeof(struct decl ## _args)); \
if (!copy_of_args) { \ if (!copy_of_args) { \
fn_post_ ## decl (args); \ fn_post_ ## decl (args); \
@ -162,15 +159,13 @@ struct async_nif_state {
enif_make_atom(env, "enomem")); \ enif_make_atom(env, "enomem")); \
} \ } \
memcpy(copy_of_args, args, sizeof(struct decl ## _args)); \ memcpy(copy_of_args, args, sizeof(struct decl ## _args)); \
ErlNifPid req_pid; \ req->env = new_env; \
enif_self(env, &req_pid); \ req->ref = enif_make_copy(new_env, argv_in[0]); \
return async_nif_enqueue_req(async_nif, \ enif_self(env, &req->pid); \
(void*)copy_of_args, \ req->args = (void*)copy_of_args; \
new_env, \ req->fn_work = (void (*)(ErlNifEnv *, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *))fn_work_ ## decl ; \
enif_make_copy(new_env, argv_in[0]), \ req->fn_post = (void (*)(void *))fn_post_ ## decl; \
req_pid, \ return async_nif_enqueue_req(async_nif, req); \
(void (*)(ErlNifEnv *, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *))fn_work_ ## decl , \
(void (*)(void *))fn_post_ ## decl); \
} }
#define ASYNC_NIF_INIT(name) \ #define ASYNC_NIF_INIT(name) \
@ -208,19 +203,16 @@ struct async_nif_state {
/** /**
*/ */
static ERL_NIF_TERM static ERL_NIF_TERM
async_nif_enqueue_req(struct async_nif_state* async_nif, async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req)
void *args,
ErlNifEnv *env,
ERL_NIF_TERM ref,
ErlNifPid pid,
void (*fn_work)(ErlNifEnv *, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *),
void (*fn_post)(void *))
{ {
/* If we're shutting down return an error term and ignore the request. */ /* If we're shutting down return an error term and ignore the request. */
if (async_nif->shutdown) { if (async_nif->shutdown) {
enif_free(args); // args is an enif_alloc'ed copy of the args frame ERL_NIF_TERM reply = enif_make_tuple2(req->env, enif_make_atom(req->env, "error"),
return enif_make_tuple2(env, enif_make_atom(env, "error"), enif_make_atom(req->env, "shutdown"));
enif_make_atom(env, "shutdown")); enif_free(req->args);
enif_free_env(req->env);
enif_free(req);
return reply;
} }
unsigned int qid = async_nif->next_q; // Keep a local to avoid the race. unsigned int qid = async_nif->next_q; // Keep a local to avoid the race.
@ -237,14 +229,8 @@ async_nif_enqueue_req(struct async_nif_state* async_nif,
/* Otherwise, add the request to the work queue. */ /* Otherwise, add the request to the work queue. */
enif_mutex_lock(q->reqs_mutex); enif_mutex_lock(q->reqs_mutex);
struct async_nif_req_entry *req = &(q->items[fifo_q_next_index(reqs, q->reqs)]);
req->ref = ref;
req->env = env;
memcpy(&req->pid, &pid, sizeof(ErlNifPid));
req->args = args;
req->fn_work = fn_work;
req->fn_post = fn_post;
fifo_q_put(reqs, q->reqs, req); fifo_q_put(reqs, q->reqs, req);
/* Build the term before releasing the lock so as not to race on the use of /* Build the term before releasing the lock so as not to race on the use of
the req pointer (which will soon become invalid). */ the req pointer (which will soon become invalid). */
ERL_NIF_TERM reply = enif_make_tuple2(req->env, enif_make_atom(req->env, "ok"), ERL_NIF_TERM reply = enif_make_tuple2(req->env, enif_make_atom(req->env, "ok"),
@ -293,6 +279,7 @@ async_nif_worker_fn(void *arg)
req->fn_post(req->args); req->fn_post(req->args);
enif_free(req->args); enif_free(req->args);
enif_free_env(req->env); enif_free_env(req->env);
enif_free(req);
/* Continue working if more requests are in the queue, otherwise wait /* Continue working if more requests are in the queue, otherwise wait
for new work to arrive. */ for new work to arrive. */
@ -343,6 +330,8 @@ async_nif_unload(ErlNifEnv *env)
enif_make_atom(req->env, "shutdown"))); enif_make_atom(req->env, "shutdown")));
req->fn_post(req->args); req->fn_post(req->args);
enif_free(req->args); enif_free(req->args);
enif_free_env(req->env);
enif_free(req);
}); });
fifo_q_free(reqs, q->reqs); fifo_q_free(reqs, q->reqs);
} }

View file

@ -1201,7 +1201,6 @@ ASYNC_NIF_DECL(
/* We create a separate session here to ensure that operations are thread safe. */ /* We create a separate session here to ensure that operations are thread safe. */
WT_CONNECTION *conn = args->conn_handle->conn; WT_CONNECTION *conn = args->conn_handle->conn;
WT_SESSION *session = NULL; WT_SESSION *session = NULL;
//dprint("cursor open: %s", (char *)args->conn_handle->session_config);
int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session);
if (rc != 0) { if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc)); ASYNC_NIF_REPLY(__strerror_term(env, rc));
@ -1578,26 +1577,24 @@ ASYNC_NIF_DECL(
return; return;
} }
WT_ITEM item_key; WT_ITEM item_key;
int exact; int exact = 0;
item_key.data = key.data; item_key.data = key.data;
item_key.size = key.size; item_key.size = key.size;
cursor->set_key(cursor, &item_key); cursor->set_key(cursor, &item_key);
int rc = cursor->search_near(cursor, &exact); int rc = cursor->search_near(cursor, &exact);
ERL_NIF_TERM reply;
if (rc == 0) { if (rc == 0) {
if (exact == 0) { if (exact == 0) {
/* an exact match */ /* an exact match */
reply = enif_make_tuple2(env, ATOM_OK, enif_make_atom(env, "match")); ASYNC_NIF_REPLY(enif_make_tuple2(env, ATOM_OK, enif_make_atom(env, "match")));
} else if (exact < 0) { } else if (exact < 0) {
/* cursor now positioned at the next smaller key */ /* cursor now positioned at the next smaller key */
reply = enif_make_tuple2(env, ATOM_OK, enif_make_atom(env, "lt")); ASYNC_NIF_REPLY(enif_make_tuple2(env, ATOM_OK, enif_make_atom(env, "lt")));
} else if (exact > 0) { } else if (exact > 0) {
/* cursor now positioned at the next larger key */ /* cursor now positioned at the next larger key */
reply = enif_make_tuple2(env, ATOM_OK, enif_make_atom(env, "gt")); ASYNC_NIF_REPLY(enif_make_tuple2(env, ATOM_OK, enif_make_atom(env, "gt")));
} }
ASYNC_NIF_REPLY(reply);
} else { } else {
ASYNC_NIF_REPLY(__strerror_term(env, rc)); ASYNC_NIF_REPLY(__strerror_term(env, rc));
} }

View file

@ -632,39 +632,39 @@ various_online_test_() ->
?assertMatch(ok, truncate(ConnRef, "table:test")), ?assertMatch(ok, truncate(ConnRef, "table:test")),
?assertMatch(not_found, get(ConnRef, "table:test", <<"a">>)) ?assertMatch(not_found, get(ConnRef, "table:test", <<"a">>))
end}, end},
%% {"truncate range [<<b>>..last], ensure value outside range is found after", {"truncate range [<<b>>..last], ensure value outside range is found after",
%% fun() -> fun() ->
%% ?assertMatch(ok, truncate(ConnRef, "table:test", <<"b">>, last)), ?assertMatch(ok, truncate(ConnRef, "table:test", <<"b">>, last)),
%% ?assertMatch({ok, <<"apple">>}, get(ConnRef, "table:test", <<"a">>)) ?assertMatch({ok, <<"apple">>}, get(ConnRef, "table:test", <<"a">>))
%% end}, end},
%% {"truncate range [first..<<b>>], ensure value inside range is not_found after", {"truncate range [first..<<b>>], ensure value inside range is not_found after",
%% fun() -> fun() ->
%% ?assertMatch(ok, truncate(ConnRef, "table:test", first, <<"b">>)), ?assertMatch(ok, truncate(ConnRef, "table:test", first, <<"b">>)),
%% ?assertMatch(not_found, get(ConnRef, "table:test", <<"a">>)) ?assertMatch(not_found, get(ConnRef, "table:test", <<"a">>))
%% end}, end},
%% {"truncate range [first..not_found] with a key that doesn't exist", {"truncate range [first..not_found] with a key that doesn't exist",
%% fun() -> fun() ->
%% ?assertMatch(not_found, truncate(ConnRef, "table:test", first, <<"z">>)) ?assertMatch(not_found, truncate(ConnRef, "table:test", first, <<"z">>))
%% end}, end},
%% {"truncate range [not_found..last] with a key that doesn't exist", {"truncate range [not_found..last] with a key that doesn't exist",
%% fun() -> fun() ->
%% ?assertMatch(not_found, truncate(ConnRef, "table:test", <<"0">>, last)) ?assertMatch(not_found, truncate(ConnRef, "table:test", <<"0">>, last))
%% end}, end},
%% {"truncate range [not_found..not_found] with keys that don't exist", {"truncate range [not_found..not_found] with keys that don't exist",
%% fun() -> fun() ->
%% ?assertMatch(not_found, truncate(ConnRef, "table:test", <<"0">>, <<"0">>)) ?assertMatch(not_found, truncate(ConnRef, "table:test", <<"0">>, <<"0">>))
%% end}, end},
%% {"truncate range [<<b>...<<f>>], ensure value before & after range still exist", {"truncate range [<<b>...<<f>>], ensure value before & after range still exist",
%% fun() -> fun() ->
%% ?assertMatch(ok, truncate(ConnRef, "table:test", <<"b">>, <<"f">>)), ?assertMatch(ok, truncate(ConnRef, "table:test", <<"b">>, <<"f">>)),
%% ?assertMatch({ok, <<"apple">>}, get(ConnRef, "table:test", <<"a">>)), ?assertMatch({ok, <<"apple">>}, get(ConnRef, "table:test", <<"a">>)),
%% ?assertMatch(not_found, get(ConnRef, "table:test", <<"b">>)), ?assertMatch(not_found, get(ConnRef, "table:test", <<"b">>)),
%% ?assertMatch(not_found, get(ConnRef, "table:test", <<"c">>)), ?assertMatch(not_found, get(ConnRef, "table:test", <<"c">>)),
%% ?assertMatch(not_found, get(ConnRef, "table:test", <<"d">>)), ?assertMatch(not_found, get(ConnRef, "table:test", <<"d">>)),
%% ?assertMatch(not_found, get(ConnRef, "table:test", <<"e">>)), ?assertMatch(not_found, get(ConnRef, "table:test", <<"e">>)),
%% ?assertMatch(not_found, get(ConnRef, "table:test", <<"f">>)), ?assertMatch(not_found, get(ConnRef, "table:test", <<"f">>)),
%% ?assertMatch({ok, <<"gooseberry">>}, get(ConnRef, "table:test", <<"g">>)) ?assertMatch({ok, <<"gooseberry">>}, get(ConnRef, "table:test", <<"g">>))
%% end}, end},
{"drop table", {"drop table",
fun() -> fun() ->
?assertMatch(ok, drop(ConnRef, "table:test")) ?assertMatch(ok, drop(ConnRef, "table:test"))