WIP: slowly adding more LMDB API on the C/NIF side

This commit is contained in:
Gregory Burd 2013-05-21 07:32:34 -04:00
parent 47a17d15f1
commit c49213cbe6
3 changed files with 383 additions and 112 deletions

View file

@ -49,8 +49,8 @@ extern "C" {
} while (0) } while (0)
#endif #endif
#ifndef __UNUSED #ifndef UNUSED
#define __UNUSED(v) ((void)(v)) #define UNUSED(v) ((void)(v))
#endif #endif

View file

@ -2,7 +2,7 @@
* This file is part of LMDB - Erlang Lightning MDB API * This file is part of LMDB - Erlang Lightning MDB API
* *
* Copyright (c) 2012 by Aleph Archives. All rights reserved. * Copyright (c) 2012 by Aleph Archives. All rights reserved.
%% Copyright (c) 2013 by Basho Technologies, Inc. All rights reserved. * Copyright (c) 2013 by Basho Technologies, Inc. All rights reserved.
* *
* ------------------------------------------------------------------------- * -------------------------------------------------------------------------
* Redistribution and use in source and binary forms, with or without * Redistribution and use in source and binary forms, with or without
@ -38,23 +38,41 @@
#include "stats.h" #include "stats.h"
#include "lmdb.h" #include "lmdb.h"
STAT_DECL(lmdb_get, 1000); STAT_DECL(lmdb_env_get, 1000);
STAT_DECL(lmdb_put, 1000); STAT_DECL(lmdb_env_put, 1000);
STAT_DECL(lmdb_del, 1000); STAT_DECL(lmdb_env_del, 1000);
STAT_DECL(lmdb_upd, 1000); STAT_DECL(lmdb_env_upd, 1000);
static ErlNifResourceType *lmdb_RESOURCE; static ErlNifResourceType *lmdb_env_RESOURCE;
struct lmdb { struct lmdb_env {
MDB_env *env; MDB_env *env;
MDB_dbi dbi; STAT_DEF(lmdb_env_get);
STAT_DEF(lmdb_get); STAT_DEF(lmdb_env_put);
STAT_DEF(lmdb_put); STAT_DEF(lmdb_env_del);
STAT_DEF(lmdb_del); STAT_DEF(lmdb_env_upd);
STAT_DEF(lmdb_upd);
}; };
static ErlNifResourceType *lmdb_dbi_RESOURCE;
struct lmdb_dbi {
MDB_dbi dbi;
};
static ErlNifResourceType *lmdb_txn_RESOURCE;
struct lmdb_txn {
MDB_txn *txn;
};
static ErlNifResourceType *lmdb_cursor_RESOURCE;
struct lmdb_cursor {
MDB_cursor *cursor;
};
KHASH_MAP_INIT_PTR(envs, struct lmdb_env*);
struct lmdb_priv_data { struct lmdb_priv_data {
void *async_nif_priv; // Note: must be first element in struct void *async_nif_priv; // Note: must be first element in struct
khash_t(envs) *envs; // TODO: could just be a list
ErlNifMutex *envs_mutex;
}; };
/* Global init for async_nif. */ /* Global init for async_nif. */
@ -175,6 +193,190 @@ __strerror_term(ErlNifEnv* env, int err)
enif_make_string(env, mdb_strerror(err), ERL_NIF_LATIN1))); enif_make_string(env, mdb_strerror(err), ERL_NIF_LATIN1)));
} }
/**
* Opens an MDB environment
*
* argv[0] path to directory for the database files
* argv[1] flags
* argv[2] file mode
* argv[3] mapsize
* argv[4] maxreaders
* argv[5] maxdbs
*/
ASYNC_NIF_DECL(
lmdb_env_open_nif,
{ // struct
char dirname[MAXPATHLEN];
unsigned int flags;
mdb_mod_t mode;
size_t mapsize;
unsigned int maxreaders;
MDB_dbi maxdbs;
struct wterl_priv_data *priv;
},
{ // pre
if (!(argc == 6 &&
enif_is_list(env, argv[0]) &&
enif_is_number(env, argv[1]) &&
enif_is_number(env, argv[2]) &&
enif_is_number(env, argv[3]) &&
enif_is_number(env, argv[4]) &&
enif_is_number(env, argv[5]))) {
ASYNC_NIF_RETURN_BADARG();
}
if (enif_get_string(env, argv[0], args->dirname, MAXPATHLEN, ERL_NIF_LATIN1) <= 0)
ASYNC_NIF_RETURN_BADARG();
enif_get_uint32(env, argv[1], &(args->flags));
enif_get_int32(env, argv[2], &(args->mode));
#if (__SIZEOF_SIZE_T__ == 8)
enif_get_int64(env, argv[3], &(args->mapsize));
#else if (__SIZEOF_SIZE_T__ == 4)
enif_get_int32(env, argv[3], &(args->mapsize));
#endif
enif_get_int32(env, argv[4], &(args->maxreaders));
enif_get_int32(env, argv[5], &(args->maxdbs));
args->priv = (struct lmdb_priv_data *)enif_priv_data(env);
},
{ // work
int ret;
ERL_NIF_TERM err;
struct lmdb_env *handle;
khash_t(envs) *h;
khiter_t itr;
int itr_status;
if ((handle = enif_alloc_resource(lmdb_RESOURCE, sizeof(struct lmdb))) == NULL)
FAIL_ERR(ENOMEM, err2);
STAT_INIT(handle, lmdb_env_get);
STAT_INIT(handle, lmdb_env_put);
STAT_INIT(handle, lmdb_env_upd);
STAT_INIT(handle, lmdb_env_del);
CHECK(mdb_env_create(&(handle->env)), err1);
if (mdb_env_set_mapsize(handle->env, args->mapsize)) {
err = enif_make_badarg(handle->env);
goto err1;
}
if (mdb_env_set_maxreaders(handle->env, args->maxreaders)) {
err = enif_make_badarg(handle->env);
goto err1;
}
if (mdb_env_set_maxdbs(handle->env, args->maxdbs)) {
err = enif_make_badarg(handle->env);
goto err1;
}
h = args->priv->envs;
itr = kh_put(envs, h, handle, &itr_status);
kh_value(h, itr) = handle;
ERL_NIF_TERM term = enif_make_resource(env, handle);
ASYNC_NIF_REPLY(enif_make_tuple(env, 2, ATOM_OK, term));
enif_release_resource(handle);
return;
err1:
mdb_env_close(handle->env);
err2:
enif_release_resource(handle);
ASYNC_NIF_REPLY(err);
return;
},
{ // post
});
/**
* Copy an MDB environment to the specified path.
*
* argv[0] an environment handle
* argv[1] destination path
*/
ASYNC_NIF_DECL(
lmdb_copy_nif,
{ // struct
struct lmdb_env *handle;
char dirname[MAXPATHLEN];
},
{ // pre
if (!(argc == 2 &&
enif_get_resource(env, argv[0], lmdb_RESOURCE, (void**)&args->handle) &&
enif_is_list(env, arg[1])) {
ASYNC_NIF_RETURN_BADARG();
}
if (enif_get_string(env, argv[1], args->dirname, MAXPATHLEN,
ERL_NIF_LATIN1) <= 0)
ASYNC_NIF_RETURN_BADARG();
if (!args->handle->env)
ASYNC_NIF_RETURN_BADARG();
enif_keep_resource((void*)args->handle);
},
{ // work
ERL_NIF_TERM err;
int ret;
CHECK(mdb_env_copy(args->handle->env, args->dirname, err));
ASYNC_NIF_REPLY(ATOM_OK);
return;
err:
ASYNC_NIF_REPLY(err);
return;
},
{ // post
enif_release_resource((void*)args->handle);
}).
/**
* ??
*
* argv[0] ??
*/
ASYNC_NIF_DECL(
lmdb_??_nif,
{ // struct
struct lmdb *handle;
},
{ // pre
if (!(argc == 1 &&
enif_get_resource(env, argv[0], lmdb_RESOURCE, (void**)&args->handle))) {
ASYNC_NIF_RETURN_BADARG();
}
if (!args->handle->env)
ASYNC_NIF_RETURN_BADARG();
enif_keep_resource((void*)args->handle);
},
{ // work
ERL_NIF_TERM err;
int ret;
CHECK(??, err);
ASYNC_NIF_REPLY(enif_make_tuple(env, 2, ATOM_OK, term));
return;
err:
ASYNC_NIF_REPLY(err);
return;
},
{ // post
enif_release_resource((void*)args->handle);
}).
/** /**
* Opens a MDB database. * Opens a MDB database.
* *
@ -195,6 +397,7 @@ ASYNC_NIF_DECL(
ErlNifUInt64 envflags; ErlNifUInt64 envflags;
}, },
{ // pre { // pre
if (!(argc == 3 && if (!(argc == 3 &&
enif_is_list(env, argv[0]) && enif_is_list(env, argv[0]) &&
enif_is_number(env, argv[1]) && enif_is_number(env, argv[1]) &&
@ -216,6 +419,7 @@ ASYNC_NIF_DECL(
if ((handle = enif_alloc_resource(lmdb_RESOURCE, sizeof(struct lmdb))) == NULL) if ((handle = enif_alloc_resource(lmdb_RESOURCE, sizeof(struct lmdb))) == NULL)
FAIL_ERR(ENOMEM, err3); FAIL_ERR(ENOMEM, err3);
enif_release_resource(handle);
STAT_INIT(handle, lmdb_get); STAT_INIT(handle, lmdb_get);
STAT_INIT(handle, lmdb_put); STAT_INIT(handle, lmdb_put);
@ -235,7 +439,6 @@ ASYNC_NIF_DECL(
CHECK(mdb_txn_commit(txn), err1); CHECK(mdb_txn_commit(txn), err1);
ERL_NIF_TERM term = enif_make_resource(env, handle); ERL_NIF_TERM term = enif_make_resource(env, handle);
enif_release_resource(handle);
ASYNC_NIF_REPLY(enif_make_tuple(env, 2, ATOM_OK, term)); ASYNC_NIF_REPLY(enif_make_tuple(env, 2, ATOM_OK, term));
return; return;
@ -244,6 +447,7 @@ ASYNC_NIF_DECL(
err2: err2:
mdb_env_close(handle->env); mdb_env_close(handle->env);
err3: err3:
enif_release_resource(handle);
ASYNC_NIF_REPLY(err); ASYNC_NIF_REPLY(err);
return; return;
}, },
@ -642,23 +846,38 @@ ASYNC_NIF_DECL(
}); });
/**
static int lmdb_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) * Called as this driver is loaded by the Erlang BEAM runtime triggered by the
* module's on_load directive.
*
* env the NIF environment
* priv_data used to hold the state for this NIF rather than global variables
* load_info an Erlang term passed in with this call
*/
static int
lmdb_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
{ {
__UNUSED(load_info); UNUSED(load_info);
int err;
char msg[1024];
ErlNifResourceFlags flags;
struct lmdb_priv_data *priv;
ErlNifResourceFlags flags = ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER; flags = ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER;
priv = enif_alloc(sizeof(struct lmdb_priv_data));
struct lmdb_priv_data *priv = enif_alloc(sizeof(struct lmdb_priv_data));
if (!priv) if (!priv)
return ENOMEM; FAIL_ERR(ENOMEM, err1);
memset(priv, 0, sizeof(struct lmdb_priv_data)); memset(priv, 0, sizeof(struct lmdb_priv_data));
priv->envs_mutex = enif_mutex_create(NULL);
priv->envs = kh_init(envs);
if (!priv->envs)
FAIL_ERR(ENOMEM, err2);
/* Note: !!! the first element of our priv_data struct *must* be the /* Note: !!! the first element of our priv_data struct *must* be the
pointer to the async_nif's private data which we set here. */ pointer to the async_nif's private data which we set here. */
ASYNC_NIF_LOAD(lmdb, priv->async_nif_priv); ASYNC_NIF_LOAD(lmdb, priv->async_nif_priv);
if (!priv) if (!priv)
return ENOMEM; FAIL_ERR(ENOMEM, err3);
*priv_data = priv; *priv_data = priv;
ATOM_ERROR = enif_make_atom(env, "error"); ATOM_ERROR = enif_make_atom(env, "error");
@ -684,68 +903,111 @@ static int lmdb_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
lmdb_RESOURCE = enif_open_resource_type(env, NULL, "lmdb_resource", lmdb_RESOURCE = enif_open_resource_type(env, NULL, "lmdb_resource",
NULL, flags, NULL); NULL, flags, NULL);
fprintf(stderr, "NIF on_load complete (lmdb version: %s)", MDB_VERSION_STRING);
fflush(stderr);
return (0); return (0);
}
static int lmdb_reload(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM info) err3:
{ kh_destroy(envs, priv->envs);
__UNUSED(env); err2:
__UNUSED(priv_data); enif_mutex_destroy(priv->conns_mutex);
__UNUSED(info); enif_free(priv);
return (0); // TODO: err1:
return (ENOMEM);
} }
static int lmdb_upgrade(ErlNifEnv* env, void** priv_data, void** old_priv, ERL_NIF_TERM load_info) /**
* TODO:
*/
static int
lmdb_reload(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM info)
{ {
__UNUSED(env); UNUSED(env);
__UNUSED(priv_data); UNUSED(priv_data);
__UNUSED(old_priv); UNUSED(info);
__UNUSED(load_info); return (0); // TODO: implement
}
/**
* TODO:
*/
static int
lmdb_upgrade(ErlNifEnv* env, void** priv_data, void** old_priv, ERL_NIF_TERM load_info)
{
UNUSED(env);
UNUSED(priv_data);
UNUSED(old_priv);
UNUSED(load_info);
ASYNC_NIF_UPGRADE(lmdb, env); ASYNC_NIF_UPGRADE(lmdb, env);
return (0); // TODO: return (0); // TODO:
} }
static void lmdb_unload(ErlNifEnv* env, void* priv_data) /**
* TODO:
*/
static void
lmdb_unload(ErlNifEnv* env, void* priv_data)
{ {
struct lmdb_priv_data *priv = (struct lmdb_priv_data *)priv_data; struct lmdb_priv_data *priv = (struct lmdb_priv_data *)priv_data;
khash_t(envs) *h;
khiter_t itr_envs;
struct lmdb_env *env;
enif_mutex_lock(priv->envs_mutex);
h = priv->envs;
for (itr_envs = kh_begin(h); itr_envs != kh_end(h); ++itr_envs) {
if (kh_exist(h, itr_envs)) {
env = kh_val(h, itr_envs);
if (env) {
mdb_env_close(env);
kh_del(envs, h, itr_envs);
enif_free(env);
kh_value(h, itr_envs) = NULL;
}
}
}
kh_destroy(envs, h);
ASYNC_NIF_UNLOAD(lmdb, env, priv->async_nif_priv); ASYNC_NIF_UNLOAD(lmdb, env, priv->async_nif_priv);
enif_mutex_unlock(priv->envs_mutex);
enif_mutex_destroy(priv->envs_mutex);
enif_free(priv); enif_free(priv);
return; return;
} }
static ErlNifFunc nif_funcs [] = { static ErlNifFunc nif_funcs [] = {
{"env_open" 4, lmdb_env_open}, // [(Ref), Path, Bitmask, Mode] {"env_open_nif", 7, lmdb_env_open}, // [(Ref), Path, Bitmask, Mode]
{"copy", 3, lmdb_copy}, // [(Ref), Env, Path] {"copy_nif", 3, lmdb_copy}, // [(Ref), Env, Path]
{"stat", 2, lmdb_stat}, // [(Ref), Env] {"stat_nif", 2, lmdb_stat}, // [(Ref), Env]
{"sync", 3, lmdb_sync}, // [(Ref), Env, Force] {"sync_nif", 3, lmdb_sync}, // [(Ref), Env, Force]
{"env_close", 2, lmdb_env_close}, // [(Ref), Env] {"env_close_nif", 2, lmdb_env_close}, // [(Ref), Env]
{"path", 2, lmdb_path}, // [(Ref), Env] {"path_nif", 2, lmdb_path}, // [(Ref), Env]
{"set_maxdbs", 3, lmdb_set_maxdbs}, // [(Ref), Env, Dbs] {"set_maxdbs_nif", 3, lmdb_set_maxdbs}, // [(Ref), Env, Dbs]
{"txn_begin", 4, lmdb_txn_begin}, // [(Ref), Env, Parent, Bitmask]); {"txn_begin_nif", 4, lmdb_txn_begin}, // [(Ref), Env, Parent, Bitmask]);
{"txn_commit", 2, lmdb_txn_commit}, // [(Ref), Txn] {"txn_commit_nif", 2, lmdb_txn_commit}, // [(Ref), Txn]
{"txn_abort", 2, lmdb_txn_abort}, // [(Ref), Txn] {"txn_abort_nif", 2, lmdb_txn_abort}, // [(Ref), Txn]
{"txn_reset", 2, lmdb_txn_reset}, // [(Ref), Txn] {"txn_reset_nif", 2, lmdb_txn_reset}, // [(Ref), Txn]
{"txn_renew", 2, lmdb_txn_renew}, // [(Ref), Txn] {"txn_renew_nif", 2, lmdb_txn_renew}, // [(Ref), Txn]
{"dbi_open", 5, lmdb_dbi_open}, // [(Ref), Txn, Path, Size, Bitmask] {"dbi_open_nif", 5, lmdb_dbi_open}, // [(Ref), Txn, Path, Size, Bitmask]
{"dbi_close", 2, lmdb_dbi_close}, // [(Ref), Env, Dbi] {"dbi_close_nif", 2, lmdb_dbi_close}, // [(Ref), Env, Dbi]
{"drop", 4, lmdb_drop}, // [(Ref), Txn, Dbi, Delete] {"drop_nif", 4, lmdb_drop}, // [(Ref), Txn, Dbi, Delete]
{"get", 3, lmdb_get}, // [(Ref), Txn, Dbi, Key] {"get_nif", 3, lmdb_get}, // [(Ref), Txn, Dbi, Key]
{"put", 4, lmdb_put}, // [(Ref), Txn, Dbi, Key, Value, Bitmask] {"put_nif", 4, lmdb_put}, // [(Ref), Txn, Dbi, Key, Value, Bitmask]
{"del", 5, lmdb_del}, // [(Ref), Txn, Dbi, Key, Value] {"del_nif", 5, lmdb_del}, // [(Ref), Txn, Dbi, Key, Value]
{"cursor_open", 3, lmdb_cursor_open}, // [(Ref), Txn, Dbi] {"cursor_open_nif", 3, lmdb_cursor_open}, // [(Ref), Txn, Dbi]
{"cursor_close", 2, lmdb_cursor_close}, // [(Ref), Cursor] {"cursor_close_nif", 2, lmdb_cursor_close}, // [(Ref), Cursor]
{"cursor_renew", 3, lmdb_cursor_renew}, // [(Ref), Txn, Cursor] {"cursor_renew_nif", 3, lmdb_cursor_renew}, // [(Ref), Txn, Cursor]
{"cursor_txn", 2, lmdb_cursor_txn}, // [(Ref), Cursor] {"cursor_txn_nif", 2, lmdb_cursor_txn}, // [(Ref), Cursor]
{"cursor_dbi", 2, lmdb_cursor_dbi}, // [(Ref), Cursor] {"cursor_dbi_nif", 2, lmdb_cursor_dbi}, // [(Ref), Cursor]
{"cursor_get", 5, lmdb_cursor_get}, // [(Ref), Cursor, Key, DupValue, CursorOp] {"cursor_get_nif", 5, lmdb_cursor_get}, // [(Ref), Cursor, Key, DupValue, CursorOp]
{"cursor_put", 5, lmdb_cursor_put}, // [(Ref), Cursor, Key, Value, Options] {"cursor_put_nif", 5, lmdb_cursor_put}, // [(Ref), Cursor, Key, Value, Options]
{"cursor_del", 3, lmdb_cursor_del}, // [(Ref), Cursor, Bitmask] {"cursor_del_nif", 3, lmdb_cursor_del}, // [(Ref), Cursor, Bitmask]
{"cursor_count", 2, lmdb_cursor_count}, // [(Ref), Cursor] {"cursor_count_nif", 2, lmdb_cursor_count}, // [(Ref), Cursor]
{"cmp", 5, lmdb_cmp}, // [(Ref), Txn, Dbi, A, B] {"cmp_nif", 5, lmdb_cmp}, // [(Ref), Txn, Dbi, A, B]
{"dup_cmp", 5, lmdb_dup_cmp}, // [(Ref), Txn, Dbi, A, B] {"dup_cmp_nif", 5, lmdb_dup_cmp}, // [(Ref), Txn, Dbi, A, B]
{"version", 1, lmdb_version} // [(Ref)] {"version_nif", 1, lmdb_version} // [(Ref)]
}; };
/* driver entry point */ /* driver entry point */

View file

@ -33,7 +33,7 @@
%%==================================================================== %%====================================================================
%% Public API: %% Public API:
-export([env_open/3, -export([env_open/3, env_open/6,
env_close/1, env_close/1,
copy/2, copy/2,
stat/1, stat/1,
@ -131,7 +131,7 @@
prev | prev_dup | prev_multiple | prev_nodup | prev | prev_dup | prev_multiple | prev_nodup |
set | set_key | set_range. set | set_key | set_range.
-type path() :: string(). -type path() :: string().
-type mode() :: string(). -type mode() :: non_neg_integer().
-type byte_size() :: non_neg_integer() | { non_neg_integer(), -type byte_size() :: non_neg_integer() | { non_neg_integer(),
b|bytes|'GB'|'GiB'|'TB'|'TiB'|'PB'|'PiB' }. b|bytes|'GB'|'GiB'|'TB'|'TiB'|'PB'|'PiB' }.
@ -198,6 +198,9 @@
%% %%
%% mode The UNIX permissions to set on created files. This parameter %% mode The UNIX permissions to set on created files. This parameter
%% is ignored on Windows. %% is ignored on Windows.
%% mapsize
%% maxreaders
%% maxdbs
%% %%
%% Possible errors are: %% Possible errors are:
%% MDB_VERSION_MISMATCH - the version of the MDB library doesn't match the %% MDB_VERSION_MISMATCH - the version of the MDB library doesn't match the
@ -210,7 +213,11 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec env_open(path(), options(), mode()) -> {ok, env()} | {error, term()}. -spec env_open(path(), options(), mode()) -> {ok, env()} | {error, term()}.
-spec env_open(path(), options(), mode(), non_neg_integer(),
non_neg_integer(), non_neg_integer()) -> {ok, env()} | {error, term()}.
env_open(Path, Options, Mode) -> env_open(Path, Options, Mode) ->
env_open(Path, Options, Mode, {2, 'GB'}, 1000, 100). % guess at default values
env_open(Path, Options, Mode, MapSize, MaxReaders, MaxDbs) ->
ValidOptions = [{ fixedmap, ?MDB_FIXEDMAP, []}, ValidOptions = [{ fixedmap, ?MDB_FIXEDMAP, []},
{ nosubdir, ?MDB_NOSUBDIR, []}, { nosubdir, ?MDB_NOSUBDIR, []},
{ rdonly, ?MDB_RDONLY, [writemap]}, { rdonly, ?MDB_RDONLY, [writemap]},
@ -223,12 +230,14 @@ env_open(Path, Options, Mode) ->
{ok, Bitmask} -> {ok, Bitmask} ->
%% Ensure directory exists %% Ensure directory exists
ok = filelib:ensure_dir(filename:join([Path, "x"])), ok = filelib:ensure_dir(filename:join([Path, "x"])),
?ASYNC_NIF_CALL(fun env_open_nif/4, [Path, Bitmask, Mode]); ?ASYNC_NIF_CALL(fun env_open_nif/7, [Path, Bitmask, Mode,
in_bytes(MapSize), MaxReaders,
MaxDbs]);
{error, _Reason}=Error -> {error, _Reason}=Error ->
Error Error
end. end.
env_open_nif(_AsyncRef, _Path, _Options, _Mode) -> env_open_nif(_AsyncRef, _Path, _Options, _Mode, _MapSize, _MaxReaders, _MaxDbs) ->
?NOT_LOADED. ?NOT_LOADED.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------