Re-worked code to use async_nif among other things, more to come.

This commit is contained in:
Gregory Burd 2013-05-19 00:19:01 -04:00
parent 2945d0096f
commit f2c5ff30e7
24 changed files with 4452 additions and 1483 deletions

View file

@ -5,9 +5,9 @@ MODULE = emdb
DIALYZER = dialyzer
REBAR = rebar
.PHONY: build clean
.PHONY: compile clean
all: ebin priv build
all: ebin priv compile
ebin:
@mkdir -p $@
@ -15,10 +15,51 @@ ebin:
priv:
@mkdir -p $@
build:
compile:
@$(REBAR) compile
clean:
@$(REBAR) clean
@rm -f *~ */*~ erl_crash.dump
@rm -rf ebin priv
xref:
@$(REBAR) xref skip_deps=true
test: eunit
eunit: compile-for-eunit
@$(REBAR) eunit skip_deps=true
eqc: compile-for-eqc
@$(REBAR) eqc skip_deps=true
proper: compile-for-proper
@echo "rebar does not implement a 'proper' command" && false
triq: compile-for-triq
@$(REBAR) triq skip_deps=true
compile-for-eunit:
@$(REBAR) compile eunit compile_only=true
compile-for-eqc:
@$(REBAR) -D QC -D QC_EQC compile eqc compile_only=true
compile-for-eqcmini:
@$(REBAR) -D QC -D QC_EQCMINI compile eqc compile_only=true
compile-for-proper:
@$(REBAR) -D QC -D QC_PROPER compile eqc compile_only=true
compile-for-triq:
@$(REBAR) -D QC -D QC_TRIQ compile triq compile_only=true
plt: compile
@$(DIALYZER) --build_plt --output_plt .$(TARGET).plt -pa deps/lager/ebin --apps kernel stdlib
analyze: compile
@$(DIALYZER) --plt .$(TARGET).plt -pa deps/lager/ebin ebin
repl:
@$(ERL) -pa ebin -pz deps/lager/ebin

105
README.md
View file

@ -1,4 +1,4 @@
EMDB ==== EMDB is a NIF library for the [Memory-Mapped Database](http://highlandsun.com/hyc/mdb/) database, aka. MDB. The main purpose of this package is to provide a **very fast** Riak [backend](http://wiki.basho.com/Storage-Backends.html).
EMDB ==== EMDB is a NIF library for the [Memory-Mapped Database](http://highlandsun.com/hyc/mdb/) database, aka. MDB. The main purpose of this package is to provide a **very fast** Riak [backend](http://wiki.basho.com/Storage-Backends.html).
But this module could also be used as a general key-value store to replace:
@ -26,65 +26,86 @@ But this module could also be used as a general key-value store to replace:
* `drop/1`: deletes all key-value pairs in the database.
Usage ----- $ make
Usage
-----
$ make
$ ./start.sh
%% create a new database
1> {ok, Handle} = emdb:open("/tmp/emdb1").
%% create a new database 1> {ok, Handle} = emdb:open("/tmp/emdb1").
%% insert the key <<"a">> with value <<"1">>
2> ok = emdb:put(Handle, <<"a">>, <<"1">>).
%% insert the key <<"a">> with value <<"1">> 2> ok = Handle:put(<<"a">>, <<"1">>).
%% try to re-insert the same key <<"a">>
3> key_exist = emdb:put(Handle, <<"a">>, <<"2">>).
%% try to re-insert the same key <<"a">> 3> key_exist = Handle:put(<<"a">>, <<"2">>).
%% add a new key-value pair
4> ok = emdb:put(Handle, <<"b">>, <<"2">>).
%% add a new key-value pair 4> ok = Handle:put(<<"b">>, <<"2">>).
%% search a non-existing key <<"c">>
5> none = emdb:get(Handle, <<"c">>).
%% search a non-existing key <<"c">> 5> none = Handle:get(<<"c">>).
%% retrieve the value for key <<"b">>
6> {ok, <<"2">>} = emdb:get(Handle, <<"b">>).
%% retrieve the value for key <<"b">> 6> {ok, <<"2">>} = Handle:get(<<"b">>).
%% retrieve the value for key <<"a">>
7> {ok, <<"1">>} = emdb:get(Handle, <<"a">>).
%% retrieve the value for key <<"a">> 7> {ok, <<"1">>} = Handle:get(<<"a">>).
%% delete key <<"b">> 8> ok = Handle:del(<<"b">>).
%% delete key <<"b">>
8> ok = emdb:del(Handle, <<"b">>).
%% search a non-existing key <<"b">>
9> none = Handle:get(<<"b">>).
9> none = emdb:get(Handle, <<"b">>).
%% delete a non-existing key <<"z">> 10> none = Handle:del(<<"z">>).
%% delete a non-existing key <<"z">>
10> none = emdb:del(Handle, <<"z">>).
%% ensure key <<"a">>'s value is still <<"1">>
11> {ok, <<"1">>} = emdb:get(Handle, <<"a">>).
%% ensure key <<"a">>'s value is still <<"1">> 11> {ok, <<"1">>} = Handle:get(<<"a">>).
%% update the value for key <<"a">>
12> ok = Handle:update(<<"a">>, <<"7">>).
12> ok = emdb:update(Handle, <<"a">>, <<"7">>).
%% check the new value for key <<"a">>
13> {ok, <<"7">>} = Handle:get(<<"a">>).
13> {ok, <<"7">>} = emdb:get(Handle, <<"a">>).
%% delete all key-value pairs in the database 14> ok = Handle:drop().
%% delete all key-value pairs in the database
14> ok = emdb:drop(Handle).
%% try to retrieve key <<"a">> value 15> none = Handle:get(<<"a">>).
%% try to retrieve key <<"a">> value
15> none = emdb:get(Handle, <<"a">>).
%% close the database 16> ok = Handle:close().
%% close the database
16> ok = emdb:close(Handle).
...
17> q().
####Note:
17> q().
#### Note:
The code below creates a new database with **80GB** MapSize, **avoid fsync**
after each commit (for max speed) and use the experimental **MDB_FIXEDMAP**. {ok, Handle} = emdb:open("/tmp/emdb2", 85899345920, ?MDB_NOSYNC bor ?MDB_FIXEDMAP).
Performance ----------- For maximum speed, this library use only binaries for both keys and values.
See the impressive [microbench](http://highlandsun.com/hyc/mdb/microbench/) against:
after each commit (for max speed) and use the experimental **MDB_FIXEDMAP**.
* Google's LevelDB
* SQLite
{ok, Handle} = emdb:open("/tmp/emdb2", 85899345920, ?MDB_NOSYNC bor ?MDB_FIXEDMAP).
Performance
-----------
For maximum speed, this library use only binaries for both keys and values.
See the impressive [microbench](http://highlandsun.com/hyc/mdb/microbench/) against:
* Google's LevelDB (which is slower and can stall unlike Basho's fork of LevelDB)
* SQLite3
* Kyoto TreeDB
* BerkeleyDB
* BerkeleyDB 5.x
MDB performs better on 64-bit arch.
Supported OSes --------------
Supported Operating Systems
--------------
Should work on 32/64-bit architectures:
@ -93,14 +114,24 @@ Should work on 32/64-bit architectures:
* FreeBSD
* Windows
TODO ----
TODO
----
* Unit tests * PropEr testing
* Fold over keys and/or values
* Unit tests
* PropEr testing
* Bulk "writing"
* basho_bench driver
* EQC, PULSE testing
* Key expirey
* Atomic group commit (for 2i)
Volunteers are always welcome! Status
Volunteers are always welcome!
Status
------
#### Work in progress. Don't use it in production!
LICENSE -------
EMDB is Copyright (C) 2012 by Aleph Archives, and released under the [OpenLDAP](http://www.OpenLDAP.org/license.html) License.
LICENSE
-------
EMDB is Copyright (C) 2012-2013 by Aleph Archives and Basho Technologies, Inc., and released under the [OpenLDAP](http://www.OpenLDAP.org/license.html) License.

533
c_src/async_nif.h Normal file
View file

@ -0,0 +1,533 @@
/*
* async_nif: An async thread-pool layer for Erlang's NIF API
*
* Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
* Author: Gregory Burd <greg@basho.com> <greg@burd.me>
*
* This file is provided to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef __ASYNC_NIF_H__
#define __ASYNC_NIF_H__
#if defined(__cplusplus)
extern "C" {
#endif
#include <assert.h>
#include "fifo_q.h"
#include "stats.h"
#ifndef __UNUSED
#define __UNUSED(v) ((void)(v))
#endif
#define ASYNC_NIF_MAX_WORKERS 128
#define ASYNC_NIF_WORKER_QUEUE_SIZE 500
#define ASYNC_NIF_MAX_QUEUED_REQS 1000 * ASYNC_NIF_MAX_WORKERS
STAT_DECL(qwait, 1000);
struct async_nif_req_entry {
ERL_NIF_TERM ref;
ErlNifEnv *env;
ErlNifPid pid;
void *args;
void (*fn_work)(ErlNifEnv*, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *);
void (*fn_post)(void *);
};
DECL_FIFO_QUEUE(reqs, struct async_nif_req_entry);
struct async_nif_work_queue {
STAT_DEF(qwait);
ErlNifMutex *reqs_mutex;
ErlNifCond *reqs_cnd;
FIFO_QUEUE_TYPE(reqs) reqs;
};
struct async_nif_worker_entry {
ErlNifTid tid;
unsigned int worker_id;
struct async_nif_state *async_nif;
struct async_nif_work_queue *q;
};
struct async_nif_state {
STAT_DEF(qwait);
unsigned int shutdown;
unsigned int num_workers;
struct async_nif_worker_entry worker_entries[ASYNC_NIF_MAX_WORKERS];
unsigned int num_queues;
unsigned int next_q;
FIFO_QUEUE_TYPE(reqs) recycled_reqs;
unsigned int num_reqs;
ErlNifMutex *recycled_req_mutex;
struct async_nif_work_queue queues[];
};
#define ASYNC_NIF_DECL(decl, frame, pre_block, work_block, post_block) \
struct decl ## _args frame; \
static void fn_work_ ## decl (ErlNifEnv *env, ERL_NIF_TERM ref, ErlNifPid *pid, unsigned int worker_id, struct decl ## _args *args) { \
__UNUSED(worker_id); \
do work_block while(0); \
} \
static void fn_post_ ## decl (struct decl ## _args *args) { \
__UNUSED(args); \
do post_block while(0); \
} \
static ERL_NIF_TERM decl(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv_in[]) { \
struct decl ## _args on_stack_args; \
struct decl ## _args *args = &on_stack_args; \
struct decl ## _args *copy_of_args; \
struct async_nif_req_entry *req = NULL; \
const char *affinity = NULL; \
ErlNifEnv *new_env = NULL; \
/* argv[0] is a ref used for selective recv */ \
const ERL_NIF_TERM *argv = argv_in + 1; \
argc -= 1; \
/* Note: !!! this assumes that the first element of priv_data is ours */ \
struct async_nif_state *async_nif = *(struct async_nif_state**)enif_priv_data(env); \
if (async_nif->shutdown) \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "shutdown")); \
req = async_nif_reuse_req(async_nif); \
new_env = req->env; \
if (!req) \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "eagain")); \
do pre_block while(0); \
copy_of_args = (struct decl ## _args *)enif_alloc(sizeof(struct decl ## _args)); \
if (!copy_of_args) { \
fn_post_ ## decl (args); \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "enomem")); \
} \
memcpy(copy_of_args, args, sizeof(struct decl ## _args)); \
req->ref = enif_make_copy(new_env, argv_in[0]); \
enif_self(env, &req->pid); \
req->args = (void*)copy_of_args; \
req->fn_work = (void (*)(ErlNifEnv *, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *))fn_work_ ## decl ; \
req->fn_post = (void (*)(void *))fn_post_ ## decl; \
int h = -1; \
if (affinity) \
h = async_nif_str_hash_func(affinity) % async_nif->num_queues; \
ERL_NIF_TERM reply = async_nif_enqueue_req(async_nif, req, h); \
if (!reply) { \
fn_post_ ## decl (args); \
enif_free(copy_of_args); \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "shutdown")); \
} \
return reply; \
}
#define ASYNC_NIF_INIT(name) \
static ErlNifMutex *name##_async_nif_coord = NULL;
#define ASYNC_NIF_LOAD(name, priv) do { \
if (!name##_async_nif_coord) \
name##_async_nif_coord = enif_mutex_create(NULL); \
enif_mutex_lock(name##_async_nif_coord); \
priv = async_nif_load(); \
enif_mutex_unlock(name##_async_nif_coord); \
} while(0);
#define ASYNC_NIF_UNLOAD(name, env, priv) do { \
if (!name##_async_nif_coord) \
name##_async_nif_coord = enif_mutex_create(NULL); \
enif_mutex_lock(name##_async_nif_coord); \
async_nif_unload(env, priv); \
enif_mutex_unlock(name##_async_nif_coord); \
enif_mutex_destroy(name##_async_nif_coord); \
name##_async_nif_coord = NULL; \
} while(0);
#define ASYNC_NIF_UPGRADE(name, env) do { \
if (!name##_async_nif_coord) \
name##_async_nif_coord = enif_mutex_create(NULL); \
enif_mutex_lock(name##_async_nif_coord); \
async_nif_upgrade(env); \
enif_mutex_unlock(name##_async_nif_coord); \
} while(0);
#define ASYNC_NIF_RETURN_BADARG() do { \
async_nif_recycle_req(req, async_nif); \
return enif_make_badarg(env); \
} while(0);
#define ASYNC_NIF_WORK_ENV new_env
#define ASYNC_NIF_REPLY(msg) enif_send(NULL, pid, env, enif_make_tuple2(env, ref, msg))
/**
* Return a request structure from the recycled req queue if one exists,
* otherwise create one.
*/
struct async_nif_req_entry *
async_nif_reuse_req(struct async_nif_state *async_nif)
{
struct async_nif_req_entry *req = NULL;
ErlNifEnv *env = NULL;
enif_mutex_lock(async_nif->recycled_req_mutex);
if (fifo_q_empty(reqs, async_nif->recycled_reqs)) {
if (async_nif->num_reqs < ASYNC_NIF_MAX_QUEUED_REQS) {
req = enif_alloc(sizeof(struct async_nif_req_entry));
if (req) {
memset(req, 0, sizeof(struct async_nif_req_entry));
env = enif_alloc_env();
if (!env) {
enif_free(req);
req = NULL;
} else {
req->env = env;
async_nif->num_reqs++;
}
}
}
} else {
req = fifo_q_get(reqs, async_nif->recycled_reqs);
}
enif_mutex_unlock(async_nif->recycled_req_mutex);
STAT_TICK(async_nif, qwait);
return req;
}
/**
* Store the request for future re-use.
*
* req a request entry with an ErlNifEnv* which will be cleared
* before reuse, but not until then.
* async_nif a handle to our state so that we can find and use the mutex
*/
void
async_nif_recycle_req(struct async_nif_req_entry *req, struct async_nif_state *async_nif)
{
STAT_TOCK(async_nif, qwait);
enif_mutex_lock(async_nif->recycled_req_mutex);
fifo_q_put(reqs, async_nif->recycled_reqs, req);
enif_mutex_unlock(async_nif->recycled_req_mutex);
}
/**
* A string hash function.
*
* A basic hash function for strings of characters used during the
* affinity association.
*
* s a NULL terminated set of bytes to be hashed
* -> an integer hash encoding of the bytes
*/
static inline unsigned int
async_nif_str_hash_func(const char *s)
{
unsigned int h = (unsigned int)*s;
if (h) for (++s ; *s; ++s) h = (h << 5) - h + (unsigned int)*s;
return h;
}
/**
* Enqueue a request for processing by a worker thread.
*
* Places the request into a work queue determined either by the
* provided affinity or by iterating through the available queues.
*/
static ERL_NIF_TERM
async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req, int hint)
{
/* Identify the most appropriate worker for this request. */
unsigned int qid = 0;
struct async_nif_work_queue *q = NULL;
unsigned int n = async_nif->num_queues;
/* Either we're choosing a queue based on some affinity/hinted value or we
need to select the next queue in the rotation and atomically update that
global value (next_q is shared across worker threads) . */
if (hint >= 0) {
qid = (unsigned int)hint;
} else {
qid = async_nif->next_q;
qid = (qid + 1) % async_nif->num_queues;
async_nif->next_q = qid;
}
/* Now we inspect and interate across the set of queues trying to select one
that isn't too full or too slow. */
do {
q = &async_nif->queues[qid];
enif_mutex_lock(q->reqs_mutex);
/* Now that we hold the lock, check for shutdown. As long as we hold
this lock either a) we're shutting down so exit now or b) this queue
will be valid until we release the lock. */
if (async_nif->shutdown) {
enif_mutex_unlock(q->reqs_mutex);
return 0;
}
double await = STAT_MEAN_LOG2_SAMPLE(async_nif, qwait);
double await_inthisq = STAT_MEAN_LOG2_SAMPLE(q, qwait);
if (fifo_q_full(reqs, q->reqs) || await_inthisq > await) {
enif_mutex_unlock(q->reqs_mutex);
qid = (qid + 1) % async_nif->num_queues;
q = &async_nif->queues[qid];
} else {
break;
}
// TODO: at some point add in work sheading/stealing
} while(n-- > 0);
/* We hold the queue's lock, and we've seletect a reasonable queue for this
new request so add the request. */
STAT_TICK(q, qwait);
fifo_q_put(reqs, q->reqs, req);
/* Build the term before releasing the lock so as not to race on the use of
the req pointer (which will soon become invalid in another thread
performing the request). */
ERL_NIF_TERM reply = enif_make_tuple2(req->env, enif_make_atom(req->env, "ok"),
enif_make_atom(req->env, "enqueued"));
enif_mutex_unlock(q->reqs_mutex);
enif_cond_signal(q->reqs_cnd);
return reply;
}
/**
* TODO:
*/
static void *
async_nif_worker_fn(void *arg)
{
struct async_nif_worker_entry *we = (struct async_nif_worker_entry *)arg;
unsigned int worker_id = we->worker_id;
struct async_nif_state *async_nif = we->async_nif;
struct async_nif_work_queue *q = we->q;
struct async_nif_req_entry *req = NULL;
for(;;) {
/* Examine the request queue, are there things to be done? */
enif_mutex_lock(q->reqs_mutex);
check_again_for_work:
if (async_nif->shutdown) {
enif_mutex_unlock(q->reqs_mutex);
break;
}
if (fifo_q_empty(reqs, q->reqs)) {
/* Queue is empty so we wait for more work to arrive. */
STAT_RESET(q, qwait);
enif_cond_wait(q->reqs_cnd, q->reqs_mutex);
goto check_again_for_work;
} else {
assert(fifo_q_size(reqs, q->reqs) > 0);
assert(fifo_q_size(reqs, q->reqs) < fifo_q_capacity(reqs, q->reqs));
/* At this point the next req is ours to process and we hold the
reqs_mutex lock. Take the request off the queue. */
req = fifo_q_get(reqs, q->reqs);
enif_mutex_unlock(q->reqs_mutex);
/* Ensure that there is at least one other worker thread watching this
queue. */
enif_cond_signal(q->reqs_cnd);
/* Perform the work. */
req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args);
STAT_TOCK(q, qwait);
/* Now call the post-work cleanup function. */
req->fn_post(req->args);
/* Clean up req for reuse. */
req->ref = 0;
req->fn_work = 0;
req->fn_post = 0;
enif_free(req->args);
req->args = NULL;
enif_clear_env(req->env);
async_nif_recycle_req(req, async_nif);
req = NULL;
}
}
enif_thread_exit(0);
return 0;
}
static void
async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
{
unsigned int i;
unsigned int num_queues = async_nif->num_queues;
struct async_nif_work_queue *q = NULL;
struct async_nif_req_entry *req = NULL;
__UNUSED(env);
STAT_PRINT(async_nif, qwait, "wterl");
/* Signal the worker threads, stop what you're doing and exit. To
ensure that we don't race with the enqueue() process we first
lock all the worker queues, then set shutdown to true, then
unlock. The enqueue function will take the queue mutex, then
test for shutdown condition, then enqueue only if not shutting
down. */
for (i = 0; i < num_queues; i++) {
q = &async_nif->queues[i];
enif_mutex_lock(q->reqs_mutex);
}
async_nif->shutdown = 1;
for (i = 0; i < num_queues; i++) {
q = &async_nif->queues[i];
enif_cond_broadcast(q->reqs_cnd);
enif_mutex_unlock(q->reqs_mutex);
}
/* Join for the now exiting worker threads. */
for (i = 0; i < async_nif->num_workers; ++i) {
void *exit_value = 0; /* We ignore the thread_join's exit value. */
enif_thread_join(async_nif->worker_entries[i].tid, &exit_value);
}
/* Free req structres sitting on the recycle queue. */
enif_mutex_lock(async_nif->recycled_req_mutex);
req = NULL;
fifo_q_foreach(reqs, async_nif->recycled_reqs, req, {
enif_free_env(req->env);
enif_free(req);
});
fifo_q_free(reqs, async_nif->recycled_reqs);
/* Cleanup in-flight requests, mutexes and conditions in each work queue. */
for (i = 0; i < num_queues; i++) {
q = &async_nif->queues[i];
/* Worker threads are stopped, now toss anything left in the queue. */
req = NULL;
fifo_q_foreach(reqs, q->reqs, req, {
enif_clear_env(req->env);
enif_send(NULL, &req->pid, req->env,
enif_make_tuple2(req->env, enif_make_atom(req->env, "error"),
enif_make_atom(req->env, "shutdown")));
req->fn_post(req->args);
enif_free_env(req->env);
enif_free(req->args);
enif_free(req);
});
fifo_q_free(reqs, q->reqs);
enif_mutex_destroy(q->reqs_mutex);
enif_cond_destroy(q->reqs_cnd);
}
enif_mutex_unlock(async_nif->recycled_req_mutex);
enif_mutex_destroy(async_nif->recycled_req_mutex);
memset(async_nif, 0, sizeof(struct async_nif_state) + (sizeof(struct async_nif_work_queue) * async_nif->num_queues));
enif_free(async_nif);
}
static void *
async_nif_load()
{
static int has_init = 0;
unsigned int i, j, num_queues;
ErlNifSysInfo info;
struct async_nif_state *async_nif;
/* Don't init more than once. */
if (has_init) return 0;
else has_init = 1;
/* Find out how many schedulers there are. */
enif_system_info(&info, sizeof(ErlNifSysInfo));
/* Size the number of work queues according to schedulers. */
if (info.scheduler_threads > ASYNC_NIF_MAX_WORKERS / 2) {
num_queues = ASYNC_NIF_MAX_WORKERS / 2;
} else {
int remainder = ASYNC_NIF_MAX_WORKERS % info.scheduler_threads;
if (remainder != 0)
num_queues = info.scheduler_threads - remainder;
else
num_queues = info.scheduler_threads;
if (num_queues < 2)
num_queues = 2;
}
/* Init our portion of priv_data's module-specific state. */
async_nif = enif_alloc(sizeof(struct async_nif_state) +
sizeof(struct async_nif_work_queue) * num_queues);
if (!async_nif)
return NULL;
memset(async_nif, 0, sizeof(struct async_nif_state) +
sizeof(struct async_nif_work_queue) * num_queues);
async_nif->num_queues = num_queues;
async_nif->num_workers = 2 * num_queues;
async_nif->next_q = 0;
async_nif->shutdown = 0;
async_nif->recycled_reqs = fifo_q_new(reqs, ASYNC_NIF_MAX_QUEUED_REQS);
async_nif->recycled_req_mutex = enif_mutex_create(NULL);
STAT_INIT(async_nif, qwait);
for (i = 0; i < async_nif->num_queues; i++) {
struct async_nif_work_queue *q = &async_nif->queues[i];
q->reqs = fifo_q_new(reqs, ASYNC_NIF_WORKER_QUEUE_SIZE);
q->reqs_mutex = enif_mutex_create(NULL);
q->reqs_cnd = enif_cond_create(NULL);
STAT_INIT(q, qwait);
}
/* Setup the thread pool management. */
memset(async_nif->worker_entries, 0, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS);
/* Start the worker threads. */
for (i = 0; i < async_nif->num_workers; i++) {
struct async_nif_worker_entry *we = &async_nif->worker_entries[i];
we->async_nif = async_nif;
we->worker_id = i;
we->q = &async_nif->queues[i % async_nif->num_queues];
if (enif_thread_create(NULL, &async_nif->worker_entries[i].tid,
&async_nif_worker_fn, (void*)we, NULL) != 0) {
async_nif->shutdown = 1;
for (j = 0; j < async_nif->num_queues; j++) {
struct async_nif_work_queue *q = &async_nif->queues[j];
enif_cond_broadcast(q->reqs_cnd);
}
while(i-- > 0) {
void *exit_value = 0; /* Ignore this. */
enif_thread_join(async_nif->worker_entries[i].tid, &exit_value);
}
for (j = 0; j < async_nif->num_queues; j++) {
struct async_nif_work_queue *q = &async_nif->queues[j];
enif_mutex_destroy(q->reqs_mutex);
enif_cond_destroy(q->reqs_cnd);
}
memset(async_nif->worker_entries, 0, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS);
enif_free(async_nif);
return NULL;
}
}
return async_nif;
}
static void
async_nif_upgrade(ErlNifEnv *env)
{
__UNUSED(env);
// TODO:
}
#if defined(__cplusplus)
}
#endif
#endif // __ASYNC_NIF_H__

61
c_src/common.h Normal file
View file

@ -0,0 +1,61 @@
/*
* Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
* Author: Gregory Burd <greg@basho.com> <greg@burd.me>
*
* This file is provided to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef __COMMON_H__
#define __COMMON_H__
#if defined(__cplusplus)
extern "C" {
#endif
#ifdef DEBUG
#include <stdio.h>
#include <stdarg.h>
#ifndef DPRINTF
#define DPRINTF(fmt, ...) \
do { \
fprintf(stderr, "%s:%d " fmt "\n", __func__, __LINE__, __VA_ARGS__); \
fflush(stderr); \
} while(0)
#endif
#ifndef DPUTS
#define DPUTS(arg) DPRINTF("%s", arg)
#endif
#else
#define DPRINTF(fmt, ...) ((void) 0)
#define DPUTS(arg) ((void) 0)
#endif
#ifndef COMPQUIET
#define COMPQUIET(n, v) do { \
(n) = (v); \
(n) = (n); \
} while (0)
#endif
#ifndef __UNUSED
#define __UNUSED(v) ((void)(v))
#endif
#if defined(__cplusplus)
}
#endif
#endif // __COMMON_H__

98
c_src/duration.h Normal file
View file

@ -0,0 +1,98 @@
/*
* Copyright (C) 2013, all rights reserved by Gregory Burd <greg@burd.me>
*
* This Source Code Form is subject to the terms of the Mozilla Public License,
* version 2 (MPLv2). If a copy of the MPL was not distributed with this file,
* you can obtain one at: http://mozilla.org/MPL/2.0/
*
* NOTES:
* - on some platforms this will require -lrt
*/
#include <stdio.h>
#include <stdint.h>
#include <time.h>
#include <sys/timeb.h>
typedef enum { ns = 0, mcs, ms, s } time_scale;
struct scale_time {
const char *abbreviation;
const char *name;
uint64_t mul, div, overhead, ticks_per;
};
static const struct scale_time scale[] = {
{ "ns", "nanosecond", 1000000000LL, 1LL, 10, 2300000000000LL },
{ "mcs", "microsecond", 1000000LL, 1000LL, 10, 2300000000LL },
{ "ms", "millisecond", 1000LL, 1000000LL, 10, 2300000LL },
{ "sec", "second", 1LL, 1000000000LL, 10, 2300LL } };
static uint64_t ts(time_scale unit)
{
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
return (((uint64_t)ts.tv_sec * scale[unit].mul) +
((uint64_t)ts.tv_nsec / scale[unit].div));
}
#if 0
//if defined(__i386__) || defined(__x86_64__)
/**
* cpu_clock_ticks()
*
* A measure provided by Intel x86 CPUs which provides the number of cycles
* (aka "ticks") executed as a counter using the RDTSC instruction.
*/
static inline uint64_t cpu_clock_ticks()
{
uint32_t lo, hi;
__asm__ __volatile__ (
"xorl %%eax, %%eax\n"
"cpuid\n"
"rdtsc\n"
: "=a" (lo), "=d" (hi)
:
: "%ebx", "%ecx" );
return (uint64_t)hi << 32 | lo;
}
/**
* cpu_clock_ticks()
*
* An approximation of elapsed [ns, mcs, ms, s] from CPU clock ticks.
*/
static uint64_t elapsed_cpu_clock_ticks(uint64_t start, time_scale unit)
{
return (cpu_clock_ticks() - start - scale[unit].overhead) * scale[unit].ticks_per;
}
#endif
typedef struct {
uint64_t then;
time_scale unit;
} duration_t;
static inline uint64_t elapsed(duration_t *d)
{
uint64_t now = ts(d->unit);
uint64_t elapsed = now - d->then;
d->then = now;
return elapsed;
}
#define DURATION(name, resolution) duration_t name = \
{ts(resolution), resolution}
#define ELAPSED_DURING(result, resolution, block) \
do { \
DURATION(__x, resolution); \
do block while(0); \
*result = elapsed(&__x); \
} while(0);
#define CYCLES_DURING(result, block) \
do { \
uint64_t __begin = cpu_clock_ticks(); \
do block while(0); \
*result = cpu_clock_ticks() - __begin; \
} while(0);

722
c_src/emdb.c Normal file
View file

@ -0,0 +1,722 @@
/* -------------------------------------------------------------------------
* This file is part of EMDB - Erlang MDB API
*
* Copyright (c) 2012 by Aleph Archives. All rights reserved.
*
* -------------------------------------------------------------------------
* Redistribution and use in source and binary forms, with or without
* modification, are permitted only as authorized by the OpenLDAP
* Public License.
*
* A copy of this license is available in the file LICENSE in the
* top-level directory of the distribution or, alternatively, at
* <http://www.OpenLDAP.org/license.html>.
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
* -------------------------------------------------------------------------*/
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <sys/param.h>
#include <erl_nif.h>
#include <erl_driver.h>
#include "common.h"
#include "async_nif.h"
#include "stats.h"
#include "lmdb.h"
STAT_DECL(emdb_get, 1000);
STAT_DECL(emdb_put, 1000);
STAT_DECL(emdb_del, 1000);
STAT_DECL(emdb_upd, 1000);
static ErlNifResourceType *emdb_RESOURCE;
struct emdb {
MDB_env *env;
MDB_dbi dbi;
STAT_DEF(emdb_get);
STAT_DEF(emdb_put);
STAT_DEF(emdb_del);
STAT_DEF(emdb_upd);
};
struct emdb_priv_data {
void *async_nif_priv; // Note: must be first element in struct
};
/* Global init for async_nif. */
ASYNC_NIF_INIT(emdb);
/* Atoms (initialized in on_load) */
static ERL_NIF_TERM ATOM_ERROR;
static ERL_NIF_TERM ATOM_OK;
static ERL_NIF_TERM ATOM_NOT_FOUND;
static ERL_NIF_TERM ATOM_EXISTS;
static ERL_NIF_TERM ATOM_KEYEXIST;
static ERL_NIF_TERM ATOM_NOTFOUND;
static ERL_NIF_TERM ATOM_PAGE_NOTFOUND;
static ERL_NIF_TERM ATOM_CORRUPTED;
static ERL_NIF_TERM ATOM_PANIC;
static ERL_NIF_TERM ATOM_VERSION_MISMATCH;
static ERL_NIF_TERM ATOM_KEYEXIST;
static ERL_NIF_TERM ATOM_MAP_FULL;
static ERL_NIF_TERM ATOM_DBS_FULL;
static ERL_NIF_TERM ATOM_READERS_FULL;
static ERL_NIF_TERM ATOM_TLS_FULL;
static ERL_NIF_TERM ATOM_TXN_FULL;
static ERL_NIF_TERM ATOM_CURSOR_FULL;
static ERL_NIF_TERM ATOM_PAGE_FULL;
static ERL_NIF_TERM ATOM_MAP_RESIZED;
static ERL_NIF_TERM ATOM_INCOMPATIBLE;
static ERL_NIF_TERM ATOM_BAD_RSLOT;
#define CHECK(expr, label) \
if (MDB_SUCCESS != (ret = (expr))) { \
DPRINTF("CHECK(\"%s\") failed \"%s\" at %s:%d in %s()\n", \
#expr, mdb_strerror(ret), __FILE__, __LINE__, __func__);\
err = __strerror_term(env, ret); \
goto label; \
}
#define FAIL_ERR(e, label) \
do { \
err = __strerror_term(env, (e)); \
goto label; \
} while(0)
/**
* Convenience function to generate {error, {errno, Reason}}
*
* env NIF environment
* err number of last error
*/
static ERL_NIF_TERM
__strerror_term(ErlNifEnv* env, int err)
{
ERL_NIF_TERM term;
if (err < MDB_LAST_ERRCODE && err > MDB_KEYEXIST) {
switch (err) {
case MDB_KEYEXIST: /** key/data pair already exists */
term = ATOM_KEYEXIST;
break;
case MDB_NOTFOUND: /** key/data pair not found (EOF) */
term = ATOM_NOTFOUND;
break;
case MDB_PAGE_NOTFOUND: /** Requested page not found - this usually indicates corruption */
term = ATOM_PAGE_NOTFOUND;
break;
case MDB_CORRUPTED: /** Located page was wrong type */
term = ATOM_CORRUPTED;
break;
case MDB_PANIC : /** Update of meta page failed, probably I/O error */
term = ATOM_PANIC;
break;
case MDB_VERSION_MISMATCH: /** Environment version mismatch */
term = ATOM_VERSION_MISMATCH;
break;
case MDB_INVALID: /** File is not a valid MDB file */
term = ATOM_KEYEXIST;
break;
case MDB_MAP_FULL: /** Environment mapsize reached */
term = ATOM_MAP_FULL;
break;
case MDB_DBS_FULL: /** Environment maxdbs reached */
term = ATOM_DBS_FULL;
break;
case MDB_READERS_FULL: /** Environment maxreaders reached */
term = ATOM_READERS_FULL;
break;
case MDB_TLS_FULL: /** Too many TLS keys in use - Windows only */
term = ATOM_TLS_FULL;
break;
case MDB_TXN_FULL: /** Txn has too many dirty pages */
term = ATOM_TXN_FULL;
break;
case MDB_CURSOR_FULL: /** Cursor stack too deep - internal error */
term = ATOM_CURSOR_FULL;
break;
case MDB_PAGE_FULL: /** Page has not enough space - internal error */
term = ATOM_PAGE_FULL;
break;
case MDB_MAP_RESIZED: /** Database contents grew beyond environment mapsize */
term = ATOM_MAP_RESIZED;
break;
case MDB_INCOMPATIBLE: /** Database flags changed or would change */
term = ATOM_INCOMPATIBLE;
break;
case MDB_BAD_RSLOT: /** Invalid reuse of reader locktable slot */
term = ATOM_BAD_RSLOT;
break;
}
} else {
term = enif_make_atom(env, erl_errno_id(err));
}
/* We return the errno value as well as the message here because the error
message provided by strerror() for differ across platforms and/or may be
localized to any given language (i18n). Use the errno atom rather than
the message when matching in Erlang. You've been warned. */
return enif_make_tuple(env, 2, ATOM_ERROR,
enif_make_tuple(env, 2, term,
enif_make_string(env, mdb_strerror(err), ERL_NIF_LATIN1)));
}
/**
* Opens a MDB database.
*
* argv[0] path to directory for the database files
* argv[1] size of database
* argv[2] flags
*/
ASYNC_NIF_DECL(
emdb_open,
{ // struct
char dirname[MAXPATHLEN];
ErlNifUInt64 mapsize;
ErlNifUInt64 envflags;
},
{ // pre
if (!(argc == 3 &&
enif_is_list(env, argv[0]) &&
enif_is_number(env, argv[1]) &&
enif_is_number(env, argv[2]))) {
ASYNC_NIF_RETURN_BADARG();
}
if (enif_get_string(env, argv[0], args->dirname,
MAXPATHLEN, ERL_NIF_LATIN1) <= 0)
ASYNC_NIF_RETURN_BADARG();
enif_get_uint64(env, argv[1], &(args->mapsize));
enif_get_uint64(env, argv[2], &(args->envflags));
},
{ // work
ERL_NIF_TERM err;
MDB_txn *txn;
struct emdb *handle;
int ret;
if ((handle = enif_alloc_resource(emdb_RESOURCE, sizeof(struct emdb))) == NULL)
FAIL_ERR(ENOMEM, err3);
STAT_INIT(handle, emdb_get);
STAT_INIT(handle, emdb_put);
STAT_INIT(handle, emdb_upd);
STAT_INIT(handle, emdb_del);
CHECK(mdb_env_create(&(handle->env)), err2);
if (mdb_env_set_mapsize(handle->env, args->mapsize)) {
ASYNC_NIF_REPLY(enif_make_badarg(env));
return;
}
CHECK(mdb_env_open(handle->env, args->dirname, args->envflags, 0664), err2);
CHECK(mdb_txn_begin(handle->env, NULL, 0, &txn), err2);
CHECK(mdb_open(txn, NULL, 0, &(handle->dbi)), err1);
CHECK(mdb_txn_commit(txn), err1);
ERL_NIF_TERM term = enif_make_resource(env, handle);
enif_release_resource(handle);
ASYNC_NIF_REPLY(enif_make_tuple(env, 2, ATOM_OK, term));
return;
err1:
mdb_txn_abort(txn);
err2:
mdb_env_close(handle->env);
err3:
ASYNC_NIF_REPLY(err);
return;
},
{ // post
});
/**
* Closes a MDB database.
*
* argv[0] reference to the MDB handle resource
*/
ASYNC_NIF_DECL(
emdb_close,
{ // struct
struct emdb *handle;
},
{ // pre
if (!(argc == 1 &&
enif_get_resource(env, argv[0], emdb_RESOURCE, (void**)&args->handle))) {
ASYNC_NIF_RETURN_BADARG();
}
if (!args->handle->env)
ASYNC_NIF_RETURN_BADARG();
enif_keep_resource((void*)args->handle);
},
{ // work
STAT_PRINT(args->handle, emdb_get, "emdb");
STAT_PRINT(args->handle, emdb_put, "emdb");
STAT_PRINT(args->handle, emdb_del, "emdb");
STAT_PRINT(args->handle, emdb_upd, "emdb");
mdb_env_close(args->handle->env);
STAT_RESET(args->handle, emdb_get);
STAT_RESET(args->handle, emdb_put);
STAT_RESET(args->handle, emdb_del);
STAT_RESET(args->handle, emdb_upd);
args->handle->env = NULL;
ASYNC_NIF_REPLY(ATOM_OK);
return;
},
{ // post
enif_release_resource((void*)args->handle);
});
/**
* Store a value indexed by key.
*
* argv[0] reference to the MDB handle resource
* argv[1] key as an Erlang binary
* argv[2] value as an Erlang binary
*/
ASYNC_NIF_DECL(
emdb_put,
{ // struct
struct emdb *handle;
ERL_NIF_TERM key;
ERL_NIF_TERM val;
},
{ // pre
if (!(argc == 3 &&
enif_get_resource(env, argv[0], emdb_RESOURCE, (void**)&args->handle) &&
enif_is_binary(env, argv[1]) &&
enif_is_binary(env, argv[2]) )) {
ASYNC_NIF_RETURN_BADARG();
}
if (!args->handle->env)
ASYNC_NIF_RETURN_BADARG();
STAT_TICK(args->handle, emdb_put);
enif_keep_resource((void*)args->handle);
args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[1]);
args->val = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]);
},
{ // work
ERL_NIF_TERM err;
ErlNifBinary key;
ErlNifBinary val;
MDB_val mkey;
MDB_val mdata;
MDB_txn * txn;
int ret;
if (!enif_inspect_iolist_as_binary(env, args->key, &key)) {
ASYNC_NIF_REPLY(enif_make_badarg(env));
return;
}
if (!enif_inspect_iolist_as_binary(env, args->val, &val)) {
ASYNC_NIF_REPLY(enif_make_badarg(env));
return;
}
mkey.mv_size = key.size;
mkey.mv_data = key.data;
mdata.mv_size = val.size;
mdata.mv_data = val.data;
CHECK(mdb_txn_begin(args->handle->env, NULL, 0, & txn), err2);
ret = mdb_put(txn, args->handle->dbi, &mkey, &mdata, MDB_NOOVERWRITE);
if (MDB_KEYEXIST == ret) {
ASYNC_NIF_REPLY(enif_make_tuple(env, 2, ATOM_ERROR, ATOM_EXISTS));
return;
}
if (ret != 0)
FAIL_ERR(ret, err1);
CHECK(mdb_txn_commit(txn), err1);
STAT_TOCK(args->handle, emdb_put);
ASYNC_NIF_REPLY(ATOM_OK);
return;
err1:
mdb_txn_abort(txn);
err2:
ASYNC_NIF_REPLY(err);
return;
},
{ // post
enif_release_resource((void*)args->handle);
});
/**
* Update and existin value indexed by key.
*
* argv[0] reference to the MDB handle resource
* argv[1] key as an Erlang binary
* argv[2] value as an Erlang binary
*/
ASYNC_NIF_DECL(
emdb_update,
{ // struct
struct emdb *handle;
ERL_NIF_TERM key;
ERL_NIF_TERM val;
},
{ // pre
if (!(argc == 3 &&
enif_get_resource(env, argv[0], emdb_RESOURCE, (void**)&args->handle) &&
enif_is_binary(env, argv[1]) &&
enif_is_binary(env, argv[2]) )) {
ASYNC_NIF_RETURN_BADARG();
}
if (!args->handle->env)
ASYNC_NIF_RETURN_BADARG();
STAT_TICK(args->handle, emdb_upd);
enif_keep_resource((void*)args->handle);
args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[1]);
args->val = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]);
},
{ // work
ERL_NIF_TERM err;
ErlNifBinary key;
ErlNifBinary val;
MDB_val mkey;
MDB_val mdata;
MDB_txn * txn;
int ret;
if (!enif_inspect_iolist_as_binary(env, args->key, &key)) {
ASYNC_NIF_REPLY(enif_make_badarg(env));
return;
}
if (!enif_inspect_iolist_as_binary(env, args->val, &val)) {
ASYNC_NIF_REPLY(enif_make_badarg(env));
return;
}
mkey.mv_size = key.size;
mkey.mv_data = key.data;
mdata.mv_size = val.size;
mdata.mv_data = val.data;
CHECK(mdb_txn_begin(args->handle->env, NULL, 0, & txn), err2);
CHECK(mdb_put(txn, args->handle->dbi, &mkey, &mdata, 0), err1);
CHECK(mdb_txn_commit(txn), err1);
STAT_TOCK(args->handle, emdb_upd);
ASYNC_NIF_REPLY(ATOM_OK);
return;
err1:
mdb_txn_abort(txn);
err2:
ASYNC_NIF_REPLY(err);
return;
},
{ // post
enif_release_resource((void*)args->handle);
});
/**
* Retrieve the value associated with the key.
*
* argv[0] reference to the MDB handle resource
* argv[1] key as an Erlang binary
*/
ASYNC_NIF_DECL(
emdb_get,
{ // struct
struct emdb *handle;
ERL_NIF_TERM key;
},
{ // pre
if (!(argc == 2 &&
enif_get_resource(env, argv[0], emdb_RESOURCE, (void**)&args->handle) &&
enif_is_binary(env, argv[1]) )) {
ASYNC_NIF_RETURN_BADARG();
}
if (!args->handle->env)
ASYNC_NIF_RETURN_BADARG();
STAT_TICK(args->handle, emdb_get);
enif_keep_resource((void*)args->handle);
args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[1]);
},
{ // work
ERL_NIF_TERM err;
ErlNifBinary key;
ERL_NIF_TERM val;
unsigned char *bin;
MDB_val mkey;
MDB_val mdata;
MDB_txn * txn;
int ret;
if (!enif_inspect_iolist_as_binary(env, args->key, &key)) {
ASYNC_NIF_REPLY(enif_make_badarg(env));
return;
}
mkey.mv_size = key.size;
mkey.mv_data = key.data;
CHECK(mdb_txn_begin(args->handle->env, NULL, 0, &txn), err);
ret = mdb_get(txn, args->handle->dbi, &mkey, &mdata);
mdb_txn_abort(txn);
if (MDB_NOTFOUND == ret) {
ASYNC_NIF_REPLY(ATOM_NOT_FOUND);
return;
}
if (ret != 0)
FAIL_ERR(ret, err);
bin = enif_make_new_binary(env, mdata.mv_size, &val);
if (!bin)
FAIL_ERR(ENOMEM, err);
memcpy(bin, mdata.mv_data, mdata.mv_size);
STAT_TOCK(args->handle, emdb_get);
ASYNC_NIF_REPLY(enif_make_tuple(env, 2, ATOM_OK, val));
return;
err:
ASYNC_NIF_REPLY(err);
return;
},
{ // post
enif_release_resource((void*)args->handle);
});
/**
* Delete the value associated with the key.
*
* argv[0] reference to the MDB handle resource
* argv[1] key as an Erlang binary
*/
ASYNC_NIF_DECL(
emdb_del,
{ // struct
struct emdb *handle;
ERL_NIF_TERM key;
},
{ // pre
if (!(argc == 2 &&
enif_get_resource(env, argv[0], emdb_RESOURCE, (void**)&args->handle) &&
enif_is_binary(env, argv[1]) )) {
ASYNC_NIF_RETURN_BADARG();
}
if (!args->handle->env)
ASYNC_NIF_RETURN_BADARG();
STAT_TICK(args->handle, emdb_del);
enif_keep_resource((void*)args->handle);
args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[1]);
},
{ // work
ERL_NIF_TERM err;
ErlNifBinary key;
MDB_val mkey;
MDB_txn * txn;
int ret;
if (!enif_inspect_iolist_as_binary(env, args->key, &key)) {
ASYNC_NIF_REPLY(enif_make_badarg(env));
return;
}
mkey.mv_size = key.size;
mkey.mv_data = key.data;
CHECK(mdb_txn_begin(args->handle->env, NULL, 0, & txn), err);
ret = mdb_del(txn, args->handle->dbi, &mkey, NULL);
if(MDB_NOTFOUND == ret) {
mdb_txn_abort(txn);
ASYNC_NIF_REPLY(ATOM_NOT_FOUND);
return;
}
CHECK(mdb_txn_commit(txn), err);
STAT_TOCK(args->handle, emdb_del);
ASYNC_NIF_REPLY(ATOM_OK);
return;
err:
ASYNC_NIF_REPLY(err);
return;
},
{ // post
enif_release_resource((void*)args->handle);
});