diff --git a/c_src/build_deps.sh b/c_src/build_deps.sh index 6ecdc50..40c4886 100755 --- a/c_src/build_deps.sh +++ b/c_src/build_deps.sh @@ -20,9 +20,9 @@ case "$1" in if [ -d wiredtiger/.git ]; then (cd wiredtiger && \ git fetch && \ - git merge origin/$WT_BRANCH) + git merge origin/${WT_BRANCH}) else - git clone -b $WT_BRANCH --single-branch $WT_REMOTE_REPO && \ + git clone -b ${WT_BRANCH} --single-branch ${WT_REMOTE_REPO} && \ (cd wiredtiger && \ patch -p1 < ../wiredtiger-extension-link.patch && \ ./autogen.sh) @@ -31,10 +31,10 @@ case "$1" in ../configure --with-pic \ --enable-snappy \ --enable-bzip2 \ - --prefix=$BASEDIR/system && \ + --prefix=${BASEDIR}/system && \ make -j && make install) - [ -d $BASEDIR/../priv ] || mkdir $BASEDIR/../priv - cp $BASEDIR/system/bin/wt $BASEDIR/../priv - cp $BASEDIR/system/lib/*.so $BASEDIR/../priv + [ -d ${BASEDIR}/../priv ] || mkdir ${BASEDIR}/../priv + cp ${BASEDIR}/system/bin/wt ${BASEDIR}/../priv + cp ${BASEDIR}/system/lib/*.so ${BASEDIR}/../priv ;; esac diff --git a/c_src/wterl.c b/c_src/wterl.c index dad02ac..5a45f6e 100644 --- a/c_src/wterl.c +++ b/c_src/wterl.c @@ -21,15 +21,32 @@ #include #include +#include #include "wiredtiger.h" #include "async_nif.h" #include "khash.h" +#ifdef DEBUG +#include +#include +void debugf(const char *fmt, ...) +{ + va_list ap; + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + fprintf(stderr, "\r\n"); + fflush(stderr); + va_end(ap); +} +#else +# define debugf(X, ...) {} +#endif + static ErlNifResourceType *wterl_conn_RESOURCE; static ErlNifResourceType *wterl_cursor_RESOURCE; -KHASH_MAP_INIT_STR(cursors, WT_CURSOR*); +KHASH_INIT(cursors, char*, WT_CURSOR*, 1, kh_str_hash_func, kh_str_hash_equal); /** * We will have exactly one (1) WterlCtx for each async worker thread. As @@ -70,6 +87,8 @@ typedef char Uri[128]; static ERL_NIF_TERM ATOM_ERROR; static ERL_NIF_TERM ATOM_OK; static ERL_NIF_TERM ATOM_NOT_FOUND; +static ERL_NIF_TERM ATOM_FIRST; +static ERL_NIF_TERM ATOM_LAST; /** * Get the per-worker reusable WT_SESSION for a worker_id. @@ -84,8 +103,10 @@ __session_for(WterlConnHandle *conn_handle, unsigned int worker_id, WT_SESSION * enif_mutex_lock(conn_handle->context_mutex); WT_CONNECTION *conn = conn_handle->conn; int rc = conn->open_session(conn, NULL, conn_handle->session_config, session); - if (rc != 0) + if (rc != 0) { + enif_mutex_unlock(conn_handle->context_mutex); return rc; + } ctx->session = *session; ctx->cursors = kh_init(cursors); enif_mutex_unlock(conn_handle->context_mutex); @@ -99,8 +120,18 @@ __close_all_sessions(WterlConnHandle *conn_handle) int i; for (i = 0; i < conn_handle->num_contexts; i++) { WterlCtx *ctx = &conn_handle->contexts[i]; - kh_destroy(cursors, ctx->cursors); - ctx->session->close(ctx->session, NULL); + WT_SESSION *session = ctx->session; + khash_t(cursors) *h = ctx->cursors; + khiter_t itr; + for (itr = kh_begin(h); itr != kh_end(h); ++itr) { + if (kh_exist(h, itr)) { + WT_CURSOR *cursor = kh_val(h, itr); + enif_free(kh_key(h, itr)); + cursor->close(cursor); + } + } + kh_destroy(cursors, h); + session->close(session, NULL); ctx->session = NULL; ctx->cursors = NULL; } @@ -119,12 +150,14 @@ __close_cursors_on(WterlConnHandle *conn_handle, const char *uri) // TODO: race? for (i = 0; i < conn_handle->num_contexts; i++) { WterlCtx *ctx = &conn_handle->contexts[i]; khash_t(cursors) *h = ctx->cursors; - khiter_t itr = kh_get(cursors, h, uri); + khiter_t itr = kh_get(cursors, h, (char *)uri); if (itr != kh_end(h)) { - WT_CURSOR *cursor = (WT_CURSOR*)kh_value(h, itr); + WT_CURSOR *cursor = kh_value(h, itr); + char *key = kh_key(h, itr); kh_del(cursors, h, itr); + enif_free(key); cursor->close(cursor); - //fprintf(stderr, "closing worker_id: %d 0x%p %s\n", i, cursor, uri); fflush(stderr); + debugf("closing worker_id: %d 0x%p %s", i, cursor, uri); } } } @@ -139,7 +172,7 @@ __retain_cursor(WterlConnHandle *conn_handle, unsigned int worker_id, const char /* Check to see if we have a cursor open for this uri and if so reuse it. */ WterlCtx *ctx = &conn_handle->contexts[worker_id]; khash_t(cursors) *h = ctx->cursors; - khiter_t itr = kh_get(cursors, h, uri); + khiter_t itr = kh_get(cursors, h, (char *)uri); if (itr != kh_end(h)) { // key exists in hash table, retrieve it *cursor = (WT_CURSOR*)kh_value(h, itr); @@ -148,10 +181,21 @@ __retain_cursor(WterlConnHandle *conn_handle, unsigned int worker_id, const char enif_mutex_lock(conn_handle->context_mutex); WT_SESSION *session = conn_handle->contexts[worker_id].session; int rc = session->open_cursor(session, uri, NULL, "overwrite,raw", cursor); - if (rc != 0) + if (rc != 0) { + enif_mutex_unlock(conn_handle->context_mutex); return rc; + } + + char *key = enif_alloc(sizeof(Uri)); + if (!key) { + session->close(session, NULL); + enif_mutex_unlock(conn_handle->context_mutex); + return ENOMEM; + } + memcpy(key, uri, 128); + int itr_status; - itr = kh_put(cursors, h, uri, &itr_status); + itr = kh_put(cursors, h, key, &itr_status); kh_value(h, itr) = *cursor; enif_mutex_unlock(conn_handle->context_mutex); } @@ -227,15 +271,24 @@ ASYNC_NIF_DECL( ASYNC_NIF_REPLY(enif_make_badarg(env)); return; } - //fprintf(stderr, "c: %s\ns: %s\n", (char *)config.data, (char *)session_config.data); fflush(stderr); + //debugf("c: %d // %s\ns: %d // %s", config.size, (char *)config.data, (char *)session_config.data, session_config.size); int rc = wiredtiger_open(args->homedir, NULL, config.data[0] != 0 ? (const char*)config.data : NULL, &conn); if (rc == 0) { WterlConnHandle *conn_handle = enif_alloc_resource(wterl_conn_RESOURCE, sizeof(WterlConnHandle)); - conn_handle->conn = conn; - if (session_config.data[0] != 0) - conn_handle->session_config = (const char *)strndup((const char *)session_config.data, session_config.size); // TODO: test for NULL, handle OOM - else + if (session_config.size > 1) { + char *sc = enif_alloc(session_config.size); + if (!sc) { + enif_release_resource(conn_handle); + ASYNC_NIF_REPLY(__strerror_term(env, ENOMEM)); + return; + } + memcpy(sc, session_config.data, session_config.size); + + conn_handle->session_config = (const char *)sc; + } else { conn_handle->session_config = NULL; + } + conn_handle->conn = conn; conn_handle->num_contexts = 0; bzero(conn_handle->contexts, sizeof(WterlCtx) * ASYNC_NIF_MAX_WORKERS); conn_handle->context_mutex = enif_mutex_create(NULL); @@ -374,6 +427,7 @@ ASYNC_NIF_DECL( ErlNifBinary config; if (!enif_inspect_binary(env, args->config, &config)) { ASYNC_NIF_REPLY(enif_make_badarg(env)); + enif_mutex_unlock(args->conn_handle->context_mutex); return; } @@ -385,6 +439,7 @@ ASYNC_NIF_DECL( int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); if (rc != 0) { ASYNC_NIF_REPLY(__strerror_term(env, rc)); + enif_mutex_unlock(args->conn_handle->context_mutex); return; } /* Note: we must first close all cursors referencing this object or this @@ -593,6 +648,8 @@ ASYNC_NIF_DECL( WterlConnHandle *conn_handle; Uri uri; + int start_first; + int stop_last; ERL_NIF_TERM start; ERL_NIF_TERM stop; ERL_NIF_TERM config; @@ -602,15 +659,25 @@ ASYNC_NIF_DECL( if (!(argc == 5 && enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle) && enif_get_string(env, argv[1], args->uri, sizeof args->uri, ERL_NIF_LATIN1) && - (enif_is_binary(env, argv[2]) || enif_is_atom(env, argv[2])) && - (enif_is_binary(env, argv[3]) || enif_is_atom(env, argv[3])) && enif_is_binary(env, argv[4]))) { ASYNC_NIF_RETURN_BADARG(); } - if (!enif_is_atom(env, argv[2])) + if (enif_is_binary(env, argv[2])) { + args->start_first = 0; args->start = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); - if (!enif_is_atom(env, argv[3])) + } else if (enif_is_atom(env, argv[2]) && argv[2] == ATOM_FIRST) { + args->start_first = 1; + } else { + ASYNC_NIF_RETURN_BADARG(); + } + if (enif_is_binary(env, argv[3])) { + args->stop_last = 0; args->stop = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[3]); + } else if (enif_is_atom(env, argv[3]) && argv[3] == ATOM_LAST) { + args->stop_last = 1; + } else { + ASYNC_NIF_RETURN_BADARG(); + } args->config = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[4]); enif_keep_resource((void*)args->conn_handle); }, @@ -623,6 +690,7 @@ ASYNC_NIF_DECL( ErlNifBinary config; if (!enif_inspect_binary(env, args->config, &config)) { ASYNC_NIF_REPLY(enif_make_badarg(env)); + enif_mutex_unlock(args->conn_handle->context_mutex); return; } @@ -635,20 +703,23 @@ ASYNC_NIF_DECL( int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); if (rc != 0) { ASYNC_NIF_REPLY(__strerror_term(env, rc)); + enif_mutex_unlock(args->conn_handle->context_mutex); return; } ErlNifBinary start_key; WT_CURSOR *start = NULL; - if (!enif_is_atom(env, args->start)) { + if (!args->start_first) { if (!enif_inspect_binary(env, args->start, &start_key)) { ASYNC_NIF_REPLY(enif_make_badarg(env)); + enif_mutex_unlock(args->conn_handle->context_mutex); return; } rc = session->open_cursor(session, args->uri, NULL, "raw", &start); if (rc != 0) { ASYNC_NIF_REPLY(__strerror_term(env, rc)); session->close(session, NULL); + enif_mutex_unlock(args->conn_handle->context_mutex); return; } WT_ITEM item_start; @@ -659,15 +730,17 @@ ASYNC_NIF_DECL( ErlNifBinary stop_key; WT_CURSOR *stop = NULL; - if (!enif_is_atom(env, args->stop)) { + if (!args->stop_last) { if (!enif_inspect_binary(env, args->stop, &stop_key)) { ASYNC_NIF_REPLY(enif_make_badarg(env)); + enif_mutex_unlock(args->conn_handle->context_mutex); return; } rc = session->open_cursor(session, args->uri, NULL, "raw", &stop); if (rc != 0) { ASYNC_NIF_REPLY(__strerror_term(env, rc)); session->close(session, NULL); + enif_mutex_unlock(args->conn_handle->context_mutex); return; } WT_ITEM item_stop; @@ -677,6 +750,8 @@ ASYNC_NIF_DECL( } rc = session->truncate(session, args->uri, start, stop, (const char*)config.data); + if (start) start->close(start); + if (stop) stop->close(stop); (void)session->close(session, NULL); enif_mutex_unlock(args->conn_handle->context_mutex); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : __strerror_term(env, rc)); @@ -721,6 +796,7 @@ ASYNC_NIF_DECL( ErlNifBinary config; if (!enif_inspect_binary(env, args->config, &config)) { ASYNC_NIF_REPLY(enif_make_badarg(env)); + enif_mutex_unlock(args->conn_handle->context_mutex); return; } @@ -732,6 +808,7 @@ ASYNC_NIF_DECL( int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); if (rc != 0) { ASYNC_NIF_REPLY(__strerror_term(env, rc)); + enif_mutex_unlock(args->conn_handle->context_mutex); return; } @@ -781,6 +858,7 @@ ASYNC_NIF_DECL( ErlNifBinary config; if (!enif_inspect_binary(env, args->config, &config)) { ASYNC_NIF_REPLY(enif_make_badarg(env)); + enif_mutex_unlock(args->conn_handle->context_mutex); return; } @@ -792,6 +870,7 @@ ASYNC_NIF_DECL( int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); if (rc != 0) { ASYNC_NIF_REPLY(__strerror_term(env, rc)); + enif_mutex_unlock(args->conn_handle->context_mutex); return; } @@ -1052,7 +1131,7 @@ ASYNC_NIF_DECL( /* We create a separate session here to ensure that operations are thread safe. */ WT_CONNECTION *conn = args->conn_handle->conn; WT_SESSION *session = NULL; - //fprintf(stderr, "cursor open: %s\n", (char *)args->conn_handle->session_config); fflush(stderr); + //debugf("cursor open: %s", (char *)args->conn_handle->session_config); int rc = conn->open_session(conn, NULL, args->conn_handle->session_config, &session); if (rc != 0) { ASYNC_NIF_REPLY(__strerror_term(env, rc)); @@ -1632,17 +1711,23 @@ __resource_conn_dtor(ErlNifEnv *env, void *obj) enif_mutex_lock(conn_handle->context_mutex); for (i = 0; i < conn_handle->num_contexts; i++) { WterlCtx *ctx = &conn_handle->contexts[i]; - WT_CURSOR *cursor; - kh_foreach_value(ctx->cursors, cursor, { - cursor->close(cursor); - }); - kh_destroy(cursors, ctx->cursors); - ctx->session->close(ctx->session, NULL); + WT_SESSION *session = ctx->session; + khash_t(cursors) *h = ctx->cursors; + khiter_t itr; + for (itr = kh_begin(h); itr != kh_end(h); ++itr) { + if (kh_exist(h, itr)) { + WT_CURSOR *cursor = kh_val(h, itr); + enif_free(kh_key(h, itr)); + cursor->close(cursor); + } + } + kh_destroy(cursors, h); + session->close(session, NULL); } enif_mutex_unlock(conn_handle->context_mutex); enif_mutex_destroy(conn_handle->context_mutex); if (conn_handle->session_config) - free((void *)conn_handle->session_config); + enif_free((void *)conn_handle->session_config); } /** @@ -1668,6 +1753,8 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info) ATOM_ERROR = enif_make_atom(env, "error"); ATOM_OK = enif_make_atom(env, "ok"); ATOM_NOT_FOUND = enif_make_atom(env, "not_found"); + ATOM_FIRST = enif_make_atom(env, "first"); + ATOM_LAST = enif_make_atom(env, "last"); *priv_data = ASYNC_NIF_LOAD(); diff --git a/src/riak_kv_wterl_backend.erl b/src/riak_kv_wterl_backend.erl index fc6584e..1de076f 100644 --- a/src/riak_kv_wterl_backend.erl +++ b/src/riak_kv_wterl_backend.erl @@ -131,9 +131,9 @@ stop(_State) -> {ok, any(), state()} | {ok, not_found, state()} | {error, term(), state()}. -get(Bucket, Key, #state{connection=Connection}=State) -> +get(Bucket, Key, #state{connection=Connection, table=Table}=State) -> WTKey = to_object_key(Bucket, Key), - case wterl:get(Connection, WTKey) of + case wterl:get(Connection, Table, WTKey) of {ok, Value} -> {ok, Value, State}; not_found -> @@ -150,8 +150,8 @@ get(Bucket, Key, #state{connection=Connection}=State) -> -spec put(riak_object:bucket(), riak_object:key(), [index_spec()], binary(), state()) -> {ok, state()} | {error, term(), state()}. -put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{connection=Connection}=State) -> - case wterl:put(Connection, to_object_key(Bucket, PrimaryKey), Val) of +put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{connection=Connection, table=Table}=State) -> + case wterl:put(Connection, Table, to_object_key(Bucket, PrimaryKey), Val) of ok -> {ok, State}; {error, Reason} -> @@ -165,8 +165,8 @@ put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{connection=Connection}=State) - -spec delete(riak_object:bucket(), riak_object:key(), [index_spec()], state()) -> {ok, state()} | {error, term(), state()}. -delete(Bucket, Key, _IndexSpecs, #state{connection=Connection}=State) -> - case wterl:delete(Connection, to_object_key(Bucket, Key)) of +delete(Bucket, Key, _IndexSpecs, #state{connection=Connection, table=Table}=State) -> + case wterl:delete(Connection, Table, to_object_key(Bucket, Key)) of ok -> {ok, State}; {error, Reason} -> diff --git a/src/wterl.erl b/src/wterl.erl index e3dae5a..c166a91 100644 --- a/src/wterl.erl +++ b/src/wterl.erl @@ -618,13 +618,12 @@ various_online_test_() -> fun() -> ?assertMatch(ok, checkpoint(ConnRef, [{target, ["table:test"]}])), ?assertMatch({ok, <<"apple">>}, get(ConnRef, "table:test", <<"a">>)) - end} - %% , - %% {"truncate", - %% fun() -> - %% ?assertMatch(ok, truncate(ConnRef, "table:test")), - %% ?assertMatch(not_found, get(ConnRef, "table:test", <<"a">>)) - %% end}, + end}, + {"truncate", + fun() -> + ?assertMatch(ok, truncate(ConnRef, "table:test")), + ?assertMatch(not_found, get(ConnRef, "table:test", <<"a">>)) + end}, %% {"truncate range, found", %% fun() -> %% ?assertMatch(ok, truncate(ConnRef, "table:test", <<"b">>, last)), @@ -646,10 +645,10 @@ various_online_test_() -> %% ?assertMatch(not_found, get(ConnRef, "table:test", <<"f">>)), %% ?assertMatch({ok, <<"gooseberry">>}, get(ConnRef, "table:test", <<"g">>)) %% end}, - %% {"drop table", - %% fun() -> - %% ?assertMatch(ok, drop(ConnRef, "table:test")) - %% end} + {"drop table", + fun() -> + ?assertMatch(ok, drop(ConnRef, "table:test")) + end} ]} end}.