diff --git a/c_src/wterl.c b/c_src/wterl.c index 764db6d..748e39f 100644 --- a/c_src/wterl.c +++ b/c_src/wterl.c @@ -222,16 +222,16 @@ __retain_cursor(WterlConnHandle *conn_handle, unsigned int worker_id, const char h = ctx->cursors; itr = kh_get(cursors, h, (char *)uri); if (itr != kh_end(h)) { - // key exists in hash table, retrieve it - *cursor = (WT_CURSOR*)kh_value(h, itr); + // key exists in hash table, retrieve it + *cursor = (WT_CURSOR*)kh_value(h, itr); } else { - // key does not exist in hash table, create and insert one + // key does not exist in hash table, create and insert one enif_mutex_lock(conn_handle->contexts_mutex); - WT_SESSION *session = conn_handle->contexts[worker_id].session; - rc = session->open_cursor(session, uri, NULL, "overwrite,raw", cursor); - if (rc != 0) { + WT_SESSION *session = conn_handle->contexts[worker_id].session; + rc = session->open_cursor(session, uri, NULL, "overwrite,raw", cursor); + if (rc != 0) { enif_mutex_unlock(conn_handle->contexts_mutex); - return rc; + return rc; } char *key = enif_alloc(sizeof(Uri)); @@ -243,7 +243,7 @@ __retain_cursor(WterlConnHandle *conn_handle, unsigned int worker_id, const char memcpy(key, uri, 128); int itr_status; itr = kh_put(cursors, h, key, &itr_status); - kh_value(h, itr) = *cursor; + kh_value(h, itr) = *cursor; enif_mutex_unlock(conn_handle->contexts_mutex); } return 0; @@ -266,14 +266,14 @@ static ERL_NIF_TERM __strerror_term(ErlNifEnv* env, int rc) { if (rc == WT_NOTFOUND) { - return ATOM_NOT_FOUND; + return ATOM_NOT_FOUND; } else { /* We return the errno value as well as the message here because the error message provided by strerror() for differ across platforms and/or may be localized to any given language (i18n). Use the errno atom rather than the message when matching in Erlang. You've been warned. */ - return enif_make_tuple2(env, ATOM_ERROR, + return enif_make_tuple2(env, ATOM_ERROR, enif_make_tuple2(env, enif_make_atom(env, erl_errno_id(rc)), enif_make_string(env, wiredtiger_strerror(rc), ERL_NIF_LATIN1))); @@ -470,8 +470,8 @@ ASYNC_NIF_DECL( WT_SESSION *session = NULL; int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); if (rc != 0) { - ASYNC_NIF_REPLY(__strerror_term(env, rc)); - return; + ASYNC_NIF_REPLY(__strerror_term(env, rc)); + return; } rc = session->create(session, args->uri, (const char*)config.data); @@ -529,9 +529,9 @@ ASYNC_NIF_DECL( WT_SESSION *session = NULL; int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); if (rc != 0) { - ASYNC_NIF_REPLY(__strerror_term(env, rc)); + ASYNC_NIF_REPLY(__strerror_term(env, rc)); enif_mutex_unlock(args->conn_handle->contexts_mutex); - return; + return; } /* Note: we locked the context mutex and called __close_cursors_on() earlier so that we are sure that before we call into WiredTiger we have @@ -595,8 +595,8 @@ ASYNC_NIF_DECL( WT_SESSION *session = NULL; int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); if (rc != 0) { - ASYNC_NIF_REPLY(__strerror_term(env, rc)); - return; + ASYNC_NIF_REPLY(__strerror_term(env, rc)); + return; } /* Note: we locked the context mutex and called __close_cursors_on() @@ -661,8 +661,8 @@ ASYNC_NIF_DECL( WT_SESSION *session = NULL; int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); if (rc != 0) { - ASYNC_NIF_REPLY(__strerror_term(env, rc)); - return; + ASYNC_NIF_REPLY(__strerror_term(env, rc)); + return; } rc = session->salvage(session, args->uri, (const char*)config.data); @@ -710,8 +710,8 @@ ASYNC_NIF_DECL( WT_SESSION *session = NULL; int rc = __session_for(args->conn_handle, worker_id, &session); if (rc != 0) { - ASYNC_NIF_REPLY(__strerror_term(env, rc)); - return; + ASYNC_NIF_REPLY(__strerror_term(env, rc)); + return; } rc = session->checkpoint(session, (const char*)config.data); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); @@ -752,7 +752,7 @@ ASYNC_NIF_DECL( } if (enif_is_binary(env, argv[2])) { args->from_first = 0; - args->start = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); + args->start = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); } else if (enif_is_atom(env, argv[2])) { // TODO && argv[2] == ATOM_FIRST) { args->from_first = 1; args->start = 0; @@ -761,7 +761,7 @@ ASYNC_NIF_DECL( } if (enif_is_binary(env, argv[3])) { args->to_last = 0; - args->stop = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[3]); + args->stop = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[3]); } else if (enif_is_atom(env, argv[3])) { // TODO && argv[3] == ATOM_LAST) { args->to_last = 1; args->stop = 0; @@ -793,9 +793,9 @@ ASYNC_NIF_DECL( WT_SESSION *session = NULL; int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); if (rc != 0) { - ASYNC_NIF_REPLY(__strerror_term(env, rc)); + ASYNC_NIF_REPLY(__strerror_term(env, rc)); enif_mutex_unlock(args->conn_handle->contexts_mutex); - return; + return; } ErlNifBinary start_key; @@ -831,10 +831,10 @@ ASYNC_NIF_DECL( return; } } else { - WT_ITEM item_start; - item_start.data = start_key.data; - item_start.size = start_key.size; - start->set_key(start, item_start); + WT_ITEM item_start; + item_start.data = start_key.data; + item_start.size = start_key.size; + start->set_key(start, item_start); rc = start->search(start); if (rc != 0) { ASYNC_NIF_REPLY(__strerror_term(env, rc)); @@ -872,10 +872,10 @@ ASYNC_NIF_DECL( return; } } else { - WT_ITEM item_stop; - item_stop.data = stop_key.data; - item_stop.size = stop_key.size; - stop->set_key(stop, item_stop); + WT_ITEM item_stop; + item_stop.data = stop_key.data; + item_stop.size = stop_key.size; + stop->set_key(stop, item_stop); rc = stop->search(stop); if (rc != 0) { ASYNC_NIF_REPLY(__strerror_term(env, rc)); @@ -948,9 +948,9 @@ ASYNC_NIF_DECL( WT_SESSION *session = NULL; int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); if (rc != 0) { - ASYNC_NIF_REPLY(__strerror_term(env, rc)); + ASYNC_NIF_REPLY(__strerror_term(env, rc)); enif_mutex_unlock(args->conn_handle->contexts_mutex); - return; + return; } rc = session->upgrade(session, args->uri, (const char*)config.data); @@ -1010,9 +1010,9 @@ ASYNC_NIF_DECL( WT_SESSION *session = NULL; int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); if (rc != 0) { - ASYNC_NIF_REPLY(__strerror_term(env, rc)); + ASYNC_NIF_REPLY(__strerror_term(env, rc)); enif_mutex_unlock(args->conn_handle->contexts_mutex); - return; + return; } rc = session->verify(session, args->uri, (const char*)config.data); @@ -1063,8 +1063,8 @@ ASYNC_NIF_DECL( WT_CURSOR *cursor = NULL; int rc = __retain_cursor(args->conn_handle, worker_id, args->uri, &cursor); if (rc != 0) { - ASYNC_NIF_REPLY(__strerror_term(env, rc)); - return; + ASYNC_NIF_REPLY(__strerror_term(env, rc)); + return; } WT_ITEM item_key; @@ -1118,8 +1118,8 @@ ASYNC_NIF_DECL( WT_CURSOR *cursor = NULL; int rc = __retain_cursor(args->conn_handle, worker_id, args->uri, &cursor); if (rc != 0) { - ASYNC_NIF_REPLY(__strerror_term(env, rc)); - return; + ASYNC_NIF_REPLY(__strerror_term(env, rc)); + return; } WT_ITEM item_key; @@ -1129,14 +1129,14 @@ ASYNC_NIF_DECL( cursor->set_key(cursor, &item_key); rc = cursor->search(cursor); if (rc != 0) { - ASYNC_NIF_REPLY(__strerror_term(env, rc)); - return; + ASYNC_NIF_REPLY(__strerror_term(env, rc)); + return; } rc = cursor->get_value(cursor, &item_value); if (rc != 0) { - ASYNC_NIF_REPLY(__strerror_term(env, rc)); - return; + ASYNC_NIF_REPLY(__strerror_term(env, rc)); + return; } ERL_NIF_TERM value; unsigned char *bin = enif_make_new_binary(env, item_value.size, &value); @@ -1196,8 +1196,8 @@ ASYNC_NIF_DECL( WT_CURSOR *cursor = NULL; int rc = __retain_cursor(args->conn_handle, worker_id, args->uri, &cursor); if (rc != 0) { - ASYNC_NIF_REPLY(__strerror_term(env, rc)); - return; + ASYNC_NIF_REPLY(__strerror_term(env, rc)); + return; } WT_ITEM item_key; @@ -1248,8 +1248,8 @@ ASYNC_NIF_DECL( ErlNifBinary config; if (!enif_inspect_binary(env, args->config, &config)) { - ASYNC_NIF_REPLY(enif_make_badarg(env)); - return; + ASYNC_NIF_REPLY(enif_make_badarg(env)); + return; } /* We create a separate session here to ensure that operations are thread safe. */ @@ -1257,18 +1257,25 @@ ASYNC_NIF_DECL( WT_SESSION *session = NULL; int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); if (rc != 0) { - ASYNC_NIF_REPLY(__strerror_term(env, rc)); - return; + ASYNC_NIF_REPLY(__strerror_term(env, rc)); + return; } WT_CURSOR* cursor; rc = session->open_cursor(session, args->uri, NULL, (config.data[0] != 0) ? (char *)config.data : "overwrite,raw", &cursor); if (rc != 0) { + session->close(session, NULL); ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; } WterlCursorHandle* cursor_handle = enif_alloc_resource(wterl_cursor_RESOURCE, sizeof(WterlCursorHandle)); + if (!cursor_handle) { + cursor->close(cursor); + session->close(session, NULL); + ASYNC_NIF_REPLY(__strerror_term(env, ENOMEM)); + return; + } cursor_handle->session = session; cursor_handle->cursor = cursor; ERL_NIF_TERM result = enif_make_resource(env, cursor_handle); @@ -1304,9 +1311,10 @@ ASYNC_NIF_DECL( WT_CURSOR *cursor = args->cursor_handle->cursor; WT_SESSION *session = args->cursor_handle->session; /* Note: session->close() will cause all open cursors in the session to be - closed first, so we don't have explicitly to do that here. */ - int rc = cursor->close(cursor); - (void)session->close(session, NULL); + closed first, so we don't have explicitly to do that here. + rc = cursor->close(cursor); + */ + int rc = session->close(session, NULL); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); }, { // post @@ -1318,13 +1326,13 @@ static ERL_NIF_TERM __cursor_key_ret(ErlNifEnv *env, WT_CURSOR *cursor, int rc) { if (rc == 0) { - WT_ITEM item_key; - rc = cursor->get_key(cursor, &item_key); - if (rc == 0) { - ERL_NIF_TERM key; - memcpy(enif_make_new_binary(env, item_key.size, &key), item_key.data, item_key.size); - return enif_make_tuple2(env, ATOM_OK, key); - } + WT_ITEM item_key; + rc = cursor->get_key(cursor, &item_key); + if (rc == 0) { + ERL_NIF_TERM key; + memcpy(enif_make_new_binary(env, item_key.size, &key), item_key.data, item_key.size); + return enif_make_tuple2(env, ATOM_OK, key); + } } return __strerror_term(env, rc); } @@ -1878,9 +1886,9 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info) { ErlNifResourceFlags flags = ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER; wterl_conn_RESOURCE = enif_open_resource_type(env, NULL, "wterl_conn_resource", - NULL, flags, NULL); + NULL, flags, NULL); wterl_cursor_RESOURCE = enif_open_resource_type(env, NULL, "wterl_cursor_resource", - NULL, flags, NULL); + NULL, flags, NULL); ATOM_ERROR = enif_make_atom(env, "error"); ATOM_OK = enif_make_atom(env, "ok"); diff --git a/src/riak_kv_wterl_backend.erl b/src/riak_kv_wterl_backend.erl index 42f4dd0..378da4f 100644 --- a/src/riak_kv_wterl_backend.erl +++ b/src/riak_kv_wterl_backend.erl @@ -310,7 +310,7 @@ fold_objects(FoldObjectsFun, Acc, Opts, #state{connection=Connection, table=Tabl %% @doc Delete all objects from this wterl backend -spec drop(state()) -> {ok, state()} | {error, term(), state()}. drop(#state{connection=Connection, table=Table}=State) -> - case wterl:truncate(Connection, Table) of + case wterl:drop(Connection, Table) of ok -> {ok, State}; Error -> @@ -355,7 +355,7 @@ max_sessions(Config) -> undefined -> 1024; Size -> Size end, - Est = 1000 * (RingSize * erlang:system_info(schedulers)), % TODO: review/fix this logic + Est = 100 * (RingSize * erlang:system_info(schedulers)), % TODO: review/fix this logic case Est > 1000000000 of % Note: WiredTiger uses a signed int for this true -> 1000000000; false -> Est diff --git a/src/wterl.erl b/src/wterl.erl index f6da109..b14cb8c 100644 --- a/src/wterl.erl +++ b/src/wterl.erl @@ -147,7 +147,7 @@ create_nif(_AsyncNif, _Ref, _Name, _Config) -> -spec drop(connection(), string()) -> ok | {error, term()}. -spec drop(connection(), string(), config_list()) -> ok | {error, term()}. drop(Ref, Name) -> - drop(Ref, Name, []). + drop(Ref, Name, [{force, true}]). drop(Ref, Name, Config) -> ?ASYNC_NIF_CALL(fun drop_nif/4, [Ref, Name, config_to_bin(Config)]).