Worker threads should check for work in other queues before exiting. #10

Merged
gburd merged 3 commits from gsb-workers-migrate into master 2013-07-27 00:12:16 +00:00
5 changed files with 34 additions and 21 deletions
Showing only changes of commit 9a5defd8c9 - Show all commits

View file

@ -37,7 +37,6 @@ extern "C" {
#define ASYNC_NIF_WORKER_QUEUE_SIZE 100 #define ASYNC_NIF_WORKER_QUEUE_SIZE 100
#define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS #define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS
struct async_nif_req_entry { struct async_nif_req_entry {
ERL_NIF_TERM ref; ERL_NIF_TERM ref;
ErlNifEnv *env; ErlNifEnv *env;
@ -317,8 +316,7 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
avg_depth += async_nif->queues[j].depth; avg_depth += async_nif->queues[j].depth;
} }
} }
if (avg_depth != 0) if (avg_depth) avg_depth /= n;
avg_depth /= n;
/* Lock this queue under consideration, then check for shutdown. While /* Lock this queue under consideration, then check for shutdown. While
we hold this lock either a) we're shutting down so exit now or b) this we hold this lock either a) we're shutting down so exit now or b) this
@ -350,15 +348,13 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
/* We've selected a queue for this new request now check to make sure there are /* We've selected a queue for this new request now check to make sure there are
enough workers actively processing requests on this queue. */ enough workers actively processing requests on this queue. */
retry: while (q->depth > q->num_workers) {
if (q->depth > q->num_workers || q->num_workers == 0) {
switch(async_nif_start_worker(async_nif, q)) { switch(async_nif_start_worker(async_nif, q)) {
case 0: __sync_fetch_and_add(&q->num_workers, 1); break;
case EAGAIN: goto retry;
case EINVAL: case ENOMEM: default: return 0; case EINVAL: case ENOMEM: default: return 0;
case EAGAIN: continue;
case 0: __sync_fetch_and_add(&q->num_workers, 1); goto done;
} }
} }done:;
/* Build the term before releasing the lock so as not to race on the use of /* Build the term before releasing the lock so as not to race on the use of
the req pointer (which will soon become invalid in another thread the req pointer (which will soon become invalid in another thread

View file

@ -11,10 +11,10 @@ unset POSIX_SHELL # clear it so if we invoke other scripts, they run as ksh as w
set -e set -e
WT_REPO=http://github.com/wiredtiger/wiredtiger.git WT_REPO=http://github.com/wiredtiger/wiredtiger.git
WT_BRANCH=develop #WT_BRANCH=develop
WT_DIR=wiredtiger-`basename $WT_BRANCH` #WT_DIR=wiredtiger-`basename $WT_BRANCH`
#WT_REF="tags/1.6.3" WT_REF="tags/1.6.3"
#WT_DIR=wiredtiger-`basename $WT_REF` WT_DIR=wiredtiger-`basename $WT_REF`
SNAPPY_VSN="1.0.4" SNAPPY_VSN="1.0.4"
SNAPPY_DIR=snappy-$SNAPPY_VSN SNAPPY_DIR=snappy-$SNAPPY_VSN

View file

@ -1336,6 +1336,10 @@ ASYNC_NIF_DECL(
ASYNC_NIF_REPLY(enif_make_badarg(env)); ASYNC_NIF_REPLY(enif_make_badarg(env));
return; return;
} }
if (key.size == 0) {
ASYNC_NIF_REPLY(enif_make_badarg(env));
return;
}
struct wterl_ctx *ctx = NULL; struct wterl_ctx *ctx = NULL;
WT_CURSOR *cursor = NULL; WT_CURSOR *cursor = NULL;
@ -1395,6 +1399,10 @@ ASYNC_NIF_DECL(
ASYNC_NIF_REPLY(enif_make_badarg(env)); ASYNC_NIF_REPLY(enif_make_badarg(env));
return; return;
} }
if (key.size == 0) {
ASYNC_NIF_REPLY(enif_make_badarg(env));
return;
}
struct wterl_ctx *ctx = NULL; struct wterl_ctx *ctx = NULL;
WT_CURSOR *cursor = NULL; WT_CURSOR *cursor = NULL;
@ -1480,6 +1488,10 @@ ASYNC_NIF_DECL(
ASYNC_NIF_REPLY(enif_make_badarg(env)); ASYNC_NIF_REPLY(enif_make_badarg(env));
return; return;
} }
if (key.size == 0 || value.size == 0) {
ASYNC_NIF_REPLY(enif_make_badarg(env));
return;
}
struct wterl_ctx *ctx = NULL; struct wterl_ctx *ctx = NULL;
WT_CURSOR *cursor = NULL; WT_CURSOR *cursor = NULL;
@ -1553,7 +1565,7 @@ ASYNC_NIF_DECL(
} }
WT_CURSOR* cursor; WT_CURSOR* cursor;
rc = session->open_cursor(session, args->uri, NULL, (config.data[0] != 0) ? (char *)config.data : "overwrite,raw", &cursor); rc = session->open_cursor(session, args->uri, NULL, (config.data[0] != 0) ? (char *)config.data : "raw", &cursor);
if (rc != 0) { if (rc != 0) {
session->close(session, NULL); session->close(session, NULL);
ASYNC_NIF_REPLY(__strerror_term(env, rc)); ASYNC_NIF_REPLY(__strerror_term(env, rc));

View file

@ -5,13 +5,15 @@
{cover_enabled, true}. {cover_enabled, true}.
{eunit_opts, [verbose, {report, {eunit_surefire, [{dir, "."}]}}]}. %{eunit_opts, [verbose, {report, {eunit_surefire, [{dir, "."}]}}]}.
{erl_opts, [ {erl_opts, [
%native, {hipe, [o3,verbose]}, inline, {inline_size, 1024},
{parse_transform, lager_transform}, {parse_transform, lager_transform},
debug_info, %{d,'DEBUG',true}, debug_info,
%strict_validation, {d,'DEBUG',true},
%fail_on_warning, strict_validation,
fail_on_warning,
%warn_missing_spec, %warn_missing_spec,
warn_bif_clash, warn_bif_clash,
warn_deprecated_function, warn_deprecated_function,
@ -22,7 +24,7 @@
warn_shadow_vars, warn_shadow_vars,
warn_untyped_record, warn_untyped_record,
warn_unused_function, warn_unused_function,
%warn_unused_import, warn_unused_import,
warn_unused_record, warn_unused_record,
warn_unused_vars warn_unused_vars
]}. ]}.

View file

@ -256,6 +256,7 @@ verify_nif(_AsyncRef, _Ref, _Name, _Config) ->
-spec cursor_open(connection(), string(), config_list()) -> {ok, cursor()} | {error, term()}. -spec cursor_open(connection(), string(), config_list()) -> {ok, cursor()} | {error, term()}.
cursor_open(Ref, Table) -> cursor_open(Ref, Table) ->
cursor_open(Ref, Table, []). cursor_open(Ref, Table, []).
cursor_open(Ref, Table, Config) -> cursor_open(Ref, Table, Config) ->
?ASYNC_NIF_CALL(fun cursor_open_nif/4, [Ref, Table, config_to_bin(Config)]). ?ASYNC_NIF_CALL(fun cursor_open_nif/4, [Ref, Table, config_to_bin(Config)]).
@ -484,6 +485,8 @@ config_to_bin([{Key, Value} | Rest], Acc) ->
{lsm_merge_threads, integer}, {lsm_merge_threads, integer},
{multiprocess, bool}, {multiprocess, bool},
{name, string}, {name, string},
{overwrite, bool},
{raw, bool},
{session_max, integer}, {session_max, integer},
{statistics_log, config}, {statistics_log, config},
{sync, bool}, {sync, bool},
@ -872,7 +875,7 @@ various_cursor_test_() ->
end}, end},
{"update an item using a cursor", {"update an item using a cursor",
fun() -> fun() ->
{ok, Cursor} = cursor_open(ConnRef, "table:test"), {ok, Cursor} = cursor_open(ConnRef, "table:test", [{overwrite, false}, {raw,true}]),
?assertMatch(ok, cursor_update(Cursor, <<"g">>, <<"goji berries">>)), ?assertMatch(ok, cursor_update(Cursor, <<"g">>, <<"goji berries">>)),
?assertMatch(not_found, cursor_update(Cursor, <<"k">>, <<"kumquat">>)), ?assertMatch(not_found, cursor_update(Cursor, <<"k">>, <<"kumquat">>)),
?assertMatch(ok, cursor_close(Cursor)), ?assertMatch(ok, cursor_close(Cursor)),
@ -880,7 +883,7 @@ various_cursor_test_() ->
end}, end},
{"remove an item using a cursor", {"remove an item using a cursor",
fun() -> fun() ->
{ok, Cursor} = cursor_open(ConnRef, "table:test"), {ok, Cursor} = cursor_open(ConnRef, "table:test", [{overwrite, false}, {raw,true}]),
?assertMatch(ok, cursor_remove(Cursor, <<"g">>)), ?assertMatch(ok, cursor_remove(Cursor, <<"g">>)),
?assertMatch(not_found, cursor_remove(Cursor, <<"l">>)), ?assertMatch(not_found, cursor_remove(Cursor, <<"l">>)),
?assertMatch(ok, cursor_close(Cursor)), ?assertMatch(ok, cursor_close(Cursor)),