diff --git a/c_src/build_deps.sh b/c_src/build_deps.sh index 371c99f..6ecdc50 100755 --- a/c_src/build_deps.sh +++ b/c_src/build_deps.sh @@ -3,6 +3,7 @@ set -e WT_BRANCH=basho +WT_REMOTE_REPO=http://github.com/wiredtiger/wiredtiger.git [ `basename $PWD` != "c_src" ] && cd c_src @@ -21,12 +22,19 @@ case "$1" in git fetch && \ git merge origin/$WT_BRANCH) else - git clone http://github.com/wiredtiger/wiredtiger.git -b $WT_BRANCH && \ - (cd wiredtiger && ./autogen.sh) + git clone -b $WT_BRANCH --single-branch $WT_REMOTE_REPO && \ + (cd wiredtiger && \ + patch -p1 < ../wiredtiger-extension-link.patch && \ + ./autogen.sh) fi (cd wiredtiger/build_posix && \ ../configure --with-pic \ + --enable-snappy \ + --enable-bzip2 \ --prefix=$BASEDIR/system && \ - make -j 8 && make install) + make -j && make install) + [ -d $BASEDIR/../priv ] || mkdir $BASEDIR/../priv + cp $BASEDIR/system/bin/wt $BASEDIR/../priv + cp $BASEDIR/system/lib/*.so $BASEDIR/../priv ;; esac diff --git a/c_src/wiredtiger-extension-link.patch b/c_src/wiredtiger-extension-link.patch new file mode 100644 index 0000000..46beb29 --- /dev/null +++ b/c_src/wiredtiger-extension-link.patch @@ -0,0 +1,22 @@ +diff --git a/ext/compressors/bzip2/Makefile.am b/ext/compressors/bzip2/Makefile.am +index 0aedc2e..1cc4cf6 100644 +--- a/ext/compressors/bzip2/Makefile.am ++++ b/ext/compressors/bzip2/Makefile.am +@@ -2,5 +2,5 @@ AM_CPPFLAGS = -I$(top_builddir) -I$(top_srcdir)/src/include + + lib_LTLIBRARIES = libwiredtiger_bzip2.la + libwiredtiger_bzip2_la_SOURCES = bzip2_compress.c +-libwiredtiger_bzip2_la_LDFLAGS = -avoid-version -module ++libwiredtiger_bzip2_la_LDFLAGS = -avoid-version -module -Wl,-rpath,lib/wterl/priv:priv:/usr/local/lib + libwiredtiger_bzip2_la_LIBADD = -lbz2 +diff --git a/ext/compressors/snappy/Makefile.am b/ext/compressors/snappy/Makefile.am +index 6d78823..7d35777 100644 +--- a/ext/compressors/snappy/Makefile.am ++++ b/ext/compressors/snappy/Makefile.am +@@ -2,5 +2,5 @@ AM_CPPFLAGS = -I$(top_builddir) -I$(top_srcdir)/src/include + + lib_LTLIBRARIES = libwiredtiger_snappy.la + libwiredtiger_snappy_la_SOURCES = snappy_compress.c +-libwiredtiger_snappy_la_LDFLAGS = -avoid-version -module ++libwiredtiger_snappy_la_LDFLAGS = -avoid-version -module -Wl,-rpath,lib/wterl/priv:priv:/usr/local/lib + libwiredtiger_snappy_la_LIBADD = -lsnappy diff --git a/c_src/wterl.c b/c_src/wterl.c index 9636465..076aa03 100644 --- a/c_src/wterl.c +++ b/c_src/wterl.c @@ -44,12 +44,83 @@ typedef struct { WT_CURSOR* cursor; } WterlCursorHandle; -typedef char Uri[128]; // object names +typedef char Uri[128]; // object names // Atoms (initialized in on_load) static ERL_NIF_TERM ATOM_ERROR; static ERL_NIF_TERM ATOM_OK; +typedef ERL_NIF_TERM (*CursorRetFun)(ErlNifEnv* env, WT_CURSOR* cursor, int rc); + +// Prototypes +static ERL_NIF_TERM wterl_conn_close(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM wterl_conn_open(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM wterl_cursor_close(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM wterl_cursor_insert(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM wterl_cursor_key_ret(ErlNifEnv* env, WT_CURSOR *cursor, int rc); +static ERL_NIF_TERM wterl_cursor_kv_ret(ErlNifEnv* env, WT_CURSOR *cursor, int rc); +static ERL_NIF_TERM wterl_cursor_next(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM wterl_cursor_next_key(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM wterl_cursor_next_value(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM wterl_cursor_np_worker(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[], + CursorRetFun cursor_ret_fun, int next); +static ERL_NIF_TERM wterl_cursor_open(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM wterl_cursor_prev(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM wterl_cursor_prev_key(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM wterl_cursor_prev_value(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM wterl_cursor_remove(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM wterl_cursor_reset(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM wterl_cursor_search(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM wterl_cursor_search_near(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM wterl_cursor_search_worker(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[], int near); +static ERL_NIF_TERM wterl_cursor_update(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM wterl_cursor_value_ret(ErlNifEnv* env, WT_CURSOR *cursor, int rc); +static ERL_NIF_TERM wterl_session_checkpoint(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM wterl_session_close(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM wterl_session_create(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM wterl_session_delete(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM wterl_session_drop(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM wterl_session_get(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM wterl_session_open(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM wterl_session_put(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM wterl_session_rename(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM wterl_session_salvage(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM wterl_session_truncate(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM wterl_session_upgrade(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM wterl_session_verify(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); + +static ErlNifFunc nif_funcs[] = +{ + {"conn_close", 1, wterl_conn_close}, + {"conn_open", 2, wterl_conn_open}, + {"cursor_close", 1, wterl_cursor_close}, + {"cursor_insert", 3, wterl_cursor_insert}, + {"cursor_next", 1, wterl_cursor_next}, + {"cursor_next_key", 1, wterl_cursor_next_key}, + {"cursor_next_value", 1, wterl_cursor_next_value}, + {"cursor_open", 2, wterl_cursor_open}, + {"cursor_prev", 1, wterl_cursor_prev}, + {"cursor_prev_key", 1, wterl_cursor_prev_key}, + {"cursor_prev_value", 1, wterl_cursor_prev_value}, + {"cursor_remove", 2, wterl_cursor_remove}, + {"cursor_reset", 1, wterl_cursor_reset}, + {"cursor_search", 2, wterl_cursor_search}, + {"cursor_search_near", 2, wterl_cursor_search_near}, + {"cursor_update", 3, wterl_cursor_update}, + {"session_checkpoint", 2, wterl_session_checkpoint}, + {"session_close", 1, wterl_session_close}, + {"session_create", 3, wterl_session_create}, + {"session_delete", 3, wterl_session_delete}, + {"session_drop", 3, wterl_session_drop}, + {"session_get", 3, wterl_session_get}, + {"session_open", 2, wterl_session_open}, + {"session_put", 4, wterl_session_put}, + {"session_rename", 4, wterl_session_rename}, + {"session_salvage", 3, wterl_session_salvage}, + {"session_truncate", 3, wterl_session_truncate}, + {"session_upgrade", 3, wterl_session_upgrade}, + {"session_verify", 3, wterl_session_verify}, +}; static inline ERL_NIF_TERM wterl_strerror(ErlNifEnv* env, int rc) { @@ -77,7 +148,7 @@ ASYNC_NIF_DECL( }, { // work - WT_CONNECTION* conn; + WT_CONNECTION* conn; ErlNifBinary config; if (!enif_inspect_binary(env, args->config, &config)) { ASYNC_NIF_REPLY(enif_make_badarg(env)); @@ -85,18 +156,18 @@ ASYNC_NIF_DECL( } int rc = wiredtiger_open(args->homedir, NULL, (const char*)config.data, &conn); - if (rc == 0) - { - WterlConnHandle* conn_handle = enif_alloc_resource(wterl_conn_RESOURCE, sizeof(WterlConnHandle)); - conn_handle->conn = conn; - ERL_NIF_TERM result = enif_make_resource(env, conn_handle); - enif_release_resource(conn_handle); + if (rc == 0) + { + WterlConnHandle* conn_handle = enif_alloc_resource(wterl_conn_RESOURCE, sizeof(WterlConnHandle)); + conn_handle->conn = conn; + ERL_NIF_TERM result = enif_make_resource(env, conn_handle); + enif_release_resource(conn_handle); ASYNC_NIF_REPLY(enif_make_tuple2(env, ATOM_OK, result)); - } - else - { + } + else + { ASYNC_NIF_REPLY(wterl_strerror(env, rc)); - } + } }, { // post @@ -140,7 +211,7 @@ ASYNC_NIF_DECL( enif_get_resource(env, argv[0], wterl_conn_RESOURCE, (void**)&args->conn_handle) && enif_is_binary(env, argv[1]))) { ASYNC_NIF_RETURN_BADARG(); - } + } args->config = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[1]); enif_keep_resource((void*)args->conn_handle); }, @@ -153,21 +224,21 @@ ASYNC_NIF_DECL( ASYNC_NIF_REPLY(enif_make_badarg(env)); return; } - int rc = conn->open_session(conn, NULL, (const char*)config.data, &session); - if (rc == 0) - { - WterlSessionHandle* session_handle = - enif_alloc_resource(wterl_session_RESOURCE, sizeof(WterlSessionHandle)); - session_handle->session = session; - ERL_NIF_TERM result = enif_make_resource(env, session_handle); + int rc = conn->open_session(conn, NULL, (const char *)config.data, &session); + if (rc == 0) + { + WterlSessionHandle* session_handle = + enif_alloc_resource(wterl_session_RESOURCE, sizeof(WterlSessionHandle)); + session_handle->session = session; + ERL_NIF_TERM result = enif_make_resource(env, session_handle); enif_keep_resource(args->conn_handle); - enif_release_resource(session_handle); + enif_release_resource(session_handle); ASYNC_NIF_REPLY(enif_make_tuple2(env, ATOM_OK, result)); - } - else - { + } + else + { ASYNC_NIF_REPLY(wterl_strerror(env, rc)); - } + } }, { // post @@ -225,7 +296,7 @@ ASYNC_NIF_DECL( if (!enif_inspect_binary(env, args->config, &config)) { ASYNC_NIF_REPLY(enif_make_badarg(env)); return; - } +} int rc = session->create(session, args->uri, (const char*)config.data); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : wterl_strerror(env, rc)); }, @@ -249,7 +320,7 @@ ASYNC_NIF_DECL( enif_get_string(env, argv[1], args->uri, sizeof args->uri, ERL_NIF_LATIN1) && enif_is_binary(env, argv[2]))) { ASYNC_NIF_RETURN_BADARG(); - } +} args->config = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); enif_keep_resource((void*)args->session_handle); }, @@ -260,19 +331,14 @@ ASYNC_NIF_DECL( if (!enif_inspect_binary(env, args->config, &config)) { ASYNC_NIF_REPLY(enif_make_badarg(env)); return; - } +} int rc = session->drop(session, args->uri, (const char*)config.data); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : wterl_strerror(env, rc)); }, { // post - enif_release_resource((void*)args->session_handle); - }); - -ASYNC_NIF_DECL( - wterl_session_rename, - { // struct - +static ERL_NIF_TERM wterl_session_rename(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ WterlSessionHandle* session_handle; ERL_NIF_TERM config; Uri oldname; @@ -293,11 +359,11 @@ ASYNC_NIF_DECL( { // work WT_SESSION* session = args->session_handle->session; - ErlNifBinary config; + ErlNifBinary config; if (!enif_inspect_binary(env, args->config, &config)) { ASYNC_NIF_REPLY(enif_make_badarg(env)); return; - } + } int rc = session->rename(session, args->oldname, args->newname, (const char*)config.data); ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : wterl_strerror(env, rc)); }, @@ -332,400 +398,199 @@ ASYNC_NIF_DECL( if (!enif_inspect_binary(env, args->config, &config)) { ASYNC_NIF_REPLY(enif_make_badarg(env)); return; - } - int rc = session->salvage(session, args->uri, (const char*)config.data); - ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : wterl_strerror(env, rc)); - }, - { // post +} - enif_release_resource((void*)args->session_handle); - }); - -ASYNC_NIF_DECL( - wterl_session_checkpoint, - { // struct +static ERL_NIF_TERM wterl_session_salvage(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + return wterl_session_worker(env, argc, argv, WTERL_OP_SALVAGE); +} +static ERL_NIF_TERM wterl_session_checkpoint(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ WterlSessionHandle* session_handle; - ERL_NIF_TERM config; - }, - { // pre - - if (!(argc == 2 && - enif_get_resource(env, argv[0], wterl_session_RESOURCE, (void**)&args->session_handle) && - enif_is_binary(env, argv[1]))) { - ASYNC_NIF_RETURN_BADARG(); - } - args->config = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[1]); - enif_keep_resource((void*)args->session_handle); - }, - { // work - - WT_SESSION* session = args->session_handle->session; ErlNifBinary config; - if (!enif_inspect_binary(env, args->config, &config)) { - ASYNC_NIF_REPLY(enif_make_badarg(env)); - return; - } - int rc = session->checkpoint(session, (const char*)config.data); - ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : wterl_strerror(env, rc)); - }, - { // post - - enif_release_resource((void*)args->session_handle); - }); - -ASYNC_NIF_DECL( - wterl_session_truncate, - { // struct - - WterlSessionHandle* session_handle; - Uri uri; - ERL_NIF_TERM config; - }, - { // pre - - if (!(argc == 3 && - enif_get_resource(env, argv[0], wterl_session_RESOURCE, (void**)&args->session_handle) && - enif_get_string(env, argv[1], args->uri, sizeof args->uri, ERL_NIF_LATIN1) && - enif_is_binary(env, argv[2]))) { - ASYNC_NIF_RETURN_BADARG(); - } - args->config = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); - enif_keep_resource((void*)args->session_handle); - }, - { // work - - // Ignore the cursor start/stop form of truncation for now, - // support only the full file truncation. - WT_SESSION* session = args->session_handle->session; - ErlNifBinary config; - if (!enif_inspect_binary(env, args->config, &config)) { - ASYNC_NIF_REPLY(enif_make_badarg(env)); - return; - } - int rc = session->truncate(session, args->uri, NULL, NULL, (const char*)config.data); - ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : wterl_strerror(env, rc)); - }, - { // post - - enif_release_resource((void*)args->session_handle); - }); - -ASYNC_NIF_DECL( - wterl_session_upgrade, - { // struct - - WterlSessionHandle* session_handle; - Uri uri; - ERL_NIF_TERM config; - }, - { // pre - - if (!(argc == 3 && - enif_get_resource(env, argv[0], wterl_session_RESOURCE, (void**)&args->session_handle) && - enif_get_string(env, argv[1], args->uri, sizeof args->uri, ERL_NIF_LATIN1) && - enif_is_binary(env, argv[2]))) { - ASYNC_NIF_RETURN_BADARG(); - } - args->config = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); - enif_keep_resource((void*)args->session_handle); - }, - { // work - - WT_SESSION* session = args->session_handle->session; - ErlNifBinary config; - if (!enif_inspect_binary(env, args->config, &config)) { - ASYNC_NIF_REPLY(enif_make_badarg(env)); - return; - } - int rc = session->upgrade(session, args->uri, (const char*)config.data); - ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : wterl_strerror(env, rc)); - }, - { // post - - enif_release_resource((void*)args->session_handle); - }); - -ASYNC_NIF_DECL( - wterl_session_verify, - { // struct - - WterlSessionHandle* session_handle; - Uri uri; - ERL_NIF_TERM config; - }, - { // pre - - if (!(argc == 3 && - enif_get_resource(env, argv[0], wterl_session_RESOURCE, (void**)&args->session_handle) && - enif_get_string(env, argv[1], args->uri, sizeof args->uri, ERL_NIF_LATIN1) && - enif_is_binary(env, argv[2]))) { - ASYNC_NIF_RETURN_BADARG(); - } - args->config = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); - enif_keep_resource((void*)args->session_handle); - }, - { // work - - WT_SESSION* session = args->session_handle->session; - ErlNifBinary config; - if (!enif_inspect_binary(env, args->config, &config)) { - ASYNC_NIF_REPLY(enif_make_badarg(env)); - return; - } - int rc = session->verify(session, args->uri, (const char*)config.data); - ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : wterl_strerror(env, rc)); - }, - { // post - - enif_release_resource((void*)args->session_handle); - }); - -ASYNC_NIF_DECL( - wterl_session_delete, - { // struct - - WterlSessionHandle* session_handle; - Uri uri; - ERL_NIF_TERM key; - }, - { // pre - - if (!(argc == 3 && - enif_get_resource(env, argv[0], wterl_session_RESOURCE, (void**)&args->session_handle) && - enif_get_string(env, argv[1], args->uri, sizeof args->uri, ERL_NIF_LATIN1) && - enif_is_binary(env, argv[2]))) { - ASYNC_NIF_RETURN_BADARG(); - } - args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); - enif_keep_resource((void*)args->session_handle); - }, - { // work - - WT_SESSION* session = args->session_handle->session; - ErlNifBinary key; - if (!enif_inspect_binary(env, args->key, &key)) { - ASYNC_NIF_REPLY(enif_make_badarg(env)); - return; - } - WT_CURSOR* cursor; - int rc = session->open_cursor(session, args->uri, NULL, "raw", &cursor); - if (rc != 0) + if (enif_get_resource(env, argv[0], wterl_session_RESOURCE, (void**)&session_handle) && + enif_inspect_binary(env, argv[1], &config)) { - ASYNC_NIF_REPLY(wterl_strerror(env, rc)); - return; + WT_SESSION* session = session_handle->session; + int rc = session->checkpoint(session, (const char*)config.data); + return rc == 0 ? ATOM_OK : wterl_strerror(env, rc); } - WT_ITEM raw_key; - raw_key.data = key.data; - raw_key.size = key.size; - cursor->set_key(cursor, &raw_key); - rc = cursor->remove(cursor); - cursor->close(cursor); - ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : wterl_strerror(env, rc)); - }, - { // post + return enif_make_badarg(env); +} - enif_release_resource((void*)args->session_handle); - }); +static ERL_NIF_TERM wterl_session_truncate(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + return wterl_session_worker(env, argc, argv, WTERL_OP_TRUNCATE); +} -ASYNC_NIF_DECL( - wterl_session_get, - { // struct +static ERL_NIF_TERM wterl_session_upgrade(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + return wterl_session_worker(env, argc, argv, WTERL_OP_UPGRADE); +} +static ERL_NIF_TERM wterl_session_verify(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + return wterl_session_worker(env, argc, argv, WTERL_OP_VERIFY); +} + +static ERL_NIF_TERM wterl_session_delete(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ WterlSessionHandle* session_handle; - Uri uri; - ERL_NIF_TERM key; - }, - { // pre - - if (!(argc == 3 && - enif_get_resource(env, argv[0], wterl_session_RESOURCE, (void**)&args->session_handle) && - enif_get_string(env, argv[1], args->uri, sizeof args->uri, ERL_NIF_LATIN1) && - enif_is_binary(env, argv[2]))) { - ASYNC_NIF_RETURN_BADARG(); - } - args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); - enif_keep_resource((void*)args->session_handle); - }, - { // work - - WT_SESSION* session = args->session_handle->session; - ErlNifBinary key; - if (!enif_inspect_binary(env, args->key, &key)) { - ASYNC_NIF_REPLY(enif_make_badarg(env)); - return; - } - WT_CURSOR* cursor; - int rc = session->open_cursor(session, args->uri, NULL, "overwrite,raw", &cursor); - if (rc != 0) + if (enif_get_resource(env, argv[0], wterl_session_RESOURCE, (void**)&session_handle)) { - ASYNC_NIF_REPLY(wterl_strerror(env, rc)); - return; + Uri uri; + ErlNifBinary key; + if (enif_get_string(env, argv[1], uri, sizeof uri, ERL_NIF_LATIN1) && + enif_inspect_binary(env, argv[2], &key)) + { + WT_SESSION* session = session_handle->session; + WT_CURSOR* cursor; + int rc = session->open_cursor(session, uri, NULL, "raw", &cursor); + if (rc != 0) + { + return wterl_strerror(env, rc); + } + WT_ITEM raw_key; + raw_key.data = key.data; + raw_key.size = key.size; + cursor->set_key(cursor, &raw_key); + rc = cursor->remove(cursor); + cursor->close(cursor); + return rc == 0 ? ATOM_OK : wterl_strerror(env, rc); + } } - WT_ITEM raw_key; - WT_ITEM raw_value; - raw_key.data = key.data; - raw_key.size = key.size; - cursor->set_key(cursor, &raw_key); - rc = cursor->search(cursor); - if (rc == 0) - { - rc = cursor->get_value(cursor, &raw_value); - if (rc == 0) - { - ERL_NIF_TERM value; - unsigned char* bin = enif_make_new_binary(env, raw_value.size, &value); - memcpy(bin, raw_value.data, raw_value.size); - cursor->close(cursor); - ASYNC_NIF_REPLY(enif_make_tuple2(env, ATOM_OK, value)); - return; - } - } - cursor->close(cursor); - ASYNC_NIF_REPLY(wterl_strerror(env, rc)); - }, - { // post - - enif_release_resource((void*)args->session_handle); - }); - -ASYNC_NIF_DECL( - wterl_session_put, - { // struct + return enif_make_badarg(env); +} +static ERL_NIF_TERM wterl_session_get(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ WterlSessionHandle* session_handle; - Uri uri; - ERL_NIF_TERM key; - ERL_NIF_TERM value; - }, - { // pre - - if (!(argc == 4 && - enif_get_resource(env, argv[0], wterl_session_RESOURCE, (void**)&args->session_handle) && - enif_get_string(env, argv[1], args->uri, sizeof args->uri, ERL_NIF_LATIN1) && - enif_is_binary(env, argv[2]) && - enif_is_binary(env, argv[3]))) { - ASYNC_NIF_RETURN_BADARG(); - } - args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); - args->value = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[3]); - enif_keep_resource((void*)args->session_handle); - }, - { // work - - WT_SESSION* session = args->session_handle->session; - ErlNifBinary key; - ErlNifBinary value; - if (!enif_inspect_binary(env, args->key, &key)) { - ASYNC_NIF_REPLY(enif_make_badarg(env)); - return; - } - if (!enif_inspect_binary(env, args->value, &value)) { - ASYNC_NIF_REPLY(enif_make_badarg(env)); - return; - } - WT_CURSOR* cursor; - int rc = session->open_cursor(session, args->uri, NULL, "overwrite,raw", &cursor); - if (rc != 0) + if (enif_get_resource(env, argv[0], wterl_session_RESOURCE, (void**)&session_handle)) { - ASYNC_NIF_REPLY(wterl_strerror(env, rc)); - return; + Uri uri; + ErlNifBinary key; + if (enif_get_string(env, argv[1], uri, sizeof uri, ERL_NIF_LATIN1) && + enif_inspect_binary(env, argv[2], &key)) + { + WT_SESSION* session = session_handle->session; + WT_CURSOR* cursor; + int rc = session->open_cursor(session, uri, NULL, "overwrite,raw", &cursor); + if (rc != 0) + { + return wterl_strerror(env, rc); + } + WT_ITEM raw_key, raw_value; + raw_key.data = key.data; + raw_key.size = key.size; + cursor->set_key(cursor, &raw_key); + rc = cursor->search(cursor); + if (rc == 0) + { + rc = cursor->get_value(cursor, &raw_value); + if (rc == 0) + { + ERL_NIF_TERM value; + unsigned char* bin = enif_make_new_binary(env, raw_value.size, &value); + memcpy(bin, raw_value.data, raw_value.size); + cursor->close(cursor); + return enif_make_tuple2(env, ATOM_OK, value); + } + } + cursor->close(cursor); + return wterl_strerror(env, rc); + } } - WT_ITEM raw_key; - WT_ITEM raw_value; - raw_key.data = key.data; - raw_key.size = key.size; - cursor->set_key(cursor, &raw_key); - raw_value.data = value.data; - raw_value.size = value.size; - cursor->set_value(cursor, &raw_value); - rc = cursor->insert(cursor); - cursor->close(cursor); - ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : wterl_strerror(env, rc)); - }, - { // post - - enif_release_resource((void*)args->session_handle); - }); - -ASYNC_NIF_DECL( - wterl_cursor_open, - { // struct + return enif_make_badarg(env); +} +static ERL_NIF_TERM wterl_session_put(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ WterlSessionHandle* session_handle; - Uri uri; - }, - { // pre - - if (!(argc == 2 && - enif_get_resource(env, argv[0], wterl_session_RESOURCE, (void**)&args->session_handle) && - enif_get_string(env, argv[1], args->uri, sizeof args->uri, ERL_NIF_LATIN1))) { - ASYNC_NIF_RETURN_BADARG(); - } - enif_keep_resource((void*)args->session_handle); - }, - { // work - - WT_SESSION* session = args->session_handle->session; - WT_CURSOR* cursor; - int rc = session->open_cursor(session, args->uri, NULL, "overwrite,raw", &cursor); - if (rc == 0) + if (enif_get_resource(env, argv[0], wterl_session_RESOURCE, (void**)&session_handle)) { - WterlCursorHandle* cursor_handle = - enif_alloc_resource(wterl_cursor_RESOURCE, sizeof(WterlCursorHandle)); - cursor_handle->cursor = cursor; - ERL_NIF_TERM result = enif_make_resource(env, cursor_handle); - enif_keep_resource(args->session_handle); - enif_release_resource(cursor_handle); - ASYNC_NIF_REPLY(enif_make_tuple2(env, ATOM_OK, result)); + Uri uri; + ErlNifBinary key, value; + if (enif_get_string(env, argv[1], uri, sizeof uri, ERL_NIF_LATIN1) && + enif_inspect_binary(env, argv[2], &key) && + enif_inspect_binary(env, argv[3], &value)) + { + WT_SESSION* session = session_handle->session; + WT_CURSOR* cursor; + int rc = session->open_cursor(session, uri, NULL, "overwrite,raw", &cursor); + if (rc != 0) + { + return wterl_strerror(env, rc); + } + WT_ITEM raw_key, raw_value; + raw_key.data = key.data; + raw_key.size = key.size; + cursor->set_key(cursor, &raw_key); + raw_value.data = value.data; + raw_value.size = value.size; + cursor->set_value(cursor, &raw_value); + rc = cursor->insert(cursor); + cursor->close(cursor); + return rc == 0 ? ATOM_OK : wterl_strerror(env, rc); + } } - else + return enif_make_badarg(env); +} + +static ERL_NIF_TERM wterl_cursor_open(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + WterlSessionHandle* session_handle; + if (enif_get_resource(env, argv[0], wterl_session_RESOURCE, (void**)&session_handle)) { - ASYNC_NIF_REPLY(wterl_strerror(env, rc)); + WT_CURSOR* cursor; + Uri uri; + if (enif_get_string(env, argv[1], uri, sizeof uri, ERL_NIF_LATIN1)) + { + WT_SESSION* session = session_handle->session; + int rc = session->open_cursor(session, uri, NULL, "overwrite,raw", &cursor); + if (rc == 0) + { + WterlCursorHandle* cursor_handle = + enif_alloc_resource(wterl_cursor_RESOURCE, sizeof(WterlCursorHandle)); + cursor_handle->cursor = cursor; + ERL_NIF_TERM result = enif_make_resource(env, cursor_handle); + enif_keep_resource(session_handle); + enif_release_resource(cursor_handle); + return enif_make_tuple2(env, ATOM_OK, result); + } + else + { + return wterl_strerror(env, rc); + } + } } - }, - { // post - - enif_release_resource((void*)args->session_handle); - }); - -ASYNC_NIF_DECL( - wterl_cursor_close, - { // struct + return enif_make_badarg(env); +} +static ERL_NIF_TERM wterl_cursor_close(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ WterlCursorHandle *cursor_handle; - }, - { // pre - - if (!(argc == 1 && - enif_get_resource(env, argv[0], wterl_cursor_RESOURCE, (void**)&args->cursor_handle))) { - ASYNC_NIF_RETURN_BADARG(); + if (enif_get_resource(env, argv[0], wterl_cursor_RESOURCE, (void**)&cursor_handle)) + { + WT_CURSOR* cursor = cursor_handle->cursor; + int rc = cursor->close(cursor); + return rc == 0 ? ATOM_OK : wterl_strerror(env, rc); } - enif_keep_resource((void*)args->cursor_handle); - }, - { // work - - WT_CURSOR* cursor = args->cursor_handle->cursor; - int rc = cursor->close(cursor); - ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : wterl_strerror(env, rc)); - }, - { // post - - enif_release_resource((void*)args->cursor_handle); - }); + return enif_make_badarg(env); +} static ERL_NIF_TERM wterl_cursor_key_ret(ErlNifEnv* env, WT_CURSOR *cursor, int rc) { if (rc == 0) { - WT_ITEM raw_key; - rc = cursor->get_key(cursor, &raw_key); - if (rc == 0) + WT_ITEM raw_key; + rc = cursor->get_key(cursor, &raw_key); + if (rc == 0) { - ERL_NIF_TERM key; - memcpy(enif_make_new_binary(env, raw_key.size, &key), raw_key.data, raw_key.size); - return enif_make_tuple2(env, ATOM_OK, key); - } + ERL_NIF_TERM key; + memcpy(enif_make_new_binary(env, raw_key.size, &key), raw_key.data, raw_key.size); + return enif_make_tuple2(env, ATOM_OK, key); + } } return wterl_strerror(env, rc); } @@ -735,16 +600,16 @@ static ERL_NIF_TERM wterl_cursor_kv_ret(ErlNifEnv* env, WT_CURSOR *cursor, int r if (rc == 0) { WT_ITEM raw_key, raw_value; - rc = cursor->get_key(cursor, &raw_key); - if (rc == 0) + rc = cursor->get_key(cursor, &raw_key); + if (rc == 0) { rc = cursor->get_value(cursor, &raw_value); if (rc == 0) { - ERL_NIF_TERM key, value; - memcpy(enif_make_new_binary(env, raw_key.size, &key), raw_key.data, raw_key.size); - memcpy(enif_make_new_binary(env, raw_value.size, &value), raw_value.data, raw_value.size); - return enif_make_tuple3(env, ATOM_OK, key, value); + ERL_NIF_TERM key, value; + memcpy(enif_make_new_binary(env, raw_key.size, &key), raw_key.data, raw_key.size); + memcpy(enif_make_new_binary(env, raw_value.size, &value), raw_value.data, raw_value.size); + return enif_make_tuple3(env, ATOM_OK, key, value); } } } @@ -755,407 +620,164 @@ static ERL_NIF_TERM wterl_cursor_value_ret(ErlNifEnv* env, WT_CURSOR *cursor, in { if (rc == 0) { - WT_ITEM raw_value; - rc = cursor->get_value(cursor, &raw_value); - if (rc == 0) + WT_ITEM raw_value; + rc = cursor->get_value(cursor, &raw_value); + if (rc == 0) { - ERL_NIF_TERM value; - memcpy(enif_make_new_binary(env, raw_value.size, &value), raw_value.data, raw_value.size); - return enif_make_tuple2(env, ATOM_OK, value); - } + ERL_NIF_TERM value; + memcpy(enif_make_new_binary(env, raw_value.size, &value), raw_value.data, raw_value.size); + return enif_make_tuple2(env, ATOM_OK, value); + } } return wterl_strerror(env, rc); } -ASYNC_NIF_DECL( - wterl_cursor_next, - { // struct - +static ERL_NIF_TERM wterl_cursor_np_worker(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[], + CursorRetFun cursor_ret, int prev) +{ WterlCursorHandle *cursor_handle; - }, - { // pre - - if (!(argc == 1 && - enif_get_resource(env, argv[0], wterl_cursor_RESOURCE, (void**)&args->cursor_handle))) { - ASYNC_NIF_RETURN_BADARG(); + if (enif_get_resource(env, argv[0], wterl_cursor_RESOURCE, (void**)&cursor_handle)) + { + WT_CURSOR* cursor = cursor_handle->cursor; + return cursor_ret(env, cursor, prev == 0 ? cursor->next(cursor) : cursor->prev(cursor)); } - enif_keep_resource((void*)args->cursor_handle); - }, - { // work + return enif_make_badarg(env); +} - WT_CURSOR* cursor = args->cursor_handle->cursor; - ASYNC_NIF_REPLY(wterl_cursor_kv_ret(env, cursor, cursor->next(cursor))); - }, - { // post +static ERL_NIF_TERM wterl_cursor_next(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + return wterl_cursor_np_worker(env, argc, argv, wterl_cursor_kv_ret, 0); +} - enif_release_resource((void*)args->cursor_handle); - }); +static ERL_NIF_TERM wterl_cursor_next_key(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + return wterl_cursor_np_worker(env, argc, argv, wterl_cursor_key_ret, 0); +} -ASYNC_NIF_DECL( - wterl_cursor_next_key, - { // struct +static ERL_NIF_TERM wterl_cursor_next_value(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + return wterl_cursor_np_worker(env, argc, argv, wterl_cursor_value_ret, 0); +} +static ERL_NIF_TERM wterl_cursor_prev(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + return wterl_cursor_np_worker(env, argc, argv, wterl_cursor_kv_ret, 1); +} + +static ERL_NIF_TERM wterl_cursor_prev_key(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + return wterl_cursor_np_worker(env, argc, argv, wterl_cursor_key_ret, 1); +} + +static ERL_NIF_TERM wterl_cursor_prev_value(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + return wterl_cursor_np_worker(env, argc, argv, wterl_cursor_value_ret, 1); +} + +static ERL_NIF_TERM wterl_cursor_search_worker(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[], int near) +{ WterlCursorHandle *cursor_handle; - }, - { // pre - - if (!(enif_get_resource(env, argv[0], wterl_cursor_RESOURCE, (void**)&args->cursor_handle))) { - ASYNC_NIF_RETURN_BADARG(); - } - enif_keep_resource((void*)args->cursor_handle); - }, - { // work - - WT_CURSOR* cursor = args->cursor_handle->cursor; - ASYNC_NIF_REPLY(wterl_cursor_key_ret(env, cursor, cursor->next(cursor))); - }, - { // post - - enif_release_resource((void*)args->cursor_handle); - }); - -ASYNC_NIF_DECL( - wterl_cursor_next_value, - { // struct - - WterlCursorHandle *cursor_handle; - }, - { // pre - - if (!(argc == 1 && - enif_get_resource(env, argv[0], wterl_cursor_RESOURCE, (void**)&args->cursor_handle))) { - ASYNC_NIF_RETURN_BADARG(); - } - enif_keep_resource((void*)args->cursor_handle); - }, - { // work - - WT_CURSOR* cursor = args->cursor_handle->cursor; - ASYNC_NIF_REPLY(wterl_cursor_value_ret(env, cursor, cursor->next(cursor))); - }, - { // post - - enif_release_resource((void*)args->cursor_handle); - }); - -ASYNC_NIF_DECL( - wterl_cursor_prev, - { // struct - - WterlCursorHandle *cursor_handle; - }, - { // pre - - if (!(argc == 1 && - enif_get_resource(env, argv[0], wterl_cursor_RESOURCE, (void**)&args->cursor_handle))) { - ASYNC_NIF_RETURN_BADARG(); - } - enif_keep_resource((void*)args->cursor_handle); - }, - { // work - - WT_CURSOR* cursor = args->cursor_handle->cursor; - ASYNC_NIF_REPLY(wterl_cursor_kv_ret(env, cursor, cursor->prev(cursor))); - }, - { // post - - enif_release_resource((void*)args->cursor_handle); - }); - -ASYNC_NIF_DECL( - wterl_cursor_prev_key, - { // struct - - WterlCursorHandle *cursor_handle; - }, - { // pre - - if (!(argc == 1 && - enif_get_resource(env, argv[0], wterl_cursor_RESOURCE, (void**)&args->cursor_handle))) { - ASYNC_NIF_RETURN_BADARG(); - } - enif_keep_resource((void*)args->cursor_handle); - }, - { // work - - WT_CURSOR* cursor = args->cursor_handle->cursor; - ASYNC_NIF_REPLY(wterl_cursor_key_ret(env, cursor, cursor->prev(cursor))); - }, - { // post - - enif_release_resource((void*)args->cursor_handle); - }); - -ASYNC_NIF_DECL( - wterl_cursor_prev_value, - { // struct - - WterlCursorHandle *cursor_handle; - }, - { // pre - - if (!(argc == 1 && - enif_get_resource(env, argv[0], wterl_cursor_RESOURCE, (void**)&args->cursor_handle))) { - ASYNC_NIF_RETURN_BADARG(); - } - enif_keep_resource((void*)args->cursor_handle); - }, - { // work - - WT_CURSOR* cursor = args->cursor_handle->cursor; - ASYNC_NIF_REPLY(wterl_cursor_value_ret(env, cursor, cursor->prev(cursor))); - }, - { // post - - enif_release_resource((void*)args->cursor_handle); - }); - -ASYNC_NIF_DECL( - wterl_cursor_search, - { // struct - - WterlCursorHandle *cursor_handle; - ERL_NIF_TERM key; - }, - { // pre - - if (!(argc == 2 && - enif_get_resource(env, argv[0], wterl_cursor_RESOURCE, (void**)&args->cursor_handle) && - enif_is_binary(env, argv[1]))) { - ASYNC_NIF_RETURN_BADARG(); - } - args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[1]); - enif_keep_resource((void*)args->cursor_handle); - }, - { // work - - WT_CURSOR* cursor = args->cursor_handle->cursor; ErlNifBinary key; - if (!enif_inspect_binary(env, args->key, &key)) { - ASYNC_NIF_REPLY(enif_make_badarg(env)); - return; + if (enif_get_resource(env, argv[0], wterl_cursor_RESOURCE, (void**)&cursor_handle) && + enif_inspect_binary(env, argv[1], &key)) + { + WT_CURSOR* cursor = cursor_handle->cursor; + WT_ITEM raw_key; + int exact; + raw_key.data = key.data; + raw_key.size = key.size; + cursor->set_key(cursor, &raw_key); + + // We currently ignore the less-than, greater-than or equals-to return information + // from the cursor.search_near method. + return wterl_cursor_value_ret(env, cursor, + near == 1 ? + cursor->search_near(cursor, &exact) : cursor->search(cursor)); } - WT_ITEM raw_key; + return enif_make_badarg(env); +} - raw_key.data = key.data; - raw_key.size = key.size; - cursor->set_key(cursor, &raw_key); +static ERL_NIF_TERM wterl_cursor_search(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + return wterl_cursor_search_worker(env, argc, argv, 0); +} - // We currently ignore the less-than, greater-than or equals-to return information - // from the cursor.search_near method. - ASYNC_NIF_REPLY(wterl_cursor_value_ret(env, cursor, cursor->search(cursor))); - }, - { // post - - enif_release_resource((void*)args->cursor_handle); - }); - -ASYNC_NIF_DECL( - wterl_cursor_search_near, - { // struct +static ERL_NIF_TERM wterl_cursor_search_near(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + return wterl_cursor_search_worker(env, argc, argv, 1); +} +static ERL_NIF_TERM wterl_cursor_reset(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ WterlCursorHandle *cursor_handle; - ERL_NIF_TERM key; - }, - { // pre - - if (!(argc == 2 && - enif_get_resource(env, argv[0], wterl_cursor_RESOURCE, (void**)&args->cursor_handle) && - enif_is_binary(env, argv[1]))) { - ASYNC_NIF_RETURN_BADARG(); + if (enif_get_resource(env, argv[0], wterl_cursor_RESOURCE, (void**)&cursor_handle)) + { + WT_CURSOR* cursor = cursor_handle->cursor; + int rc = cursor->reset(cursor); + return rc == 0 ? ATOM_OK : wterl_strerror(env, rc); } - args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[1]); - enif_keep_resource((void*)args->cursor_handle); - }, - { // work + return enif_make_badarg(env); +} - WT_CURSOR* cursor = args->cursor_handle->cursor; - ErlNifBinary key; - if (!enif_inspect_binary(env, args->key, &key)) { - ASYNC_NIF_REPLY(enif_make_badarg(env)); - return; - } - WT_ITEM raw_key; - int exact; - - raw_key.data = key.data; - raw_key.size = key.size; - cursor->set_key(cursor, &raw_key); - - // We currently ignore the less-than, greater-than or equals-to return information - // from the cursor.search_near method. - ASYNC_NIF_REPLY(wterl_cursor_value_ret(env, cursor, cursor->search_near(cursor, &exact))); - }, - { // post - - enif_release_resource((void*)args->cursor_handle); - }); - -ASYNC_NIF_DECL( - wterl_cursor_reset, - { // struct +#define WTERL_OP_CURSOR_INSERT 1 +#define WTERL_OP_CURSOR_UPDATE 2 +#define WTERL_OP_CURSOR_REMOVE 3 +static inline ERL_NIF_TERM wterl_cursor_data_op(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[], int op) +{ WterlCursorHandle *cursor_handle; - }, - { // pre + if (enif_get_resource(env, argv[0], wterl_cursor_RESOURCE, (void**)&cursor_handle)) + { + ErlNifBinary key, value; + int rc; - if (!(argc == 1 && - enif_get_resource(env, argv[0], wterl_cursor_RESOURCE, (void**)&args->cursor_handle))) { - ASYNC_NIF_RETURN_BADARG(); + if (enif_inspect_binary(env, argv[1], &key) && + (op == WTERL_OP_CURSOR_REMOVE ? 1 : enif_inspect_binary(env, argv[2], &value))) + { + WT_CURSOR* cursor = cursor_handle->cursor; + WT_ITEM raw_key, raw_value; + raw_key.data = key.data; + raw_key.size = key.size; + cursor->set_key(cursor, &raw_key); + if (op != WTERL_OP_CURSOR_REMOVE) + { + raw_value.data = value.data; + raw_value.size = value.size; + cursor->set_value(cursor, &raw_value); + } + switch (op) + { + case WTERL_OP_CURSOR_INSERT: + rc = cursor->insert(cursor); + break; + case WTERL_OP_CURSOR_UPDATE: + rc = cursor->update(cursor); + break; + case WTERL_OP_CURSOR_REMOVE: + default: + rc = cursor->remove(cursor); + break; + } + return rc == 0 ? ATOM_OK : wterl_strerror(env, rc); + } } - enif_keep_resource((void*)args->cursor_handle); - }, - { // work + return enif_make_badarg(env); +} - WT_CURSOR* cursor = args->cursor_handle->cursor; - int rc = cursor->reset(cursor); - ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : wterl_strerror(env, rc)); - }, - { // post +static ERL_NIF_TERM wterl_cursor_insert(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + return wterl_cursor_data_op(env, argc, argv, WTERL_OP_CURSOR_INSERT); +} - enif_release_resource((void*)args->cursor_handle); - }); +static ERL_NIF_TERM wterl_cursor_update(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + return wterl_cursor_data_op(env, argc, argv, WTERL_OP_CURSOR_UPDATE); +} -ASYNC_NIF_DECL( - wterl_cursor_insert, - { // struct - - WterlCursorHandle *cursor_handle; - ERL_NIF_TERM key; - ERL_NIF_TERM value; - int rc; - }, - { // pre - - if (!(argc == 3 && - enif_get_resource(env, argv[0], wterl_cursor_RESOURCE, (void**)&args->cursor_handle) && - enif_is_binary(env, argv[1]) && - enif_is_binary(env, argv[2]))) { - ASYNC_NIF_RETURN_BADARG(); - } - args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[1]); - args->value = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); - enif_keep_resource((void*)args->cursor_handle); - }, - { // work - - WT_CURSOR* cursor = args->cursor_handle->cursor; - ErlNifBinary key; - ErlNifBinary value; - if (!enif_inspect_binary(env, args->key, &key)) { - ASYNC_NIF_REPLY(enif_make_badarg(env)); - return; - } - if (!enif_inspect_binary(env, args->value, &value)) { - ASYNC_NIF_REPLY(enif_make_badarg(env)); - return; - } - WT_ITEM raw_key; - WT_ITEM raw_value; - - raw_key.data = key.data; - raw_key.size = key.size; - cursor->set_key(cursor, &raw_key); - raw_value.data = value.data; - raw_value.size = value.size; - cursor->set_value(cursor, &raw_value); - int rc = cursor->insert(cursor); - ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : wterl_strerror(env, rc)); - }, - { // post - - enif_release_resource((void*)args->cursor_handle); - }); - -ASYNC_NIF_DECL( - wterl_cursor_update, - { // struct - - WterlCursorHandle *cursor_handle; - ERL_NIF_TERM key; - ERL_NIF_TERM value; - int rc; - }, - { // pre - - if (!(argc == 3 && - enif_get_resource(env, argv[0], wterl_cursor_RESOURCE, (void**)&args->cursor_handle) && - enif_is_binary(env, argv[1]) && - enif_is_binary(env, argv[2]))) { - ASYNC_NIF_RETURN_BADARG(); - } - args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[1]); - args->value = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); - enif_keep_resource((void*)args->cursor_handle); - }, - { // work - - WT_CURSOR* cursor = args->cursor_handle->cursor; - ErlNifBinary key; - ErlNifBinary value; - if (!enif_inspect_binary(env, args->key, &key)) { - ASYNC_NIF_REPLY(enif_make_badarg(env)); - return; - } - if (!enif_inspect_binary(env, args->value, &value)) { - ASYNC_NIF_REPLY(enif_make_badarg(env)); - return; - } - WT_ITEM raw_key; - WT_ITEM raw_value; - - raw_key.data = key.data; - raw_key.size = key.size; - cursor->set_key(cursor, &raw_key); - raw_value.data = value.data; - raw_value.size = value.size; - cursor->set_value(cursor, &raw_value); - int rc = cursor->update(cursor); - ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : wterl_strerror(env, rc)); - }, - { // post - - enif_release_resource((void*)args->cursor_handle); - }); - -ASYNC_NIF_DECL( - wterl_cursor_remove, - { // struct - - WterlCursorHandle *cursor_handle; - ERL_NIF_TERM key; - int rc; - }, - { // pre - - if (!(argc == 2 && - enif_get_resource(env, argv[0], wterl_cursor_RESOURCE, (void**)&args->cursor_handle) && - enif_is_binary(env, argv[1]))) { - ASYNC_NIF_RETURN_BADARG(); - } - args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[1]); - enif_keep_resource((void*)args->cursor_handle); - }, - { // work - - WT_CURSOR* cursor = args->cursor_handle->cursor; - ErlNifBinary key; - if (!enif_inspect_binary(env, args->key, &key)) { - ASYNC_NIF_REPLY(enif_make_badarg(env)); - return; - } - WT_ITEM raw_key; - - raw_key.data = key.data; - raw_key.size = key.size; - cursor->set_key(cursor, &raw_key); - int rc = cursor->remove(cursor); - ASYNC_NIF_REPLY(rc == 0 ? ATOM_OK : wterl_strerror(env, rc)); - }, - { // post - - enif_release_resource((void*)args->cursor_handle); - }); +static ERL_NIF_TERM wterl_cursor_remove(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + return wterl_cursor_data_op(env, argc, argv, WTERL_OP_CURSOR_REMOVE); +} static int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) { @@ -1165,54 +787,7 @@ static int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) wterl_cursor_RESOURCE = enif_open_resource_type(env, NULL, "wterl_cursor_resource", NULL, flags, NULL); ATOM_ERROR = enif_make_atom(env, "error"); ATOM_OK = enif_make_atom(env, "ok"); - - ASYNC_NIF_LOAD(); - return 0; } -static void on_unload(ErlNifEnv* env, void* priv_data) -{ - ASYNC_NIF_UNLOAD(); -} - -static int on_upgrade(ErlNifEnv* env, void** priv_data, void** old_priv_data, ERL_NIF_TERM load_info) -{ - ASYNC_NIF_UPGRADE(); - return 0; -} - -static ErlNifFunc nif_funcs[] = -{ - {"conn_open_nif", 3, wterl_conn_open}, - {"conn_close_nif", 2, wterl_conn_close}, - {"session_open_nif", 3, wterl_session_open}, - {"session_close_nif", 2, wterl_session_close}, - {"session_create_nif", 4, wterl_session_create}, - {"session_drop_nif", 4, wterl_session_drop}, - {"session_rename_nif", 5, wterl_session_rename}, - {"session_salvage_nif", 4, wterl_session_salvage}, - {"session_checkpoint_nif", 3, wterl_session_checkpoint}, - {"session_truncate_nif", 4, wterl_session_truncate}, - {"session_upgrade_nif", 4, wterl_session_upgrade}, - {"session_verify_nif", 4, wterl_session_verify}, - {"session_delete_nif", 4, wterl_session_delete}, - {"session_get_nif", 4, wterl_session_get}, - {"session_put_nif", 5, wterl_session_put}, - {"cursor_open_nif", 3, wterl_cursor_open}, - {"cursor_close_nif", 2, wterl_cursor_close}, - {"cursor_next_nif", 2, wterl_cursor_next}, - {"cursor_next_key_nif", 2, wterl_cursor_next_key}, - {"cursor_next_value_nif", 2, wterl_cursor_next_value}, - {"cursor_prev_nif", 2, wterl_cursor_prev}, - {"cursor_prev_key_nif", 2, wterl_cursor_prev_key}, - {"cursor_prev_value_nif", 2, wterl_cursor_prev_value}, - {"cursor_search_nif", 3, wterl_cursor_search}, - {"cursor_search_near_nif", 3, wterl_cursor_search_near}, - {"cursor_reset_nif", 2, wterl_cursor_reset}, - {"cursor_insert_nif", 4, wterl_cursor_insert}, - {"cursor_update_nif", 4, wterl_cursor_update}, - {"cursor_remove_nif", 3, wterl_cursor_remove}, -}; - -ERL_NIF_INIT(wterl, nif_funcs, &on_load, NULL, &on_upgrade, &on_unload); +ERL_NIF_INIT(wterl, nif_funcs, &on_load, NULL, NULL, NULL); diff --git a/enable-wterl b/enable-wterl index 873e999..2b99caa 100755 --- a/enable-wterl +++ b/enable-wterl @@ -35,7 +35,7 @@ fi rebar get-deps file=./deps/riak_kv/src/riak_kv.app.src -if ! grep -q hanoidb $file && ! grep -q wterl $file ; then +if ! grep -q wterl $file ; then echo echo "Modifying $file, saving the original as ${file}.orig ..." perl -i.orig -pe '/\bos_mon,/ && print qq( wterl,\n)' $file diff --git a/rebar.config b/rebar.config index dc2f7eb..52d6171 100644 --- a/rebar.config +++ b/rebar.config @@ -37,7 +37,7 @@ {port_env, [ {"DRV_CFLAGS", "$DRV_CFLAGS -Werror -I c_src/system/include"}, - {"DRV_LDFLAGS", "$DRV_LDFLAGS c_src/system/lib/libwiredtiger.a"} + {"DRV_LDFLAGS", "$DRV_LDFLAGS -Wl,-rpath,lib/wterl/priv:priv -Lpriv -lwiredtiger"} ]}. {pre_hooks, [{compile, "c_src/build_deps.sh"}]}. diff --git a/src/riak_kv_wterl_backend.erl b/src/riak_kv_wterl_backend.erl index 8e92514..552e514 100644 --- a/src/riak_kv_wterl_backend.erl +++ b/src/riak_kv_wterl_backend.erl @@ -52,12 +52,12 @@ -define(CAPABILITIES, [async_fold]). -record(pass, {session :: wterl:session(), - cursor :: wterl:cursor()}). + cursor :: wterl:cursor()}). -type pass() :: #pass{}. -record(state, {table :: string(), connection :: wterl:connection(), - passes :: [pass()]}). + passes :: [pass()]}). -type state() :: #state{}. -type config() :: [{atom(), term()}]. @@ -85,7 +85,6 @@ capabilities(_, _) -> %% @doc Start the wterl backend -spec start(integer(), config()) -> {ok, state()} | {error, term()}. start(Partition, Config) -> - lager:start(), AppStart = case application:start(wterl) of ok -> @@ -100,8 +99,8 @@ start(Partition, Config) -> ok -> Table = "lsm:wt" ++ integer_to_list(Partition), {ok, Connection} = establish_connection(Config), - Passes = establish_passes(erlang:system_info(schedulers), Connection, Table), - {ok, #state{table=Table, connection=Connection, passes=Passes}}; + Passes = establish_passes(erlang:system_info(schedulers), Connection, Table), + {ok, #state{table=Table, connection=Connection, passes=Passes}}; {error, Reason2} -> {error, Reason2} end. @@ -109,10 +108,11 @@ start(Partition, Config) -> %% @doc Stop the wterl backend -spec stop(state()) -> ok. stop(#state{passes=Passes}) -> - lists:foreach(fun({Session, Cursor}) -> - ok = wterl:cursor_close(Cursor), - ok = wterl:session_close(Session) - end, Passes), + lists:foreach(fun(Elem) -> + {Session, Cursor} = Elem, + ok = wterl:cursor_close(Cursor), + ok = wterl:session_close(Session) + end, Passes), ok. %% @doc Retrieve an object from the wterl backend @@ -191,7 +191,7 @@ fold_buckets(FoldBucketsFun, Acc, Opts, #state{table=Table}) -> AccFinal after ok = wterl:cursor_close(Cursor), - ok = wterl:session_close(Session) + ok = wterl:session_close(Session) end end end, @@ -238,7 +238,7 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{table=Table}) -> AccFinal after ok = wterl:cursor_close(Cursor), - ok = wterl:session_close(Session) + ok = wterl:session_close(Session) end end end, @@ -272,7 +272,7 @@ fold_objects(FoldObjectsFun, Acc, Opts, #state{table=Table}) -> AccFinal after ok = wterl:cursor_close(Cursor), - ok = wterl:session_close(Session) + ok = wterl:session_close(Session) end end end, @@ -327,6 +327,14 @@ callback(_Ref, _Msg, State) -> %% Internal functions %% =================================================================== +max_sessions(Config) -> + RingSize = + case app_helper:get_prop_or_env(ring_creation_size, Config, riak_core) of + undefined -> 1024; + Size -> Size + end, + 2 * (RingSize * erlang:system_info(schedulers)). + %% @private establish_connection(Config) -> %% Get the data root directory @@ -336,28 +344,23 @@ establish_connection(Config) -> {error, data_root_unset}; DataRoot -> ok = filelib:ensure_dir(filename:join(DataRoot, "x")), - SessionMax = - case app_helper:get_prop_or_env(ring_creation_size, Config, riak_core) of - undefined -> 1024; - RingSize when RingSize < 512 -> 1024; - RingSize -> RingSize * 2 - end, RequestedCacheSize = app_helper:get_prop_or_env(cache_size, Config, wterl), - Opts = orddict:from_list( + Opts = + orddict:from_list( [ wterl:config_value(create, Config, true), wterl:config_value(sync, Config, false), wterl:config_value(logging, Config, true), wterl:config_value(transactional, Config, true), - wterl:config_value(session_max, Config, SessionMax), + wterl:config_value(session_max, Config, max_sessions(Config)), wterl:config_value(cache_size, Config, size_cache(RequestedCacheSize)), wterl:config_value(statistics_log, Config, [{wait, 30}]), % sec + %% NOTE: LSM auto-checkpoints, so we don't have too. + %% wterl:config_value(checkpoint, Config, [{wait, 10}]), % sec wterl:config_value(verbose, Config, [ %"ckpt" "block", "shared_cache", "evictserver", "fileops", %"hazard", "mutex", "read", "readserver", "reconcile", %"salvage", "verify", "write", "evict", "lsm" - ]) - ] ++ proplists:get_value(wterl, Config, [])), % sec - lager:info("WiredTiger connection:open(~s, ~s)", [DataRoot, wterl:config_to_bin(Opts)]), + ]) ] ++ proplists:get_value(wterl, Config, [])), % sec case wterl_conn:open(DataRoot, Opts) of {ok, Connection} -> {ok, Connection}; @@ -369,41 +372,36 @@ establish_connection(Config) -> establish_passes(Count, Connection, Table) when is_number(Count), Count > 0 -> - establish_passes(Count, Connection, Table, []). - -establish_passes(Count, Connection, Table, Acc) - when Count > 0 -> - case Count > 1 of - true -> - {ok, Session} = establish_session(Connection, Table), - {ok, Cursor} = wterl:cursor_open(Session, Table), - [{Session, Cursor} | establish_passes(Count - 1, Connection, Table, Acc)]; - false -> - Acc - end. + lists:map(fun(_Elem) -> + {ok, Session} = establish_session(Connection, Table), + {ok, Cursor} = wterl:cursor_open(Session, Table), + {Session, Cursor} + end, lists:seq(1, Count)). %% @private establish_session(Connection, Table) -> case wterl:session_open(Connection, wterl:config_to_bin([{isolation, "snapshot"}])) of {ok, Session} -> SessionOpts = - [%TODO {block_compressor, "snappy"}, + [{block_compressor, "snappy"}, {internal_page_max, "128K"}, {leaf_page_max, "128K"}, - {lsm_chunk_size, "200MB"}, + {lsm_chunk_size, "25MB"}, {lsm_bloom_newest, true}, {lsm_bloom_oldest, true} , - {lsm_bloom_config, [{leaf_page_max, "10MB"}]} ], + {lsm_bloom_bit_count, 128}, + {lsm_bloom_hash_count, 64}, + {lsm_bloom_config, [{leaf_page_max, "8MB"}]} ], case wterl:session_create(Session, Table, wterl:config_to_bin(SessionOpts)) of ok -> - {ok, Session}; + {ok, Session}; {error, Reason} -> lager:error("Failed to start wterl backend: ~p\n", [Reason]), - {error, Reason} + {error, Reason} end; {error, Reason} -> lager:error("Failed to open a WiredTiger session: ~p\n", [Reason]), - {error, Reason} + {error, Reason} end. %% @private @@ -551,7 +549,6 @@ size_cache(RequestedSize) -> "1GB" end, application:set_env(wterl, cache_size, FinalGuess), - lager:info("Using cache size of ~p for WiredTiger storage backend.", [FinalGuess]), FinalGuess; Value when is_list(Value) -> Value; diff --git a/src/wterl.erl b/src/wterl.erl index 2eb18dd..85a2db0 100644 --- a/src/wterl.erl +++ b/src/wterl.erl @@ -20,8 +20,8 @@ %% %% ------------------------------------------------------------------- -module(wterl). --export([conn_open/2, - conn_close/1, +-export([connection_open/2, + connection_close/1, cursor_close/1, cursor_insert/3, cursor_next/1, @@ -60,6 +60,7 @@ session_verify/3, config_value/3, config_to_bin/1, + priv_dir/0, fold_keys/3, fold/3]). @@ -94,15 +95,22 @@ nif_stub_error(Line) -> -spec init() -> ok | {error, any()}. init() -> - PrivDir = case code:priv_dir(?MODULE) of - {error, bad_name} -> - EbinDir = filename:dirname(code:which(?MODULE)), - AppPath = filename:dirname(EbinDir), - filename:join(AppPath, "priv"); - Path -> - Path - end, - erlang:load_nif(filename:join(PrivDir, atom_to_list(?MODULE)), 0). + erlang:load_nif(filename:join(priv_dir(), atom_to_list(?MODULE)), 0). + +-spec connection_open(string(), config()) -> {ok, connection()} | {error, term()}. +connection_open(HomeDir, Config) -> + PrivDir = wterl:priv_dir(), + {ok, PrivFiles} = file:list_dir(PrivDir), + SoFiles = + lists:filter(fun(Elem) -> + case re:run(Elem, "^libwiredtiger_.*\.so$") of + {match, _} -> true; + nomatch -> false + end + end, PrivFiles), + SoPaths = lists:map(fun(Elem) -> filename:join(PrivDir, Elem) end, SoFiles), + Bin = config_to_bin([{extensions, SoPaths}], [<<",">>, Config]), + conn_open(HomeDir, Bin). -spec conn_open(string(), config()) -> {ok, connection()} | {error, term()}. conn_open(HomeDir, Config) -> @@ -383,11 +391,21 @@ fold(_Cursor, _Fun, Acc, not_found) -> fold(Cursor, Fun, Acc, {ok, Key, Value}) -> fold(Cursor, Fun, Fun({Key, Value}, Acc), cursor_next(Cursor)). +priv_dir() -> + case code:priv_dir(?MODULE) of + {error, bad_name} -> + EbinDir = filename:dirname(code:which(?MODULE)), + AppPath = filename:dirname(EbinDir), + filename:join(AppPath, "priv"); + Path -> + Path + end. + %% %% Configuration type information. %% config_types() -> - [{block_compressor, string}, + [{block_compressor, {string, quoted}}, {cache_size, string}, {checkpoint, config}, {create, bool}, @@ -396,7 +414,7 @@ config_types() -> {error_prefix, string}, {eviction_target, integer}, {eviction_trigger, integer}, - {extensions, string}, + {extensions, {list, quoted}}, {force, bool}, {hazard_max, integer}, {home_environment, bool}, @@ -437,8 +455,13 @@ config_encode(config, Value) -> list_to_binary(["(", config_to_bin(Value, []), ")"]); config_encode(list, Value) -> list_to_binary(["(", string:join(Value, ","), ")"]); +config_encode({list, quoted}, Value) -> + Values = lists:map(fun(S) -> "\"" ++ S ++ "\"" end, Value), + list_to_binary(["(", string:join(Values, ","), ")"]); config_encode(string, Value) when is_list(Value) -> list_to_binary(Value); +config_encode({string, quoted}, Value) when is_list(Value) -> + list_to_binary("\"" ++ Value ++ "\""); config_encode(string, Value) when is_number(Value) -> list_to_binary(integer_to_list(Value)); config_encode(bool, true) -> @@ -493,18 +516,18 @@ open_test_conn(DataDir) -> ?cmd("rm -rf "++DataDir), ?assertMatch(ok, filelib:ensure_dir(filename:join(DataDir, "x"))), OpenConfig = config_to_bin([{create,true},{cache_size,"100MB"}]), - {ok, ConnRef} = conn_open(DataDir, OpenConfig), + {ok, ConnRef} = connection_open(DataDir, OpenConfig), ConnRef. open_test_session(ConnRef) -> {ok, SRef} = session_open(ConnRef), ?assertMatch(ok, session_drop(SRef, "table:test", config_to_bin([{force,true}]))), - ?assertMatch(ok, session_create(SRef, "table:test")), + ?assertMatch(ok, session_create(SRef, "table:test", config_to_bin([{block_compressor, "snappy"}]))), SRef. conn_test() -> ConnRef = open_test_conn(?TEST_DATA_DIR), - ?assertMatch(ok, conn_close(ConnRef)). + ?assertMatch(ok, connection_close(ConnRef)). session_test_() -> {setup, @@ -512,7 +535,7 @@ session_test_() -> open_test_conn(?TEST_DATA_DIR) end, fun(ConnRef) -> - ok = conn_close(ConnRef) + ok = connection_close(ConnRef) end, fun(ConnRef) -> {inorder, @@ -537,7 +560,7 @@ insert_delete_test() -> ?assertMatch(ok, session_delete(SRef, "table:test", <<"a">>)), ?assertMatch(not_found, session_get(SRef, "table:test", <<"a">>)), ok = session_close(SRef), - ok = conn_close(ConnRef). + ok = connection_close(ConnRef). init_test_table() -> ConnRef = open_test_conn(?TEST_DATA_DIR), @@ -551,7 +574,7 @@ init_test_table() -> stop_test_table({ConnRef, SRef}) -> ?assertMatch(ok, session_close(SRef)), - ?assertMatch(ok, conn_close(ConnRef)). + ?assertMatch(ok, connection_close(ConnRef)). various_session_test_() -> {setup, @@ -760,10 +783,10 @@ prop_put_delete() -> Table = "table:eqc", {ok, CWD} = file:get_cwd(), ?assertMatch(true, lists:suffix("wterl/.eunit", CWD)), - ?cmd("rm -rf "++DataDir), + ?cmd("rm -rf "++DataDir), ok = filelib:ensure_dir(filename:join(DataDir, "x")), Cfg = wterl:config_to_bin([{create,true}]), - {ok, Conn} = wterl:conn_open(DataDir, Cfg), + {ok, Conn} = wterl:connection_open(DataDir, Cfg), {ok, SRef} = wterl:session_open(Conn), try wterl:session_create(SRef, Table), @@ -779,7 +802,7 @@ prop_put_delete() -> true after wterl:session_close(SRef), - wterl:conn_close(Conn) + wterl:connection_close(Conn) end end)). diff --git a/src/wterl_conn.erl b/src/wterl_conn.erl index afe91ee..4a680d0 100644 --- a/src/wterl_conn.erl +++ b/src/wterl_conn.erl @@ -82,7 +82,7 @@ init([]) -> handle_call({open, Dir, Config, Caller}, _From, #state{conn=undefined}=State) -> {Reply, NState} = - case wterl:conn_open(Dir, wterl:config_to_bin(Config)) of + case wterl:connection_open(Dir, wterl:config_to_bin(Config)) of {ok, ConnRef}=OK -> Monitor = erlang:monitor(process, Caller), true = ets:insert(wterl_ets, {Monitor, Caller}), @@ -164,7 +164,7 @@ code_change(_OldVsn, State, _Extra) -> do_close(undefined) -> ok; do_close(ConnRef) -> - wterl:conn_close(ConnRef). + wterl:connection_close(ConnRef). -ifdef(TEST).