From 85152316477668c996fbf39fe552a80e2851c028 Mon Sep 17 00:00:00 2001 From: Dima Aleksandrov Date: Wed, 1 Apr 2015 00:15:20 +0200 Subject: [PATCH] add transactions --- .gitignore | 1 + c_src/lmdb_nif.c | 167 +++++++++++++++++++++++++++++++++++++++++++---- rebar.config | 4 +- src/lmdb.erl | 29 ++++++-- 4 files changed, 183 insertions(+), 18 deletions(-) diff --git a/.gitignore b/.gitignore index 644a2ba..32b628e 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ c_src/*.o deps/ priv/ *~ +.rebar diff --git a/c_src/lmdb_nif.c b/c_src/lmdb_nif.c index 38feeea..f1a0c50 100644 --- a/c_src/lmdb_nif.c +++ b/c_src/lmdb_nif.c @@ -41,6 +41,8 @@ static ErlNifResourceType *lmdb_RESOURCE; struct lmdb { MDB_env *env; + MDB_txn *txn; + MDB_cursor *cursor; MDB_dbi dbi; }; @@ -74,6 +76,9 @@ static ERL_NIF_TERM ATOM_MAP_RESIZED; static ERL_NIF_TERM ATOM_INCOMPATIBLE; static ERL_NIF_TERM ATOM_BAD_RSLOT; +static ERL_NIF_TERM ATOM_TXN_STARTED; +static ERL_NIF_TERM ATOM_TXN_NOT_STARTED; + #define CHECK(expr, label) \ if (MDB_SUCCESS != (ret = (expr))) { \ DPRINTF("CHECK(\"%s\") failed \"%s\" at %s:%d in %s()\n", \ @@ -216,6 +221,9 @@ ASYNC_NIF_DECL( CHECK(mdb_open(txn, NULL, 0, &(handle->dbi)), err1); CHECK(mdb_txn_commit(txn), err1); + handle->txn = NULL; + handle->cursor = NULL; + ERL_NIF_TERM term = enif_make_resource(env, handle); enif_release_resource(handle); ASYNC_NIF_REPLY(enif_make_tuple(env, 2, ATOM_OK, term)); @@ -320,7 +328,11 @@ ASYNC_NIF_DECL( mkey.mv_data = key.data; mdata.mv_size = val.size; mdata.mv_data = val.data; - CHECK(mdb_txn_begin(args->handle->env, NULL, 0, & txn), err2); + if(args->handle->txn == NULL) { + CHECK(mdb_txn_begin(args->handle->env, NULL, 0, & txn), err2); + } else { + txn = args->handle->txn; + } ret = mdb_put(txn, args->handle->dbi, &mkey, &mdata, MDB_NOOVERWRITE); if (MDB_KEYEXIST == ret) { @@ -330,7 +342,8 @@ ASYNC_NIF_DECL( if (ret != 0) FAIL_ERR(ret, err1); - CHECK(mdb_txn_commit(txn), err1); + if(args->handle->txn == NULL) + CHECK(mdb_txn_commit(txn), err1); ASYNC_NIF_REPLY(ATOM_OK); return; @@ -345,7 +358,6 @@ ASYNC_NIF_DECL( enif_release_resource((void*)args->handle); }); - /** * Update and existin value indexed by key. * @@ -399,9 +411,16 @@ ASYNC_NIF_DECL( mdata.mv_size = val.size; mdata.mv_data = val.data; - CHECK(mdb_txn_begin(args->handle->env, NULL, 0, & txn), err2); + if(args->handle->txn == NULL) { + CHECK(mdb_txn_begin(args->handle->env, NULL, 0, & txn), err2); + } else { + txn = args->handle->txn; + } + CHECK(mdb_put(txn, args->handle->dbi, &mkey, &mdata, 0), err1); - CHECK(mdb_txn_commit(txn), err1); + + if(args->handle->txn == NULL) + CHECK(mdb_txn_commit(txn), err1); ASYNC_NIF_REPLY(ATOM_OK); return; @@ -461,10 +480,15 @@ ASYNC_NIF_DECL( mkey.mv_size = key.size; mkey.mv_data = key.data; - CHECK(mdb_txn_begin(args->handle->env, NULL, 0, &txn), err); + if(args->handle->txn == NULL) { + CHECK(mdb_txn_begin(args->handle->env, NULL, 0, & txn), err); + } else { + txn = args->handle->txn; + } ret = mdb_get(txn, args->handle->dbi, &mkey, &mdata); - mdb_txn_abort(txn); + if(args->handle->txn == NULL) + mdb_txn_abort(txn); if (MDB_NOTFOUND == ret) { ASYNC_NIF_REPLY(ATOM_NOT_FOUND); return; @@ -532,16 +556,22 @@ ASYNC_NIF_DECL( mkey.mv_size = key.size; mkey.mv_data = key.data; - CHECK(mdb_txn_begin(args->handle->env, NULL, 0, & txn), err); + if(args->handle->txn == NULL) { + CHECK(mdb_txn_begin(args->handle->env, NULL, 0, & txn), err); + } else { + txn = args->handle->txn; + } + ret = mdb_del(txn, args->handle->dbi, &mkey, NULL); if(MDB_NOTFOUND == ret) { - mdb_txn_abort(txn); + if(args->handle->txn == NULL) + mdb_txn_abort(txn); ASYNC_NIF_REPLY(ATOM_NOT_FOUND); return; } - - CHECK(mdb_txn_commit(txn), err); + if(args->handle->txn == NULL) + CHECK(mdb_txn_commit(txn), err); ASYNC_NIF_REPLY(ATOM_OK); return; @@ -600,7 +630,109 @@ ASYNC_NIF_DECL( enif_release_resource((void*)args->handle); }); +ASYNC_NIF_DECL( + lmdb_txn_begin, + { // struct + struct lmdb *handle; + }, + { // pre + + if (!(argc == 1 && + enif_get_resource(env, argv[0], lmdb_RESOURCE, (void**)&args->handle))) { + ASYNC_NIF_RETURN_BADARG(); + } + if (!args->handle->env) + ASYNC_NIF_RETURN_BADARG(); + enif_keep_resource((void*)args->handle); + }, + { // work + + ERL_NIF_TERM err; + int ret; + if(args->handle->txn == NULL) { + CHECK(mdb_txn_begin(args->handle->env, NULL, 0, &(args->handle->txn)), err2); + ASYNC_NIF_REPLY(ATOM_OK); + } else + ASYNC_NIF_REPLY(enif_make_tuple(env, 2, ATOM_ERROR, ATOM_TXN_STARTED)); + return; + + err2: + ASYNC_NIF_REPLY(err); + return; + }, + { // post + + enif_release_resource((void*)args->handle); + }); + +ASYNC_NIF_DECL( + lmdb_txn_commit, + { // struct + + struct lmdb *handle; + }, + { // pre + + if (!(argc == 1 && + enif_get_resource(env, argv[0], lmdb_RESOURCE, (void**)&args->handle))) { + ASYNC_NIF_RETURN_BADARG(); + } + if (!args->handle->env) + ASYNC_NIF_RETURN_BADARG(); + enif_keep_resource((void*)args->handle); + }, + { // work + + ERL_NIF_TERM err; + int ret; + if(args->handle->txn != NULL) { + CHECK(mdb_txn_commit(args->handle->txn), err2); + args->handle->txn = NULL; + ASYNC_NIF_REPLY(ATOM_OK); + } else + ASYNC_NIF_REPLY(enif_make_tuple(env, 2, ATOM_ERROR, ATOM_TXN_NOT_STARTED)); + return; + + err2: + ASYNC_NIF_REPLY(err); + return; + }, + { // post + + enif_release_resource((void*)args->handle); + }); + +ASYNC_NIF_DECL( + lmdb_txn_abort, + { // struct + + struct lmdb *handle; + }, + { // pre + + if (!(argc == 1 && + enif_get_resource(env, argv[0], lmdb_RESOURCE, (void**)&args->handle))) { + ASYNC_NIF_RETURN_BADARG(); + } + if (!args->handle->env) + ASYNC_NIF_RETURN_BADARG(); + enif_keep_resource((void*)args->handle); + }, + { // work + + if(args->handle->txn != NULL) { + mdb_txn_abort(args->handle->txn); + args->handle->txn = NULL; + ASYNC_NIF_REPLY(ATOM_OK); + } else + ASYNC_NIF_REPLY(enif_make_tuple(env, 2, ATOM_ERROR, ATOM_TXN_NOT_STARTED)); + return; + }, + { // post + + enif_release_resource((void*)args->handle); + }); static int lmdb_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) { @@ -641,6 +773,9 @@ static int lmdb_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) ATOM_INCOMPATIBLE = enif_make_atom(env, "incompatible"); ATOM_BAD_RSLOT = enif_make_atom(env, "bad_rslot"); + ATOM_TXN_STARTED = enif_make_atom(env, "txn_started"); + ATOM_TXN_NOT_STARTED = enif_make_atom(env, "txn_not_started"); + lmdb_RESOURCE = enif_open_resource_type(env, NULL, "lmdb_resource", NULL, flags, NULL); return (0); @@ -681,7 +816,15 @@ static ErlNifFunc nif_funcs [] = { {"get", 3, lmdb_get}, {"del", 3, lmdb_del}, {"update", 4, lmdb_update}, - {"drop", 2, lmdb_drop} + {"drop", 2, lmdb_drop}, + + {"txn_begin", 2, lmdb_txn_begin}, + {"txn_commit", 2, lmdb_txn_commit}, + {"txn_abort", 2, lmdb_txn_abort}/*, + + {"cursor_open", 2, lmdb_cursor_open}, + {"cursor_close", 2, lmdb_cursor_close} */ + }; /* driver entry point */ diff --git a/rebar.config b/rebar.config index 5b96b0a..ce42b5c 100644 --- a/rebar.config +++ b/rebar.config @@ -1,7 +1,7 @@ %% -*- erlang -*- %% ex: ft=erlang ts=4 sw=4 et -{require_otp_vsn, "R1[567]"}. +{require_otp_vsn, "R1[56]|1[78]"}. {cover_enabled, true}. @@ -33,7 +33,7 @@ ]}. {port_env, [ - {"DRV_CFLAGS", "$DRV_CFLAGS -O3 -fPIC -march=native -mtune=native -Wall -Wextra -Werror"} + {"DRV_CFLAGS", "$DRV_CFLAGS -O3 -fPIC -march=native -mtune=native -Wall -Wextra"} ]}. % for debugging use diff --git a/src/lmdb.erl b/src/lmdb.erl index e769c28..5b7b1df 100644 --- a/src/lmdb.erl +++ b/src/lmdb.erl @@ -32,7 +32,7 @@ %% EXPORTS %%==================================================================== -export([ - %open/1, + open/1, open/2, open/3, @@ -40,8 +40,11 @@ put/3, get/2, + txn_begin/1, + txn_commit/1, + txn_abort/1, del/2, - update/3, upd/3, + update/3, upd/3, drop/1 ]). @@ -80,8 +83,8 @@ %% @doc Create a new MDB database %% @end %%-------------------------------------------------------------------- -%open(DirName) -> -% open(DirName, ?MDB_MAP_SIZE). +open(DirName) -> + open(DirName, ?MDB_MAP_SIZE). open(DirName, MapSize) when is_integer(MapSize) andalso MapSize > 0 -> @@ -116,6 +119,24 @@ get(Handle, Key) get(_AsyncRef, _Handle, _Key) -> ?NOT_LOADED. +txn_begin(Handle) -> + ?ASYNC_NIF_CALL(fun txn_begin/2, [Handle]). + +txn_begin(_AsyncRef, _Handle) -> + ?NOT_LOADED. + +txn_commit(Handle) -> + ?ASYNC_NIF_CALL(fun txn_commit/2, [Handle]). + +txn_commit(_AsyncRef, _Handle) -> + ?NOT_LOADED. + +txn_abort(Handle) -> + ?ASYNC_NIF_CALL(fun txn_abort/2, [Handle]). + +txn_abort(_AsyncRef, _Handle) -> + ?NOT_LOADED. + del(Handle, Key) when is_binary(Key) -> ?ASYNC_NIF_CALL(fun del/3, [Handle, Key]).