diff --git a/c_src/wterl.c b/c_src/wterl.c index 760d0e2..22ca910 100644 --- a/c_src/wterl.c +++ b/c_src/wterl.c @@ -81,23 +81,48 @@ __session_for(WterlConnHandle *conn_handle, unsigned int worker_id, WT_SESSION * *session = ctx->session; if (*session == NULL) { /* Create a context for this worker thread to reuse. */ + enif_mutex_lock(conn_handle->context_mutex); WT_CONNECTION *conn = conn_handle->conn; int rc = conn->open_session(conn, NULL, conn_handle->session_config, session); if (rc != 0) return rc; ctx->session = *session; ctx->cursors = kh_init(cursors); + enif_mutex_unlock(conn_handle->context_mutex); } return 0; } +/** + * Close cursors open on 'uri' object. + * + * Note: always call within enif_mutex_lock/unlock(conn_handle->context_mutex) + */ +void +__close_cursors_on(WterlConnHandle *conn_handle, const char *uri) // TODO: race? +{ + int i; + for (i = 0; i < conn_handle->num_contexts; i++) { + WterlCtx *ctx = &conn_handle->contexts[i]; + khash_t(cursors) *h = ctx->cursors; + khiter_t itr = kh_get(cursors, h, uri); + if (itr != kh_end(h)) { + WT_CURSOR *cursor = (WT_CURSOR*)kh_value(h, itr); + kh_del(cursors, h, itr); + cursor->close(cursor); + //fprintf(stderr, "closing worker_id: %d 0x%p %s\n", i, cursor, uri); fflush(stderr); + } + } +} + /** * Get a reusable cursor that was opened for a particular worker within its * session. */ static int -__cursor_for(WterlConnHandle *conn_handle, unsigned int worker_id, const char *uri, WT_CURSOR **cursor) +__retain_cursor(WterlConnHandle *conn_handle, unsigned int worker_id, const char *uri, WT_CURSOR **cursor) { + /* Check to see if we have a cursor open for this uri and if so reuse it. */ WterlCtx *ctx = &conn_handle->contexts[worker_id]; khash_t(cursors) *h = ctx->cursors; khiter_t itr = kh_get(cursors, h, uri); @@ -106,6 +131,7 @@ __cursor_for(WterlConnHandle *conn_handle, unsigned int worker_id, const char *u *cursor = (WT_CURSOR*)kh_value(h, itr); } else { // key does not exist in hash table, create and insert one + enif_mutex_lock(conn_handle->context_mutex); WT_SESSION *session = conn_handle->contexts[worker_id].session; int rc = session->open_cursor(session, uri, NULL, "overwrite,raw", cursor); if (rc != 0) @@ -113,10 +139,17 @@ __cursor_for(WterlConnHandle *conn_handle, unsigned int worker_id, const char *u int itr_status; itr = kh_put(cursors, h, uri, &itr_status); kh_value(h, itr) = *cursor; + enif_mutex_unlock(conn_handle->context_mutex); } return 0; } +static void +__release_cursor(WterlConnHandle *conn_handle, unsigned int worker_id, const char *uri, WT_CURSOR *cursor) +{ + cursor->reset(cursor); +} + /** * Convenience function to generate {error, Reason} or 'not_found' * Erlang terms to return to callers. @@ -315,6 +348,10 @@ ASYNC_NIF_DECL( }, { // work + /* This call requires that there be no open cursors referencing the object. */ + enif_mutex_lock(args->conn_handle->context_mutex); + __close_cursors_on(args->conn_handle, args->uri); + ErlNifBinary config; if (!enif_inspect_binary(env, args->config, &config)) { ASYNC_NIF_REPLY(enif_make_badarg(env)); @@ -342,6 +379,7 @@ ASYNC_NIF_DECL( rc = session->drop(session, args->uri, (const char*)config.data); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); (void)session->close(session, NULL); + enif_mutex_unlock(args->conn_handle->context_mutex); }, { // post @@ -379,6 +417,10 @@ ASYNC_NIF_DECL( }, { // work + /* This call requires that there be no open cursors referencing the object. */ + enif_mutex_lock(args->conn_handle->context_mutex); + __close_cursors_on(args->conn_handle, args->oldname); + ErlNifBinary config; if (!enif_inspect_binary(env, args->config, &config)) { ASYNC_NIF_REPLY(enif_make_badarg(env)); @@ -402,6 +444,7 @@ ASYNC_NIF_DECL( rc = session->rename(session, args->oldname, args->newname, (const char*)config.data); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); (void)session->close(session, NULL); + enif_mutex_unlock(args->conn_handle->context_mutex); }, { // post @@ -439,6 +482,10 @@ ASYNC_NIF_DECL( }, { // work + /* This call requires that there be no open cursors referencing the object. */ + enif_mutex_lock(args->conn_handle->context_mutex); + __close_cursors_on(args->conn_handle, args->uri); + ErlNifBinary config; if (!enif_inspect_binary(env, args->config, &config)) { ASYNC_NIF_REPLY(enif_make_badarg(env)); @@ -459,6 +506,7 @@ ASYNC_NIF_DECL( rc = session->salvage(session, args->uri, (const char*)config.data); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); (void)session->close(session, NULL); + enif_mutex_unlock(args->conn_handle->context_mutex); }, { // post @@ -549,6 +597,10 @@ ASYNC_NIF_DECL( }, { // work + /* This call requires that there be no open cursors referencing the object. */ + enif_mutex_lock(args->conn_handle->context_mutex); + __close_cursors_on(args->conn_handle, args->uri); + ErlNifBinary config; if (!enif_inspect_binary(env, args->config, &config)) { ASYNC_NIF_REPLY(enif_make_badarg(env)); @@ -607,6 +659,7 @@ ASYNC_NIF_DECL( rc = session->truncate(session, args->uri, start, stop, (const char*)config.data); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); + enif_mutex_unlock(args->conn_handle->context_mutex); }, { // post @@ -641,6 +694,10 @@ ASYNC_NIF_DECL( }, { // work + /* This call requires that there be no open cursors referencing the object. */ + enif_mutex_lock(args->conn_handle->context_mutex); + __close_cursors_on(args->conn_handle, args->uri); + ErlNifBinary config; if (!enif_inspect_binary(env, args->config, &config)) { ASYNC_NIF_REPLY(enif_make_badarg(env)); @@ -661,6 +718,7 @@ ASYNC_NIF_DECL( rc = session->upgrade(session, args->uri, (const char*)config.data); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); (void)session->close(session, NULL); + enif_mutex_unlock(args->conn_handle->context_mutex); }, { // post @@ -696,6 +754,10 @@ ASYNC_NIF_DECL( }, { // work + /* This call requires that there be no open cursors referencing the object. */ + enif_mutex_lock(args->conn_handle->context_mutex); + __close_cursors_on(args->conn_handle, args->uri); + ErlNifBinary config; if (!enif_inspect_binary(env, args->config, &config)) { ASYNC_NIF_REPLY(enif_make_badarg(env)); @@ -716,6 +778,7 @@ ASYNC_NIF_DECL( rc = session->verify(session, args->uri, (const char*)config.data); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); (void)session->close(session, NULL); + enif_mutex_unlock(args->conn_handle->context_mutex); }, { // post @@ -764,7 +827,7 @@ ASYNC_NIF_DECL( } WT_CURSOR *cursor = NULL; - rc = __cursor_for(args->conn_handle, worker_id, args->uri, &cursor); + rc = __retain_cursor(args->conn_handle, worker_id, args->uri, &cursor); if (rc != 0) { ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; @@ -776,7 +839,7 @@ ASYNC_NIF_DECL( cursor->set_key(cursor, &item_key); rc = cursor->remove(cursor); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); - cursor->reset(cursor); + __release_cursor(args->conn_handle, worker_id, args->uri, cursor); }, { // post @@ -825,7 +888,7 @@ ASYNC_NIF_DECL( } WT_CURSOR *cursor = NULL; - rc = __cursor_for(args->conn_handle, worker_id, args->uri, &cursor); + rc = __retain_cursor(args->conn_handle, worker_id, args->uri, &cursor); if (rc != 0) { ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; @@ -851,7 +914,7 @@ ASYNC_NIF_DECL( unsigned char *bin = enif_make_new_binary(env, item_value.size, &value); memcpy(bin, item_value.data, item_value.size); ASYNC_NIF_REPLY(enif_make_tuple2(env, ATOM_OK, value)); - cursor->reset(cursor); + __release_cursor(args->conn_handle, worker_id, args->uri, cursor); }, { // post @@ -909,7 +972,7 @@ ASYNC_NIF_DECL( } WT_CURSOR *cursor = NULL; - rc = __cursor_for(args->conn_handle, worker_id, args->uri, &cursor); + rc = __retain_cursor(args->conn_handle, worker_id, args->uri, &cursor); if (rc != 0) { ASYNC_NIF_REPLY(__strerror_term(env, rc)); return; @@ -925,7 +988,7 @@ ASYNC_NIF_DECL( cursor->set_value(cursor, &item_value); rc = cursor->insert(cursor); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); - cursor->reset(cursor); + __release_cursor(args->conn_handle, worker_id, args->uri, cursor); }, { // post @@ -969,6 +1032,7 @@ ASYNC_NIF_DECL( /* We create a separate session here to ensure that operations are thread safe. */ WT_CONNECTION *conn = args->conn_handle->conn; WT_SESSION *session = NULL; + //fprintf(stderr, "cursor open: %s\n", (char *)args->conn_handle->session_config); fflush(stderr); int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); if (rc != 0) { ASYNC_NIF_REPLY(__strerror_term(env, rc)); @@ -1541,10 +1605,10 @@ ASYNC_NIF_DECL( static void __resource_conn_dtor(ErlNifEnv *env, void *obj) { + int i; WterlConnHandle *conn_handle = (WterlConnHandle *)obj; /* Free up the shared sessions and cursors. */ enif_mutex_lock(conn_handle->context_mutex); - int i; for (i = 0; i < conn_handle->num_contexts; i++) { WterlCtx *ctx = &conn_handle->contexts[i]; WT_CURSOR *cursor; @@ -1621,6 +1685,9 @@ static ErlNifFunc nif_funcs[] = {"put_nif", 5, wterl_put}, {"rename_nif", 5, wterl_rename}, {"salvage_nif", 4, wterl_salvage}, + // TODO: {"txn_begin", 3, wterl_txn_begin}, + // TODO: {"txn_commit", 3, wterl_txn_commit}, + // TODO: {"txn_abort", 3, wterl_txn_abort}, {"truncate_nif", 6, wterl_truncate}, {"upgrade_nif", 4, wterl_upgrade}, {"verify_nif", 4, wterl_verify}, diff --git a/src/riak_kv_wterl_backend.erl b/src/riak_kv_wterl_backend.erl index 0a8d3ef..29ce3c6 100644 --- a/src/riak_kv_wterl_backend.erl +++ b/src/riak_kv_wterl_backend.erl @@ -362,9 +362,10 @@ establish_connection(Config) -> {lsm_bloom_oldest, true} , {lsm_bloom_bit_count, 128}, {lsm_bloom_hash_count, 64}, - {lsm_bloom_config, [{leaf_page_max, "8MB"}]}], + {lsm_bloom_config, [{leaf_page_max, "8MB"}]} + ], - case wterl_conn:open(DataRoot, SessionOpts, ConnectionOpts) of + case wterl_conn:open(DataRoot, ConnectionOpts, SessionOpts) of {ok, Connection} -> {ok, Connection}; {error, Reason2} -> diff --git a/src/wterl.erl b/src/wterl.erl index affbcd5..73be7ec 100644 --- a/src/wterl.erl +++ b/src/wterl.erl @@ -20,7 +20,9 @@ %% %% ------------------------------------------------------------------- -module(wterl). + -export([connection_open/2, + connection_open/3, connection_close/1, cursor_close/1, cursor_insert/3, @@ -88,11 +90,9 @@ nif_stub_error(Line) -> erlang:nif_error({nif_not_loaded,module,?MODULE,line,Line}). --define(EMPTY_CONFIG, <<"\0">>). - -spec init() -> ok | {error, any()}. init() -> - erlang:load_nif(filename:join(priv_dir(), atom_to_list(?MODULE)), + erlang:load_nif(filename:join([priv_dir(), atom_to_list(?MODULE)]), [{wterl, "163a5073cb85db2a270ebe904e788bd8d478ea1c"}, {wiredtiger, "e9a607b1b78ffa528631519b5cb6ac944468991e"}]). @@ -110,7 +110,7 @@ connection_open(HomeDir, ConnectionConfig, SessionConfig) -> nomatch -> false end end, PrivFiles), - SoPaths = lists:map(fun(Elem) -> filename:join(PrivDir, Elem) end, SoFiles), + SoPaths = lists:map(fun(Elem) -> filename:join([PrivDir, Elem]) end, SoFiles), conn_open(HomeDir, [{extensions, SoPaths}] ++ ConnectionConfig, SessionConfig). -spec conn_open(string(), config_list(), config_list()) -> {ok, connection()} | {error, term()}. @@ -389,50 +389,14 @@ priv_dir() -> {error, bad_name} -> EbinDir = filename:dirname(code:which(?MODULE)), AppPath = filename:dirname(EbinDir), - filename:join(AppPath, "priv"); + filename:join([AppPath, "priv"]); Path -> Path end. %% -%% Configuration type information. +%% Configuration information. %% -config_types() -> - [{block_compressor, {string, quoted}}, - {cache_size, string}, - {checkpoint, config}, - {create, bool}, - {direct_io, list}, - {drop, list}, - {error_prefix, string}, - {eviction_target, integer}, - {eviction_trigger, integer}, - {extensions, {list, quoted}}, - {force, bool}, - {hazard_max, integer}, - {home_environment, bool}, - {home_environment_priv, bool}, - {internal_page_max, string}, - {isolation, string}, - {key_type, string}, - {leaf_page_max, string}, - {logging, bool}, - {lsm_bloom_bit_count, integer}, - {lsm_bloom_config, config}, - {lsm_bloom_hash_count, integer}, - {lsm_bloom_newest, bool}, - {lsm_bloom_oldest, bool}, - {lsm_chunk_size, string}, - {lsm_merge_threads, integer}, - {multiprocess, bool}, - {name, string}, - {session_max, integer}, - {statistics_log, config}, - {sync, bool}, - {target, list}, - {transactional, bool}, - {verbose, list}, - {wait, integer}]. config_value(Key, Config, Default) -> {Key, app_helper:get_prop_or_env(Key, Config, wterl, Default)}. @@ -478,7 +442,44 @@ config_to_bin(Opts) -> config_to_bin([], Acc) -> iolist_to_binary(Acc); config_to_bin([{Key, Value} | Rest], Acc) -> - case lists:keysearch(Key, 1, config_types()) of + ConfigTypes = + [{block_compressor, {string, quoted}}, + {cache_size, string}, + {checkpoint, config}, + {create, bool}, + {direct_io, list}, + {drop, list}, + {error_prefix, string}, + {eviction_target, integer}, + {eviction_trigger, integer}, + {extensions, {list, quoted}}, + {force, bool}, + {hazard_max, integer}, + {home_environment, bool}, + {home_environment_priv, bool}, + {internal_page_max, string}, + {isolation, string}, + {key_type, string}, + {leaf_page_max, string}, + {logging, bool}, + {lsm_bloom_bit_count, integer}, + {lsm_bloom_config, config}, + {lsm_bloom_hash_count, integer}, + {lsm_bloom_newest, bool}, + {lsm_bloom_oldest, bool}, + {lsm_chunk_size, string}, + {lsm_merge_threads, integer}, + {multiprocess, bool}, + {name, string}, + {session_max, integer}, + {statistics_log, config}, + {sync, bool}, + {target, list}, + {transactional, bool}, + {verbose, list}, + {wait, integer}], + + case lists:keysearch(Key, 1, ConfigTypes) of {value, {Key, Type}} -> Acc2 = case config_encode(Type, Value) of invalid -> @@ -501,15 +502,35 @@ config_to_bin([{Key, Value} | Rest], Acc) -> %% =================================================================== -ifdef(TEST). +-include_lib("kernel/include/file.hrl"). + +remove_dir_tree(Dir) -> + remove_all_files(".", [Dir]). + +remove_all_files(Dir, Files) -> + lists:foreach(fun(File) -> + FilePath = filename:join([Dir, File]), + {ok, FileInfo} = file:read_file_info(FilePath), + case FileInfo#file_info.type of + directory -> + {ok, DirFiles} = file:list_dir(FilePath), + remove_all_files(FilePath, DirFiles), + file:del_dir(FilePath); + _ -> + file:delete(FilePath) + end + end, Files). + -define(TEST_DATA_DIR, "test/wterl.basic"). open_test_conn(DataDir) -> + open_test_conn(DataDir, [{create,true},{cache_size,"100MB"}]). +open_test_conn(DataDir, OpenConfig) -> {ok, CWD} = file:get_cwd(), ?assertMatch(true, lists:suffix("wterl/.eunit", CWD)), - ?cmd("rm -rf "++DataDir), - ?assertMatch(ok, filelib:ensure_dir(filename:join(DataDir, "x"))), - OpenConfig = [{create,true},{cache_size,"100MB"}], - {ok, ConnRef} = connection_open(DataDir, OpenConfig), + ?cmd("rm -rf " ++ filename:join([CWD, DataDir])), + ?assertMatch(ok, filelib:ensure_dir(filename:join([DataDir, "x"]))), + {ok, ConnRef} = connection_open(filename:join([CWD, DataDir]), OpenConfig), ConnRef. open_test_table(ConnRef) -> @@ -563,7 +584,6 @@ insert_delete_test() -> ?assertMatch({ok, <<"apple">>}, get(ConnRef, "table:test", <<"a">>)), ?assertMatch(ok, delete(ConnRef, "table:test", <<"a">>)), ?assertMatch(not_found, get(ConnRef, "table:test", <<"a">>)), - ?assertMatch(ok, drop(ConnRef, "table:test")), ok = connection_close(ConnRef). init_test_table() -> @@ -587,55 +607,48 @@ various_test_() -> fun stop_test_table/1, fun(ConnRef) -> {inorder, - [{"verify", - fun() -> - ?assertMatch(ok, verify(ConnRef, "table:test")), - ?assertMatch({ok, <<"apple">>}, get(ConnRef, "table:test", <<"a">>)), - ?assertMatch(ok, drop(ConnRef, "table:test")) - end}, + [ {"checkpoint", fun() -> ?assertMatch(ok, checkpoint(ConnRef, [{target, ["\"table:test\""]}])), - ?assertMatch({ok, <<"apple">>}, get(ConnRef, "table:test", <<"a">>)), - ?assertMatch(ok, drop(ConnRef, "table:test")) + ?assertMatch({ok, <<"apple">>}, get(ConnRef, "table:test", <<"a">>)) + end}, + {"verify", + fun() -> + ?assertMatch(ok, verify(ConnRef, "table:test")), + ?assertMatch({ok, <<"apple">>}, get(ConnRef, "table:test", <<"a">>)) end}, {"salvage", fun() -> ok = salvage(ConnRef, "table:test"), - {ok, <<"apple">>} = get(ConnRef, "table:test", <<"a">>), - ?assertMatch(ok, drop(ConnRef, "table:test")) + {ok, <<"apple">>} = get(ConnRef, "table:test", <<"a">>) end}, {"upgrade", fun() -> ?assertMatch(ok, upgrade(ConnRef, "table:test")), - ?assertMatch({ok, <<"apple">>}, get(ConnRef, "table:test", <<"a">>)), - ?assertMatch(ok, drop(ConnRef, "table:test")) + ?assertMatch({ok, <<"apple">>}, get(ConnRef, "table:test", <<"a">>)) end}, {"rename", fun() -> ?assertMatch(ok, rename(ConnRef, "table:test", "table:new")), ?assertMatch({ok, <<"apple">>}, get(ConnRef, "table:new", <<"a">>)), ?assertMatch(ok, rename(ConnRef, "table:new", "table:test")), - ?assertMatch({ok, <<"apple">>}, get(ConnRef, "table:test", <<"a">>)), - ?assertMatch(ok, drop(ConnRef, "table:test")) + ?assertMatch({ok, <<"apple">>}, get(ConnRef, "table:test", <<"a">>)) end}, {"truncate", fun() -> ?assertMatch(ok, truncate(ConnRef, "table:test")), - ?assertMatch(not_found, get(ConnRef, "table:test", <<"a">>)), - ?assertMatch(ok, drop(ConnRef, "table:test")) + ?assertMatch(not_found, get(ConnRef, "table:test", <<"a">>)) end}, {"truncate range, found", fun() -> ?assertMatch(ok, truncate(ConnRef, "table:test", <<"b">>, last)), - ?assertMatch({ok, <<"apple">>}, get(ConnRef, "table:test", <<"a">>)), - ?assertMatch(ok, drop(ConnRef, "table:test")) + ?assertMatch({ok, <<"apple">>}, get(ConnRef, "table:test", <<"a">>)) end}, {"truncate range, not_found", fun() -> ?assertMatch(ok, truncate(ConnRef, "table:test", first, <<"b">>)), - ?assertMatch(not_found, get(ConnRef, "table:test", <<"a">>)), - ?assertMatch(ok, drop(ConnRef, "table:test")) + ?assertMatch(not_found, get(ConnRef, "table:test", <<"a">>)) end}, {"truncate range, middle", fun() -> @@ -646,9 +659,13 @@ various_test_() -> ?assertMatch(not_found, get(ConnRef, "table:test", <<"d">>)), ?assertMatch(not_found, get(ConnRef, "table:test", <<"e">>)), ?assertMatch(not_found, get(ConnRef, "table:test", <<"f">>)), - ?assertMatch({ok, <<"gooseberry">>}, get(ConnRef, "table:test", <<"g">>)), + ?assertMatch({ok, <<"gooseberry">>}, get(ConnRef, "table:test", <<"g">>)) + end}, + {"drop table", + fun() -> ?assertMatch(ok, drop(ConnRef, "table:test")) - end}]} + end} + ]} end}. cursor_open_close_test() -> @@ -676,6 +693,8 @@ various_cursor_test_() -> ?assertMatch({ok, <<"d">>}, cursor_next_key(Cursor)), ?assertMatch({ok, <<"c">>}, cursor_prev_key(Cursor)), ?assertMatch({ok, <<"d">>}, cursor_next_key(Cursor)), + ?assertMatch({ok, <<"e">>}, cursor_next_key(Cursor)), + ?assertMatch({ok, <<"f">>}, cursor_next_key(Cursor)), ?assertMatch({ok, <<"g">>}, cursor_next_key(Cursor)), ?assertMatch(not_found, cursor_next_key(Cursor)), ?assertMatch(ok, cursor_close(Cursor)) @@ -689,6 +708,8 @@ various_cursor_test_() -> ?assertMatch({ok, <<"date">>}, cursor_next_value(Cursor)), ?assertMatch({ok, <<"cherry">>}, cursor_prev_value(Cursor)), ?assertMatch({ok, <<"date">>}, cursor_next_value(Cursor)), + ?assertMatch({ok, <<"elephant">>}, cursor_next_value(Cursor)), + ?assertMatch({ok, <<"forest">>}, cursor_next_value(Cursor)), ?assertMatch({ok, <<"gooseberry">>}, cursor_next_value(Cursor)), ?assertMatch(not_found, cursor_next_value(Cursor)), ?assertMatch(ok, cursor_close(Cursor)) @@ -702,6 +723,8 @@ various_cursor_test_() -> ?assertMatch({ok, <<"d">>, <<"date">>}, cursor_next(Cursor)), ?assertMatch({ok, <<"c">>, <<"cherry">>}, cursor_prev(Cursor)), ?assertMatch({ok, <<"d">>, <<"date">>}, cursor_next(Cursor)), + ?assertMatch({ok, <<"e">>, <<"elephant">>}, cursor_next(Cursor)), + ?assertMatch({ok, <<"f">>, <<"forest">>}, cursor_next(Cursor)), ?assertMatch({ok, <<"g">>, <<"gooseberry">>}, cursor_next(Cursor)), ?assertMatch(not_found, cursor_next(Cursor)), ?assertMatch(ok, cursor_close(Cursor)) @@ -709,7 +732,7 @@ various_cursor_test_() -> {"fold keys", fun() -> {ok, Cursor} = cursor_open(ConnRef, "table:test"), - ?assertMatch([<<"g">>, <<"d">>, <<"c">>, <<"b">>, <<"a">>], + ?assertMatch([<<"g">>, <<"f">>, <<"e">>, <<"d">>, <<"c">>, <<"b">>, <<"a">>], fold_keys(Cursor, fun(Key, Acc) -> [Key | Acc] end, [])), ?assertMatch(ok, cursor_close(Cursor)) end}, @@ -722,8 +745,7 @@ various_cursor_test_() -> {"range search for an item", fun() -> {ok, Cursor} = cursor_open(ConnRef, "table:test"), - ?assertMatch({ok, <<"gooseberry">>}, - cursor_search_near(Cursor, <<"z">>)), + ?assertMatch({ok, <<"gooseberry">>}, cursor_search_near(Cursor, <<"z">>)), ?assertMatch(ok, cursor_close(Cursor)) end}, {"check cursor reset", @@ -737,30 +759,21 @@ various_cursor_test_() -> {"insert/overwrite an item using a cursor", fun() -> {ok, Cursor} = cursor_open(ConnRef, "table:test"), - ?assertMatch(ok, - cursor_insert(Cursor, <<"h">>, <<"huckleberry">>)), - ?assertMatch({ok, <<"huckleberry">>}, - cursor_search(Cursor, <<"h">>)), - ?assertMatch(ok, - cursor_insert(Cursor, <<"g">>, <<"grapefruit">>)), - ?assertMatch({ok, <<"grapefruit">>}, - cursor_search(Cursor, <<"g">>)), + ?assertMatch(ok, cursor_insert(Cursor, <<"h">>, <<"huckleberry">>)), + ?assertMatch({ok, <<"huckleberry">>}, cursor_search(Cursor, <<"h">>)), + ?assertMatch(ok, cursor_insert(Cursor, <<"g">>, <<"grapefruit">>)), + ?assertMatch({ok, <<"grapefruit">>}, cursor_search(Cursor, <<"g">>)), ?assertMatch(ok, cursor_close(Cursor)), - ?assertMatch({ok, <<"grapefruit">>}, - get(ConnRef, "table:test", <<"g">>)), - ?assertMatch({ok, <<"huckleberry">>}, - get(ConnRef, "table:test", <<"h">>)) + ?assertMatch({ok, <<"grapefruit">>}, get(ConnRef, "table:test", <<"g">>)), + ?assertMatch({ok, <<"huckleberry">>}, get(ConnRef, "table:test", <<"h">>)) end}, {"update an item using a cursor", fun() -> {ok, Cursor} = cursor_open(ConnRef, "table:test"), - ?assertMatch(ok, - cursor_update(Cursor, <<"g">>, <<"goji berries">>)), - ?assertMatch(not_found, - cursor_update(Cursor, <<"k">>, <<"kumquat">>)), + ?assertMatch(ok, cursor_update(Cursor, <<"g">>, <<"goji berries">>)), + ?assertMatch(not_found, cursor_update(Cursor, <<"k">>, <<"kumquat">>)), ?assertMatch(ok, cursor_close(Cursor)), - ?assertMatch({ok, <<"goji berries">>}, - get(ConnRef, "table:test", <<"g">>)) + ?assertMatch({ok, <<"goji berries">>}, get(ConnRef, "table:test", <<"g">>)) end}, {"remove an item using a cursor", fun() -> @@ -811,7 +824,7 @@ prop_put_delete() -> {ok, CWD} = file:get_cwd(), ?assertMatch(true, lists:suffix("wterl/.eunit", CWD)), ?cmd("rm -rf "++DataDir), - ok = filelib:ensure_dir(filename:join(DataDir, "x")), + ok = filelib:ensure_dir(filename:join([DataDir, "x"])), {ok, Conn} = wterl:connection_open(DataDir, [{create,true}]), try wterl:create(ConnRef, Table), diff --git a/src/wterl_conn.erl b/src/wterl_conn.erl index 815efd2..b7ead8b 100644 --- a/src/wterl_conn.erl +++ b/src/wterl_conn.erl @@ -93,7 +93,7 @@ handle_call({open, Dir, ConnectionConfig, SessionConfig, Caller}, _From, #state{ {Error, State} end, {reply, Reply, NState}; -handle_call({open, _Dir, _Config, Caller}, _From,#state{conn=ConnRef}=State) -> +handle_call({open, _Dir, _ConnectionConfig, _SessionConfig, Caller}, _From, #state{conn=ConnRef}=State) -> Monitor = erlang:monitor(process, Caller), true = ets:insert(wterl_ets, {Monitor, Caller}), {reply, {ok, ConnRef}, State};