Execute NIF calls on non-scheduler threads asynchronously #6

Merged
gburd merged 72 commits from gsb-async-nifs3 into master 2013-04-23 00:54:56 +00:00
4 changed files with 184 additions and 103 deletions
Showing only changes of commit 07061ed6e8 - Show all commits

View file

@ -81,23 +81,48 @@ __session_for(WterlConnHandle *conn_handle, unsigned int worker_id, WT_SESSION *
*session = ctx->session;
if (*session == NULL) {
/* Create a context for this worker thread to reuse. */
enif_mutex_lock(conn_handle->context_mutex);
WT_CONNECTION *conn = conn_handle->conn;
int rc = conn->open_session(conn, NULL, conn_handle->session_config, session);
if (rc != 0)
return rc;
ctx->session = *session;
ctx->cursors = kh_init(cursors);
enif_mutex_unlock(conn_handle->context_mutex);
}
return 0;
}
/**
* Close cursors open on 'uri' object.
*
* Note: always call within enif_mutex_lock/unlock(conn_handle->context_mutex)
*/
void
__close_cursors_on(WterlConnHandle *conn_handle, const char *uri) // TODO: race?
{
int i;
for (i = 0; i < conn_handle->num_contexts; i++) {
WterlCtx *ctx = &conn_handle->contexts[i];
khash_t(cursors) *h = ctx->cursors;
khiter_t itr = kh_get(cursors, h, uri);
if (itr != kh_end(h)) {
WT_CURSOR *cursor = (WT_CURSOR*)kh_value(h, itr);
kh_del(cursors, h, itr);
cursor->close(cursor);
//fprintf(stderr, "closing worker_id: %d 0x%p %s\n", i, cursor, uri); fflush(stderr);
}
}
}
/**
* Get a reusable cursor that was opened for a particular worker within its
* session.
*/
static int
__cursor_for(WterlConnHandle *conn_handle, unsigned int worker_id, const char *uri, WT_CURSOR **cursor)
__retain_cursor(WterlConnHandle *conn_handle, unsigned int worker_id, const char *uri, WT_CURSOR **cursor)
{
/* Check to see if we have a cursor open for this uri and if so reuse it. */
WterlCtx *ctx = &conn_handle->contexts[worker_id];
khash_t(cursors) *h = ctx->cursors;
khiter_t itr = kh_get(cursors, h, uri);
@ -106,6 +131,7 @@ __cursor_for(WterlConnHandle *conn_handle, unsigned int worker_id, const char *u
*cursor = (WT_CURSOR*)kh_value(h, itr);
} else {
// key does not exist in hash table, create and insert one
enif_mutex_lock(conn_handle->context_mutex);
WT_SESSION *session = conn_handle->contexts[worker_id].session;
int rc = session->open_cursor(session, uri, NULL, "overwrite,raw", cursor);
if (rc != 0)
@ -113,10 +139,17 @@ __cursor_for(WterlConnHandle *conn_handle, unsigned int worker_id, const char *u
int itr_status;
itr = kh_put(cursors, h, uri, &itr_status);
kh_value(h, itr) = *cursor;
enif_mutex_unlock(conn_handle->context_mutex);
}
return 0;
}
static void
__release_cursor(WterlConnHandle *conn_handle, unsigned int worker_id, const char *uri, WT_CURSOR *cursor)
{
cursor->reset(cursor);
}
/**
* Convenience function to generate {error, Reason} or 'not_found'
* Erlang terms to return to callers.
@ -315,6 +348,10 @@ ASYNC_NIF_DECL(
},
{ // work
/* This call requires that there be no open cursors referencing the object. */
enif_mutex_lock(args->conn_handle->context_mutex);
__close_cursors_on(args->conn_handle, args->uri);
ErlNifBinary config;
if (!enif_inspect_binary(env, args->config, &config)) {
ASYNC_NIF_REPLY(enif_make_badarg(env));
@ -342,6 +379,7 @@ ASYNC_NIF_DECL(
rc = session->drop(session, args->uri, (const char*)config.data);
ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc));
(void)session->close(session, NULL);
enif_mutex_unlock(args->conn_handle->context_mutex);
},
{ // post
@ -379,6 +417,10 @@ ASYNC_NIF_DECL(
},
{ // work
/* This call requires that there be no open cursors referencing the object. */
enif_mutex_lock(args->conn_handle->context_mutex);
__close_cursors_on(args->conn_handle, args->oldname);
ErlNifBinary config;
if (!enif_inspect_binary(env, args->config, &config)) {
ASYNC_NIF_REPLY(enif_make_badarg(env));
@ -402,6 +444,7 @@ ASYNC_NIF_DECL(
rc = session->rename(session, args->oldname, args->newname, (const char*)config.data);
ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc));
(void)session->close(session, NULL);
enif_mutex_unlock(args->conn_handle->context_mutex);
},
{ // post
@ -439,6 +482,10 @@ ASYNC_NIF_DECL(
},
{ // work
/* This call requires that there be no open cursors referencing the object. */
enif_mutex_lock(args->conn_handle->context_mutex);
__close_cursors_on(args->conn_handle, args->uri);
ErlNifBinary config;
if (!enif_inspect_binary(env, args->config, &config)) {
ASYNC_NIF_REPLY(enif_make_badarg(env));
@ -459,6 +506,7 @@ ASYNC_NIF_DECL(
rc = session->salvage(session, args->uri, (const char*)config.data);
ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc));
(void)session->close(session, NULL);
enif_mutex_unlock(args->conn_handle->context_mutex);
},
{ // post
@ -549,6 +597,10 @@ ASYNC_NIF_DECL(
},
{ // work
/* This call requires that there be no open cursors referencing the object. */
enif_mutex_lock(args->conn_handle->context_mutex);
__close_cursors_on(args->conn_handle, args->uri);
ErlNifBinary config;
if (!enif_inspect_binary(env, args->config, &config)) {
ASYNC_NIF_REPLY(enif_make_badarg(env));
@ -607,6 +659,7 @@ ASYNC_NIF_DECL(
rc = session->truncate(session, args->uri, start, stop, (const char*)config.data);
ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc));
enif_mutex_unlock(args->conn_handle->context_mutex);
},
{ // post
@ -641,6 +694,10 @@ ASYNC_NIF_DECL(
},
{ // work
/* This call requires that there be no open cursors referencing the object. */
enif_mutex_lock(args->conn_handle->context_mutex);
__close_cursors_on(args->conn_handle, args->uri);
ErlNifBinary config;
if (!enif_inspect_binary(env, args->config, &config)) {
ASYNC_NIF_REPLY(enif_make_badarg(env));
@ -661,6 +718,7 @@ ASYNC_NIF_DECL(
rc = session->upgrade(session, args->uri, (const char*)config.data);
ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc));
(void)session->close(session, NULL);
enif_mutex_unlock(args->conn_handle->context_mutex);
},
{ // post
@ -696,6 +754,10 @@ ASYNC_NIF_DECL(
},
{ // work
/* This call requires that there be no open cursors referencing the object. */
enif_mutex_lock(args->conn_handle->context_mutex);
__close_cursors_on(args->conn_handle, args->uri);
ErlNifBinary config;
if (!enif_inspect_binary(env, args->config, &config)) {
ASYNC_NIF_REPLY(enif_make_badarg(env));
@ -716,6 +778,7 @@ ASYNC_NIF_DECL(
rc = session->verify(session, args->uri, (const char*)config.data);
ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc));
(void)session->close(session, NULL);
enif_mutex_unlock(args->conn_handle->context_mutex);
},
{ // post
@ -764,7 +827,7 @@ ASYNC_NIF_DECL(
}
WT_CURSOR *cursor = NULL;
rc = __cursor_for(args->conn_handle, worker_id, args->uri, &cursor);
rc = __retain_cursor(args->conn_handle, worker_id, args->uri, &cursor);
if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc));
return;
@ -776,7 +839,7 @@ ASYNC_NIF_DECL(
cursor->set_key(cursor, &item_key);
rc = cursor->remove(cursor);
ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc));
cursor->reset(cursor);
__release_cursor(args->conn_handle, worker_id, args->uri, cursor);
},
{ // post
@ -825,7 +888,7 @@ ASYNC_NIF_DECL(
}
WT_CURSOR *cursor = NULL;
rc = __cursor_for(args->conn_handle, worker_id, args->uri, &cursor);
rc = __retain_cursor(args->conn_handle, worker_id, args->uri, &cursor);
if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc));
return;
@ -851,7 +914,7 @@ ASYNC_NIF_DECL(
unsigned char *bin = enif_make_new_binary(env, item_value.size, &value);
memcpy(bin, item_value.data, item_value.size);
ASYNC_NIF_REPLY(enif_make_tuple2(env, ATOM_OK, value));
cursor->reset(cursor);
__release_cursor(args->conn_handle, worker_id, args->uri, cursor);
},
{ // post
@ -909,7 +972,7 @@ ASYNC_NIF_DECL(
}
WT_CURSOR *cursor = NULL;
rc = __cursor_for(args->conn_handle, worker_id, args->uri, &cursor);
rc = __retain_cursor(args->conn_handle, worker_id, args->uri, &cursor);
if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc));
return;
@ -925,7 +988,7 @@ ASYNC_NIF_DECL(
cursor->set_value(cursor, &item_value);
rc = cursor->insert(cursor);
ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc));
cursor->reset(cursor);
__release_cursor(args->conn_handle, worker_id, args->uri, cursor);
},
{ // post
@ -969,6 +1032,7 @@ ASYNC_NIF_DECL(
/* We create a separate session here to ensure that operations are thread safe. */
WT_CONNECTION *conn = args->conn_handle->conn;
WT_SESSION *session = NULL;
//fprintf(stderr, "cursor open: %s\n", (char *)args->conn_handle->session_config); fflush(stderr);
int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session);
if (rc != 0) {
ASYNC_NIF_REPLY(__strerror_term(env, rc));
@ -1541,10 +1605,10 @@ ASYNC_NIF_DECL(
static void
__resource_conn_dtor(ErlNifEnv *env, void *obj)
{
int i;
WterlConnHandle *conn_handle = (WterlConnHandle *)obj;
/* Free up the shared sessions and cursors. */
enif_mutex_lock(conn_handle->context_mutex);
int i;
for (i = 0; i < conn_handle->num_contexts; i++) {
WterlCtx *ctx = &conn_handle->contexts[i];
WT_CURSOR *cursor;
@ -1621,6 +1685,9 @@ static ErlNifFunc nif_funcs[] =
{"put_nif", 5, wterl_put},
{"rename_nif", 5, wterl_rename},
{"salvage_nif", 4, wterl_salvage},
// TODO: {"txn_begin", 3, wterl_txn_begin},
// TODO: {"txn_commit", 3, wterl_txn_commit},
// TODO: {"txn_abort", 3, wterl_txn_abort},
{"truncate_nif", 6, wterl_truncate},
{"upgrade_nif", 4, wterl_upgrade},
{"verify_nif", 4, wterl_verify},

View file

@ -362,9 +362,10 @@ establish_connection(Config) ->
{lsm_bloom_oldest, true} ,
{lsm_bloom_bit_count, 128},
{lsm_bloom_hash_count, 64},
{lsm_bloom_config, [{leaf_page_max, "8MB"}]}],
{lsm_bloom_config, [{leaf_page_max, "8MB"}]}
],
case wterl_conn:open(DataRoot, SessionOpts, ConnectionOpts) of
case wterl_conn:open(DataRoot, ConnectionOpts, SessionOpts) of
{ok, Connection} ->
{ok, Connection};
{error, Reason2} ->

View file

@ -20,7 +20,9 @@
%%
%% -------------------------------------------------------------------
-module(wterl).
-export([connection_open/2,
connection_open/3,
connection_close/1,
cursor_close/1,
cursor_insert/3,
@ -88,11 +90,9 @@
nif_stub_error(Line) ->
erlang:nif_error({nif_not_loaded,module,?MODULE,line,Line}).
-define(EMPTY_CONFIG, <<"\0">>).
-spec init() -> ok | {error, any()}.
init() ->
erlang:load_nif(filename:join(priv_dir(), atom_to_list(?MODULE)),
erlang:load_nif(filename:join([priv_dir(), atom_to_list(?MODULE)]),
[{wterl, "163a5073cb85db2a270ebe904e788bd8d478ea1c"},
{wiredtiger, "e9a607b1b78ffa528631519b5cb6ac944468991e"}]).
@ -110,7 +110,7 @@ connection_open(HomeDir, ConnectionConfig, SessionConfig) ->
nomatch -> false
end
end, PrivFiles),
SoPaths = lists:map(fun(Elem) -> filename:join(PrivDir, Elem) end, SoFiles),
SoPaths = lists:map(fun(Elem) -> filename:join([PrivDir, Elem]) end, SoFiles),
conn_open(HomeDir, [{extensions, SoPaths}] ++ ConnectionConfig, SessionConfig).
-spec conn_open(string(), config_list(), config_list()) -> {ok, connection()} | {error, term()}.
@ -389,50 +389,14 @@ priv_dir() ->
{error, bad_name} ->
EbinDir = filename:dirname(code:which(?MODULE)),
AppPath = filename:dirname(EbinDir),
filename:join(AppPath, "priv");
filename:join([AppPath, "priv"]);
Path ->
Path
end.
%%
%% Configuration type information.
%% Configuration information.
%%
config_types() ->
[{block_compressor, {string, quoted}},
{cache_size, string},
{checkpoint, config},
{create, bool},
{direct_io, list},
{drop, list},
{error_prefix, string},
{eviction_target, integer},
{eviction_trigger, integer},
{extensions, {list, quoted}},
{force, bool},
{hazard_max, integer},
{home_environment, bool},
{home_environment_priv, bool},
{internal_page_max, string},
{isolation, string},
{key_type, string},
{leaf_page_max, string},
{logging, bool},
{lsm_bloom_bit_count, integer},
{lsm_bloom_config, config},
{lsm_bloom_hash_count, integer},
{lsm_bloom_newest, bool},
{lsm_bloom_oldest, bool},
{lsm_chunk_size, string},
{lsm_merge_threads, integer},
{multiprocess, bool},
{name, string},
{session_max, integer},
{statistics_log, config},
{sync, bool},
{target, list},
{transactional, bool},
{verbose, list},
{wait, integer}].
config_value(Key, Config, Default) ->
{Key, app_helper:get_prop_or_env(Key, Config, wterl, Default)}.
@ -478,7 +442,44 @@ config_to_bin(Opts) ->
config_to_bin([], Acc) ->
iolist_to_binary(Acc);
config_to_bin([{Key, Value} | Rest], Acc) ->
case lists:keysearch(Key, 1, config_types()) of
ConfigTypes =
[{block_compressor, {string, quoted}},
{cache_size, string},
{checkpoint, config},
{create, bool},
{direct_io, list},
{drop, list},
{error_prefix, string},
{eviction_target, integer},
{eviction_trigger, integer},
{extensions, {list, quoted}},
{force, bool},
{hazard_max, integer},
{home_environment, bool},
{home_environment_priv, bool},
{internal_page_max, string},
{isolation, string},
{key_type, string},
{leaf_page_max, string},
{logging, bool},
{lsm_bloom_bit_count, integer},
{lsm_bloom_config, config},
{lsm_bloom_hash_count, integer},
{lsm_bloom_newest, bool},
{lsm_bloom_oldest, bool},
{lsm_chunk_size, string},
{lsm_merge_threads, integer},
{multiprocess, bool},
{name, string},
{session_max, integer},
{statistics_log, config},
{sync, bool},
{target, list},
{transactional, bool},
{verbose, list},
{wait, integer}],
case lists:keysearch(Key, 1, ConfigTypes) of
{value, {Key, Type}} ->
Acc2 = case config_encode(Type, Value) of
invalid ->
@ -501,15 +502,35 @@ config_to_bin([{Key, Value} | Rest], Acc) ->
%% ===================================================================
-ifdef(TEST).
-include_lib("kernel/include/file.hrl").
remove_dir_tree(Dir) ->
remove_all_files(".", [Dir]).
remove_all_files(Dir, Files) ->
lists:foreach(fun(File) ->
FilePath = filename:join([Dir, File]),
{ok, FileInfo} = file:read_file_info(FilePath),
case FileInfo#file_info.type of
directory ->
{ok, DirFiles} = file:list_dir(FilePath),
remove_all_files(FilePath, DirFiles),
file:del_dir(FilePath);
_ ->
file:delete(FilePath)
end
end, Files).
-define(TEST_DATA_DIR, "test/wterl.basic").
open_test_conn(DataDir) ->
open_test_conn(DataDir, [{create,true},{cache_size,"100MB"}]).
open_test_conn(DataDir, OpenConfig) ->
{ok, CWD} = file:get_cwd(),
?assertMatch(true, lists:suffix("wterl/.eunit", CWD)),
?cmd("rm -rf "++DataDir),
?assertMatch(ok, filelib:ensure_dir(filename:join(DataDir, "x"))),
OpenConfig = [{create,true},{cache_size,"100MB"}],
{ok, ConnRef} = connection_open(DataDir, OpenConfig),
?cmd("rm -rf " ++ filename:join([CWD, DataDir])),
?assertMatch(ok, filelib:ensure_dir(filename:join([DataDir, "x"]))),
{ok, ConnRef} = connection_open(filename:join([CWD, DataDir]), OpenConfig),
ConnRef.
open_test_table(ConnRef) ->
@ -563,7 +584,6 @@ insert_delete_test() ->
?assertMatch({ok, <<"apple">>}, get(ConnRef, "table:test", <<"a">>)),
?assertMatch(ok, delete(ConnRef, "table:test", <<"a">>)),
?assertMatch(not_found, get(ConnRef, "table:test", <<"a">>)),
?assertMatch(ok, drop(ConnRef, "table:test")),
ok = connection_close(ConnRef).
init_test_table() ->
@ -587,55 +607,48 @@ various_test_() ->
fun stop_test_table/1,
fun(ConnRef) ->
{inorder,
[{"verify",
fun() ->
?assertMatch(ok, verify(ConnRef, "table:test")),
?assertMatch({ok, <<"apple">>}, get(ConnRef, "table:test", <<"a">>)),
?assertMatch(ok, drop(ConnRef, "table:test"))
end},
[
{"checkpoint",
fun() ->
?assertMatch(ok, checkpoint(ConnRef, [{target, ["\"table:test\""]}])),
?assertMatch({ok, <<"apple">>}, get(ConnRef, "table:test", <<"a">>)),
?assertMatch(ok, drop(ConnRef, "table:test"))
?assertMatch({ok, <<"apple">>}, get(ConnRef, "table:test", <<"a">>))
end},
{"verify",
fun() ->
?assertMatch(ok, verify(ConnRef, "table:test")),
?assertMatch({ok, <<"apple">>}, get(ConnRef, "table:test", <<"a">>))
end},
{"salvage",
fun() ->
ok = salvage(ConnRef, "table:test"),
{ok, <<"apple">>} = get(ConnRef, "table:test", <<"a">>),
?assertMatch(ok, drop(ConnRef, "table:test"))
{ok, <<"apple">>} = get(ConnRef, "table:test", <<"a">>)
end},
{"upgrade",
fun() ->
?assertMatch(ok, upgrade(ConnRef, "table:test")),
?assertMatch({ok, <<"apple">>}, get(ConnRef, "table:test", <<"a">>)),
?assertMatch(ok, drop(ConnRef, "table:test"))
?assertMatch({ok, <<"apple">>}, get(ConnRef, "table:test", <<"a">>))
end},
{"rename",
fun() ->
?assertMatch(ok, rename(ConnRef, "table:test", "table:new")),
?assertMatch({ok, <<"apple">>}, get(ConnRef, "table:new", <<"a">>)),
?assertMatch(ok, rename(ConnRef, "table:new", "table:test")),
?assertMatch({ok, <<"apple">>}, get(ConnRef, "table:test", <<"a">>)),
?assertMatch(ok, drop(ConnRef, "table:test"))
?assertMatch({ok, <<"apple">>}, get(ConnRef, "table:test", <<"a">>))
end},
{"truncate",
fun() ->
?assertMatch(ok, truncate(ConnRef, "table:test")),
?assertMatch(not_found, get(ConnRef, "table:test", <<"a">>)),
?assertMatch(ok, drop(ConnRef, "table:test"))
?assertMatch(not_found, get(ConnRef, "table:test", <<"a">>))
end},
{"truncate range, found",
fun() ->
?assertMatch(ok, truncate(ConnRef, "table:test", <<"b">>, last)),
?assertMatch({ok, <<"apple">>}, get(ConnRef, "table:test", <<"a">>)),
?assertMatch(ok, drop(ConnRef, "table:test"))
?assertMatch({ok, <<"apple">>}, get(ConnRef, "table:test", <<"a">>))
end},
{"truncate range, not_found",
fun() ->
?assertMatch(ok, truncate(ConnRef, "table:test", first, <<"b">>)),
?assertMatch(not_found, get(ConnRef, "table:test", <<"a">>)),
?assertMatch(ok, drop(ConnRef, "table:test"))
?assertMatch(not_found, get(ConnRef, "table:test", <<"a">>))
end},
{"truncate range, middle",
fun() ->
@ -646,9 +659,13 @@ various_test_() ->
?assertMatch(not_found, get(ConnRef, "table:test", <<"d">>)),
?assertMatch(not_found, get(ConnRef, "table:test", <<"e">>)),
?assertMatch(not_found, get(ConnRef, "table:test", <<"f">>)),
?assertMatch({ok, <<"gooseberry">>}, get(ConnRef, "table:test", <<"g">>)),
?assertMatch({ok, <<"gooseberry">>}, get(ConnRef, "table:test", <<"g">>))
end},
{"drop table",
fun() ->
?assertMatch(ok, drop(ConnRef, "table:test"))
end}]}
end}
]}
end}.
cursor_open_close_test() ->
@ -676,6 +693,8 @@ various_cursor_test_() ->
?assertMatch({ok, <<"d">>}, cursor_next_key(Cursor)),
?assertMatch({ok, <<"c">>}, cursor_prev_key(Cursor)),
?assertMatch({ok, <<"d">>}, cursor_next_key(Cursor)),
?assertMatch({ok, <<"e">>}, cursor_next_key(Cursor)),
?assertMatch({ok, <<"f">>}, cursor_next_key(Cursor)),
?assertMatch({ok, <<"g">>}, cursor_next_key(Cursor)),
?assertMatch(not_found, cursor_next_key(Cursor)),
?assertMatch(ok, cursor_close(Cursor))
@ -689,6 +708,8 @@ various_cursor_test_() ->
?assertMatch({ok, <<"date">>}, cursor_next_value(Cursor)),
?assertMatch({ok, <<"cherry">>}, cursor_prev_value(Cursor)),
?assertMatch({ok, <<"date">>}, cursor_next_value(Cursor)),
?assertMatch({ok, <<"elephant">>}, cursor_next_value(Cursor)),
?assertMatch({ok, <<"forest">>}, cursor_next_value(Cursor)),
?assertMatch({ok, <<"gooseberry">>}, cursor_next_value(Cursor)),
?assertMatch(not_found, cursor_next_value(Cursor)),
?assertMatch(ok, cursor_close(Cursor))
@ -702,6 +723,8 @@ various_cursor_test_() ->
?assertMatch({ok, <<"d">>, <<"date">>}, cursor_next(Cursor)),
?assertMatch({ok, <<"c">>, <<"cherry">>}, cursor_prev(Cursor)),
?assertMatch({ok, <<"d">>, <<"date">>}, cursor_next(Cursor)),
?assertMatch({ok, <<"e">>, <<"elephant">>}, cursor_next(Cursor)),
?assertMatch({ok, <<"f">>, <<"forest">>}, cursor_next(Cursor)),
?assertMatch({ok, <<"g">>, <<"gooseberry">>}, cursor_next(Cursor)),
?assertMatch(not_found, cursor_next(Cursor)),
?assertMatch(ok, cursor_close(Cursor))
@ -709,7 +732,7 @@ various_cursor_test_() ->
{"fold keys",
fun() ->
{ok, Cursor} = cursor_open(ConnRef, "table:test"),
?assertMatch([<<"g">>, <<"d">>, <<"c">>, <<"b">>, <<"a">>],
?assertMatch([<<"g">>, <<"f">>, <<"e">>, <<"d">>, <<"c">>, <<"b">>, <<"a">>],
fold_keys(Cursor, fun(Key, Acc) -> [Key | Acc] end, [])),
?assertMatch(ok, cursor_close(Cursor))
end},
@ -722,8 +745,7 @@ various_cursor_test_() ->
{"range search for an item",
fun() ->
{ok, Cursor} = cursor_open(ConnRef, "table:test"),
?assertMatch({ok, <<"gooseberry">>},
cursor_search_near(Cursor, <<"z">>)),
?assertMatch({ok, <<"gooseberry">>}, cursor_search_near(Cursor, <<"z">>)),
?assertMatch(ok, cursor_close(Cursor))
end},
{"check cursor reset",
@ -737,30 +759,21 @@ various_cursor_test_() ->
{"insert/overwrite an item using a cursor",
fun() ->
{ok, Cursor} = cursor_open(ConnRef, "table:test"),
?assertMatch(ok,
cursor_insert(Cursor, <<"h">>, <<"huckleberry">>)),
?assertMatch({ok, <<"huckleberry">>},
cursor_search(Cursor, <<"h">>)),
?assertMatch(ok,
cursor_insert(Cursor, <<"g">>, <<"grapefruit">>)),
?assertMatch({ok, <<"grapefruit">>},
cursor_search(Cursor, <<"g">>)),
?assertMatch(ok, cursor_insert(Cursor, <<"h">>, <<"huckleberry">>)),
?assertMatch({ok, <<"huckleberry">>}, cursor_search(Cursor, <<"h">>)),
?assertMatch(ok, cursor_insert(Cursor, <<"g">>, <<"grapefruit">>)),
?assertMatch({ok, <<"grapefruit">>}, cursor_search(Cursor, <<"g">>)),
?assertMatch(ok, cursor_close(Cursor)),
?assertMatch({ok, <<"grapefruit">>},
get(ConnRef, "table:test", <<"g">>)),
?assertMatch({ok, <<"huckleberry">>},
get(ConnRef, "table:test", <<"h">>))
?assertMatch({ok, <<"grapefruit">>}, get(ConnRef, "table:test", <<"g">>)),
?assertMatch({ok, <<"huckleberry">>}, get(ConnRef, "table:test", <<"h">>))
end},
{"update an item using a cursor",
fun() ->
{ok, Cursor} = cursor_open(ConnRef, "table:test"),
?assertMatch(ok,
cursor_update(Cursor, <<"g">>, <<"goji berries">>)),
?assertMatch(not_found,
cursor_update(Cursor, <<"k">>, <<"kumquat">>)),
?assertMatch(ok, cursor_update(Cursor, <<"g">>, <<"goji berries">>)),
?assertMatch(not_found, cursor_update(Cursor, <<"k">>, <<"kumquat">>)),
?assertMatch(ok, cursor_close(Cursor)),
?assertMatch({ok, <<"goji berries">>},
get(ConnRef, "table:test", <<"g">>))
?assertMatch({ok, <<"goji berries">>}, get(ConnRef, "table:test", <<"g">>))
end},
{"remove an item using a cursor",
fun() ->
@ -811,7 +824,7 @@ prop_put_delete() ->
{ok, CWD} = file:get_cwd(),
?assertMatch(true, lists:suffix("wterl/.eunit", CWD)),
?cmd("rm -rf "++DataDir),
ok = filelib:ensure_dir(filename:join(DataDir, "x")),
ok = filelib:ensure_dir(filename:join([DataDir, "x"])),
{ok, Conn} = wterl:connection_open(DataDir, [{create,true}]),
try
wterl:create(ConnRef, Table),

View file

@ -93,7 +93,7 @@ handle_call({open, Dir, ConnectionConfig, SessionConfig, Caller}, _From, #state{
{Error, State}
end,
{reply, Reply, NState};
handle_call({open, _Dir, _Config, Caller}, _From,#state{conn=ConnRef}=State) ->
handle_call({open, _Dir, _ConnectionConfig, _SessionConfig, Caller}, _From, #state{conn=ConnRef}=State) ->
Monitor = erlang:monitor(process, Caller),
true = ets:insert(wterl_ets, {Monitor, Caller}),
{reply, {ok, ConnRef}, State};