Added some sanity checking of key/value sizes. Check for EAGAIN/INVAL/NOMEM when starting worker threads. Switch back to the 1.6.3 release branch of WT.
This commit is contained in:
parent
122963133a
commit
452d7694a6
5 changed files with 38 additions and 17 deletions
|
@ -289,7 +289,7 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
|
||||||
/* Identify the most appropriate worker for this request. */
|
/* Identify the most appropriate worker for this request. */
|
||||||
unsigned int i, qid = 0;
|
unsigned int i, qid = 0;
|
||||||
struct async_nif_work_queue *q = NULL;
|
struct async_nif_work_queue *q = NULL;
|
||||||
double avg_depth;
|
double avg_depth = 0.0;
|
||||||
|
|
||||||
/* Either we're choosing a queue based on some affinity/hinted value or we
|
/* Either we're choosing a queue based on some affinity/hinted value or we
|
||||||
need to select the next queue in the rotation and atomically update that
|
need to select the next queue in the rotation and atomically update that
|
||||||
|
@ -314,8 +314,7 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
|
||||||
avg_depth += async_nif->queues[j].depth;
|
avg_depth += async_nif->queues[j].depth;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (avg_depth != 0)
|
if (avg_depth) avg_depth /= n;
|
||||||
avg_depth /= n;
|
|
||||||
|
|
||||||
/* Lock this queue under consideration, then check for shutdown. While
|
/* Lock this queue under consideration, then check for shutdown. While
|
||||||
we hold this lock either a) we're shutting down so exit now or b) this
|
we hold this lock either a) we're shutting down so exit now or b) this
|
||||||
|
@ -347,8 +346,13 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
|
||||||
|
|
||||||
/* We've selected a queue for this new request now check to make sure there are
|
/* We've selected a queue for this new request now check to make sure there are
|
||||||
enough workers actively processing requests on this queue. */
|
enough workers actively processing requests on this queue. */
|
||||||
if (q->depth > q->num_workers || q->num_workers == 0)
|
while (q->depth > q->num_workers) {
|
||||||
if (async_nif_start_worker(async_nif, q) == 0) q->num_workers++;
|
switch(async_nif_start_worker(async_nif, q)) {
|
||||||
|
case EINVAL: case ENOMEM: default: return 0;
|
||||||
|
case EAGAIN: continue;
|
||||||
|
case 0: q->num_workers++; goto done;
|
||||||
|
}
|
||||||
|
}done:;
|
||||||
|
|
||||||
/* Build the term before releasing the lock so as not to race on the use of
|
/* Build the term before releasing the lock so as not to race on the use of
|
||||||
the req pointer (which will soon become invalid in another thread
|
the req pointer (which will soon become invalid in another thread
|
||||||
|
|
|
@ -11,10 +11,10 @@ unset POSIX_SHELL # clear it so if we invoke other scripts, they run as ksh as w
|
||||||
set -e
|
set -e
|
||||||
|
|
||||||
WT_REPO=http://github.com/wiredtiger/wiredtiger.git
|
WT_REPO=http://github.com/wiredtiger/wiredtiger.git
|
||||||
WT_BRANCH=develop
|
#WT_BRANCH=develop
|
||||||
WT_DIR=wiredtiger-`basename $WT_BRANCH`
|
#WT_DIR=wiredtiger-`basename $WT_BRANCH`
|
||||||
#WT_REF="tags/1.6.3"
|
WT_REF="tags/1.6.3"
|
||||||
#WT_DIR=wiredtiger-`basename $WT_REF`
|
WT_DIR=wiredtiger-`basename $WT_REF`
|
||||||
|
|
||||||
SNAPPY_VSN="1.0.4"
|
SNAPPY_VSN="1.0.4"
|
||||||
SNAPPY_DIR=snappy-$SNAPPY_VSN
|
SNAPPY_DIR=snappy-$SNAPPY_VSN
|
||||||
|
|
|
@ -1336,6 +1336,10 @@ ASYNC_NIF_DECL(
|
||||||
ASYNC_NIF_REPLY(enif_make_badarg(env));
|
ASYNC_NIF_REPLY(enif_make_badarg(env));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
if (key.size == 0) {
|
||||||
|
ASYNC_NIF_REPLY(enif_make_badarg(env));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
struct wterl_ctx *ctx = NULL;
|
struct wterl_ctx *ctx = NULL;
|
||||||
WT_CURSOR *cursor = NULL;
|
WT_CURSOR *cursor = NULL;
|
||||||
|
@ -1395,6 +1399,10 @@ ASYNC_NIF_DECL(
|
||||||
ASYNC_NIF_REPLY(enif_make_badarg(env));
|
ASYNC_NIF_REPLY(enif_make_badarg(env));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
if (key.size == 0) {
|
||||||
|
ASYNC_NIF_REPLY(enif_make_badarg(env));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
struct wterl_ctx *ctx = NULL;
|
struct wterl_ctx *ctx = NULL;
|
||||||
WT_CURSOR *cursor = NULL;
|
WT_CURSOR *cursor = NULL;
|
||||||
|
@ -1480,6 +1488,10 @@ ASYNC_NIF_DECL(
|
||||||
ASYNC_NIF_REPLY(enif_make_badarg(env));
|
ASYNC_NIF_REPLY(enif_make_badarg(env));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
if (key.size == 0 || value.size == 0) {
|
||||||
|
ASYNC_NIF_REPLY(enif_make_badarg(env));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
struct wterl_ctx *ctx = NULL;
|
struct wterl_ctx *ctx = NULL;
|
||||||
WT_CURSOR *cursor = NULL;
|
WT_CURSOR *cursor = NULL;
|
||||||
|
@ -1553,7 +1565,7 @@ ASYNC_NIF_DECL(
|
||||||
}
|
}
|
||||||
|
|
||||||
WT_CURSOR* cursor;
|
WT_CURSOR* cursor;
|
||||||
rc = session->open_cursor(session, args->uri, NULL, (config.data[0] != 0) ? (char *)config.data : "overwrite,raw", &cursor);
|
rc = session->open_cursor(session, args->uri, NULL, (config.data[0] != 0) ? (char *)config.data : "raw", &cursor);
|
||||||
if (rc != 0) {
|
if (rc != 0) {
|
||||||
session->close(session, NULL);
|
session->close(session, NULL);
|
||||||
ASYNC_NIF_REPLY(__strerror_term(env, rc));
|
ASYNC_NIF_REPLY(__strerror_term(env, rc));
|
||||||
|
|
12
rebar.config
12
rebar.config
|
@ -5,13 +5,15 @@
|
||||||
|
|
||||||
{cover_enabled, true}.
|
{cover_enabled, true}.
|
||||||
|
|
||||||
{eunit_opts, [verbose, {report, {eunit_surefire, [{dir, "."}]}}]}.
|
%{eunit_opts, [verbose, {report, {eunit_surefire, [{dir, "."}]}}]}.
|
||||||
|
|
||||||
{erl_opts, [
|
{erl_opts, [
|
||||||
|
%native, {hipe, [o3,verbose]}, inline, {inline_size, 1024},
|
||||||
{parse_transform, lager_transform},
|
{parse_transform, lager_transform},
|
||||||
debug_info, %{d,'DEBUG',true},
|
debug_info,
|
||||||
%strict_validation,
|
{d,'DEBUG',true},
|
||||||
%fail_on_warning,
|
strict_validation,
|
||||||
|
fail_on_warning,
|
||||||
%warn_missing_spec,
|
%warn_missing_spec,
|
||||||
warn_bif_clash,
|
warn_bif_clash,
|
||||||
warn_deprecated_function,
|
warn_deprecated_function,
|
||||||
|
@ -22,7 +24,7 @@
|
||||||
warn_shadow_vars,
|
warn_shadow_vars,
|
||||||
warn_untyped_record,
|
warn_untyped_record,
|
||||||
warn_unused_function,
|
warn_unused_function,
|
||||||
%warn_unused_import,
|
warn_unused_import,
|
||||||
warn_unused_record,
|
warn_unused_record,
|
||||||
warn_unused_vars
|
warn_unused_vars
|
||||||
]}.
|
]}.
|
||||||
|
|
|
@ -256,6 +256,7 @@ verify_nif(_AsyncRef, _Ref, _Name, _Config) ->
|
||||||
-spec cursor_open(connection(), string(), config_list()) -> {ok, cursor()} | {error, term()}.
|
-spec cursor_open(connection(), string(), config_list()) -> {ok, cursor()} | {error, term()}.
|
||||||
cursor_open(Ref, Table) ->
|
cursor_open(Ref, Table) ->
|
||||||
cursor_open(Ref, Table, []).
|
cursor_open(Ref, Table, []).
|
||||||
|
|
||||||
cursor_open(Ref, Table, Config) ->
|
cursor_open(Ref, Table, Config) ->
|
||||||
?ASYNC_NIF_CALL(fun cursor_open_nif/4, [Ref, Table, config_to_bin(Config)]).
|
?ASYNC_NIF_CALL(fun cursor_open_nif/4, [Ref, Table, config_to_bin(Config)]).
|
||||||
|
|
||||||
|
@ -484,6 +485,8 @@ config_to_bin([{Key, Value} | Rest], Acc) ->
|
||||||
{lsm_merge_threads, integer},
|
{lsm_merge_threads, integer},
|
||||||
{multiprocess, bool},
|
{multiprocess, bool},
|
||||||
{name, string},
|
{name, string},
|
||||||
|
{overwrite, bool},
|
||||||
|
{raw, bool},
|
||||||
{session_max, integer},
|
{session_max, integer},
|
||||||
{statistics_log, config},
|
{statistics_log, config},
|
||||||
{sync, bool},
|
{sync, bool},
|
||||||
|
@ -872,7 +875,7 @@ various_cursor_test_() ->
|
||||||
end},
|
end},
|
||||||
{"update an item using a cursor",
|
{"update an item using a cursor",
|
||||||
fun() ->
|
fun() ->
|
||||||
{ok, Cursor} = cursor_open(ConnRef, "table:test"),
|
{ok, Cursor} = cursor_open(ConnRef, "table:test", [{overwrite, false}, {raw,true}]),
|
||||||
?assertMatch(ok, cursor_update(Cursor, <<"g">>, <<"goji berries">>)),
|
?assertMatch(ok, cursor_update(Cursor, <<"g">>, <<"goji berries">>)),
|
||||||
?assertMatch(not_found, cursor_update(Cursor, <<"k">>, <<"kumquat">>)),
|
?assertMatch(not_found, cursor_update(Cursor, <<"k">>, <<"kumquat">>)),
|
||||||
?assertMatch(ok, cursor_close(Cursor)),
|
?assertMatch(ok, cursor_close(Cursor)),
|
||||||
|
@ -880,7 +883,7 @@ various_cursor_test_() ->
|
||||||
end},
|
end},
|
||||||
{"remove an item using a cursor",
|
{"remove an item using a cursor",
|
||||||
fun() ->
|
fun() ->
|
||||||
{ok, Cursor} = cursor_open(ConnRef, "table:test"),
|
{ok, Cursor} = cursor_open(ConnRef, "table:test", [{overwrite, false}, {raw,true}]),
|
||||||
?assertMatch(ok, cursor_remove(Cursor, <<"g">>)),
|
?assertMatch(ok, cursor_remove(Cursor, <<"g">>)),
|
||||||
?assertMatch(not_found, cursor_remove(Cursor, <<"l">>)),
|
?assertMatch(not_found, cursor_remove(Cursor, <<"l">>)),
|
||||||
?assertMatch(ok, cursor_close(Cursor)),
|
?assertMatch(ok, cursor_close(Cursor)),
|
||||||
|
|
Loading…
Reference in a new issue