add transactions

This commit is contained in:
Dima Aleksandrov 2015-04-01 00:15:20 +02:00
parent 0c98a25ade
commit 8515231647
4 changed files with 183 additions and 18 deletions

1
.gitignore vendored
View file

@ -5,3 +5,4 @@ c_src/*.o
deps/ deps/
priv/ priv/
*~ *~
.rebar

View file

@ -41,6 +41,8 @@
static ErlNifResourceType *lmdb_RESOURCE; static ErlNifResourceType *lmdb_RESOURCE;
struct lmdb { struct lmdb {
MDB_env *env; MDB_env *env;
MDB_txn *txn;
MDB_cursor *cursor;
MDB_dbi dbi; MDB_dbi dbi;
}; };
@ -74,6 +76,9 @@ static ERL_NIF_TERM ATOM_MAP_RESIZED;
static ERL_NIF_TERM ATOM_INCOMPATIBLE; static ERL_NIF_TERM ATOM_INCOMPATIBLE;
static ERL_NIF_TERM ATOM_BAD_RSLOT; static ERL_NIF_TERM ATOM_BAD_RSLOT;
static ERL_NIF_TERM ATOM_TXN_STARTED;
static ERL_NIF_TERM ATOM_TXN_NOT_STARTED;
#define CHECK(expr, label) \ #define CHECK(expr, label) \
if (MDB_SUCCESS != (ret = (expr))) { \ if (MDB_SUCCESS != (ret = (expr))) { \
DPRINTF("CHECK(\"%s\") failed \"%s\" at %s:%d in %s()\n", \ DPRINTF("CHECK(\"%s\") failed \"%s\" at %s:%d in %s()\n", \
@ -216,6 +221,9 @@ ASYNC_NIF_DECL(
CHECK(mdb_open(txn, NULL, 0, &(handle->dbi)), err1); CHECK(mdb_open(txn, NULL, 0, &(handle->dbi)), err1);
CHECK(mdb_txn_commit(txn), err1); CHECK(mdb_txn_commit(txn), err1);
handle->txn = NULL;
handle->cursor = NULL;
ERL_NIF_TERM term = enif_make_resource(env, handle); ERL_NIF_TERM term = enif_make_resource(env, handle);
enif_release_resource(handle); enif_release_resource(handle);
ASYNC_NIF_REPLY(enif_make_tuple(env, 2, ATOM_OK, term)); ASYNC_NIF_REPLY(enif_make_tuple(env, 2, ATOM_OK, term));
@ -320,7 +328,11 @@ ASYNC_NIF_DECL(
mkey.mv_data = key.data; mkey.mv_data = key.data;
mdata.mv_size = val.size; mdata.mv_size = val.size;
mdata.mv_data = val.data; mdata.mv_data = val.data;
if(args->handle->txn == NULL) {
CHECK(mdb_txn_begin(args->handle->env, NULL, 0, & txn), err2); CHECK(mdb_txn_begin(args->handle->env, NULL, 0, & txn), err2);
} else {
txn = args->handle->txn;
}
ret = mdb_put(txn, args->handle->dbi, &mkey, &mdata, MDB_NOOVERWRITE); ret = mdb_put(txn, args->handle->dbi, &mkey, &mdata, MDB_NOOVERWRITE);
if (MDB_KEYEXIST == ret) { if (MDB_KEYEXIST == ret) {
@ -330,6 +342,7 @@ ASYNC_NIF_DECL(
if (ret != 0) if (ret != 0)
FAIL_ERR(ret, err1); FAIL_ERR(ret, err1);
if(args->handle->txn == NULL)
CHECK(mdb_txn_commit(txn), err1); CHECK(mdb_txn_commit(txn), err1);
ASYNC_NIF_REPLY(ATOM_OK); ASYNC_NIF_REPLY(ATOM_OK);
return; return;
@ -345,7 +358,6 @@ ASYNC_NIF_DECL(
enif_release_resource((void*)args->handle); enif_release_resource((void*)args->handle);
}); });
/** /**
* Update and existin value indexed by key. * Update and existin value indexed by key.
* *
@ -399,8 +411,15 @@ ASYNC_NIF_DECL(
mdata.mv_size = val.size; mdata.mv_size = val.size;
mdata.mv_data = val.data; mdata.mv_data = val.data;
if(args->handle->txn == NULL) {
CHECK(mdb_txn_begin(args->handle->env, NULL, 0, & txn), err2); CHECK(mdb_txn_begin(args->handle->env, NULL, 0, & txn), err2);
} else {
txn = args->handle->txn;
}
CHECK(mdb_put(txn, args->handle->dbi, &mkey, &mdata, 0), err1); CHECK(mdb_put(txn, args->handle->dbi, &mkey, &mdata, 0), err1);
if(args->handle->txn == NULL)
CHECK(mdb_txn_commit(txn), err1); CHECK(mdb_txn_commit(txn), err1);
ASYNC_NIF_REPLY(ATOM_OK); ASYNC_NIF_REPLY(ATOM_OK);
return; return;
@ -461,9 +480,14 @@ ASYNC_NIF_DECL(
mkey.mv_size = key.size; mkey.mv_size = key.size;
mkey.mv_data = key.data; mkey.mv_data = key.data;
if(args->handle->txn == NULL) {
CHECK(mdb_txn_begin(args->handle->env, NULL, 0, & txn), err); CHECK(mdb_txn_begin(args->handle->env, NULL, 0, & txn), err);
} else {
txn = args->handle->txn;
}
ret = mdb_get(txn, args->handle->dbi, &mkey, &mdata); ret = mdb_get(txn, args->handle->dbi, &mkey, &mdata);
if(args->handle->txn == NULL)
mdb_txn_abort(txn); mdb_txn_abort(txn);
if (MDB_NOTFOUND == ret) { if (MDB_NOTFOUND == ret) {
ASYNC_NIF_REPLY(ATOM_NOT_FOUND); ASYNC_NIF_REPLY(ATOM_NOT_FOUND);
@ -532,15 +556,21 @@ ASYNC_NIF_DECL(
mkey.mv_size = key.size; mkey.mv_size = key.size;
mkey.mv_data = key.data; mkey.mv_data = key.data;
if(args->handle->txn == NULL) {
CHECK(mdb_txn_begin(args->handle->env, NULL, 0, & txn), err); CHECK(mdb_txn_begin(args->handle->env, NULL, 0, & txn), err);
} else {
txn = args->handle->txn;
}
ret = mdb_del(txn, args->handle->dbi, &mkey, NULL); ret = mdb_del(txn, args->handle->dbi, &mkey, NULL);
if(MDB_NOTFOUND == ret) { if(MDB_NOTFOUND == ret) {
if(args->handle->txn == NULL)
mdb_txn_abort(txn); mdb_txn_abort(txn);
ASYNC_NIF_REPLY(ATOM_NOT_FOUND); ASYNC_NIF_REPLY(ATOM_NOT_FOUND);
return; return;
} }
if(args->handle->txn == NULL)
CHECK(mdb_txn_commit(txn), err); CHECK(mdb_txn_commit(txn), err);
ASYNC_NIF_REPLY(ATOM_OK); ASYNC_NIF_REPLY(ATOM_OK);
return; return;
@ -600,7 +630,109 @@ ASYNC_NIF_DECL(
enif_release_resource((void*)args->handle); enif_release_resource((void*)args->handle);
}); });
ASYNC_NIF_DECL(
lmdb_txn_begin,
{ // struct
struct lmdb *handle;
},
{ // pre
if (!(argc == 1 &&
enif_get_resource(env, argv[0], lmdb_RESOURCE, (void**)&args->handle))) {
ASYNC_NIF_RETURN_BADARG();
}
if (!args->handle->env)
ASYNC_NIF_RETURN_BADARG();
enif_keep_resource((void*)args->handle);
},
{ // work
ERL_NIF_TERM err;
int ret;
if(args->handle->txn == NULL) {
CHECK(mdb_txn_begin(args->handle->env, NULL, 0, &(args->handle->txn)), err2);
ASYNC_NIF_REPLY(ATOM_OK);
} else
ASYNC_NIF_REPLY(enif_make_tuple(env, 2, ATOM_ERROR, ATOM_TXN_STARTED));
return;
err2:
ASYNC_NIF_REPLY(err);
return;
},
{ // post
enif_release_resource((void*)args->handle);
});
ASYNC_NIF_DECL(
lmdb_txn_commit,
{ // struct
struct lmdb *handle;
},
{ // pre
if (!(argc == 1 &&
enif_get_resource(env, argv[0], lmdb_RESOURCE, (void**)&args->handle))) {
ASYNC_NIF_RETURN_BADARG();
}
if (!args->handle->env)
ASYNC_NIF_RETURN_BADARG();
enif_keep_resource((void*)args->handle);
},
{ // work
ERL_NIF_TERM err;
int ret;
if(args->handle->txn != NULL) {
CHECK(mdb_txn_commit(args->handle->txn), err2);
args->handle->txn = NULL;
ASYNC_NIF_REPLY(ATOM_OK);
} else
ASYNC_NIF_REPLY(enif_make_tuple(env, 2, ATOM_ERROR, ATOM_TXN_NOT_STARTED));
return;
err2:
ASYNC_NIF_REPLY(err);
return;
},
{ // post
enif_release_resource((void*)args->handle);
});
ASYNC_NIF_DECL(
lmdb_txn_abort,
{ // struct
struct lmdb *handle;
},
{ // pre
if (!(argc == 1 &&
enif_get_resource(env, argv[0], lmdb_RESOURCE, (void**)&args->handle))) {
ASYNC_NIF_RETURN_BADARG();
}
if (!args->handle->env)
ASYNC_NIF_RETURN_BADARG();
enif_keep_resource((void*)args->handle);
},
{ // work
if(args->handle->txn != NULL) {
mdb_txn_abort(args->handle->txn);
args->handle->txn = NULL;
ASYNC_NIF_REPLY(ATOM_OK);
} else
ASYNC_NIF_REPLY(enif_make_tuple(env, 2, ATOM_ERROR, ATOM_TXN_NOT_STARTED));
return;
},
{ // post
enif_release_resource((void*)args->handle);
});
static int lmdb_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) static int lmdb_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
{ {
@ -641,6 +773,9 @@ static int lmdb_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
ATOM_INCOMPATIBLE = enif_make_atom(env, "incompatible"); ATOM_INCOMPATIBLE = enif_make_atom(env, "incompatible");
ATOM_BAD_RSLOT = enif_make_atom(env, "bad_rslot"); ATOM_BAD_RSLOT = enif_make_atom(env, "bad_rslot");
ATOM_TXN_STARTED = enif_make_atom(env, "txn_started");
ATOM_TXN_NOT_STARTED = enif_make_atom(env, "txn_not_started");
lmdb_RESOURCE = enif_open_resource_type(env, NULL, "lmdb_resource", lmdb_RESOURCE = enif_open_resource_type(env, NULL, "lmdb_resource",
NULL, flags, NULL); NULL, flags, NULL);
return (0); return (0);
@ -681,7 +816,15 @@ static ErlNifFunc nif_funcs [] = {
{"get", 3, lmdb_get}, {"get", 3, lmdb_get},
{"del", 3, lmdb_del}, {"del", 3, lmdb_del},
{"update", 4, lmdb_update}, {"update", 4, lmdb_update},
{"drop", 2, lmdb_drop} {"drop", 2, lmdb_drop},
{"txn_begin", 2, lmdb_txn_begin},
{"txn_commit", 2, lmdb_txn_commit},
{"txn_abort", 2, lmdb_txn_abort}/*,
{"cursor_open", 2, lmdb_cursor_open},
{"cursor_close", 2, lmdb_cursor_close} */
}; };
/* driver entry point */ /* driver entry point */

View file

@ -1,7 +1,7 @@
%% -*- erlang -*- %% -*- erlang -*-
%% ex: ft=erlang ts=4 sw=4 et %% ex: ft=erlang ts=4 sw=4 et
{require_otp_vsn, "R1[567]"}. {require_otp_vsn, "R1[56]|1[78]"}.
{cover_enabled, true}. {cover_enabled, true}.
@ -33,7 +33,7 @@
]}. ]}.
{port_env, [ {port_env, [
{"DRV_CFLAGS", "$DRV_CFLAGS -O3 -fPIC -march=native -mtune=native -Wall -Wextra -Werror"} {"DRV_CFLAGS", "$DRV_CFLAGS -O3 -fPIC -march=native -mtune=native -Wall -Wextra"}
]}. ]}.
% for debugging use % for debugging use

View file

@ -32,7 +32,7 @@
%% EXPORTS %% EXPORTS
%%==================================================================== %%====================================================================
-export([ -export([
%open/1, open/1,
open/2, open/2,
open/3, open/3,
@ -40,6 +40,9 @@
put/3, put/3,
get/2, get/2,
txn_begin/1,
txn_commit/1,
txn_abort/1,
del/2, del/2,
update/3, upd/3, update/3, upd/3,
@ -80,8 +83,8 @@
%% @doc Create a new MDB database %% @doc Create a new MDB database
%% @end %% @end
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%open(DirName) -> open(DirName) ->
% open(DirName, ?MDB_MAP_SIZE). open(DirName, ?MDB_MAP_SIZE).
open(DirName, MapSize) open(DirName, MapSize)
when is_integer(MapSize) when is_integer(MapSize)
andalso MapSize > 0 -> andalso MapSize > 0 ->
@ -116,6 +119,24 @@ get(Handle, Key)
get(_AsyncRef, _Handle, _Key) -> get(_AsyncRef, _Handle, _Key) ->
?NOT_LOADED. ?NOT_LOADED.
txn_begin(Handle) ->
?ASYNC_NIF_CALL(fun txn_begin/2, [Handle]).
txn_begin(_AsyncRef, _Handle) ->
?NOT_LOADED.
txn_commit(Handle) ->
?ASYNC_NIF_CALL(fun txn_commit/2, [Handle]).
txn_commit(_AsyncRef, _Handle) ->
?NOT_LOADED.
txn_abort(Handle) ->
?ASYNC_NIF_CALL(fun txn_abort/2, [Handle]).
txn_abort(_AsyncRef, _Handle) ->
?NOT_LOADED.
del(Handle, Key) del(Handle, Key)
when is_binary(Key) -> when is_binary(Key) ->
?ASYNC_NIF_CALL(fun del/3, [Handle, Key]). ?ASYNC_NIF_CALL(fun del/3, [Handle, Key]).