diff --git a/c_src/async_nif.h b/c_src/async_nif.h index 277b3e0..5d283da 100644 --- a/c_src/async_nif.h +++ b/c_src/async_nif.h @@ -37,7 +37,6 @@ extern "C" { #define ASYNC_NIF_WORKER_QUEUE_SIZE 100 #define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS - struct async_nif_req_entry { ERL_NIF_TERM ref; ErlNifEnv *env; @@ -317,8 +316,7 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en avg_depth += async_nif->queues[j].depth; } } - if (avg_depth != 0) - avg_depth /= n; + if (avg_depth) avg_depth /= n; /* Lock this queue under consideration, then check for shutdown. While we hold this lock either a) we're shutting down so exit now or b) this @@ -350,15 +348,13 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en /* We've selected a queue for this new request now check to make sure there are enough workers actively processing requests on this queue. */ - retry: - if (q->depth > q->num_workers || q->num_workers == 0) { + while (q->depth > q->num_workers) { switch(async_nif_start_worker(async_nif, q)) { - case 0: __sync_fetch_and_add(&q->num_workers, 1); break; - case EAGAIN: goto retry; case EINVAL: case ENOMEM: default: return 0; + case EAGAIN: continue; + case 0: __sync_fetch_and_add(&q->num_workers, 1); goto done; } - } - + }done:; /* Build the term before releasing the lock so as not to race on the use of the req pointer (which will soon become invalid in another thread diff --git a/c_src/build_deps.sh b/c_src/build_deps.sh index e81fd5e..bdd87fa 100755 --- a/c_src/build_deps.sh +++ b/c_src/build_deps.sh @@ -11,10 +11,10 @@ unset POSIX_SHELL # clear it so if we invoke other scripts, they run as ksh as w set -e WT_REPO=http://github.com/wiredtiger/wiredtiger.git -WT_BRANCH=develop -WT_DIR=wiredtiger-`basename $WT_BRANCH` -#WT_REF="tags/1.6.3" -#WT_DIR=wiredtiger-`basename $WT_REF` +#WT_BRANCH=develop +#WT_DIR=wiredtiger-`basename $WT_BRANCH` +WT_REF="tags/1.6.3" +WT_DIR=wiredtiger-`basename $WT_REF` SNAPPY_VSN="1.0.4" SNAPPY_DIR=snappy-$SNAPPY_VSN diff --git a/c_src/wterl.c b/c_src/wterl.c index 4ebbbf1..e2f4ffa 100644 --- a/c_src/wterl.c +++ b/c_src/wterl.c @@ -1336,6 +1336,10 @@ ASYNC_NIF_DECL( ASYNC_NIF_REPLY(enif_make_badarg(env)); return; } + if (key.size == 0) { + ASYNC_NIF_REPLY(enif_make_badarg(env)); + return; + } struct wterl_ctx *ctx = NULL; WT_CURSOR *cursor = NULL; @@ -1395,6 +1399,10 @@ ASYNC_NIF_DECL( ASYNC_NIF_REPLY(enif_make_badarg(env)); return; } + if (key.size == 0) { + ASYNC_NIF_REPLY(enif_make_badarg(env)); + return; + } struct wterl_ctx *ctx = NULL; WT_CURSOR *cursor = NULL; @@ -1480,6 +1488,10 @@ ASYNC_NIF_DECL( ASYNC_NIF_REPLY(enif_make_badarg(env)); return; } + if (key.size == 0 || value.size == 0) { + ASYNC_NIF_REPLY(enif_make_badarg(env)); + return; + } struct wterl_ctx *ctx = NULL; WT_CURSOR *cursor = NULL; @@ -1553,7 +1565,7 @@ ASYNC_NIF_DECL( } WT_CURSOR* cursor; - rc = session->open_cursor(session, args->uri, NULL, (config.data[0] != 0) ? (char *)config.data : "overwrite,raw", &cursor); + rc = session->open_cursor(session, args->uri, NULL, (config.data[0] != 0) ? (char *)config.data : "raw", &cursor); if (rc != 0) { session->close(session, NULL); ASYNC_NIF_REPLY(__strerror_term(env, rc)); diff --git a/rebar.config b/rebar.config index 46f0af2..52c1c94 100644 --- a/rebar.config +++ b/rebar.config @@ -5,13 +5,15 @@ {cover_enabled, true}. -{eunit_opts, [verbose, {report, {eunit_surefire, [{dir, "."}]}}]}. +%{eunit_opts, [verbose, {report, {eunit_surefire, [{dir, "."}]}}]}. {erl_opts, [ + %native, {hipe, [o3,verbose]}, inline, {inline_size, 1024}, {parse_transform, lager_transform}, - debug_info, %{d,'DEBUG',true}, - %strict_validation, - %fail_on_warning, + debug_info, + {d,'DEBUG',true}, + strict_validation, + fail_on_warning, %warn_missing_spec, warn_bif_clash, warn_deprecated_function, @@ -22,7 +24,7 @@ warn_shadow_vars, warn_untyped_record, warn_unused_function, - %warn_unused_import, + warn_unused_import, warn_unused_record, warn_unused_vars ]}. diff --git a/src/wterl.erl b/src/wterl.erl index 9045be2..906a167 100644 --- a/src/wterl.erl +++ b/src/wterl.erl @@ -256,6 +256,7 @@ verify_nif(_AsyncRef, _Ref, _Name, _Config) -> -spec cursor_open(connection(), string(), config_list()) -> {ok, cursor()} | {error, term()}. cursor_open(Ref, Table) -> cursor_open(Ref, Table, []). + cursor_open(Ref, Table, Config) -> ?ASYNC_NIF_CALL(fun cursor_open_nif/4, [Ref, Table, config_to_bin(Config)]). @@ -484,6 +485,8 @@ config_to_bin([{Key, Value} | Rest], Acc) -> {lsm_merge_threads, integer}, {multiprocess, bool}, {name, string}, + {overwrite, bool}, + {raw, bool}, {session_max, integer}, {statistics_log, config}, {sync, bool}, @@ -872,7 +875,7 @@ various_cursor_test_() -> end}, {"update an item using a cursor", fun() -> - {ok, Cursor} = cursor_open(ConnRef, "table:test"), + {ok, Cursor} = cursor_open(ConnRef, "table:test", [{overwrite, false}, {raw,true}]), ?assertMatch(ok, cursor_update(Cursor, <<"g">>, <<"goji berries">>)), ?assertMatch(not_found, cursor_update(Cursor, <<"k">>, <<"kumquat">>)), ?assertMatch(ok, cursor_close(Cursor)), @@ -880,7 +883,7 @@ various_cursor_test_() -> end}, {"remove an item using a cursor", fun() -> - {ok, Cursor} = cursor_open(ConnRef, "table:test"), + {ok, Cursor} = cursor_open(ConnRef, "table:test", [{overwrite, false}, {raw,true}]), ?assertMatch(ok, cursor_remove(Cursor, <<"g">>)), ?assertMatch(not_found, cursor_remove(Cursor, <<"l">>)), ?assertMatch(ok, cursor_close(Cursor)),