Compare commits
32 commits
merge-chan
...
master
Author | SHA1 | Date | |
---|---|---|---|
|
d0c587bdec | ||
|
1f7ca0c189 | ||
|
3d7a6e41c6 | ||
|
92c906f105 | ||
|
91d10a7d51 | ||
|
d7659ea8c1 | ||
|
30b89d7808 | ||
|
b649dc7d3b | ||
|
12e3012a05 | ||
|
9021359752 | ||
|
fed8c726e2 | ||
|
eed6dfdf3a | ||
|
5e0c4ff384 | ||
|
b70d9b8afa | ||
|
11c864d949 | ||
|
63eb48ec24 | ||
|
ea7c83b32c | ||
|
dfa71be3c5 | ||
|
72cf45ff3f | ||
|
7fc539984f | ||
|
542f70443a | ||
|
21017f2d95 | ||
|
ad64781df6 | ||
|
daa79ac912 | ||
|
05b81cdc3b | ||
|
7ff11a42a8 | ||
|
1d28a6f5f5 | ||
|
00f5549644 | ||
|
564b87a78a | ||
|
f32973b044 | ||
|
cfd8bc9bb1 | ||
|
9621cb0043 |
17 changed files with 760 additions and 489 deletions
2
.gitignore
vendored
2
.gitignore
vendored
|
@ -5,8 +5,10 @@
|
|||
*.so
|
||||
*.beam
|
||||
*.orig
|
||||
deps
|
||||
logs
|
||||
doc
|
||||
include/BDBERL-MIB.hrl
|
||||
test/test.cover
|
||||
int_test/test.cover
|
||||
.eunit
|
||||
|
|
18
Makefile
18
Makefile
|
@ -1,25 +1,37 @@
|
|||
|
||||
ERL ?=erl
|
||||
CT_RUN ?=ct_run
|
||||
ERL_FLAGS ?=+A10
|
||||
REBAR_FLAGS :=
|
||||
|
||||
all: $(BDB_LOCAL_LIB)
|
||||
all: deps compile
|
||||
|
||||
deps:
|
||||
$(REBAR) get-deps
|
||||
|
||||
compile: $(BDB_LOCAL_LIB)
|
||||
ERL_FLAGS=$(ERL_FLAGS) $(REBAR) $(REBAR_FLAGS) compile
|
||||
|
||||
test: tests
|
||||
|
||||
tests:
|
||||
@ $(REBAR) $(REBAR_FLAGS) eunit ct
|
||||
@ $(REBAR) $(REBAR_FLAGS) eunit app=bdberl
|
||||
@ $(REBAR) $(REBAR_FLAGS) ct app=bdberl
|
||||
|
||||
thrash-test:
|
||||
@ $(CT_RUN) -pa test/ -suite thrash_SUITE
|
||||
|
||||
clean:
|
||||
$(REBAR) $(REBAR_FLAGS) clean
|
||||
-rm test/*.beam
|
||||
|
||||
distclean: clean
|
||||
-rm -rf $(BDB_LOCAL_DIST)
|
||||
$(REBAR) delete-deps
|
||||
-make -C c_src clean
|
||||
-rm -rf c_src/sources
|
||||
-rm -rf priv
|
||||
-rm -rf logs
|
||||
|
||||
include rebar.mk
|
||||
|
||||
.EXPORT_ALL_VARIABLES:
|
||||
|
|
1
README
1
README
|
@ -5,3 +5,4 @@ Authors:
|
|||
Phil Toland <phil.toland@gmail.com>
|
||||
Jon Meredith <jon@jonmeredith.com>
|
||||
Sergey Yelin <elinsn@gmail.com>
|
||||
Greg Burd <greg@basho.com> <gregburd@me.com>
|
||||
|
|
|
@ -1,13 +1,15 @@
|
|||
# This Makefile builds the dependency (libdb) needed by bdberl_drv.so
|
||||
|
||||
ERL ?=erl
|
||||
ERL_FLAGS ?=+A10
|
||||
ERL ?= erl
|
||||
ERL_FLAGS ?= +A10
|
||||
TAR ?= tar
|
||||
GUNZIP ?= gunzip
|
||||
CURL ?= curl
|
||||
BDB_VER := 5.2.36
|
||||
BDB_DIR := $(CURDIR)/db-$(BDB_VER)
|
||||
BDB_DIST := db-$(BDB_VER).tar.gz
|
||||
BDB_DIST_URL := http://download.oracle.com/berkeley-db/$(BDB_DIST)
|
||||
BDB_DIST := db-$(BDB_VER)
|
||||
#BDB_DIST_URL := http://download.oracle.com/berkeley-db/$(BDB_DIST).tar.gz
|
||||
BDB_DIST_URL := https://github.com/downloads/gburd/libdb/$(BDB_DIST).tar.gz
|
||||
|
||||
SYSTEM_DIR := $(CURDIR)/system
|
||||
LIB_DIR := $(SYSTEM_DIR)/lib
|
||||
|
@ -17,7 +19,6 @@ INC_DIR := $(SYSTEM_DIR)/include
|
|||
db: $(LIB_DIR)/libdb.a
|
||||
|
||||
$(LIB_DIR)/libdb.a: $(BDB_DIST)
|
||||
$(GUNZIP) -c db-$(BDB_VER).tar.gz | $(TAR) xf -
|
||||
@for I in patches/*.patch; do \
|
||||
(patch -p0 < $${I} || echo "Skipping patch"); \
|
||||
done
|
||||
|
@ -28,6 +29,6 @@ clean:
|
|||
@rm -rf ./*.o $(SYSTEM_DIR) $(BDB_DIR)
|
||||
|
||||
$(BDB_DIST):
|
||||
$(REBAR_FETCH) $(BDB_DIST_URL)
|
||||
$(CURL) -L $(BDB_DIST_URL) | $(GUNZIP) | $(TAR) xf -
|
||||
|
||||
.EXPORT_ALL_VARIABLES:
|
||||
|
|
|
@ -113,6 +113,7 @@ static void do_async_txnop(void* arg);
|
|||
static void do_async_cursor_put(void* arg);
|
||||
static void do_async_cursor_get(void* arg);
|
||||
static void do_async_cursor_del(void* arg);
|
||||
static void do_async_cursor_count(void* arg);
|
||||
static void do_async_cursor_cnp(void* arg);
|
||||
static void do_async_truncate(void* arg);
|
||||
static void do_sync_data_dirs_info(PortData *p);
|
||||
|
@ -129,13 +130,14 @@ static int del_portref(int dbref, ErlDrvPort port);
|
|||
static int alloc_dbref();
|
||||
static void abort_txn(PortData* d);
|
||||
|
||||
static void* zalloc(unsigned int size);
|
||||
static void* driver_calloc(unsigned int size);
|
||||
|
||||
static void* deadlock_check(void* arg);
|
||||
static void* checkpointer(void* arg);
|
||||
|
||||
static void bdb_errcall(const DB_ENV* dbenv, const char* errpfx, const char* msg);
|
||||
static void bdb_msgcall(const DB_ENV* dbenv, const char* msg);
|
||||
static void bdb_eventcall(DB_ENV* dbenv, u_int32_t type, void* msg);
|
||||
static void send_log_message(ErlDrvTermData* msg, int elements);
|
||||
|
||||
/**
|
||||
|
@ -198,7 +200,7 @@ static unsigned int G_CHECKPOINT_ACTIVE = 1;
|
|||
static unsigned int G_CHECKPOINT_INTERVAL = 60; /* Seconds between checkpoints */
|
||||
|
||||
/**
|
||||
* Pipe to used to wake up the various monitors. Instead of just sleeping
|
||||
* Pipe is used to wake up the various monitors. Instead of just sleeping
|
||||
* they wait for an exceptional condition on the read fd of the pipe. When it is time to
|
||||
* shutdown, the driver closes the write fd and waits for the threads to be joined.
|
||||
*/
|
||||
|
@ -227,36 +229,20 @@ static TPool* G_TPOOL_GENERAL = NULL;
|
|||
static TPool* G_TPOOL_TXNS = NULL;
|
||||
|
||||
|
||||
/**
|
||||
* Helpful macros
|
||||
*/
|
||||
#ifdef DEBUG
|
||||
# define DBG(...) fprintf(stderr, __VA_ARGS__)
|
||||
# define DBGCMD(P, ...) bdberl_dbgcmd(P, __VA_ARGS__)
|
||||
# define DBGCMDRC(P, ...) bdberl_dbgcmdrc(P, __VA_ARGS__)
|
||||
static void bdberl_dbgcmd(PortData *d, const char *fmt, ...);
|
||||
static void bdberl_dbgcmdrc(PortData *d, int rc);
|
||||
#else
|
||||
# define DBG(arg1,...)
|
||||
# define DBGCMD(d, fmt, ...)
|
||||
# define DBGCMDRC(d, rc) { while (0) { rc++; } } // otherwise get unused variable error
|
||||
#endif
|
||||
|
||||
|
||||
#define LOCK_DATABASES(P) \
|
||||
do \
|
||||
{ \
|
||||
DBG("threadid %p port %p: locking G_DATABASES\r\n", erl_drv_thread_self(), P); \
|
||||
DBG("threadid %p port %p: locking G_DATABASES\n", erl_drv_thread_self(), P); \
|
||||
erl_drv_mutex_lock(G_DATABASES_MUTEX); \
|
||||
DBG("threadid %p port %p: locked G_DATABASES\r\n", erl_drv_thread_self(), P); \
|
||||
DBG("threadid %p port %p: locked G_DATABASES\n", erl_drv_thread_self(), P); \
|
||||
} while(0)
|
||||
|
||||
#define UNLOCK_DATABASES(P) \
|
||||
do \
|
||||
{ \
|
||||
DBG("threadid %p port %p: unlocking G_DATABASES\r\n", erl_drv_thread_self(), P); \
|
||||
DBG("threadid %p port %p: unlocking G_DATABASES\n", erl_drv_thread_self(), P); \
|
||||
erl_drv_mutex_unlock(G_DATABASES_MUTEX); \
|
||||
DBG("threadid %p port %p: unlocked G_DATABASES\r\n", erl_drv_thread_self(), P); \
|
||||
DBG("threadid %p port %p: unlocked G_DATABASES\n", erl_drv_thread_self(), P); \
|
||||
} while (0)
|
||||
|
||||
|
||||
|
@ -268,7 +254,7 @@ static void bdberl_dbgcmdrc(PortData *d, int rc);
|
|||
|
||||
DRIVER_INIT(bdberl_drv)
|
||||
{
|
||||
DBG("DRIVER INIT\r\n");
|
||||
DBG("DRIVER INIT");
|
||||
// Setup flags we'll use to init the environment
|
||||
int flags =
|
||||
DB_INIT_LOCK | /* Enable support for locking */
|
||||
|
@ -277,6 +263,9 @@ DRIVER_INIT(bdberl_drv)
|
|||
DB_RECOVER | /* Enable support for recovering from failures */
|
||||
DB_CREATE | /* Create files as necessary */
|
||||
DB_REGISTER | /* Run recovery if needed */
|
||||
DB_FAILCHK | /* Release any database reads locks held by the
|
||||
thread of control that exited and, if needed,
|
||||
abort unresolved transaction. */
|
||||
DB_USE_ENVIRON | /* Use DB_HOME environment variable */
|
||||
DB_THREAD; /* Make the environment free-threaded */
|
||||
|
||||
|
@ -292,22 +281,46 @@ DRIVER_INIT(bdberl_drv)
|
|||
// specify where the working directory is
|
||||
DBG("db_env_create(%p, 0)", &G_DB_ENV);
|
||||
G_DB_ENV_ERROR = db_env_create(&G_DB_ENV, 0);
|
||||
DBG(" = %d\r\n", G_DB_ENV_ERROR);
|
||||
DBG(" = %d\n", G_DB_ENV_ERROR);
|
||||
if (G_DB_ENV_ERROR != 0)
|
||||
{
|
||||
G_DB_ENV = 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
// DB should use the safe allocation routines provided by the Erlang VM
|
||||
DBG("G_DB_ENV->set_alloc(%p, ...)", &G_DB_ENV);
|
||||
G_DB_ENV_ERROR = G_DB_ENV->set_alloc(G_DB_ENV, driver_alloc, driver_realloc, driver_free);
|
||||
DBG(" = %d\n", G_DB_ENV_ERROR);
|
||||
|
||||
// Inform DB of the number of threads that will be operating on the DB Environment
|
||||
unsigned int nthreads = G_NUM_GENERAL_THREADS + G_NUM_TXN_THREADS;
|
||||
DBG("G_DB_ENV->set_thread_count(%p, %d, ...)", &G_DB_ENV, nthreads);
|
||||
G_DB_ENV_ERROR = G_DB_ENV->set_thread_count(G_DB_ENV, nthreads);
|
||||
DBG(" = %d\n", G_DB_ENV_ERROR);
|
||||
|
||||
DBG("G_DB_ENV->set_thread_id(%p, ...)", &G_DB_ENV);
|
||||
G_DB_ENV_ERROR = G_DB_ENV->set_thread_id(G_DB_ENV, &bdberl_tpool_thread_id);
|
||||
DBG(" = %d\n", G_DB_ENV_ERROR);
|
||||
|
||||
DBG("G_DB_ENV->set_thread_id_string(%p, ...)", &G_DB_ENV);
|
||||
G_DB_ENV_ERROR = G_DB_ENV->set_thread_id_string(G_DB_ENV, &bdberl_tpool_thread_id_string);
|
||||
DBG(" = %d\n", G_DB_ENV_ERROR);
|
||||
|
||||
DBG("G_DB_ENV->set_is_alive(%p, ...)", &G_DB_ENV);
|
||||
G_DB_ENV_ERROR = G_DB_ENV->set_isalive(G_DB_ENV, &bdberl_tpool_thread_is_alive);
|
||||
DBG(" = %d\n", G_DB_ENV_ERROR);
|
||||
|
||||
// Open the DB Environment
|
||||
DBG("G_DB_ENV->open(%p, 0, %08X, 0)", &G_DB_ENV, flags);
|
||||
G_DB_ENV_ERROR = G_DB_ENV->open(G_DB_ENV, 0, flags, 0);
|
||||
DBG(" = %d\r\n", G_DB_ENV_ERROR);
|
||||
DBG(" = %d\n", G_DB_ENV_ERROR);
|
||||
if (G_DB_ENV_ERROR != 0)
|
||||
{
|
||||
// Something bad happened while initializing BDB; in this situation we
|
||||
// cleanup and set the environment to zero. Attempts to open ports will
|
||||
// fail and the user will have to sort out how to resolve the issue.
|
||||
DBG("G_DB_ENV->close(%p, 0);\r\n", &G_DB_ENV);
|
||||
DBG("G_DB_ENV->close(%p, 0);", &G_DB_ENV);
|
||||
G_DB_ENV->close(G_DB_ENV, 0);
|
||||
G_DB_ENV = 0;
|
||||
}
|
||||
|
@ -341,7 +354,7 @@ DRIVER_INIT(bdberl_drv)
|
|||
}
|
||||
else
|
||||
{
|
||||
fprintf(stderr, "Ignoring \"BDBERL_PAGE_SIZE\" value %u - not power of 2\r\n",
|
||||
fprintf(stderr, "Ignoring \"BDBERL_PAGE_SIZE\" value %u - not power of 2",
|
||||
page_size);
|
||||
}
|
||||
}
|
||||
|
@ -385,7 +398,7 @@ DRIVER_INIT(bdberl_drv)
|
|||
}
|
||||
else
|
||||
{
|
||||
DBG("DRIVER INIT FAILED - %s\r\n", db_strerror(G_DB_ENV_ERROR));
|
||||
DBG("DRIVER INIT FAILED - %s\n", db_strerror(G_DB_ENV_ERROR));
|
||||
}
|
||||
|
||||
return &bdberl_drv_entry;
|
||||
|
@ -393,8 +406,7 @@ DRIVER_INIT(bdberl_drv)
|
|||
|
||||
static ErlDrvData bdberl_drv_start(ErlDrvPort port, char* buffer)
|
||||
{
|
||||
DBG("threadid %p port %p: BDB DRIVER STARTING\r\n",
|
||||
erl_drv_thread_self(), port);
|
||||
DBG("threadid %p port %p: BDB DRIVER STARTING\n", erl_drv_thread_self(), port);
|
||||
|
||||
// Make sure we have a functional environment -- if we don't,
|
||||
// bail...
|
||||
|
@ -422,7 +434,7 @@ static ErlDrvData bdberl_drv_start(ErlDrvPort port, char* buffer)
|
|||
// Make sure port is running in binary mode
|
||||
set_port_control_flags(port, PORT_CONTROL_FLAG_BINARY);
|
||||
|
||||
DBGCMD(d, "BDB DRIVER STARTED");
|
||||
DBGCMD(d, "BDB DRIVER STARTED\n");
|
||||
|
||||
return (ErlDrvData)d;
|
||||
}
|
||||
|
@ -431,7 +443,7 @@ static void bdberl_drv_stop(ErlDrvData handle)
|
|||
{
|
||||
PortData* d = (PortData*)handle;
|
||||
|
||||
DBG("Stopping port %p\r\n", d->port);
|
||||
DBG("Stopping port %p\n", d->port);
|
||||
|
||||
// Grab the port lock, in case we have an async job running
|
||||
erl_drv_mutex_lock(d->port_lock);
|
||||
|
@ -440,13 +452,13 @@ static void bdberl_drv_stop(ErlDrvData handle)
|
|||
// block until the job has either been removed or has run
|
||||
if (d->async_job)
|
||||
{
|
||||
DBGCMD(d, "Stopping port %p - cancelling async job %p\r\n", d->port, d->async_job);
|
||||
DBGCMD(d, "Stopping port %p - cancelling async job %p\n", d->port, d->async_job);
|
||||
|
||||
// Drop the lock prior to starting the wait for the async process
|
||||
erl_drv_mutex_unlock(d->port_lock);
|
||||
|
||||
bdberl_tpool_cancel(d->async_pool, d->async_job);
|
||||
DBGCMD(d, "Canceled async job for port: %p\r\n", d->port);
|
||||
DBGCMD(d, "Canceled async job for port: %p\n", d->port);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -458,7 +470,7 @@ static void bdberl_drv_stop(ErlDrvData handle)
|
|||
erl_drv_mutex_destroy(d->port_lock);
|
||||
|
||||
// If a cursor is open, close it
|
||||
DBG("Stopping port %p - cleaning up cursors (%p) and transactions (%p)\r\n", d->port,
|
||||
DBG("Stopping port %p - cleaning up cursors (%p) and transactions (%p)\n", d->port,
|
||||
d->cursor, d->txn);
|
||||
|
||||
if (d->cursor)
|
||||
|
@ -470,13 +482,13 @@ static void bdberl_drv_stop(ErlDrvData handle)
|
|||
abort_txn(d);
|
||||
|
||||
// Close all the databases we previously opened
|
||||
DBG("Stopping port %p - closing all dbrefs\r\n", d->port);
|
||||
DBG("Stopping port %p - closing all dbrefs\n", d->port);
|
||||
while (d->dbrefs)
|
||||
{
|
||||
int dbref = d->dbrefs->dbref;
|
||||
if (close_database(dbref, 0, d) != ERROR_NONE)
|
||||
{
|
||||
DBG("Stopping port %p could not close dbref %d\r\n", d->port, dbref);
|
||||
DBG("Stopping port %p could not close dbref %d\n", d->port, dbref);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -485,7 +497,7 @@ static void bdberl_drv_stop(ErlDrvData handle)
|
|||
// unregister if it's already initialized to this port.
|
||||
if (G_LOG_PORT == d->port)
|
||||
{
|
||||
DBG("Stopping port %p - removing logging port\r\n", d->port);
|
||||
DBG("Stopping port %p - removing logging port\n", d->port);
|
||||
|
||||
WRITE_LOCK(G_LOG_RWLOCK);
|
||||
|
||||
|
@ -496,11 +508,12 @@ static void bdberl_drv_stop(ErlDrvData handle)
|
|||
// Unregister with BDB -- MUST DO THIS WITH WRITE LOCK HELD!
|
||||
G_DB_ENV->set_msgcall(G_DB_ENV, 0);
|
||||
G_DB_ENV->set_errcall(G_DB_ENV, 0);
|
||||
G_DB_ENV->set_event_notify(G_DB_ENV, 0);
|
||||
|
||||
WRITE_UNLOCK(G_LOG_RWLOCK);
|
||||
}
|
||||
|
||||
DBG("Stopped port: %p\r\n", d->port);
|
||||
DBG("Stopped port: %p\n", d->port);
|
||||
|
||||
// Release the port instance data
|
||||
driver_free(d->work_buffer);
|
||||
|
@ -509,7 +522,7 @@ static void bdberl_drv_stop(ErlDrvData handle)
|
|||
|
||||
static void bdberl_drv_finish()
|
||||
{
|
||||
DBG("BDB DRIVER FINISHING\r\n");
|
||||
DBG("BDB DRIVER FINISHING");
|
||||
// Stop the thread pools
|
||||
if (G_TPOOL_GENERAL != NULL)
|
||||
{
|
||||
|
@ -588,7 +601,7 @@ static void bdberl_drv_finish()
|
|||
G_LOG_RWLOCK = NULL;
|
||||
}
|
||||
|
||||
DBG("BDB DRIVER FINISHED\r\n");
|
||||
DBG("BDB DRIVER FINISHED");
|
||||
}
|
||||
|
||||
static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
|
||||
|
@ -787,8 +800,8 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
|
|||
|
||||
// Inbuf is <<Flags:32/native, KeyLen:32/native, KeyBin/bytes>>,
|
||||
|
||||
// If the working buffer is large enough, copy the data to put/get into it. Otherwise, realloc
|
||||
// until it is large enough
|
||||
// If the working buffer is large enough, copy the data to put/get into it.
|
||||
// Otherwise, realloc until it is large enough
|
||||
if (d->work_buffer_sz < inbuf_sz)
|
||||
{
|
||||
d->work_buffer = driver_realloc(d->work_buffer, inbuf_sz);
|
||||
|
@ -842,13 +855,26 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
|
|||
// Let caller know operation is in progress
|
||||
RETURN_INT(0, outbuf);
|
||||
}
|
||||
case CMD_CURSOR_COUNT:
|
||||
{
|
||||
FAIL_IF_ASYNC_PENDING(d, outbuf);
|
||||
FAIL_IF_NO_CURSOR(d, outbuf);
|
||||
|
||||
// Schedule the operation
|
||||
d->async_op = cmd;
|
||||
bdberl_general_tpool_run(&do_async_cursor_count, d, 0, &d->async_job);
|
||||
|
||||
// Let caller know operation is in progress
|
||||
// Outbuf is: <<0:32>>
|
||||
RETURN_INT(0, outbuf);
|
||||
}
|
||||
case CMD_CURSOR_CLOSE:
|
||||
{
|
||||
FAIL_IF_ASYNC_PENDING(d, outbuf);
|
||||
FAIL_IF_NO_CURSOR(d, outbuf);
|
||||
|
||||
// It's possible to get a deadlock when closing a cursor -- in that situation we also
|
||||
// need to go ahead and abort the txn
|
||||
// It's possible to get a deadlock when closing a cursor,
|
||||
// in that situation we also need to go ahead and abort the txn.
|
||||
int rc = d->cursor->close(d->cursor);
|
||||
if (d->txn && (rc == DB_LOCK_NOTGRANTED || rc == DB_LOCK_DEADLOCK))
|
||||
{
|
||||
|
@ -916,6 +942,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
|
|||
|
||||
G_DB_ENV->set_msgcall(G_DB_ENV, &bdb_msgcall);
|
||||
G_DB_ENV->set_errcall(G_DB_ENV, &bdb_errcall);
|
||||
G_DB_ENV->set_event_notify(G_DB_ENV, &bdb_eventcall);
|
||||
|
||||
WRITE_UNLOCK(G_LOG_RWLOCK);
|
||||
}
|
||||
|
@ -1005,16 +1032,16 @@ static int check_non_neg_env(char *env, unsigned int *val_ptr)
|
|||
long long val = strtoll(val_str, NULL, 0);
|
||||
if (val == 0 && errno == EINVAL)
|
||||
{
|
||||
fprintf(stderr, "Ignoring \"%s\" value \"%s\" - invalid value\r\n", env, val_str);
|
||||
fprintf(stderr, "Ignoring \"%s\" value \"%s\" - invalid value\n", env, val_str);
|
||||
return 0;
|
||||
}
|
||||
if (val <= 0 || val > UINT_MAX)
|
||||
{
|
||||
fprintf(stderr, "Ignoring \"%s\" value \"%lld\" - out of range\r\n", env, val);
|
||||
fprintf(stderr, "Ignoring \"%s\" value \"%lld\" - out of range\n", env, val);
|
||||
return 0;
|
||||
}
|
||||
unsigned int uival = (unsigned int) val;
|
||||
DBG("Using \"%s\" value %u\r\n", env, uival);
|
||||
DBG("Using \"%s\" value %u\n", env, uival);
|
||||
*val_ptr = uival;
|
||||
return 1;
|
||||
}
|
||||
|
@ -1040,7 +1067,7 @@ static int check_pos_env(char *env, unsigned int *val_ptr)
|
|||
}
|
||||
else
|
||||
{
|
||||
fprintf(stderr, "Ignoring \"%s\" value \"%u\" - out of range\r\n", env, *val_ptr);
|
||||
fprintf(stderr, "Ignoring \"%s\" value \"%u\" - out of range\n", env, *val_ptr);
|
||||
*val_ptr = original_val;
|
||||
return 0;
|
||||
}
|
||||
|
@ -1131,9 +1158,9 @@ static int open_database(const char* name, DBTYPE type, unsigned int flags, Port
|
|||
|
||||
// Create the DB handle
|
||||
DB* db = NULL;
|
||||
DBGCMD(data, "db_create(&db, %p, 0);", G_DB_ENV);
|
||||
DBGCMD(data, "db_create(&db, %p, 0);\n", G_DB_ENV);
|
||||
int rc = db_create(&db, G_DB_ENV, 0);
|
||||
DBGCMD(data, "rc = %s (%d) db = %p", rc == 0 ? "ok" : bdberl_rc_to_atom_str(rc), rc, db);
|
||||
DBGCMD(data, " = %s (%d) db = %p\n", rc == 0 ? "ok" : bdberl_rc_to_atom_str(rc), rc, db);
|
||||
if (rc != 0)
|
||||
{
|
||||
// Failure while creating the database handle -- drop our lock and return
|
||||
|
@ -1147,12 +1174,14 @@ static int open_database(const char* name, DBTYPE type, unsigned int flags, Port
|
|||
{
|
||||
if (db->set_pagesize(db, G_PAGE_SIZE) != 0)
|
||||
{
|
||||
bdb_errcall(G_DB_ENV, "", "Failed to set page size.");
|
||||
bdb_errcall(G_DB_ENV, "\n", "Failed to set page size.");
|
||||
}
|
||||
}
|
||||
|
||||
flags |= DB_AUTO_COMMIT;
|
||||
|
||||
// Attempt to open our database
|
||||
DBGCMD(data, "db->open(%p, 0, '%s', 0, %x, %08x, 0);", db, name, type, flags);
|
||||
DBGCMD(data, "db->open(%p, 0, '%s', 0, %x, %08x, 0);\n", db, name, type, flags);
|
||||
rc = db->open(db, 0, name, 0, type, flags, 0);
|
||||
DBGCMDRC(data, rc);
|
||||
if (rc != 0)
|
||||
|
@ -1168,7 +1197,7 @@ static int open_database(const char* name, DBTYPE type, unsigned int flags, Port
|
|||
assert(db != NULL);
|
||||
G_DATABASES[dbref].db = db;
|
||||
G_DATABASES[dbref].name = strdup(name);
|
||||
G_DATABASES[dbref].ports = zalloc(sizeof(PortList));
|
||||
G_DATABASES[dbref].ports = driver_calloc(sizeof(PortList));
|
||||
G_DATABASES[dbref].ports->port = data->port;
|
||||
|
||||
// Make entry in hash table of names
|
||||
|
@ -1206,7 +1235,7 @@ static int close_database(int dbref, unsigned flags, PortData* data)
|
|||
if (database->ports == 0)
|
||||
{
|
||||
// Close out the BDB handle
|
||||
DBGCMD(data, "database->db->close(%p, %08x) (for dbref %d)", database->db, flags, dbref);
|
||||
DBGCMD(data, "database->db->close(%p, %08x) (for dbref %d)\n", database->db, flags, dbref);
|
||||
rc = database->db->close(database->db, flags);
|
||||
DBGCMDRC(data, rc);
|
||||
|
||||
|
@ -1242,7 +1271,7 @@ static void check_all_databases_closed()
|
|||
Database* database = &G_DATABASES[dbref];
|
||||
if (database->ports != NULL)
|
||||
{
|
||||
fprintf(stderr, "BDBERL: Ports still open on '%s' dbref %d\r\n",
|
||||
fprintf(stderr, "BDBERL: Ports still open on '%s' dbref %d\n",
|
||||
database->name ? database->name : "no name", dbref);
|
||||
}
|
||||
|
||||
|
@ -1251,7 +1280,7 @@ static void check_all_databases_closed()
|
|||
int flags = 0;
|
||||
DBG("final db->close(%p, %08x) (for dbref %d)", database->db, flags, dbref);
|
||||
rc = database->db->close(database->db, flags);
|
||||
DBG(" = %s (%d)\r\n", rc == 0 ? "ok" : bdberl_rc_to_atom_str(rc), rc);
|
||||
DBG(" = %s (%d)\n", rc == 0 ? "ok" : bdberl_rc_to_atom_str(rc), rc);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1264,7 +1293,7 @@ static void abort_txn(PortData* d)
|
|||
{
|
||||
if (d->txn)
|
||||
{
|
||||
DBGCMD(d, "d->txn->abort(%p)", d->txn);
|
||||
DBGCMD(d, "d->txn->abort(%p)\n", d->txn);
|
||||
int rc = d->txn->abort(d->txn);
|
||||
DBGCMDRC(d, rc);
|
||||
d->txn = NULL;
|
||||
|
@ -1284,7 +1313,7 @@ static int delete_database(const char* name, PortData *data)
|
|||
}
|
||||
|
||||
// Good, database doesn't seem to be open -- attempt the delete
|
||||
DBG("Attempting to delete database: %s\r\n", name);
|
||||
DBG("Attempting to delete database: %s\n", name);
|
||||
int rc = G_DB_ENV->dbremove(G_DB_ENV, 0, name, 0, DB_AUTO_COMMIT);
|
||||
UNLOCK_DATABASES(data->port);
|
||||
|
||||
|
@ -1460,18 +1489,8 @@ static int send_dir_info(ErlDrvPort port, ErlDrvTermData pid, const char *path)
|
|||
}
|
||||
|
||||
|
||||
void bdberl_send_rc(ErlDrvPort port, ErlDrvTermData pid, int rc)
|
||||
static void send_error_response(ErlDrvPort port, ErlDrvTermData pid, int rc)
|
||||
{
|
||||
// TODO: May need to tag the messages a bit more explicitly so that if another async
|
||||
// job runs to completion before the message gets delivered we don't mis-interpret this
|
||||
// response code.
|
||||
if (rc == 0)
|
||||
{
|
||||
ErlDrvTermData response[] = {ERL_DRV_ATOM, driver_mk_atom("ok")};
|
||||
driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0]));
|
||||
}
|
||||
else
|
||||
{
|
||||
// See if this is a standard errno that we have an erlang code for
|
||||
char *error = bdberl_rc_to_atom_str(rc);
|
||||
if (error != NULL)
|
||||
|
@ -1491,6 +1510,22 @@ void bdberl_send_rc(ErlDrvPort port, ErlDrvTermData pid, int rc)
|
|||
driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0]));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void bdberl_send_rc(ErlDrvPort port, ErlDrvTermData pid, int rc)
|
||||
{
|
||||
// TODO: May need to tag the messages a bit more explicitly so that if another async
|
||||
// job runs to completion before the message gets delivered we don't mis-interpret this
|
||||
// response code.
|
||||
if (rc == 0)
|
||||
{
|
||||
ErlDrvTermData response[] = {ERL_DRV_ATOM, driver_mk_atom("ok")};
|
||||
driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0]));
|
||||
}
|
||||
else
|
||||
{
|
||||
send_error_response(port, pid, rc);
|
||||
}
|
||||
}
|
||||
|
||||
void bdberl_async_cleanup_and_send_rc(PortData* d, int rc)
|
||||
|
@ -1507,6 +1542,32 @@ void bdberl_async_cleanup_and_send_rc(PortData* d, int rc)
|
|||
bdberl_send_rc(port, pid, rc);
|
||||
}
|
||||
|
||||
static void async_cleanup_and_send_uint32(PortData* d, int rc, unsigned int value)
|
||||
{
|
||||
// Save the port and pid references -- we need copies independent from the PortData
|
||||
// structure. Once we release the port_lock after clearing the cmd, it's possible that
|
||||
// the port could go away without waiting on us to finish. This is acceptable, but we need
|
||||
// to be certain that there is no overlap of data between the two threads. driver_send_term
|
||||
// is safe to use from a thread, even if the port you're sending from has already expired.
|
||||
ErlDrvPort port = d->port;
|
||||
ErlDrvTermData pid = d->port_owner;
|
||||
|
||||
bdberl_async_cleanup(d);
|
||||
|
||||
// Notify port of result
|
||||
if (rc == 0)
|
||||
{
|
||||
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("ok"),
|
||||
ERL_DRV_UINT, value,
|
||||
ERL_DRV_TUPLE, 2};
|
||||
driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0]));
|
||||
}
|
||||
else
|
||||
{
|
||||
send_error_response(port, pid, rc);
|
||||
}
|
||||
}
|
||||
|
||||
static void async_cleanup_and_send_kv(PortData* d, int rc, DBT* key, DBT* value)
|
||||
{
|
||||
// Save the port and pid references -- we need copies independent from the PortData
|
||||
|
@ -1535,24 +1596,7 @@ static void async_cleanup_and_send_kv(PortData* d, int rc, DBT* key, DBT* value)
|
|||
}
|
||||
else
|
||||
{
|
||||
// See if this is a standard errno that we have an erlang code for
|
||||
char *error = bdberl_rc_to_atom_str(rc);
|
||||
if (error != NULL)
|
||||
{
|
||||
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"),
|
||||
ERL_DRV_ATOM, driver_mk_atom(error),
|
||||
ERL_DRV_TUPLE, 2};
|
||||
driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0]));
|
||||
}
|
||||
else
|
||||
{
|
||||
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"),
|
||||
ERL_DRV_ATOM, driver_mk_atom("unknown"),
|
||||
ERL_DRV_INT, rc,
|
||||
ERL_DRV_TUPLE, 2,
|
||||
ERL_DRV_TUPLE, 2};
|
||||
driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0]));
|
||||
}
|
||||
send_error_response(port, pid, rc);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1587,14 +1631,14 @@ static void do_async_put(void* arg)
|
|||
int rc;
|
||||
if (calc_crc32 != buf_crc32)
|
||||
{
|
||||
DBGCMD(d, "CRC-32 error on put data - buffer %08X calculated %08X.", buf_crc32, calc_crc32);
|
||||
DBGCMD(d, "CRC-32 error on put data - buffer %08X calculated %08X.\n", buf_crc32, calc_crc32);
|
||||
rc = ERROR_INVALID_VALUE;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Execute the actual put. All databases are opened with AUTO_COMMIT, so if msg->port->txn
|
||||
// is NULL, the put will still be atomic
|
||||
DBGCMD(d, "db->put(%p, %p, %p, %p, %08X) dbref %d key=%p(%d) value=%p(%d)",
|
||||
DBGCMD(d, "db->put(%p, %p, %p, %p, %08X) dbref %d key=%p(%d) value=%p(%d)\n",
|
||||
db, d->txn, &key, &value, flags, dbref, key.data, key.size, value.data, value.size);
|
||||
rc = db->put(db, d->txn, &key, &value, flags);
|
||||
DBGCMDRC(d, rc);
|
||||
|
@ -1654,7 +1698,7 @@ static void do_async_get(void* arg)
|
|||
|
||||
if (calc_crc32 != buf_crc32)
|
||||
{
|
||||
DBGCMD(d, "CRC-32 error on get data - buffer %08X calculated %08X.",
|
||||
DBGCMD(d, "CRC-32 error on get data - buffer %08X calculated %08X.\n",
|
||||
buf_crc32, calc_crc32);
|
||||
rc = ERROR_INVALID_VALUE;
|
||||
}
|
||||
|
@ -1670,7 +1714,7 @@ static void do_async_get(void* arg)
|
|||
async_cleanup_and_send_kv(d, rc, &key, &value);
|
||||
|
||||
// Finally, clean up value buffer (driver_send_term made a copy)
|
||||
free(value.data);
|
||||
driver_free(value.data);
|
||||
}
|
||||
|
||||
static void do_async_del(void* arg)
|
||||
|
@ -1713,15 +1757,15 @@ static void do_async_txnop(void* arg)
|
|||
int rc = 0;
|
||||
if (d->async_op == CMD_TXN_BEGIN)
|
||||
{
|
||||
DBGCMD(d, "G_DB_ENV->txn_begin(%p, 0, %p, %08X)", G_DB_ENV, d->txn, d->async_flags);
|
||||
DBGCMD(d, "G_DB_ENV->txn_begin(%p, 0, %p, %08X)\n", G_DB_ENV, d->txn, d->async_flags);
|
||||
rc = G_DB_ENV->txn_begin(G_DB_ENV, 0, &(d->txn), d->async_flags);
|
||||
DBGCMD(d, "rc = %s (%d) d->txn = %p", rc == 0 ? "ok" : bdberl_rc_to_atom_str(rc), rc, d->txn);
|
||||
DBGCMD(d, "rc = %s (%d) d->txn = %p\n", rc == 0 ? "ok" : bdberl_rc_to_atom_str(rc), rc, d->txn);
|
||||
|
||||
}
|
||||
else if (d->async_op == CMD_TXN_COMMIT)
|
||||
{
|
||||
assert(d->txn != NULL);
|
||||
DBGCMD(d, "d->txn->txn_commit(%p, %08X)", d->txn, d->async_flags);
|
||||
DBGCMD(d, "d->txn->txn_commit(%p, %08X)\n", d->txn, d->async_flags);
|
||||
rc = d->txn->commit(d->txn, d->async_flags);
|
||||
DBGCMDRC(d, rc);
|
||||
d->txn = 0;
|
||||
|
@ -1740,7 +1784,7 @@ static void do_async_cursor_put(void* arg)
|
|||
{
|
||||
PortData* d = (PortData*)arg;
|
||||
assert(d->cursor != NULL);
|
||||
DBGCMD(d, "cursor_put/2 not yet implemented..."); /* TODO: implement this. */
|
||||
DBGCMD(d, "cursor_put/2 not yet implemented...\n"); /* TODO: implement this. */
|
||||
bdberl_async_cleanup_and_send_rc(d, ERROR_DB_ACTIVE);
|
||||
}
|
||||
|
||||
|
@ -1768,7 +1812,7 @@ static void do_async_cursor_get(void* arg)
|
|||
value.flags = DB_DBT_MALLOC;
|
||||
|
||||
// Execute the operation
|
||||
DBGCMD(d, "d->cursor->get(%p, %p, %p, %08X);", d->cursor, &key, &value, flags);
|
||||
DBGCMD(d, "d->cursor->get(%p, %p, %p, %08X\n);", d->cursor, &key, &value, flags);
|
||||
int rc = d->cursor->get(d->cursor, &key, &value, flags);
|
||||
DBGCMDRC(d, rc);
|
||||
|
||||
|
@ -1781,7 +1825,7 @@ static void do_async_cursor_get(void* arg)
|
|||
|
||||
if (calc_crc32 != buf_crc32)
|
||||
{
|
||||
DBGCMD(d, "CRC-32 error on get data - buffer %08X calculated %08X.",
|
||||
DBGCMD(d, "CRC-32 error on get data - buffer %08X calculated %08X.\n",
|
||||
buf_crc32, calc_crc32);
|
||||
rc = ERROR_INVALID_VALUE;
|
||||
}
|
||||
|
@ -1800,7 +1844,7 @@ static void do_async_cursor_get(void* arg)
|
|||
async_cleanup_and_send_kv(d, rc, &key, &value);
|
||||
|
||||
// Finally, clean up value buffer (driver_send_term made a copy)
|
||||
free(value.data);
|
||||
driver_free(value.data);
|
||||
}
|
||||
|
||||
|
||||
|
@ -1808,11 +1852,28 @@ static void do_async_cursor_del(void* arg)
|
|||
{
|
||||
PortData* d = (PortData*)arg;
|
||||
assert(d->cursor != NULL);
|
||||
DBGCMD(d, "cursor_del/2 not yet implemented..."); /* TODO: implement this. */
|
||||
DBGCMD(d, "cursor_del/2 not yet implemented...\n"); /* TODO: implement this. */
|
||||
bdberl_async_cleanup_and_send_rc(d, ERROR_DB_ACTIVE);
|
||||
}
|
||||
|
||||
|
||||
static void do_async_cursor_count(void* arg)
|
||||
{
|
||||
PortData* d = (PortData*)arg;
|
||||
assert(d->cursor != NULL);
|
||||
|
||||
// Place to store the record count.
|
||||
db_recno_t count = 0;
|
||||
|
||||
// Execute the operation
|
||||
DBGCMD(d, "d->cursor->count(%p, %p, %08X);\n", d->cursor, &count, 0);
|
||||
int rc = d->cursor->count(d->cursor, &count, 0);
|
||||
DBGCMDRC(d, rc);
|
||||
|
||||
async_cleanup_and_send_uint32(d, rc, count);
|
||||
}
|
||||
|
||||
|
||||
static void do_async_cursor_cnp(void* arg)
|
||||
{
|
||||
// Payload is: << DbRef:32, Flags:32, KeyLen:32, Key:KeyLen >>
|
||||
|
@ -1838,7 +1899,7 @@ static void do_async_cursor_cnp(void* arg)
|
|||
}
|
||||
|
||||
// Execute the operation
|
||||
DBGCMD(d, "d->cursor->get(%p, %p, %p, %08X);", d->cursor, &key, &value, flags);
|
||||
DBGCMD(d, "d->cursor->get(%p, %p, %p, %08X);\n", d->cursor, &key, &value, flags);
|
||||
int rc = d->cursor->get(d->cursor, &key, &value, flags);
|
||||
DBGCMDRC(d, rc);
|
||||
|
||||
|
@ -1879,7 +1940,7 @@ static void do_async_truncate(void* arg)
|
|||
|
||||
if (d->async_dbref == -1)
|
||||
{
|
||||
DBG("Truncating all open databases...\r\n");
|
||||
DBG("Truncating all open databases...");
|
||||
|
||||
// Iterate over the whole database list skipping null entries
|
||||
int i = 0; // I hate C
|
||||
|
@ -1891,10 +1952,9 @@ static void do_async_truncate(void* arg)
|
|||
DB* db = database->db;
|
||||
u_int32_t count = 0;
|
||||
|
||||
DBGCMD(d, "db->truncate(%p, %p, %p, 0) dbref=%d", db, d->txn, &count, i);
|
||||
DBGCMD(d, "db->truncate(%p, %p, %p, 0) dbref=%d\n", db, d->txn, &count, i);
|
||||
rc = db->truncate(db, d->txn, &count, 0);
|
||||
DBGCMD(d, "rc = %s (%d) count=%d",
|
||||
rc == 0 ? "ok" : bdberl_rc_to_atom_str(rc), rc, count);
|
||||
DBGCMD(d, "rc = %s (%d) count=%d\n", rc == 0 ? "ok" : bdberl_rc_to_atom_str(rc), rc, count);
|
||||
|
||||
if (rc != 0)
|
||||
{
|
||||
|
@ -1907,9 +1967,9 @@ static void do_async_truncate(void* arg)
|
|||
{
|
||||
DB* db = G_DATABASES[d->async_dbref].db;
|
||||
u_int32_t count = 0;
|
||||
DBGCMD(d, "db->truncate(%p, %p, %p, 0) dbref=%d", db, d->txn, &count, d->async_dbref);
|
||||
DBGCMD(d, "db->truncate(%p, %p, %p, 0) dbref=%d\n", db, d->txn, &count, d->async_dbref);
|
||||
rc = db->truncate(db, d->txn, &count, 0);
|
||||
DBGCMD(d, "rc = %s (%d) count=%d", rc == 0 ? "ok" : bdberl_rc_to_atom_str(rc),
|
||||
DBGCMD(d, "rc = %s (%d) count=%d\n", rc == 0 ? "ok" : bdberl_rc_to_atom_str(rc),
|
||||
rc, count);
|
||||
}
|
||||
|
||||
|
@ -2043,18 +2103,16 @@ static void do_sync_driver_info(PortData *d)
|
|||
}
|
||||
|
||||
|
||||
static void* zalloc(unsigned int size)
|
||||
static void* driver_calloc(unsigned int size)
|
||||
{
|
||||
void* res = driver_alloc(size);
|
||||
memset(res, '\0', size);
|
||||
return res;
|
||||
}
|
||||
|
||||
#define zfree(p) driver_free(p)
|
||||
|
||||
static int add_portref(int dbref, ErlDrvPort port)
|
||||
{
|
||||
DBG("Adding port %p to dbref %d\r\n", port, dbref);
|
||||
DBG("Adding port %p to dbref %d\n", port, dbref);
|
||||
PortList* current = G_DATABASES[dbref].ports;
|
||||
if (current)
|
||||
{
|
||||
|
@ -2072,7 +2130,7 @@ static int add_portref(int dbref, ErlDrvPort port)
|
|||
} while (current != 0);
|
||||
|
||||
// At the end of the list -- allocate a new entry for this port
|
||||
current = (PortList*)zalloc(sizeof(PortList));
|
||||
current = (PortList*)driver_calloc(sizeof(PortList));
|
||||
current->port = port;
|
||||
last->next = current;
|
||||
return 1;
|
||||
|
@ -2080,7 +2138,7 @@ static int add_portref(int dbref, ErlDrvPort port)
|
|||
else
|
||||
{
|
||||
// Current was initially NULL, so alloc the first one and add it.
|
||||
current = zalloc(sizeof(PortList));
|
||||
current = driver_calloc(sizeof(PortList));
|
||||
current->port = port;
|
||||
G_DATABASES[dbref].ports = current;
|
||||
return 1;
|
||||
|
@ -2089,7 +2147,7 @@ static int add_portref(int dbref, ErlDrvPort port)
|
|||
|
||||
static int del_portref(int dbref, ErlDrvPort port)
|
||||
{
|
||||
DBG("Deleting port %p from dbref %d\r\n", port, dbref);
|
||||
DBG("Deleting port %p from dbref %d\n", port, dbref);
|
||||
PortList* current = G_DATABASES[dbref].ports;
|
||||
PortList* last = 0;
|
||||
assert(current != NULL);
|
||||
|
@ -2108,7 +2166,7 @@ static int del_portref(int dbref, ErlDrvPort port)
|
|||
}
|
||||
|
||||
// Delete this entry
|
||||
zfree(current);
|
||||
driver_free(current);
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
@ -2125,7 +2183,7 @@ static int del_portref(int dbref, ErlDrvPort port)
|
|||
*/
|
||||
static int add_dbref(PortData* data, int dbref)
|
||||
{
|
||||
DBG("Adding dbref %d to port %p\r\n", dbref, data->port);
|
||||
DBG("Adding dbref %d to port %p\n", dbref, data->port);
|
||||
DbRefList* current = data->dbrefs;
|
||||
if (current)
|
||||
{
|
||||
|
@ -2142,7 +2200,7 @@ static int add_dbref(PortData* data, int dbref)
|
|||
} while (current != 0);
|
||||
|
||||
// At the end of the list -- allocate a new entry
|
||||
current = zalloc(sizeof(DbRefList));
|
||||
current = driver_calloc(sizeof(DbRefList));
|
||||
current->dbref = dbref;
|
||||
last->next = current;
|
||||
return 1;
|
||||
|
@ -2150,7 +2208,7 @@ static int add_dbref(PortData* data, int dbref)
|
|||
else
|
||||
{
|
||||
// Current was initially NULL, so alloc the first one
|
||||
current = zalloc(sizeof(DbRefList));
|
||||
current = driver_calloc(sizeof(DbRefList));
|
||||
current->dbref = dbref;
|
||||
data->dbrefs = current;
|
||||
return 1;
|
||||
|
@ -2162,7 +2220,7 @@ static int add_dbref(PortData* data, int dbref)
|
|||
*/
|
||||
static int del_dbref(PortData* data, int dbref)
|
||||
{
|
||||
DBG("Deleting dbref %d from port %p\r\n", dbref, data->port);
|
||||
DBG("Deleting dbref %d from port %p\n", dbref, data->port);
|
||||
|
||||
DbRefList* current = data->dbrefs;
|
||||
DbRefList* last = 0;
|
||||
|
@ -2183,7 +2241,7 @@ static int del_dbref(PortData* data, int dbref)
|
|||
}
|
||||
|
||||
// Delete this entry
|
||||
zfree(current);
|
||||
driver_free(current);
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
@ -2314,7 +2372,7 @@ static void* deadlock_check(void* arg)
|
|||
}
|
||||
if (count > 0)
|
||||
{
|
||||
DBG("Rejected deadlocks: %d\r\n", count);
|
||||
DBG("Rejected deadlocks: %d\n", count);
|
||||
}
|
||||
|
||||
if (G_DEADLOCK_CHECK_INTERVAL > 0)
|
||||
|
@ -2323,7 +2381,7 @@ static void* deadlock_check(void* arg)
|
|||
}
|
||||
}
|
||||
|
||||
DBG("Deadlock checker exiting.\r\n");
|
||||
DBG("Deadlock checker exiting.");
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -2392,6 +2450,7 @@ static void bdb_errcall(const DB_ENV* dbenv, const char* errpfx, const char* msg
|
|||
ERL_DRV_STRING, (ErlDrvTermData)msg, (ErlDrvUInt)strlen(msg),
|
||||
ERL_DRV_TUPLE, 2};
|
||||
send_log_message(response, sizeof(response));
|
||||
DBG("BDB error call: %s\n", msg);
|
||||
}
|
||||
|
||||
static void bdb_msgcall(const DB_ENV* dbenv, const char* msg)
|
||||
|
@ -2400,6 +2459,35 @@ static void bdb_msgcall(const DB_ENV* dbenv, const char* msg)
|
|||
ERL_DRV_STRING, (ErlDrvTermData)msg, (ErlDrvUInt)strlen(msg),
|
||||
ERL_DRV_TUPLE, 2};
|
||||
send_log_message(response, sizeof(response));
|
||||
DBG("BDB message call: %s\n", msg);
|
||||
}
|
||||
|
||||
static void bdb_eventcall(DB_ENV* dbenv, u_int32_t type, void* info)
|
||||
{
|
||||
switch(type)
|
||||
{
|
||||
case DB_EVENT_PANIC:
|
||||
{
|
||||
const char *msg = "panic";
|
||||
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("bdb_event_notify"),
|
||||
ERL_DRV_STRING, (ErlDrvTermData)msg, (ErlDrvUInt)strlen(msg),
|
||||
ERL_DRV_TUPLE, 2};
|
||||
// TODO clearly something should be done to shut things down cleanly and restart (how?)
|
||||
send_log_message(response, sizeof(response));
|
||||
DBG("BDB panic event\n");
|
||||
break;
|
||||
}
|
||||
case DB_EVENT_WRITE_FAILED:
|
||||
{
|
||||
const char *msg = "write failed";
|
||||
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("bdb_event_notify"),
|
||||
ERL_DRV_STRING, (ErlDrvTermData)msg, (ErlDrvUInt)strlen(msg),
|
||||
ERL_DRV_TUPLE, 2};
|
||||
send_log_message(response, sizeof(response));
|
||||
DBG("BDB write failed event\n");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void send_log_message(ErlDrvTermData* msg, int elements)
|
||||
|
@ -2413,7 +2501,7 @@ static void send_log_message(ErlDrvTermData* msg, int elements)
|
|||
}
|
||||
|
||||
#ifdef DEBUG
|
||||
static void bdberl_dbgcmd(PortData *d, const char *fmt, ...)
|
||||
void bdberl_dbg(const char *fmt, ...)
|
||||
{
|
||||
char buf[1024];
|
||||
|
||||
|
@ -2422,12 +2510,27 @@ static void bdberl_dbgcmd(PortData *d, const char *fmt, ...)
|
|||
vsnprintf(buf, sizeof(buf), fmt, ap);
|
||||
va_end(ap);
|
||||
|
||||
(void)fprintf(stderr, "threadid %p port %p: %s\r\n", erl_drv_thread_self(), d->port, buf);
|
||||
(void)fprintf(stderr, "%s", buf);
|
||||
(void)fflush(stderr);
|
||||
}
|
||||
|
||||
static void bdberl_dbgcmdrc(PortData *d, int rc)
|
||||
void bdberl_dbgcmd(PortData *d, const char *fmt, ...)
|
||||
{
|
||||
(void)fprintf(stderr, "threadid %p port %p: rc = %s (%d)\r\n",
|
||||
char buf[1024];
|
||||
|
||||
va_list ap;
|
||||
va_start(ap, fmt);
|
||||
vsnprintf(buf, sizeof(buf), fmt, ap);
|
||||
va_end(ap);
|
||||
|
||||
(void)fprintf(stderr, "threadid %p port %p: %s", erl_drv_thread_self(), d->port, buf);
|
||||
(void)fflush(stderr);
|
||||
}
|
||||
|
||||
void bdberl_dbgcmdrc(PortData *d, int rc)
|
||||
{
|
||||
(void)fprintf(stderr, "threadid %p port %p: rc = %s (%d)\n",
|
||||
erl_drv_thread_self(), d->port, rc == 0 ? "ok" : bdberl_rc_to_atom_str(rc), rc);
|
||||
(void)fflush(stderr);
|
||||
}
|
||||
#endif // DEBUG
|
||||
|
|
|
@ -53,9 +53,6 @@
|
|||
#define CMD_CURSOR_NEXT 12
|
||||
#define CMD_CURSOR_PREV 13
|
||||
#define CMD_CURSOR_CLOSE 14
|
||||
#define CMD_CURSOR_GET 35 /* TODO: renumber these next 3 and match them with bdberl.hrl */
|
||||
#define CMD_CURSOR_PUT 36
|
||||
#define CMD_CURSOR_DEL 37
|
||||
#define CMD_PUT_COMMIT 15
|
||||
#define CMD_REMOVE_DB 16
|
||||
#define CMD_TRUNCATE 17
|
||||
|
@ -76,6 +73,10 @@
|
|||
#define CMD_DATA_DIRS_INFO 32
|
||||
#define CMD_LOG_DIR_INFO 33
|
||||
#define CMD_DRIVER_INFO 34
|
||||
#define CMD_CURSOR_GET 35
|
||||
#define CMD_CURSOR_PUT 36
|
||||
#define CMD_CURSOR_DEL 37
|
||||
#define CMD_CURSOR_COUNT 38
|
||||
|
||||
/**
|
||||
* Command status values
|
||||
|
@ -249,4 +250,21 @@ void bdberl_txn_tpool_run(TPoolJobFunc main_fn, PortData* d, TPoolJobFunc cance
|
|||
RETURN_INT(0, outbuf); \
|
||||
}}
|
||||
|
||||
|
||||
/**
|
||||
* Helpful macros
|
||||
*/
|
||||
#ifdef DEBUG
|
||||
# define DBG(...) bdberl_dbg(__VA_ARGS__)
|
||||
# define DBGCMD(P, ...) bdberl_dbgcmd(P, __VA_ARGS__)
|
||||
# define DBGCMDRC(P, ...) bdberl_dbgcmdrc(P, __VA_ARGS__)
|
||||
extern void bdberl_dbg(const char * fmt, ...);
|
||||
extern void bdberl_dbgcmd(PortData *d, const char *fmt, ...);
|
||||
extern void bdberl_dbgcmdrc(PortData *d, int rc);
|
||||
#else
|
||||
# define DBG(arg1,...)
|
||||
# define DBGCMD(d, fmt, ...)
|
||||
# define DBGCMDRC(d, rc) { while (0) { rc++; } } // otherwise get unused variable error
|
||||
#endif
|
||||
|
||||
#endif //_BDBERL_DRV
|
||||
|
|
|
@ -607,7 +607,7 @@ static void do_async_stat(void* arg)
|
|||
// Finally, clean up value buffer (driver_send_term made a copy)
|
||||
if (NULL != sp)
|
||||
{
|
||||
free(sp);
|
||||
driver_free(sp);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -630,7 +630,7 @@ static void do_async_lock_stat(void* arg)
|
|||
// Finally, clean up lock stats
|
||||
if (NULL != lsp)
|
||||
{
|
||||
free(lsp);
|
||||
driver_free(lsp);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -653,7 +653,7 @@ static void do_async_log_stat(void* arg)
|
|||
// Finally, clean up stats
|
||||
if (NULL != lsp)
|
||||
{
|
||||
free(lsp);
|
||||
driver_free(lsp);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -677,11 +677,11 @@ static void do_async_memp_stat(void* arg)
|
|||
// Finally, clean up stats
|
||||
if (NULL != gsp)
|
||||
{
|
||||
free(gsp);
|
||||
driver_free(gsp);
|
||||
}
|
||||
if (NULL != fsp)
|
||||
{
|
||||
free(fsp);
|
||||
driver_free(fsp);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -704,7 +704,7 @@ static void do_async_mutex_stat(void* arg)
|
|||
// Finally, clean up stats
|
||||
if (NULL != msp)
|
||||
{
|
||||
free(msp);
|
||||
driver_free(msp);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -728,7 +728,7 @@ static void do_async_txn_stat(void* arg)
|
|||
// Finally, clean up stats
|
||||
if (NULL != tsp)
|
||||
{
|
||||
free(tsp);
|
||||
driver_free(tsp);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -25,11 +25,17 @@
|
|||
* THE SOFTWARE.
|
||||
*
|
||||
* ------------------------------------------------------------------- */
|
||||
|
||||
#include <db.h>
|
||||
#include "bdberl_drv.h"
|
||||
#include "bdberl_tpool.h"
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <signal.h>
|
||||
#include <errno.h>
|
||||
#include <pthread.h>
|
||||
|
||||
static void* bdberl_tpool_main(void* tpool);
|
||||
static TPoolJob* next_job(TPool* tpool);
|
||||
|
@ -37,8 +43,9 @@ static int remove_pending_job(TPool* tpool, TPoolJob* job);
|
|||
static void cleanup_job(TPool* tpool, TPoolJob* job);
|
||||
static int is_active_job(TPool* tpool, TPoolJob* job);
|
||||
|
||||
#define LOCK(t) erl_drv_mutex_lock(tpool->lock)
|
||||
#define UNLOCK(t) erl_drv_mutex_unlock(tpool->lock)
|
||||
#define LOCK(tpool) erl_drv_mutex_lock(tpool->lock)
|
||||
#define UNLOCK(tpool) erl_drv_mutex_unlock(tpool->lock)
|
||||
|
||||
|
||||
TPool* bdberl_tpool_start(unsigned int thread_count)
|
||||
{
|
||||
|
@ -56,10 +63,13 @@ TPool* bdberl_tpool_start(unsigned int thread_count)
|
|||
int i;
|
||||
for (i = 0; i < thread_count; i++)
|
||||
{
|
||||
int rc = erl_drv_thread_create("bdberl_tpool_thread", &(tpool->threads[i]), &bdberl_tpool_main, (void*)tpool, 0);
|
||||
if (0 != rc) {
|
||||
// TODO: Figure out good way to deal with errors in this situation (should be rare, but still...)
|
||||
erl_drv_thread_create("bdberl_tpool_thread", &(tpool->threads[i]), &bdberl_tpool_main, (void*)tpool, 0);
|
||||
fprintf(stderr, "Failed to spawn an erlang thread for the BDB thread pools! %s\n", erl_errno_id(rc));
|
||||
fflush(stderr);
|
||||
}
|
||||
}
|
||||
|
||||
return tpool;
|
||||
}
|
||||
|
||||
|
@ -356,3 +366,38 @@ void bdberl_tpool_job_count(TPool* tpool, unsigned int *pending_count_ptr,
|
|||
*active_count_ptr = tpool->active_job_count;
|
||||
UNLOCK(tpool);
|
||||
}
|
||||
|
||||
// Returns a unique identifier pair for the current thread of control
|
||||
void bdberl_tpool_thread_id(DB_ENV *env, pid_t *pid, db_threadid_t *tid)
|
||||
{
|
||||
if (pid)
|
||||
*pid = getpid();
|
||||
if (tid)
|
||||
*tid = (db_threadid_t)pthread_self();
|
||||
}
|
||||
|
||||
char *bdberl_tpool_thread_id_string(DB_ENV *dbenv, pid_t pid, db_threadid_t tid, char *buf)
|
||||
{
|
||||
snprintf(buf, DB_THREADID_STRLEN, "%d/%p", (unsigned int)pid, (void *)tid);
|
||||
return buf;
|
||||
}
|
||||
|
||||
// Returns non-zero if the thread of control, identified by the pid and tid arguments,
|
||||
// is still running.
|
||||
// If DB_MUTEX_PROCESS_ONLY is set in flags then return only if the process (pid) is
|
||||
// alive, ignore the thread ID.
|
||||
int bdberl_tpool_thread_is_alive(DB_ENV *dbenv, pid_t pid, db_threadid_t tid, u_int32_t flags)
|
||||
{
|
||||
int alive = 0;
|
||||
|
||||
if (kill(pid, 0) != ESRCH)
|
||||
{
|
||||
if (flags & DB_MUTEX_PROCESS_ONLY)
|
||||
alive = 1;
|
||||
else
|
||||
if (pthread_kill(tid, 0) != ESRCH)
|
||||
alive = 1;
|
||||
}
|
||||
DBG("bdberl_tpool_thread_is_alive(%08X, %08X, %d) = %d\n", pid, tid, flags, alive);
|
||||
return alive;
|
||||
}
|
||||
|
|
|
@ -89,4 +89,10 @@ void bdberl_tpool_cancel(TPool* tpool, TPoolJob* job);
|
|||
void bdberl_tpool_job_count(TPool* tpool, unsigned int *pending_count_ptr,
|
||||
unsigned int *active_count_ptr);
|
||||
|
||||
void bdberl_tpool_thread_id(DB_ENV *env, pid_t *pid, db_threadid_t *tid);
|
||||
|
||||
char *bdberl_tpool_thread_id_string(DB_ENV *dbenv, pid_t pid, db_threadid_t tid, char *buf);
|
||||
|
||||
int bdberl_tpool_thread_is_alive(DB_ENV *dbenv, pid_t pid, db_threadid_t tid, u_int32_t flags);
|
||||
|
||||
#endif
|
||||
|
|
|
@ -2,7 +2,8 @@
|
|||
|
||||
cd $1/build_unix && \
|
||||
../dist/configure --disable-shared --enable-static --with-pic \
|
||||
--disable-heap --disable-queue \
|
||||
--disable-partition --disable-replication \
|
||||
--enable-o_direct \
|
||||
--disable-heap --disable-queue --disable-replication \
|
||||
--enable-o_direct --enable-o_direct \
|
||||
--enable-debug --enable-diagnostics \
|
||||
--enable-dtrace \
|
||||
--prefix=$2
|
||||
|
|
|
@ -2,6 +2,6 @@
|
|||
[{description,"This is an Erlang port driver that allows Erlang programs to store data in BerkleyDB."},
|
||||
{vsn,"5.2.28"},
|
||||
{registered,[]},
|
||||
{applications,[kernel,stdlib]},
|
||||
{applications,[kernel,stdlib,lager]},
|
||||
{env,[]},
|
||||
{modules,[bdberl,bdberl_logger]}]}.
|
||||
|
|
|
@ -46,9 +46,6 @@
|
|||
-define(CMD_CURSOR_NEXT, 12).
|
||||
-define(CMD_CURSOR_PREV, 13).
|
||||
-define(CMD_CURSOR_CLOSE, 14).
|
||||
-define(CMD_CURSOR_GET, 35). %% TODO: renumber these 3 and match them to bdberl_drv.h
|
||||
-define(CMD_CURSOR_PUT, 36).
|
||||
-define(CMD_CURSOR_DEL, 37).
|
||||
-define(CMD_PUT_COMMIT, 15).
|
||||
-define(CMD_REMOVE_DB, 16).
|
||||
-define(CMD_TRUNCATE, 17).
|
||||
|
@ -69,6 +66,10 @@
|
|||
-define(CMD_DATA_DIRS_INFO, 32).
|
||||
-define(CMD_LOG_DIR_INFO, 33).
|
||||
-define(CMD_DRIVER_INFO, 34).
|
||||
-define(CMD_CURSOR_GET, 35).
|
||||
-define(CMD_CURSOR_PUT, 36).
|
||||
-define(CMD_CURSOR_DEL, 37).
|
||||
-define(CMD_CURSOR_COUNT, 38).
|
||||
|
||||
-define(DB_TYPE_BTREE, 1).
|
||||
-define(DB_TYPE_HASH, 2).
|
||||
|
@ -384,3 +385,28 @@
|
|||
-define(DB_UPDATE_SECONDARY, 29).
|
||||
-define(DB_SET_LTE, 30).
|
||||
-define(DB_GET_BOTH_LTE, 31).
|
||||
|
||||
%% DB Event notification types.
|
||||
-define(DB_EVENT_PANIC, 0).
|
||||
-define(DB_EVENT_REG_ALIVE, 1).
|
||||
-define(DB_EVENT_REG_PANIC, 2).
|
||||
-define(DB_EVENT_REP_CLIENT, 3).
|
||||
-define(DB_EVENT_REP_CONNECT_BROKEN, 4).
|
||||
-define(DB_EVENT_REP_CONNECT_ESTD, 5).
|
||||
-define(DB_EVENT_REP_CONNECT_TRY_FAILED, 6).
|
||||
-define(DB_EVENT_REP_DUPMASTER, 7).
|
||||
-define(DB_EVENT_REP_ELECTED, 8).
|
||||
-define(DB_EVENT_REP_ELECTION_FAILED, 9).
|
||||
-define(DB_EVENT_REP_INIT_DONE, 10).
|
||||
-define(DB_EVENT_REP_JOIN_FAILURE, 11).
|
||||
-define(DB_EVENT_REP_LOCAL_SITE_REMOVED, 12).
|
||||
-define(DB_EVENT_REP_MASTER, 13).
|
||||
-define(DB_EVENT_REP_MASTER_FAILURE, 14).
|
||||
-define(DB_EVENT_REP_NEWMASTER, 15).
|
||||
-define(DB_EVENT_REP_PERM_FAILED, 16).
|
||||
-define(DB_EVENT_REP_SITE_ADDED, 17).
|
||||
-define(DB_EVENT_REP_SITE_REMOVED, 18).
|
||||
-define(DB_EVENT_REP_STARTUPDONE, 19).
|
||||
-define(DB_EVENT_REP_WOULD_ROLLBACK, 20).
|
||||
-define(DB_EVENT_WRITE_FAILED, 21).
|
||||
-define(DB_EVENT_NO_SUCH_EVENT, 0xffffffff).
|
||||
|
|
|
@ -4,8 +4,14 @@
|
|||
%% ex: ft=erlang ts=4 sw=4 et
|
||||
%%
|
||||
|
||||
{require_otp_vsn, "R13B04|R14"}.
|
||||
{cover_enabled, true}.
|
||||
{erl_opts, [warnings_as_errors]}.
|
||||
|
||||
{deps, [
|
||||
{lager, ".*", {git, "git://github.com/basho/lager", {branch, "master"}}}
|
||||
]}.
|
||||
|
||||
{erl_opts, [warnings_as_errors, {parse_transform, lager_transform}]}.
|
||||
|
||||
{port_envs, [
|
||||
{"DRV_CFLAGS", "$DRV_CFLAGS -Werror -I c_src/system/include"},
|
||||
|
|
|
@ -5,7 +5,8 @@
|
|||
{registered, []},
|
||||
{applications, [
|
||||
kernel,
|
||||
stdlib
|
||||
stdlib,
|
||||
lager
|
||||
]},
|
||||
{env, []}
|
||||
]}.
|
115
src/bdberl.erl
115
src/bdberl.erl
|
@ -68,6 +68,7 @@
|
|||
delete_database/1,
|
||||
cursor_open/1, cursor_next/0, cursor_prev/0, cursor_current/0, cursor_close/0,
|
||||
cursor_get/0, cursor_get/1, cursor_get/2, %TODO: cursor_del/2, cursor_del/3, cursor_put/2, cursor_put/3,
|
||||
cursor_count/0,
|
||||
driver_info/0,
|
||||
register_logger/0,
|
||||
stop/0]).
|
||||
|
@ -104,7 +105,7 @@
|
|||
%% @spec open(Name, Type) -> {ok, Db} | {error, Error}
|
||||
%% where
|
||||
%% Name = string()
|
||||
%% Type = btree | hash
|
||||
%% Type = btree | hash | queue
|
||||
%% Db = integer()
|
||||
%%
|
||||
%% @equiv open(Name, Type, [create])
|
||||
|
@ -194,8 +195,8 @@ open(Name, Type, Opts) ->
|
|||
end,
|
||||
Flags = process_flags(lists:umerge(Opts, [auto_commit, threaded])),
|
||||
Cmd = <<Flags:32/native, TypeCode:8/signed-native, (list_to_binary(Name))/bytes, 0:8/native>>,
|
||||
<<Rc:32/signed-native>> = erlang:port_control(get_port(), ?CMD_OPEN_DB, Cmd),
|
||||
recv_val(Rc).
|
||||
<<Result:32/signed-native>> = erlang:port_control(get_port(), ?CMD_OPEN_DB, Cmd),
|
||||
recv_val(Result).
|
||||
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -256,8 +257,8 @@ close(Db) ->
|
|||
close(Db, Opts) ->
|
||||
Flags = process_flags(Opts),
|
||||
Cmd = <<Db:32/signed-native, Flags:32/native>>,
|
||||
<<Rc:32/signed-native>> = erlang:port_control(get_port(), ?CMD_CLOSE_DB, Cmd),
|
||||
recv_ok(Rc).
|
||||
<<Result:32/signed-native>> = erlang:port_control(get_port(), ?CMD_CLOSE_DB, Cmd),
|
||||
recv_ok(Result).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% @doc
|
||||
|
@ -272,7 +273,7 @@ close(Db, Opts) ->
|
|||
-spec txn_begin() -> ok | db_error().
|
||||
|
||||
txn_begin() ->
|
||||
txn_begin([txn_snapshot]).
|
||||
txn_begin([]).
|
||||
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -364,8 +365,8 @@ txn_begin() ->
|
|||
txn_begin(Opts) ->
|
||||
Flags = process_flags(Opts),
|
||||
Cmd = <<Flags:32/native>>,
|
||||
<<Rc:32/signed-native>> = erlang:port_control(get_port(), ?CMD_TXN_BEGIN, Cmd),
|
||||
recv_ok(Rc).
|
||||
<<Result:32/signed-native>> = erlang:port_control(get_port(), ?CMD_TXN_BEGIN, Cmd),
|
||||
recv_ok(Result).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% @doc
|
||||
|
@ -427,8 +428,8 @@ txn_commit() ->
|
|||
txn_commit(Opts) ->
|
||||
Flags = process_flags(Opts),
|
||||
Cmd = <<Flags:32/native>>,
|
||||
<<Rc:32/signed-native>> = erlang:port_control(get_port(), ?CMD_TXN_COMMIT, Cmd),
|
||||
recv_ok(Rc).
|
||||
<<Result:32/signed-native>> = erlang:port_control(get_port(), ?CMD_TXN_COMMIT, Cmd),
|
||||
recv_ok(Result).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% @doc
|
||||
|
@ -571,7 +572,7 @@ transaction(Fun, Retries, TimeLeft, Opts) ->
|
|||
ok ->
|
||||
try Fun() of
|
||||
abort ->
|
||||
error_logger:info_msg("function requested abort"),
|
||||
lager:info("function requested abort"),
|
||||
ok = txn_abort(),
|
||||
{error, transaction_aborted};
|
||||
|
||||
|
@ -595,7 +596,7 @@ transaction(Fun, Retries, TimeLeft, Opts) ->
|
|||
transaction(Fun, R, T, Opts);
|
||||
|
||||
_ : Reason ->
|
||||
error_logger:info_msg("function threw non-lock error - ~p", [Reason]),
|
||||
lager:info("function threw non-lock error - ~p", [{Reason, erlang:get_stacktrace()}]),
|
||||
ok = txn_abort(),
|
||||
{error, {transaction_failed, Reason}}
|
||||
end;
|
||||
|
@ -833,7 +834,7 @@ put_commit_r(Db, Key, Value, Opts) ->
|
|||
%% @end
|
||||
%%--------------------------------------------------------------------
|
||||
-spec get(Db :: db(), Key :: db_key()) ->
|
||||
not_found | {ok, db_ret_value()} | db_error().
|
||||
not_found | {ok, db_ret_value()} | {error, db_error()}.
|
||||
|
||||
get(Db, Key) ->
|
||||
get(Db, Key, []).
|
||||
|
@ -884,7 +885,7 @@ get(Db, Key) ->
|
|||
%% @end
|
||||
%%--------------------------------------------------------------------
|
||||
-spec get(Db :: db(), Key :: db_key(), Opts :: db_flags()) ->
|
||||
not_found | {ok, db_ret_value()} | db_error().
|
||||
not_found | {ok, db_ret_value()} | {error, db_error()}.
|
||||
|
||||
get(Db, Key, Opts) ->
|
||||
{KeyLen, KeyBin} = to_binary(Key),
|
||||
|
@ -900,7 +901,7 @@ get(Db, Key, Opts) ->
|
|||
Crc ->
|
||||
{ok, binary_to_term(Payload)};
|
||||
CrcOther ->
|
||||
error_logger:warning_msg("Invalid CRC: ~p ~p\n", [Crc, CrcOther]),
|
||||
lager:warning("Invalid CRC: ~p ~p\n", [Crc, CrcOther]),
|
||||
{error, invalid_crc}
|
||||
end;
|
||||
not_found -> not_found;
|
||||
|
@ -963,8 +964,6 @@ get_r(Db, Key, Opts) ->
|
|||
end.
|
||||
|
||||
|
||||
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% @doc
|
||||
%% Delete a value based on key.
|
||||
|
@ -977,7 +976,7 @@ get_r(Db, Key, Opts) ->
|
|||
%% This function will return `not_found' if the specified key is not in
|
||||
%% the database.
|
||||
%%
|
||||
%% @spec del(Db, Key) -> not_found | {ok, Value} | {error, Error}
|
||||
%% @spec del(Db, Key) -> ok | not_found | {error, Reason}
|
||||
%% where
|
||||
%% Db = integer()
|
||||
%% Key = term()
|
||||
|
@ -986,7 +985,7 @@ get_r(Db, Key, Opts) ->
|
|||
%% @end
|
||||
%%--------------------------------------------------------------------
|
||||
-spec del(Db :: db(), Key :: db_key()) ->
|
||||
not_found | ok | db_error().
|
||||
ok | not_found | {error, Reason :: db_error()}.
|
||||
|
||||
del(Db, Key) ->
|
||||
{KeyLen, KeyBin} = to_binary(Key),
|
||||
|
@ -995,7 +994,7 @@ del(Db, Key) ->
|
|||
case decode_rc(Result) of
|
||||
ok ->
|
||||
receive
|
||||
{ok, _, _} -> ok;
|
||||
ok -> ok;
|
||||
not_found -> not_found;
|
||||
{error, Reason} -> {error, Reason}
|
||||
end;
|
||||
|
@ -1173,8 +1172,8 @@ truncate() ->
|
|||
|
||||
truncate(Db) ->
|
||||
Cmd = <<Db:32/signed-native>>,
|
||||
<<Rc:32/signed-native>> = erlang:port_control(get_port(), ?CMD_TRUNCATE, Cmd),
|
||||
recv_ok(Rc).
|
||||
<<Result:32/signed-native>> = erlang:port_control(get_port(), ?CMD_TRUNCATE, Cmd),
|
||||
recv_ok(Result).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% @doc
|
||||
|
@ -1194,8 +1193,8 @@ truncate(Db) ->
|
|||
|
||||
cursor_open(Db) ->
|
||||
Cmd = <<Db:32/signed-native, 0:32/native>>,
|
||||
<<Rc:32/signed-native>> = erlang:port_control(get_port(), ?CMD_CURSOR_OPEN, Cmd),
|
||||
recv_ok(Rc).
|
||||
<<Result:32/signed-native>> = erlang:port_control(get_port(), ?CMD_CURSOR_OPEN, Cmd),
|
||||
recv_ok(Result).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% @doc
|
||||
|
@ -1373,7 +1372,12 @@ cursor_get(Key) ->
|
|||
not_found | {ok, db_key(), db_value()} | db_error().
|
||||
|
||||
cursor_get(Key, Opts) ->
|
||||
{KeyLen, KeyBin} = to_binary(Key),
|
||||
case Key of
|
||||
undefined ->
|
||||
{KeyLen, KeyBin} = {0, <<>>};
|
||||
_ ->
|
||||
{KeyLen, KeyBin} = to_binary(Key)
|
||||
end,
|
||||
Flags = process_flags(Opts),
|
||||
Cmd = <<Flags:32/native, KeyLen:32/native, KeyBin/bytes>>,
|
||||
<<Result:32/signed-native>> = erlang:port_control(get_port(), ?CMD_CURSOR_GET, Cmd),
|
||||
|
@ -1386,7 +1390,7 @@ cursor_get(Key, Opts) ->
|
|||
Crc ->
|
||||
{ok, binary_to_term(Payload)};
|
||||
CrcOther ->
|
||||
error_logger:warning_msg("Invalid CRC: ~p ~p\n", [Crc, CrcOther]),
|
||||
lager:warning("Invalid CRC: ~p ~p\n", [Crc, CrcOther]),
|
||||
{error, invalid_crc}
|
||||
end;
|
||||
not_found -> not_found;
|
||||
|
@ -1397,6 +1401,25 @@ cursor_get(Key, Opts) ->
|
|||
end.
|
||||
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% @doc
|
||||
%% Returns the count of duplicate records for the key to which the
|
||||
%% cursor currently refers.
|
||||
%%
|
||||
%% If this function fails for any reason, the state of the cursor will
|
||||
%% be unchanged.
|
||||
%%
|
||||
%% @spec cursor_count() -> {ok, Count} | {error, Error}
|
||||
%%
|
||||
%% @end
|
||||
%%--------------------------------------------------------------------
|
||||
-spec cursor_count() -> {ok, Count :: number()} | {error, db_error()}.
|
||||
|
||||
cursor_count() ->
|
||||
<<Result:32/signed-native>> = erlang:port_control(get_port(), ?CMD_CURSOR_COUNT, <<>>),
|
||||
recv_val(Result).
|
||||
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% @doc
|
||||
%% Closes the cursor.
|
||||
|
@ -1418,8 +1441,8 @@ cursor_get(Key, Opts) ->
|
|||
-spec cursor_close() -> ok | db_error().
|
||||
|
||||
cursor_close() ->
|
||||
<<Rc:32/signed-native>> = erlang:port_control(get_port(), ?CMD_CURSOR_CLOSE, <<>>),
|
||||
recv_ok(Rc).
|
||||
<<Result:32/signed-native>> = erlang:port_control(get_port(), ?CMD_CURSOR_CLOSE, <<>>),
|
||||
recv_ok(Result).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% @doc
|
||||
|
@ -1445,8 +1468,8 @@ cursor_close() ->
|
|||
|
||||
delete_database(Filename) ->
|
||||
Cmd = <<(list_to_binary(Filename))/binary, 0:8>>,
|
||||
<<Rc:32/signed-native>> = erlang:port_control(get_port(), ?CMD_REMOVE_DB, Cmd),
|
||||
recv_ok(Rc).
|
||||
<<Result:32/signed-native>> = erlang:port_control(get_port(), ?CMD_REMOVE_DB, Cmd),
|
||||
recv_ok(Result).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% @doc
|
||||
|
@ -1634,8 +1657,8 @@ get_txn_timeout() ->
|
|||
stat(Db, Opts) ->
|
||||
Flags = process_flags(Opts),
|
||||
Cmd = <<Db:32/signed-native, Flags:32/native>>,
|
||||
<<Rc:32/signed-native>> = erlang:port_control(get_port(), ?CMD_DB_STAT, Cmd),
|
||||
recv_val(Rc).
|
||||
<<Result:32/signed-native>> = erlang:port_control(get_port(), ?CMD_DB_STAT, Cmd),
|
||||
recv_val(Result).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% @doc
|
||||
|
@ -1739,8 +1762,8 @@ stat_print(Db) ->
|
|||
lock_stat(Opts) ->
|
||||
Flags = process_flags(Opts),
|
||||
Cmd = <<Flags:32/native>>,
|
||||
<<Rc:32/signed-native>> = erlang:port_control(get_port(), ?CMD_LOCK_STAT, Cmd),
|
||||
recv_val(Rc).
|
||||
<<Result:32/signed-native>> = erlang:port_control(get_port(), ?CMD_LOCK_STAT, Cmd),
|
||||
recv_val(Result).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% @doc
|
||||
|
@ -1835,8 +1858,8 @@ lock_stat_print() ->
|
|||
log_stat(Opts) ->
|
||||
Flags = process_flags(Opts),
|
||||
Cmd = <<Flags:32/native>>,
|
||||
<<Rc:32/signed-native>> = erlang:port_control(get_port(), ?CMD_LOG_STAT, Cmd),
|
||||
recv_val(Rc).
|
||||
<<Result:32/signed-native>> = erlang:port_control(get_port(), ?CMD_LOG_STAT, Cmd),
|
||||
recv_val(Result).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% @doc
|
||||
|
@ -2016,8 +2039,8 @@ memp_stat_print() ->
|
|||
mutex_stat(Opts) ->
|
||||
Flags = process_flags(Opts),
|
||||
Cmd = <<Flags:32/native>>,
|
||||
<<Rc:32/signed-native>> = erlang:port_control(get_port(), ?CMD_MUTEX_STAT, Cmd),
|
||||
recv_val(Rc).
|
||||
<<Result:32/signed-native>> = erlang:port_control(get_port(), ?CMD_MUTEX_STAT, Cmd),
|
||||
recv_val(Result).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% @doc
|
||||
|
@ -2232,8 +2255,9 @@ env_stat_print() ->
|
|||
{ok, [{atom(), number()}]} | db_error().
|
||||
|
||||
driver_info() ->
|
||||
<<Rc:32/signed-native>> = erlang:port_control(get_port(), ?CMD_DRIVER_INFO, <<>>),
|
||||
recv_val(Rc).
|
||||
<<Result:32/signed-native>> = erlang:port_control(get_port(), ?CMD_DRIVER_INFO, <<>>),
|
||||
recv_val(Result).
|
||||
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% @doc
|
||||
|
@ -2251,6 +2275,7 @@ register_logger() ->
|
|||
[] = erlang:port_control(get_port(), ?CMD_REGISTER_LOGGER, <<>>),
|
||||
ok.
|
||||
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% @doc
|
||||
%% Stop bdberl - stops the bdberl_logger process so that when
|
||||
|
@ -2422,15 +2447,15 @@ do_put(Action, Db, Key, Value, Opts) ->
|
|||
Flags = process_flags(Opts),
|
||||
Cmd = <<Db:32/signed-native, Flags:32/native, KeyLen:32/native, KeyBin/bytes,
|
||||
FinalValBinLen:32/native, FinalValBin/bytes>>,
|
||||
<<Rc:32/signed-native>> = erlang:port_control(get_port(), Action, Cmd),
|
||||
recv_ok(Rc).
|
||||
<<Result:32/signed-native>> = erlang:port_control(get_port(), Action, Cmd),
|
||||
recv_ok(Result).
|
||||
|
||||
%%
|
||||
%% Move the cursor in a given direction. Invoked by cursor_next/prev/current.
|
||||
%%
|
||||
do_cursor_move(Direction) ->
|
||||
<<Rc:32/signed-native>> = erlang:port_control(get_port(), Direction, <<>>),
|
||||
case decode_rc(Rc) of
|
||||
<<Result:32/signed-native>> = erlang:port_control(get_port(), Direction, <<>>),
|
||||
case decode_rc(Result) of
|
||||
ok ->
|
||||
receive
|
||||
{ok, KeyBin, ValueBin} ->
|
||||
|
@ -2439,7 +2464,7 @@ do_cursor_move(Direction) ->
|
|||
Crc ->
|
||||
{ok, binary_to_term(KeyBin), binary_to_term(Payload)};
|
||||
CrcOther ->
|
||||
error_logger:warning_msg("Invalid CRC on cursor: ~p ~p\n", [Crc, CrcOther]),
|
||||
lager:warning("Invalid CRC on cursor: ~p ~p\n", [Crc, CrcOther]),
|
||||
{error, invalid_crc}
|
||||
end;
|
||||
not_found ->
|
||||
|
|
|
@ -67,7 +67,7 @@ init([]) ->
|
|||
true ->
|
||||
load_mibs(['BDBERL-MIB']);
|
||||
false ->
|
||||
error_logger:warning_msg("SNMP is not running; bdberl stats will not be published.\n")
|
||||
lager:warning("SNMP is not running; bdberl stats will not be published.\n")
|
||||
end,
|
||||
|
||||
{ok, #state{}}.
|
||||
|
@ -79,11 +79,11 @@ handle_cast(_Msg, State) ->
|
|||
{stop, unsupportedOperation, State}.
|
||||
|
||||
handle_info({bdb_error_log, Msg}, State) ->
|
||||
error_logger:error_msg("BDB Error: ~s\n", [Msg]),
|
||||
lager:error("BDB: ~s\n", [Msg]),
|
||||
{noreply, State};
|
||||
|
||||
handle_info({bdb_info_log, Msg}, State) ->
|
||||
error_logger:info_msg("BDB Info: ~s\n", [Msg]),
|
||||
lager:info("BDB: ~s\n", [Msg]),
|
||||
{noreply, State};
|
||||
|
||||
handle_info({bdb_checkpoint_stats, CheckpointSecs, ArchiveSecs, 0, 0}, State) ->
|
||||
|
@ -98,7 +98,7 @@ handle_info({bdb_checkpoint_stats, CheckpointSecs, ArchiveSecs, 0, 0}, State) ->
|
|||
{noreply, State};
|
||||
|
||||
handle_info({bdb_checkpoint_stats, _CheckpointSecs, _ArchiveSecs, CheckpointRc, ArchiveRc}, State) ->
|
||||
error_logger:error_msg("BDB Checkpoint error: ~w ~w\n", [CheckpointRc, ArchiveRc]),
|
||||
lager:error("BDB Checkpoint: ~w ~w\n", [CheckpointRc, ArchiveRc]),
|
||||
{noreply, State};
|
||||
|
||||
handle_info({bdb_trickle_stats, ElapsedSecs, Pages, 0}, State) ->
|
||||
|
@ -112,11 +112,11 @@ handle_info({bdb_trickle_stats, ElapsedSecs, Pages, 0}, State) ->
|
|||
end,
|
||||
{noreply, State};
|
||||
handle_info({bdb_trickle_stats, _ElapsedSecs, _Pages, Rc}, State) ->
|
||||
error_logger:error_msg("BDB Trickle Write error: ~w\n", [Rc]),
|
||||
lager:error("BDB Trickle Write: ~w\n", [Rc]),
|
||||
{noreply, State};
|
||||
|
||||
handle_info(Msg, State) ->
|
||||
io:format("Unexpected message: ~p\n", [Msg]),
|
||||
lager:info("Unexpected message: ~p\n", [Msg]),
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, _State) ->
|
||||
|
|
|
@ -45,7 +45,8 @@ all() ->
|
|||
get_should_return_a_value_when_getting_a_valid_record,
|
||||
put_should_succeed_with_manual_transaction,
|
||||
put_should_rollback_with_failed_manual_transaction,
|
||||
% del_should_remove_a_value, %TODO: why is this disabled
|
||||
del_should_remove_a_value,
|
||||
aborted_del_should_not_remove_a_value,
|
||||
transaction_should_commit_on_success,
|
||||
transaction_should_abort_on_exception,
|
||||
transaction_should_abort_on_user_abort,
|
||||
|
@ -54,6 +55,7 @@ all() ->
|
|||
update_should_accept_args_for_fun,
|
||||
port_should_return_transaction_timeouts,
|
||||
cursor_should_iterate, cursor_get_should_pos, cursor_should_fail_if_not_open,
|
||||
cursor_should_return_count,
|
||||
put_commit_should_end_txn,
|
||||
data_dir_should_be_priv_dir,
|
||||
delete_should_remove_file,
|
||||
|
@ -141,6 +143,16 @@ del_should_remove_a_value(Config) ->
|
|||
ok = bdberl:del(Db, mykey),
|
||||
not_found = bdberl:get(Db, mykey).
|
||||
|
||||
aborted_del_should_not_remove_a_value(Config) ->
|
||||
Db = ?config(db, Config),
|
||||
ok = bdberl:put(Db, mykey, avalue),
|
||||
{ok, avalue} = bdberl:get(Db, mykey),
|
||||
ok = bdberl:txn_begin(),
|
||||
ok = bdberl:del(Db, mykey),
|
||||
not_found = bdberl:get(Db, mykey),
|
||||
ok = bdberl:txn_abort(),
|
||||
{ok, avalue} = bdberl:get(Db, mykey).
|
||||
|
||||
transaction_should_commit_on_success(Config) ->
|
||||
Db = ?config(db, Config),
|
||||
F = fun() -> bdberl:put(Db, mykey, avalue) end,
|
||||
|
@ -259,6 +271,20 @@ cursor_get_should_pos(Config) ->
|
|||
|
||||
ok = bdberl:cursor_close().
|
||||
|
||||
cursor_should_return_count(Config) ->
|
||||
Db = ?config(db, Config),
|
||||
|
||||
%% Store some sample values in the db
|
||||
ok = bdberl:put(Db, key, value1),
|
||||
% ok = bdberl:put(Db, key, value2),
|
||||
% ok = bdberl:put(Db, key, value3),
|
||||
% ok = bdberl:put(Db, key, value4),
|
||||
ok = bdberl:cursor_open(Db),
|
||||
{ok, _} = bdberl:cursor_get(key),
|
||||
%% Validate the count of duplicate values
|
||||
{ok, 1} = bdberl:cursor_count(),
|
||||
ok = bdberl:cursor_close().
|
||||
|
||||
cursor_should_fail_if_not_open(_Config) ->
|
||||
{error, no_cursor} = bdberl:cursor_next(),
|
||||
{error, no_cursor} = bdberl:cursor_prev(),
|
||||
|
@ -413,5 +439,3 @@ start_after_stop_should_be_safe(_Config) ->
|
|||
end,
|
||||
true = lists:keymember(bdberl_logger, 1, supervisor:which_children(kernel_safe_sup)),
|
||||
ok.
|
||||
|
||||
|
||||
|
|
Loading…
Reference in a new issue