Whitespace. Call wterl:drop not truncate for drop calls and set force

to true when calling so that we ignore ENOENT.  Change the session estimate
down a bit.
This commit is contained in:
Gregory Burd 2013-04-19 14:55:32 -04:00
parent 40bdda15bb
commit d505f7f9c8
3 changed files with 73 additions and 65 deletions

View file

@ -222,16 +222,16 @@ __retain_cursor(WterlConnHandle *conn_handle, unsigned int worker_id, const char
h = ctx->cursors; h = ctx->cursors;
itr = kh_get(cursors, h, (char *)uri); itr = kh_get(cursors, h, (char *)uri);
if (itr != kh_end(h)) { if (itr != kh_end(h)) {
// key exists in hash table, retrieve it // key exists in hash table, retrieve it
*cursor = (WT_CURSOR*)kh_value(h, itr); *cursor = (WT_CURSOR*)kh_value(h, itr);
} else { } else {
// key does not exist in hash table, create and insert one // key does not exist in hash table, create and insert one
enif_mutex_lock(conn_handle->contexts_mutex); enif_mutex_lock(conn_handle->contexts_mutex);
WT_SESSION *session = conn_handle->contexts[worker_id].session; WT_SESSION *session = conn_handle->contexts[worker_id].session;
rc = session->open_cursor(session, uri, NULL, "overwrite,raw", cursor); rc = session->open_cursor(session, uri, NULL, "overwrite,raw", cursor);
if (rc != 0) { if (rc != 0) {
enif_mutex_unlock(conn_handle->contexts_mutex); enif_mutex_unlock(conn_handle->contexts_mutex);
return rc; return rc;
} }
char *key = enif_alloc(sizeof(Uri)); char *key = enif_alloc(sizeof(Uri));
@ -243,7 +243,7 @@ __retain_cursor(WterlConnHandle *conn_handle, unsigned int worker_id, const char
memcpy(key, uri, 128); memcpy(key, uri, 128);
int itr_status; int itr_status;
itr = kh_put(cursors, h, key, &itr_status); itr = kh_put(cursors, h, key, &itr_status);
kh_value(h, itr) = *cursor; kh_value(h, itr) = *cursor;
enif_mutex_unlock(conn_handle->contexts_mutex); enif_mutex_unlock(conn_handle->contexts_mutex);
} }
return 0; return 0;
@ -266,14 +266,14 @@ static ERL_NIF_TERM
__strerror_term(ErlNifEnv* env, int rc) __strerror_term(ErlNifEnv* env, int rc)
{ {
if (rc == WT_NOTFOUND) { if (rc == WT_NOTFOUND) {
return ATOM_NOT_FOUND; return ATOM_NOT_FOUND;
} else { } else {
/* We return the errno value as well as the message here because the /* We return the errno value as well as the message here because the
error message provided by strerror() for differ across platforms error message provided by strerror() for differ across platforms
and/or may be localized to any given language (i18n). Use the errno and/or may be localized to any given language (i18n). Use the errno
atom rather than the message when matching in Erlang. You've been atom rather than the message when matching in Erlang. You've been
warned. */ warned. */
return enif_make_tuple2(env, ATOM_ERROR, return enif_make_tuple2(env, ATOM_ERROR,
enif_make_tuple2(env, enif_make_tuple2(env,
enif_make_atom(env, erl_errno_id(rc)), enif_make_atom(env, erl_errno_id(rc)),
enif_make_string(env, wiredtiger_strerror(rc), ERL_NIF_LATIN1))); enif_make_string(env, wiredtiger_strerror(rc), ERL_NIF_LATIN1)));
@ -470,8 +470,8 @@ ASYNC_NIF_DECL(
WT_SESSION *session = NULL; WT_SESSION *session = NULL;
int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session);
if (rc != 0) { if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc)); ASYNC_NIF_REPLY(__strerror_term(env, rc));
return; return;
} }
rc = session->create(session, args->uri, (const char*)config.data); rc = session->create(session, args->uri, (const char*)config.data);
@ -529,9 +529,9 @@ ASYNC_NIF_DECL(
WT_SESSION *session = NULL; WT_SESSION *session = NULL;
int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session);
if (rc != 0) { if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc)); ASYNC_NIF_REPLY(__strerror_term(env, rc));
enif_mutex_unlock(args->conn_handle->contexts_mutex); enif_mutex_unlock(args->conn_handle->contexts_mutex);
return; return;
} }
/* Note: we locked the context mutex and called __close_cursors_on() /* Note: we locked the context mutex and called __close_cursors_on()
earlier so that we are sure that before we call into WiredTiger we have earlier so that we are sure that before we call into WiredTiger we have
@ -595,8 +595,8 @@ ASYNC_NIF_DECL(
WT_SESSION *session = NULL; WT_SESSION *session = NULL;
int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session);
if (rc != 0) { if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc)); ASYNC_NIF_REPLY(__strerror_term(env, rc));
return; return;
} }
/* Note: we locked the context mutex and called __close_cursors_on() /* Note: we locked the context mutex and called __close_cursors_on()
@ -661,8 +661,8 @@ ASYNC_NIF_DECL(
WT_SESSION *session = NULL; WT_SESSION *session = NULL;
int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session);
if (rc != 0) { if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc)); ASYNC_NIF_REPLY(__strerror_term(env, rc));
return; return;
} }
rc = session->salvage(session, args->uri, (const char*)config.data); rc = session->salvage(session, args->uri, (const char*)config.data);
@ -710,8 +710,8 @@ ASYNC_NIF_DECL(
WT_SESSION *session = NULL; WT_SESSION *session = NULL;
int rc = __session_for(args->conn_handle, worker_id, &session); int rc = __session_for(args->conn_handle, worker_id, &session);
if (rc != 0) { if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc)); ASYNC_NIF_REPLY(__strerror_term(env, rc));
return; return;
} }
rc = session->checkpoint(session, (const char*)config.data); rc = session->checkpoint(session, (const char*)config.data);
ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc));
@ -752,7 +752,7 @@ ASYNC_NIF_DECL(
} }
if (enif_is_binary(env, argv[2])) { if (enif_is_binary(env, argv[2])) {
args->from_first = 0; args->from_first = 0;
args->start = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); args->start = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]);
} else if (enif_is_atom(env, argv[2])) { // TODO && argv[2] == ATOM_FIRST) { } else if (enif_is_atom(env, argv[2])) { // TODO && argv[2] == ATOM_FIRST) {
args->from_first = 1; args->from_first = 1;
args->start = 0; args->start = 0;
@ -761,7 +761,7 @@ ASYNC_NIF_DECL(
} }
if (enif_is_binary(env, argv[3])) { if (enif_is_binary(env, argv[3])) {
args->to_last = 0; args->to_last = 0;
args->stop = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[3]); args->stop = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[3]);
} else if (enif_is_atom(env, argv[3])) { // TODO && argv[3] == ATOM_LAST) { } else if (enif_is_atom(env, argv[3])) { // TODO && argv[3] == ATOM_LAST) {
args->to_last = 1; args->to_last = 1;
args->stop = 0; args->stop = 0;
@ -793,9 +793,9 @@ ASYNC_NIF_DECL(
WT_SESSION *session = NULL; WT_SESSION *session = NULL;
int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session);
if (rc != 0) { if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc)); ASYNC_NIF_REPLY(__strerror_term(env, rc));
enif_mutex_unlock(args->conn_handle->contexts_mutex); enif_mutex_unlock(args->conn_handle->contexts_mutex);
return; return;
} }
ErlNifBinary start_key; ErlNifBinary start_key;
@ -831,10 +831,10 @@ ASYNC_NIF_DECL(
return; return;
} }
} else { } else {
WT_ITEM item_start; WT_ITEM item_start;
item_start.data = start_key.data; item_start.data = start_key.data;
item_start.size = start_key.size; item_start.size = start_key.size;
start->set_key(start, item_start); start->set_key(start, item_start);
rc = start->search(start); rc = start->search(start);
if (rc != 0) { if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc)); ASYNC_NIF_REPLY(__strerror_term(env, rc));
@ -872,10 +872,10 @@ ASYNC_NIF_DECL(
return; return;
} }
} else { } else {
WT_ITEM item_stop; WT_ITEM item_stop;
item_stop.data = stop_key.data; item_stop.data = stop_key.data;
item_stop.size = stop_key.size; item_stop.size = stop_key.size;
stop->set_key(stop, item_stop); stop->set_key(stop, item_stop);
rc = stop->search(stop); rc = stop->search(stop);
if (rc != 0) { if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc)); ASYNC_NIF_REPLY(__strerror_term(env, rc));
@ -948,9 +948,9 @@ ASYNC_NIF_DECL(
WT_SESSION *session = NULL; WT_SESSION *session = NULL;
int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session);
if (rc != 0) { if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc)); ASYNC_NIF_REPLY(__strerror_term(env, rc));
enif_mutex_unlock(args->conn_handle->contexts_mutex); enif_mutex_unlock(args->conn_handle->contexts_mutex);
return; return;
} }
rc = session->upgrade(session, args->uri, (const char*)config.data); rc = session->upgrade(session, args->uri, (const char*)config.data);
@ -1010,9 +1010,9 @@ ASYNC_NIF_DECL(
WT_SESSION *session = NULL; WT_SESSION *session = NULL;
int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session);
if (rc != 0) { if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc)); ASYNC_NIF_REPLY(__strerror_term(env, rc));
enif_mutex_unlock(args->conn_handle->contexts_mutex); enif_mutex_unlock(args->conn_handle->contexts_mutex);
return; return;
} }
rc = session->verify(session, args->uri, (const char*)config.data); rc = session->verify(session, args->uri, (const char*)config.data);
@ -1063,8 +1063,8 @@ ASYNC_NIF_DECL(
WT_CURSOR *cursor = NULL; WT_CURSOR *cursor = NULL;
int rc = __retain_cursor(args->conn_handle, worker_id, args->uri, &cursor); int rc = __retain_cursor(args->conn_handle, worker_id, args->uri, &cursor);
if (rc != 0) { if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc)); ASYNC_NIF_REPLY(__strerror_term(env, rc));
return; return;
} }
WT_ITEM item_key; WT_ITEM item_key;
@ -1118,8 +1118,8 @@ ASYNC_NIF_DECL(
WT_CURSOR *cursor = NULL; WT_CURSOR *cursor = NULL;
int rc = __retain_cursor(args->conn_handle, worker_id, args->uri, &cursor); int rc = __retain_cursor(args->conn_handle, worker_id, args->uri, &cursor);
if (rc != 0) { if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc)); ASYNC_NIF_REPLY(__strerror_term(env, rc));
return; return;
} }
WT_ITEM item_key; WT_ITEM item_key;
@ -1129,14 +1129,14 @@ ASYNC_NIF_DECL(
cursor->set_key(cursor, &item_key); cursor->set_key(cursor, &item_key);
rc = cursor->search(cursor); rc = cursor->search(cursor);
if (rc != 0) { if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc)); ASYNC_NIF_REPLY(__strerror_term(env, rc));
return; return;
} }
rc = cursor->get_value(cursor, &item_value); rc = cursor->get_value(cursor, &item_value);
if (rc != 0) { if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc)); ASYNC_NIF_REPLY(__strerror_term(env, rc));
return; return;
} }
ERL_NIF_TERM value; ERL_NIF_TERM value;
unsigned char *bin = enif_make_new_binary(env, item_value.size, &value); unsigned char *bin = enif_make_new_binary(env, item_value.size, &value);
@ -1196,8 +1196,8 @@ ASYNC_NIF_DECL(
WT_CURSOR *cursor = NULL; WT_CURSOR *cursor = NULL;
int rc = __retain_cursor(args->conn_handle, worker_id, args->uri, &cursor); int rc = __retain_cursor(args->conn_handle, worker_id, args->uri, &cursor);
if (rc != 0) { if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc)); ASYNC_NIF_REPLY(__strerror_term(env, rc));
return; return;
} }
WT_ITEM item_key; WT_ITEM item_key;
@ -1248,8 +1248,8 @@ ASYNC_NIF_DECL(
ErlNifBinary config; ErlNifBinary config;
if (!enif_inspect_binary(env, args->config, &config)) { if (!enif_inspect_binary(env, args->config, &config)) {
ASYNC_NIF_REPLY(enif_make_badarg(env)); ASYNC_NIF_REPLY(enif_make_badarg(env));
return; return;
} }
/* We create a separate session here to ensure that operations are thread safe. */ /* We create a separate session here to ensure that operations are thread safe. */
@ -1257,18 +1257,25 @@ ASYNC_NIF_DECL(
WT_SESSION *session = NULL; WT_SESSION *session = NULL;
int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session);
if (rc != 0) { if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc)); ASYNC_NIF_REPLY(__strerror_term(env, rc));
return; return;
} }
WT_CURSOR* cursor; WT_CURSOR* cursor;
rc = session->open_cursor(session, args->uri, NULL, (config.data[0] != 0) ? (char *)config.data : "overwrite,raw", &cursor); rc = session->open_cursor(session, args->uri, NULL, (config.data[0] != 0) ? (char *)config.data : "overwrite,raw", &cursor);
if (rc != 0) { if (rc != 0) {
session->close(session, NULL);
ASYNC_NIF_REPLY(__strerror_term(env, rc)); ASYNC_NIF_REPLY(__strerror_term(env, rc));
return; return;
} }
WterlCursorHandle* cursor_handle = enif_alloc_resource(wterl_cursor_RESOURCE, sizeof(WterlCursorHandle)); WterlCursorHandle* cursor_handle = enif_alloc_resource(wterl_cursor_RESOURCE, sizeof(WterlCursorHandle));
if (!cursor_handle) {
cursor->close(cursor);
session->close(session, NULL);
ASYNC_NIF_REPLY(__strerror_term(env, ENOMEM));
return;
}
cursor_handle->session = session; cursor_handle->session = session;
cursor_handle->cursor = cursor; cursor_handle->cursor = cursor;
ERL_NIF_TERM result = enif_make_resource(env, cursor_handle); ERL_NIF_TERM result = enif_make_resource(env, cursor_handle);
@ -1304,9 +1311,10 @@ ASYNC_NIF_DECL(
WT_CURSOR *cursor = args->cursor_handle->cursor; WT_CURSOR *cursor = args->cursor_handle->cursor;
WT_SESSION *session = args->cursor_handle->session; WT_SESSION *session = args->cursor_handle->session;
/* Note: session->close() will cause all open cursors in the session to be /* Note: session->close() will cause all open cursors in the session to be
closed first, so we don't have explicitly to do that here. */ closed first, so we don't have explicitly to do that here.
int rc = cursor->close(cursor); rc = cursor->close(cursor);
(void)session->close(session, NULL); */
int rc = session->close(session, NULL);
ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc));
}, },
{ // post { // post
@ -1318,13 +1326,13 @@ static ERL_NIF_TERM
__cursor_key_ret(ErlNifEnv *env, WT_CURSOR *cursor, int rc) __cursor_key_ret(ErlNifEnv *env, WT_CURSOR *cursor, int rc)
{ {
if (rc == 0) { if (rc == 0) {
WT_ITEM item_key; WT_ITEM item_key;
rc = cursor->get_key(cursor, &item_key); rc = cursor->get_key(cursor, &item_key);
if (rc == 0) { if (rc == 0) {
ERL_NIF_TERM key; ERL_NIF_TERM key;
memcpy(enif_make_new_binary(env, item_key.size, &key), item_key.data, item_key.size); memcpy(enif_make_new_binary(env, item_key.size, &key), item_key.data, item_key.size);
return enif_make_tuple2(env, ATOM_OK, key); return enif_make_tuple2(env, ATOM_OK, key);
} }
} }
return __strerror_term(env, rc); return __strerror_term(env, rc);
} }
@ -1878,9 +1886,9 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info)
{ {
ErlNifResourceFlags flags = ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER; ErlNifResourceFlags flags = ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER;
wterl_conn_RESOURCE = enif_open_resource_type(env, NULL, "wterl_conn_resource", wterl_conn_RESOURCE = enif_open_resource_type(env, NULL, "wterl_conn_resource",
NULL, flags, NULL); NULL, flags, NULL);
wterl_cursor_RESOURCE = enif_open_resource_type(env, NULL, "wterl_cursor_resource", wterl_cursor_RESOURCE = enif_open_resource_type(env, NULL, "wterl_cursor_resource",
NULL, flags, NULL); NULL, flags, NULL);
ATOM_ERROR = enif_make_atom(env, "error"); ATOM_ERROR = enif_make_atom(env, "error");
ATOM_OK = enif_make_atom(env, "ok"); ATOM_OK = enif_make_atom(env, "ok");

View file

@ -310,7 +310,7 @@ fold_objects(FoldObjectsFun, Acc, Opts, #state{connection=Connection, table=Tabl
%% @doc Delete all objects from this wterl backend %% @doc Delete all objects from this wterl backend
-spec drop(state()) -> {ok, state()} | {error, term(), state()}. -spec drop(state()) -> {ok, state()} | {error, term(), state()}.
drop(#state{connection=Connection, table=Table}=State) -> drop(#state{connection=Connection, table=Table}=State) ->
case wterl:truncate(Connection, Table) of case wterl:drop(Connection, Table) of
ok -> ok ->
{ok, State}; {ok, State};
Error -> Error ->
@ -355,7 +355,7 @@ max_sessions(Config) ->
undefined -> 1024; undefined -> 1024;
Size -> Size Size -> Size
end, end,
Est = 1000 * (RingSize * erlang:system_info(schedulers)), % TODO: review/fix this logic Est = 100 * (RingSize * erlang:system_info(schedulers)), % TODO: review/fix this logic
case Est > 1000000000 of % Note: WiredTiger uses a signed int for this case Est > 1000000000 of % Note: WiredTiger uses a signed int for this
true -> 1000000000; true -> 1000000000;
false -> Est false -> Est

View file

@ -147,7 +147,7 @@ create_nif(_AsyncNif, _Ref, _Name, _Config) ->
-spec drop(connection(), string()) -> ok | {error, term()}. -spec drop(connection(), string()) -> ok | {error, term()}.
-spec drop(connection(), string(), config_list()) -> ok | {error, term()}. -spec drop(connection(), string(), config_list()) -> ok | {error, term()}.
drop(Ref, Name) -> drop(Ref, Name) ->
drop(Ref, Name, []). drop(Ref, Name, [{force, true}]).
drop(Ref, Name, Config) -> drop(Ref, Name, Config) ->
?ASYNC_NIF_CALL(fun drop_nif/4, [Ref, Name, config_to_bin(Config)]). ?ASYNC_NIF_CALL(fun drop_nif/4, [Ref, Name, config_to_bin(Config)]).