WIP: adding more of the API.

This commit is contained in:
Gregory Burd 2013-08-05 12:45:43 -04:00
parent 095031a23d
commit 6c1c27a51a
4 changed files with 237 additions and 140 deletions

View file

@ -2949,6 +2949,19 @@ mdb_env_init_meta(MDB_env *env, MDB_meta *meta)
MDB_page *p, *q; MDB_page *p, *q;
int rc; int rc;
unsigned int psize; unsigned int psize;
#ifdef _WIN32
DWORD len;
OVERLAPPED ov;
memset(&ov, 0, sizeof(ov));
#define DO_PWRITE(rc, fd, ptr, size, len, pos) do { \
ov.Offset = pos; \
rc = WriteFile(fd, ptr, size, &len, &ov); } while(0)
#else
unsigned int len;
#define DO_PWRITE(rc, fd, ptr, size, len, pos) do { \
len = pwrite(fd, ptr, size, pos); \
rc = (len >= 0); } while(0)
#endif
DPUTS("writing new meta page"); DPUTS("writing new meta page");
@ -2974,18 +2987,13 @@ mdb_env_init_meta(MDB_env *env, MDB_meta *meta)
q->mp_flags = P_META; q->mp_flags = P_META;
*(MDB_meta *)METADATA(q) = *meta; *(MDB_meta *)METADATA(q) = *meta;
#ifdef _WIN32 DO_PWRITE(rc, env->me_fd, p, psize * 2, len, 0);
{ if (!rc)
DWORD len; rc = ErrCode();
OVERLAPPED ov; else if (len == psize * 2)
memset(&ov, 0, sizeof(ov)); rc = MDB_SUCCESS;
rc = WriteFile(env->me_fd, p, psize * 2, &len, &ov); else
rc = rc ? (len == psize * 2 ? MDB_SUCCESS : EIO) : ErrCode(); rc = ENOSPC;
}
#else
rc = pwrite(env->me_fd, p, psize * 2, 0);
rc = (rc == (int)psize * 2) ? MDB_SUCCESS : rc < 0 ? ErrCode() : EIO;
#endif
free(p); free(p);
return rc; return rc;
} }
@ -3535,20 +3543,35 @@ mdb_hash_val(MDB_val *val, mdb_hash_t hval)
return hval; return hval;
} }
/** Hash the string and output the hash in hex. /** Hash the string and output the encoded hash.
* This uses modified RFC1924 Ascii85 encoding to accommodate systems with
* very short name limits. We don't care about the encoding being reversible,
* we just want to preserve as many bits of the input as possible in a
* small printable string.
* @param[in] str string to hash * @param[in] str string to hash
* @param[out] hexbuf an array of 17 chars to hold the hash * @param[out] encbuf an array of 11 chars to hold the hash
*/ */
const static char mdb_a85[]= "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz!#$%&()*+-;<=>?@^_`{|}~";
static void static void
mdb_hash_hex(MDB_val *val, char *hexbuf) mdb_pack85(unsigned long l, char *out)
{ {
int i; int i;
mdb_hash_t h = mdb_hash_val(val, MDB_HASH_INIT); for (i=0; i<5; i++) {
for (i=0; i<8; i++) { *out++ = mdb_a85[l % 85];
hexbuf += sprintf(hexbuf, "%02x", (unsigned int)h & 0xff); l /= 85;
h >>= 8;
} }
} }
static void
mdb_hash_enc(MDB_val *val, char *encbuf)
{
mdb_hash_t h = mdb_hash_val(val, MDB_HASH_INIT);
unsigned long *l = (unsigned long *)&h;
mdb_pack85(l[0], encbuf);
mdb_pack85(l[1], encbuf+5);
encbuf[10] = '\0';
}
#endif #endif
/** Open and/or initialize the lock region for the environment. /** Open and/or initialize the lock region for the environment.
@ -3661,7 +3684,7 @@ mdb_env_setup_locks(MDB_env *env, char *lpath, int mode, int *excl)
DWORD nlow; DWORD nlow;
} idbuf; } idbuf;
MDB_val val; MDB_val val;
char hexbuf[17]; char encbuf[11];
if (!mdb_sec_inited) { if (!mdb_sec_inited) {
InitializeSecurityDescriptor(&mdb_null_sd, InitializeSecurityDescriptor(&mdb_null_sd,
@ -3678,9 +3701,9 @@ mdb_env_setup_locks(MDB_env *env, char *lpath, int mode, int *excl)
idbuf.nlow = stbuf.nFileIndexLow; idbuf.nlow = stbuf.nFileIndexLow;
val.mv_data = &idbuf; val.mv_data = &idbuf;
val.mv_size = sizeof(idbuf); val.mv_size = sizeof(idbuf);
mdb_hash_hex(&val, hexbuf); mdb_hash_enc(&val, encbuf);
sprintf(env->me_txns->mti_rmname, "Global\\MDBr%s", hexbuf); sprintf(env->me_txns->mti_rmname, "Global\\MDBr%s", encbuf);
sprintf(env->me_txns->mti_wmname, "Global\\MDBw%s", hexbuf); sprintf(env->me_txns->mti_wmname, "Global\\MDBw%s", encbuf);
env->me_rmutex = CreateMutex(&mdb_all_sa, FALSE, env->me_txns->mti_rmname); env->me_rmutex = CreateMutex(&mdb_all_sa, FALSE, env->me_txns->mti_rmname);
if (!env->me_rmutex) goto fail_errno; if (!env->me_rmutex) goto fail_errno;
env->me_wmutex = CreateMutex(&mdb_all_sa, FALSE, env->me_txns->mti_wmname); env->me_wmutex = CreateMutex(&mdb_all_sa, FALSE, env->me_txns->mti_wmname);
@ -3692,16 +3715,22 @@ mdb_env_setup_locks(MDB_env *env, char *lpath, int mode, int *excl)
ino_t ino; ino_t ino;
} idbuf; } idbuf;
MDB_val val; MDB_val val;
char hexbuf[17]; char encbuf[11];
#if defined(__NetBSD__)
#define MDB_SHORT_SEMNAMES 1 /* limited to 14 chars */
#endif
if (fstat(env->me_lfd, &stbuf)) goto fail_errno; if (fstat(env->me_lfd, &stbuf)) goto fail_errno;
idbuf.dev = stbuf.st_dev; idbuf.dev = stbuf.st_dev;
idbuf.ino = stbuf.st_ino; idbuf.ino = stbuf.st_ino;
val.mv_data = &idbuf; val.mv_data = &idbuf;
val.mv_size = sizeof(idbuf); val.mv_size = sizeof(idbuf);
mdb_hash_hex(&val, hexbuf); mdb_hash_enc(&val, encbuf);
sprintf(env->me_txns->mti_rmname, "/MDBr%s", hexbuf); #ifdef MDB_SHORT_SEMNAMES
sprintf(env->me_txns->mti_wmname, "/MDBw%s", hexbuf); encbuf[9] = '\0'; /* drop name from 15 chars to 14 chars */
#endif
sprintf(env->me_txns->mti_rmname, "/MDBr%s", encbuf);
sprintf(env->me_txns->mti_wmname, "/MDBw%s", encbuf);
/* Clean up after a previous run, if needed: Try to /* Clean up after a previous run, if needed: Try to
* remove both semaphores before doing anything else. * remove both semaphores before doing anything else.
*/ */
@ -3984,6 +4013,14 @@ mdb_env_copyfd(MDB_env *env, HANDLE fd)
int rc; int rc;
size_t wsize; size_t wsize;
char *ptr; char *ptr;
#ifdef _WIN32
DWORD len, w2;
#define DO_WRITE(rc, fd, ptr, w2, len) rc = WriteFile(fd, ptr, w2, &len, NULL)
#else
ssize_t len;
size_t w2;
#define DO_WRITE(rc, fd, ptr, w2, len) len = write(fd, ptr, w2); rc = (len >= 0)
#endif
/* Do the lock/unlock of the reader mutex before starting the /* Do the lock/unlock of the reader mutex before starting the
* write txn. Otherwise other read txns could block writers. * write txn. Otherwise other read txns could block writers.
@ -4007,52 +4044,50 @@ mdb_env_copyfd(MDB_env *env, HANDLE fd)
} }
wsize = env->me_psize * 2; wsize = env->me_psize * 2;
#ifdef _WIN32 ptr = env->me_map;
{ w2 = wsize;
DWORD len; while (w2 > 0) {
rc = WriteFile(fd, env->me_map, wsize, &len, NULL); DO_WRITE(rc, fd, ptr, w2, len);
rc = rc ? (len == wsize ? MDB_SUCCESS : EIO) : ErrCode(); if (!rc) {
rc = ErrCode();
break;
} else if (len > 0) {
rc = MDB_SUCCESS;
ptr += len;
w2 -= len;
continue;
} else {
/* Non-blocking or async handles are not supported */
rc = EIO;
break;
}
} }
#else
rc = write(fd, env->me_map, wsize);
rc = rc == (int)wsize ? MDB_SUCCESS : rc < 0 ? ErrCode() : EIO;
#endif
if (env->me_txns) if (env->me_txns)
UNLOCK_MUTEX_W(env); UNLOCK_MUTEX_W(env);
if (rc) if (rc)
goto leave; goto leave;
ptr = env->me_map + wsize;
wsize = txn->mt_next_pgno * env->me_psize - wsize; wsize = txn->mt_next_pgno * env->me_psize - wsize;
#ifdef _WIN32
while (wsize > 0) { while (wsize > 0) {
DWORD len, w2;
if (wsize > MAX_WRITE) if (wsize > MAX_WRITE)
w2 = MAX_WRITE; w2 = MAX_WRITE;
else else
w2 = wsize; w2 = wsize;
rc = WriteFile(fd, ptr, w2, &len, NULL); DO_WRITE(rc, fd, ptr, w2, len);
rc = rc ? (len == w2 ? MDB_SUCCESS : EIO) : ErrCode(); if (!rc) {
if (rc) break; rc = ErrCode();
wsize -= w2; break;
ptr += w2; } else if (len > 0) {
rc = MDB_SUCCESS;
ptr += len;
wsize -= len;
continue;
} else {
rc = EIO;
break;
} }
#else
while (wsize > 0) {
size_t w2;
ssize_t wres;
if (wsize > MAX_WRITE)
w2 = MAX_WRITE;
else
w2 = wsize;
wres = write(fd, ptr, w2);
rc = wres == (ssize_t)w2 ? MDB_SUCCESS : wres < 0 ? ErrCode() : EIO;
if (rc) break;
wsize -= wres;
ptr += wres;
} }
#endif
leave: leave:
mdb_txn_abort(txn); mdb_txn_abort(txn);
@ -7659,7 +7694,10 @@ mdb_env_info(MDB_env *env, MDB_envinfo *arg)
arg->me_mapaddr = (env->me_flags & MDB_FIXEDMAP) ? env->me_map : 0; arg->me_mapaddr = (env->me_flags & MDB_FIXEDMAP) ? env->me_map : 0;
arg->me_mapsize = env->me_mapsize; arg->me_mapsize = env->me_mapsize;
arg->me_maxreaders = env->me_maxreaders; arg->me_maxreaders = env->me_maxreaders;
arg->me_numreaders = env->me_numreaders; /* me_numreaders may be zero if this process never used any readers. Use
* the shared numreader count if it exists.
*/
arg->me_numreaders = env->me_txns ? env->me_txns->mti_numreaders : env->me_numreaders;
arg->me_last_pgno = env->me_metas[toggle]->mm_last_pg; arg->me_last_pgno = env->me_metas[toggle]->mm_last_pg;
arg->me_last_txnid = env->me_metas[toggle]->mm_txnid; arg->me_last_txnid = env->me_metas[toggle]->mm_txnid;
return MDB_SUCCESS; return MDB_SUCCESS;

View file

@ -36,6 +36,7 @@
#include "common.h" #include "common.h"
#include "async_nif.h" #include "async_nif.h"
#include "queue.h"
#include "lmdb.h" #include "lmdb.h"
@ -43,6 +44,7 @@ static ErlNifResourceType *lmdb_RESOURCE;
struct lmdb { struct lmdb {
MDB_env *env; MDB_env *env;
MDB_dbi dbi; MDB_dbi dbi;
STAILQ_ENTRY(lmdb) entries;
}; };
static ErlNifResourceType *lmdb_txn_RESOURCE; static ErlNifResourceType *lmdb_txn_RESOURCE;
@ -55,10 +57,9 @@ struct lmdb_cursor {
MDB_cursor *cursor; MDB_cursor *cursor;
}; };
KHASH_MAP_INIT_PTR(envs, struct lmdb_env*);
struct lmdb_priv_data { struct lmdb_priv_data {
void *async_nif_priv; // Note: must be first element in struct void *async_nif_priv; // Note: must be first element in struct
khash_t(envs) *envs; // TODO: could just be a list STAILQ_HEAD(envs, lmdb) envs;
ErlNifMutex *envs_mutex; ErlNifMutex *envs_mutex;
}; };
@ -66,27 +67,34 @@ struct lmdb_priv_data {
ASYNC_NIF_INIT(lmdb); ASYNC_NIF_INIT(lmdb);
/* Atoms (initialized in on_load) */ /* Atoms (initialized in on_load) */
static ERL_NIF_TERM ATOM_ERROR; static ERL_NIF_TERM ATOM_BAD_RSLOT;
static ERL_NIF_TERM ATOM_OK; static ERL_NIF_TERM ATOM_BRANCH_PAGES;
static ERL_NIF_TERM ATOM_NOT_FOUND;
static ERL_NIF_TERM ATOM_EXISTS;
static ERL_NIF_TERM ATOM_KEYEXIST;
static ERL_NIF_TERM ATOM_NOTFOUND;
static ERL_NIF_TERM ATOM_PAGE_NOTFOUND;
static ERL_NIF_TERM ATOM_CORRUPTED; static ERL_NIF_TERM ATOM_CORRUPTED;
static ERL_NIF_TERM ATOM_PANIC; static ERL_NIF_TERM ATOM_CURSOR_FULL;
static ERL_NIF_TERM ATOM_VERSION_MISMATCH;
static ERL_NIF_TERM ATOM_KEYEXIST;
static ERL_NIF_TERM ATOM_MAP_FULL;
static ERL_NIF_TERM ATOM_DBS_FULL; static ERL_NIF_TERM ATOM_DBS_FULL;
static ERL_NIF_TERM ATOM_DEPTH;
static ERL_NIF_TERM ATOM_ENTRIES;
static ERL_NIF_TERM ATOM_ERROR;
static ERL_NIF_TERM ATOM_EXISTS;
static ERL_NIF_TERM ATOM_INCOMPATIBLE;
static ERL_NIF_TERM ATOM_KEYEXIST;
static ERL_NIF_TERM ATOM_KEYEXIST;
static ERL_NIF_TERM ATOM_LEAF_PAGES;
static ERL_NIF_TERM ATOM_MAP_FULL;
static ERL_NIF_TERM ATOM_MAP_RESIZED;
static ERL_NIF_TERM ATOM_NOTFOUND;
static ERL_NIF_TERM ATOM_NOT_FOUND;
static ERL_NIF_TERM ATOM_OK;
static ERL_NIF_TERM ATOM_OVERFLOW_PAGES;
static ERL_NIF_TERM ATOM_PAGE_FULL;
static ERL_NIF_TERM ATOM_PAGE_NOTFOUND;
static ERL_NIF_TERM ATOM_PANIC;
static ERL_NIF_TERM ATOM_PSIZE;
static ERL_NIF_TERM ATOM_READERS_FULL; static ERL_NIF_TERM ATOM_READERS_FULL;
static ERL_NIF_TERM ATOM_TLS_FULL; static ERL_NIF_TERM ATOM_TLS_FULL;
static ERL_NIF_TERM ATOM_TRUE;
static ERL_NIF_TERM ATOM_TXN_FULL; static ERL_NIF_TERM ATOM_TXN_FULL;
static ERL_NIF_TERM ATOM_CURSOR_FULL; static ERL_NIF_TERM ATOM_VERSION_MISMATCH;
static ERL_NIF_TERM ATOM_PAGE_FULL;
static ERL_NIF_TERM ATOM_MAP_RESIZED;
static ERL_NIF_TERM ATOM_INCOMPATIBLE;
static ERL_NIF_TERM ATOM_BAD_RSLOT;
#define CHECK(expr, label) \ #define CHECK(expr, label) \
if (MDB_SUCCESS != (ret = (expr))) { \ if (MDB_SUCCESS != (ret = (expr))) { \
@ -191,16 +199,16 @@ __strerror_term(ErlNifEnv* env, int err)
* argv[5] maxdbs * argv[5] maxdbs
*/ */
ASYNC_NIF_DECL( ASYNC_NIF_DECL(
lmdb_env_open_nif, lmdb_env_open,
{ // struct { // struct
char dirname[MAXPATHLEN]; char dirname[MAXPATHLEN];
unsigned int flags; unsigned int flags;
mdb_mod_t mode; mdb_mode_t mode;
size_t mapsize; size_t mapsize;
unsigned int maxreaders; unsigned int maxreaders;
MDB_dbi maxdbs; MDB_dbi maxdbs;
struct wterl_priv_data *priv; struct lmdb_priv_data *priv;
}, },
{ // pre { // pre
@ -215,54 +223,46 @@ ASYNC_NIF_DECL(
} }
if (enif_get_string(env, argv[0], args->dirname, MAXPATHLEN, ERL_NIF_LATIN1) <= 0) if (enif_get_string(env, argv[0], args->dirname, MAXPATHLEN, ERL_NIF_LATIN1) <= 0)
ASYNC_NIF_RETURN_BADARG(); ASYNC_NIF_RETURN_BADARG();
enif_get_uint32(env, argv[1], &(args->flags)); enif_get_uint(env, argv[1], &(args->flags));
enif_get_int32(env, argv[2], &(args->mode)); enif_get_uint(env, argv[2], &(args->mode));
#if (__SIZEOF_SIZE_T__ == 8) #if (__SIZEOF_SIZE_T__ == 8)
enif_get_int64(env, argv[3], &(args->mapsize)); enif_get_uint64(env, argv[3], &(args->mapsize));
#else if (__SIZEOF_SIZE_T__ == 4) #elif (__SIZEOF_SIZE_T__ == 4)
enif_get_int32(env, argv[3], &(args->mapsize)); enif_get_int(env, argv[3], &(args->mapsize));
#endif #endif
enif_get_int32(env, argv[4], &(args->maxreaders)); enif_get_uint(env, argv[4], &(args->maxreaders));
enif_get_int32(env, argv[5], &(args->maxdbs)); enif_get_uint(env, argv[5], &(args->maxdbs));
args->priv = (struct lmdb_priv_data *)enif_priv_data(env); args->priv = (struct lmdb_priv_data *)enif_priv_data(env);
}, },
{ // work { // work
int ret; int ret;
ERL_NIF_TERM err; ERL_NIF_TERM err;
struct lmdb_env *handle; struct lmdb *handle;
khash_t(envs) *h;
khiter_t itr;
int itr_status;
if ((handle = enif_alloc_resource(lmdb_RESOURCE, sizeof(struct lmdb))) == NULL) if ((handle = enif_alloc_resource(lmdb_RESOURCE, sizeof(struct lmdb))) == NULL)
FAIL_ERR(ENOMEM, err2); FAIL_ERR(ENOMEM, err2);
STAT_INIT(handle, lmdb_env_get);
STAT_INIT(handle, lmdb_env_put);
STAT_INIT(handle, lmdb_env_upd);
STAT_INIT(handle, lmdb_env_del);
CHECK(mdb_env_create(&(handle->env)), err1); CHECK(mdb_env_create(&(handle->env)), err1);
if (mdb_env_set_mapsize(handle->env, args->mapsize)) { if (mdb_env_set_mapsize(handle->env, args->mapsize)) {
err = enif_make_badarg(handle->env); err = enif_make_badarg(env);
goto err1; goto err1;
} }
if (mdb_env_set_maxreaders(handle->env, args->maxreaders)) { if (mdb_env_set_maxreaders(handle->env, args->maxreaders)) {
err = enif_make_badarg(handle->env); err = enif_make_badarg(env);
goto err1; goto err1;
} }
if (mdb_env_set_maxdbs(handle->env, args->maxdbs)) { if (mdb_env_set_maxdbs(handle->env, args->maxdbs)) {
err = enif_make_badarg(handle->env); err = enif_make_badarg(env);
goto err1; goto err1;
} }
h = args->priv->envs; enif_mutex_lock(args->priv->envs_mutex);
itr = kh_put(envs, h, handle, &itr_status); STAILQ_INSERT_TAIL(&args->priv->envs, handle, entries);
kh_value(h, itr) = handle; enif_mutex_unlock(args->priv->envs_mutex);
ERL_NIF_TERM term = enif_make_resource(env, handle); ERL_NIF_TERM term = enif_make_resource(env, handle);
ASYNC_NIF_REPLY(enif_make_tuple(env, 2, ATOM_OK, term)); ASYNC_NIF_REPLY(enif_make_tuple(env, 2, ATOM_OK, term));
@ -287,17 +287,17 @@ ASYNC_NIF_DECL(
* argv[1] destination path * argv[1] destination path
*/ */
ASYNC_NIF_DECL( ASYNC_NIF_DECL(
lmdb_copy_nif, lmdb_copy,
{ // struct { // struct
struct lmdb_env *handle; struct lmdb *handle;
char dirname[MAXPATHLEN]; char dirname[MAXPATHLEN];
}, },
{ // pre { // pre
if (!(argc == 2 && if (!(argc == 2 &&
enif_get_resource(env, argv[0], lmdb_RESOURCE, (void**)&args->handle) && enif_get_resource(env, argv[0], lmdb_RESOURCE, (void**)&args->handle) &&
enif_is_list(env, arg[1])) { enif_is_list(env, argv[1]))) {
ASYNC_NIF_RETURN_BADARG(); ASYNC_NIF_RETURN_BADARG();
} }
if (enif_get_string(env, argv[1], args->dirname, MAXPATHLEN, if (enif_get_string(env, argv[1], args->dirname, MAXPATHLEN,
@ -312,7 +312,7 @@ ASYNC_NIF_DECL(
ERL_NIF_TERM err; ERL_NIF_TERM err;
int ret; int ret;
CHECK(mdb_env_copy(args->handle->env, args->dirname, err)); CHECK(mdb_env_copy(args->handle->env, args->dirname), err);
ASYNC_NIF_REPLY(ATOM_OK); ASYNC_NIF_REPLY(ATOM_OK);
return; return;
@ -323,15 +323,15 @@ ASYNC_NIF_DECL(
{ // post { // post
enif_release_resource((void*)args->handle); enif_release_resource((void*)args->handle);
}). });
/** /**
* ?? * Return statistics about the MDB environment.
* *
* argv[0] ?? * argv[0] an environment handle
*/ */
ASYNC_NIF_DECL( ASYNC_NIF_DECL(
lmdb_??_nif, lmdb_stat,
{ // struct { // struct
struct lmdb *handle; struct lmdb *handle;
@ -349,20 +349,77 @@ ASYNC_NIF_DECL(
{ // work { // work
ERL_NIF_TERM err; ERL_NIF_TERM err;
ERL_NIF_TERM term;
MDB_stat stats;
int ret; int ret;
CHECK(??, err); CHECK(mdb_env_stat(args->handle->env, &stats), err1);
term = enif_make_tuple(env, 6,
enif_make_tuple(env, 2, ATOM_PSIZE, enif_make_uint(env, stats.ms_psize)),
enif_make_tuple(env, 2, ATOM_DEPTH, enif_make_uint(env, stats.ms_depth)),
enif_make_tuple(env, 2, ATOM_BRANCH_PAGES, enif_make_uint(env, stats.ms_branch_pages)),
enif_make_tuple(env, 2, ATOM_LEAF_PAGES, enif_make_uint(env, stats.ms_leaf_pages)),
enif_make_tuple(env, 2, ATOM_OVERFLOW_PAGES, enif_make_uint(env, stats.ms_overflow_pages)),
enif_make_tuple(env, 2, ATOM_ENTRIES, enif_make_uint(env, stats.ms_entries)));
ASYNC_NIF_REPLY(enif_make_tuple(env, 2, ATOM_OK, term)); ASYNC_NIF_REPLY(enif_make_tuple(env, 2, ATOM_OK, term));
return; return;
err: err1:
ASYNC_NIF_REPLY(err); ASYNC_NIF_REPLY(err);
return; return;
}, },
{ // post { // post
enif_release_resource((void*)args->handle); enif_release_resource((void*)args->handle);
}). });
/**
* Flush the data buffers to disk.
*
* argv[0] an environment handle
* argv[1] if true, force a synchronous flush. Otherwise if the environment
* has the ?MDB_NOSYNC flag set the flushes will be omitted, and
* with ?MDB_MAPASYNC they will be asynchronous.
*/
ASYNC_NIF_DECL(
lmdb_sync,
{ // struct
struct lmdb *handle;
int force;
},
{ // pre
if (!(argc == 2 &&
enif_get_resource(env, argv[0], lmdb_RESOURCE, (void**)&args->handle) &&
enif_is_atom(env, argv[1]))) {
ASYNC_NIF_RETURN_BADARG();
}
args->force = enif_is_identical(ATOM_TRUE, argv[1]) ? 1 : 0;
if (!args->handle->env)
ASYNC_NIF_RETURN_BADARG();
enif_keep_resource((void*)args->handle);
},
{ // work
ERL_NIF_TERM err;
int ret;
CHECK(mdb_env_sync(args->handle->env, args->force), err1);
ASYNC_NIF_REPLY(enif_make_tuple(env, 1, ATOM_OK));
return;
err1:
ASYNC_NIF_REPLY(err);
return;
},
{ // post
enif_release_resource((void*)args->handle);
});
/** /**
* Opens a MDB database. * Opens a MDB database.
@ -825,7 +882,6 @@ lmdb_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
{ {
UNUSED(load_info); UNUSED(load_info);
int err; int err;
char msg[1024];
ErlNifResourceFlags flags; ErlNifResourceFlags flags;
struct lmdb_priv_data *priv; struct lmdb_priv_data *priv;
@ -835,21 +891,20 @@ lmdb_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
FAIL_ERR(ENOMEM, err1); FAIL_ERR(ENOMEM, err1);
memset(priv, 0, sizeof(struct lmdb_priv_data)); memset(priv, 0, sizeof(struct lmdb_priv_data));
priv->envs_mutex = enif_mutex_create(NULL); priv->envs_mutex = enif_mutex_create(NULL);
priv->envs = kh_init(envs); STAILQ_INIT(&priv->envs);
if (!priv->envs)
FAIL_ERR(ENOMEM, err2);
/* Note: !!! the first element of our priv_data struct *must* be the /* Note: !!! the first element of our priv_data struct *must* be the
pointer to the async_nif's private data which we set here. */ pointer to the async_nif's private data which we set here. */
ASYNC_NIF_LOAD(lmdb, priv->async_nif_priv); ASYNC_NIF_LOAD(lmdb, priv->async_nif_priv);
if (!priv) if (!priv)
FAIL_ERR(ENOMEM, err3); FAIL_ERR(ENOMEM, err2);
*priv_data = priv; *priv_data = priv;
ATOM_ERROR = enif_make_atom(env, "error"); ATOM_ERROR = enif_make_atom(env, "error");
ATOM_OK = enif_make_atom(env, "ok"); ATOM_OK = enif_make_atom(env, "ok");
ATOM_NOT_FOUND = enif_make_atom(env, "not_found"); ATOM_NOT_FOUND = enif_make_atom(env, "not_found");
ATOM_EXISTS = enif_make_atom(env, "exists"); ATOM_EXISTS = enif_make_atom(env, "exists");
ATOM_TRUE = enif_make_atom(env, "true");
ATOM_KEYEXIST = enif_make_atom(env, "key_exist"); ATOM_KEYEXIST = enif_make_atom(env, "key_exist");
ATOM_NOTFOUND = enif_make_atom(env, "notfound"); ATOM_NOTFOUND = enif_make_atom(env, "notfound");
@ -867,18 +922,25 @@ lmdb_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
ATOM_INCOMPATIBLE = enif_make_atom(env, "incompatible"); ATOM_INCOMPATIBLE = enif_make_atom(env, "incompatible");
ATOM_BAD_RSLOT = enif_make_atom(env, "bad_rslot"); ATOM_BAD_RSLOT = enif_make_atom(env, "bad_rslot");
ATOM_PSIZE = enif_make_atom(env, "psize");
ATOM_DEPTH = enif_make_atom(env, "depth");
ATOM_BRANCH_PAGES = enif_make_atom(env, "branch_pages");
ATOM_LEAF_PAGES = enif_make_atom(env, "leaf_pages");
ATOM_OVERFLOW_PAGES = enif_make_atom(env, "overflow_pages");
ATOM_ENTRIES = enif_make_atom(env, "entries");
lmdb_RESOURCE = enif_open_resource_type(env, NULL, "lmdb_resource", lmdb_RESOURCE = enif_open_resource_type(env, NULL, "lmdb_resource",
NULL, flags, NULL); NULL, flags, NULL);
fprintf(stderr, "NIF on_load complete (lmdb version: %s)", MDB_VERSION_STRING); fprintf(stderr, "NIF on_load complete (lmdb version: %s)", MDB_VERSION_STRING);
fflush(stderr); fflush(stderr);
return (0); return (0);
err3:
kh_destroy(envs, priv->envs);
err2: err2:
enif_mutex_destroy(priv->conns_mutex); enif_mutex_destroy(priv->envs_mutex);
enif_free(priv); enif_free(priv);
err1: err1:
return (ENOMEM); return (err);
} }
/** /**
@ -916,24 +978,13 @@ static void
lmdb_unload(ErlNifEnv* env, void* priv_data) lmdb_unload(ErlNifEnv* env, void* priv_data)
{ {
struct lmdb_priv_data *priv = (struct lmdb_priv_data *)priv_data; struct lmdb_priv_data *priv = (struct lmdb_priv_data *)priv_data;
khash_t(envs) *h; struct lmdb *handle;
khiter_t itr_envs;
struct lmdb_env *env;
enif_mutex_lock(priv->envs_mutex); enif_mutex_lock(priv->envs_mutex);
h = priv->envs; STAILQ_FOREACH(handle, &priv->envs, entries) {
for (itr_envs = kh_begin(h); itr_envs != kh_end(h); ++itr_envs) { mdb_env_close(handle->env);
if (kh_exist(h, itr_envs)) { enif_free(handle);
env = kh_val(h, itr_envs);
if (env) {
mdb_env_close(env);
kh_del(envs, h, itr_envs);
enif_free(env);
kh_value(h, itr_envs) = NULL;
}
}
} }
kh_destroy(envs, h);
ASYNC_NIF_UNLOAD(lmdb, env, priv->async_nif_priv); ASYNC_NIF_UNLOAD(lmdb, env, priv->async_nif_priv);
enif_mutex_unlock(priv->envs_mutex); enif_mutex_unlock(priv->envs_mutex);
enif_mutex_destroy(priv->envs_mutex); enif_mutex_destroy(priv->envs_mutex);

View file

@ -6,6 +6,7 @@
{cover_enabled, true}. {cover_enabled, true}.
{erl_opts, [%{d,'DEBUG',true}, {erl_opts, [%{d,'DEBUG',true},
%native, {hipe, [o3,verbose]}, inline, {inline_size, 1024},
debug_info, debug_info,
warn_unused_vars, warn_unused_vars,
warn_export_all, warn_export_all,

View file

@ -27,11 +27,18 @@
{duration, 480}. {duration, 480}.
{concurrent, 32}. {concurrent, 32}.
{driver, basho_bench_driver_lmdb}. {driver, basho_bench_driver_lmdb}.
{report_interval, 1}. % sec, default was 10
{pb_timeout_general, 1000}. % ms, default was 60 sec
%{pb_timeout_read, ?}.
%{pb_timeout_write, ?}.
%{pb_timeout_listkeys, ?}.
%{pb_timeout_mapreduce, ?}.
{key_generator, {int_to_bin_littleendian,{uniform_int, 5000000000}}}. {key_generator, {int_to_bin_littleendian,{uniform_int, 5000000000}}}.
{key_generator, {int_to_bin_littleendian,{pareto_int, 5000000000}}}.
{value_generator, {highly_compressible_bin, 2048}}. {value_generator, {highly_compressible_bin, 2048}}.
%{value_generator, {fixed_bin, 1024}}. %{value_generator, {fixed_bin, 1024}}.
{operations, [{get, 25}, {put, 70}, {delete, 5}]}. %{operations, [{get, 25}, {put, 70}, {delete, 5}]}.
%{operations, [{put, 1}]}. {operations, [{put, 1}]}.
{code_paths, ["../lmdb"]}. {code_paths, ["../lmdb"]}.
{lmdb_dir, "/home/gburd/ws/basho_bench/data"}. {lmdb_dir, "/home/gburd/ws/basho_bench/data"}.