Compare commits

..

32 commits

Author SHA1 Message Date
Gregory Burd
d0c587bdec Various fixes. 2011-12-05 12:16:25 -05:00
Gregory Burd
1f7ca0c189 Use proper string formats 2011-12-04 15:16:54 -05:00
Gregory Burd
3d7a6e41c6 Use a more portable method to determine if PID is alive or not. 2011-12-04 15:04:55 -05:00
Gregory Burd
92c906f105 Fix macros so that they use their arguments properly. 2011-12-04 14:47:32 -05:00
Gregory Burd
91d10a7d51 Error messages 2011-12-04 14:46:13 -05:00
Gregory Burd
d7659ea8c1 Add BDB failchk support which requires thread_id and is_alive (implement thread_id_string while we're at it). 2011-12-04 14:45:30 -05:00
Gregory Burd
30b89d7808 Change scope of debugging macros. 2011-12-04 14:42:30 -05:00
Gregory Burd
b649dc7d3b Whitespace and formatting only. 2011-12-04 14:40:54 -05:00
Gregory Burd
12e3012a05 Transmit BDB event notifications into Erlang port driver. 2011-12-03 10:23:40 -05:00
Gregory Burd
9021359752 Merge remote-tracking branch 'origin/master' 2011-12-03 09:51:44 -05:00
Gregory Burd
fed8c726e2 Ensure all databases are opened with auto commit enabled. 2011-12-03 09:45:43 -05:00
Gregory Burd
eed6dfdf3a A cursor get() of an undefined Key will position the cursor. 2011-12-03 09:44:59 -05:00
Gregory Burd
5e0c4ff384 Consistently use 'Result' rather than a mix of 'Rc' and 'Result' 2011-12-01 16:24:53 -05:00
Gregory Burd
b70d9b8afa Add support for cursor->count() 2011-12-01 14:38:22 -05:00
Gregory Burd
11c864d949 Whitespace and log message formatting fixes. 2011-12-01 14:34:16 -05:00
Gregory Burd
63eb48ec24 Delete just sends back ok for success 2011-12-01 08:13:45 -05:00
Gregory Burd
ea7c83b32c Enable the two delete test cases. Currently the tests will hang as they execute these tests. 2011-11-30 15:01:40 -05:00
Gregory Burd
dfa71be3c5 Merge branches 'lager-integration' and 'master' 2011-11-30 14:58:34 -05:00
Gregory Burd
72cf45ff3f Use lager for all log messages 2011-11-30 14:44:39 -05:00
Gregory Burd
7fc539984f Merge branch 'master', remote-tracking branch 'origin' 2011-11-29 13:42:21 -05:00
Gregory Burd
542f70443a Add me to the list. 2011-11-29 13:42:06 -05:00
Gregory Burd
21017f2d95 Merge branch 'master', remote-tracking branch 'origin' 2011-11-28 18:44:21 -05:00
Gregory Burd
ad64781df6 Auto-download of db package now works properly. 2011-11-28 18:39:53 -05:00
Gregory Burd
daa79ac912 Merge branch 'master' of git://github.com/jonmeredith/bdberl 2011-11-28 16:44:33 -05:00
Gregory Burd
05b81cdc3b Test bdb:del within a transaction, then abort. 2011-11-28 16:22:08 -05:00
Gregory Burd
7ff11a42a8 Consistently use driver_(alloc,realloc,free). 2011-11-28 16:02:00 -05:00
Gregory Burd
1d28a6f5f5 Add thrash test. 2011-11-28 15:59:57 -05:00
Gregory Burd
00f5549644 Cleanup 2011-11-28 14:08:27 -05:00
Gregory Burd
564b87a78a Whitespace 2011-11-28 13:56:28 -05:00
Gregory Burd
f32973b044 Grab a reduced tar-ball from GitHub gburd/libdb 2011-11-28 13:41:16 -05:00
Jon Meredith
cfd8bc9bb1 The value to binary line in do_put was accidentally duplicated making extra work on a put. 2009-06-29 10:12:42 -06:00
Jon Meredith
9621cb0043 Moved check_all_databases_closed inside check for G_DB_ENV != NULL, otherwise
fails on LOCK_DATABASE when G_DATABASES_MUTEX is uninitialized.
2009-06-29 10:11:41 -06:00
17 changed files with 760 additions and 489 deletions

2
.gitignore vendored
View file

@ -5,8 +5,10 @@
*.so *.so
*.beam *.beam
*.orig *.orig
deps
logs logs
doc doc
include/BDBERL-MIB.hrl
test/test.cover test/test.cover
int_test/test.cover int_test/test.cover
.eunit .eunit

View file

@ -1,25 +1,37 @@
ERL ?=erl ERL ?=erl
CT_RUN ?=ct_run
ERL_FLAGS ?=+A10 ERL_FLAGS ?=+A10
REBAR_FLAGS := REBAR_FLAGS :=
all: $(BDB_LOCAL_LIB) all: deps compile
deps:
$(REBAR) get-deps
compile: $(BDB_LOCAL_LIB)
ERL_FLAGS=$(ERL_FLAGS) $(REBAR) $(REBAR_FLAGS) compile ERL_FLAGS=$(ERL_FLAGS) $(REBAR) $(REBAR_FLAGS) compile
test: tests test: tests
tests: tests:
@ $(REBAR) $(REBAR_FLAGS) eunit ct @ $(REBAR) $(REBAR_FLAGS) eunit app=bdberl
@ $(REBAR) $(REBAR_FLAGS) ct app=bdberl
thrash-test:
@ $(CT_RUN) -pa test/ -suite thrash_SUITE
clean: clean:
$(REBAR) $(REBAR_FLAGS) clean $(REBAR) $(REBAR_FLAGS) clean
-rm test/*.beam -rm test/*.beam
distclean: clean distclean: clean
-rm -rf $(BDB_LOCAL_DIST) $(REBAR) delete-deps
-make -C c_src clean
-rm -rf c_src/sources -rm -rf c_src/sources
-rm -rf priv -rm -rf priv
-rm -rf logs -rm -rf logs
include rebar.mk include rebar.mk
.EXPORT_ALL_VARIABLES:

1
README
View file

@ -5,3 +5,4 @@ Authors:
Phil Toland <phil.toland@gmail.com> Phil Toland <phil.toland@gmail.com>
Jon Meredith <jon@jonmeredith.com> Jon Meredith <jon@jonmeredith.com>
Sergey Yelin <elinsn@gmail.com> Sergey Yelin <elinsn@gmail.com>
Greg Burd <greg@basho.com> <gregburd@me.com>

View file

@ -1,13 +1,15 @@
# This Makefile builds the dependency (libdb) needed by bdberl_drv.so # This Makefile builds the dependency (libdb) needed by bdberl_drv.so
ERL ?=erl ERL ?= erl
ERL_FLAGS ?=+A10 ERL_FLAGS ?= +A10
TAR ?= tar TAR ?= tar
GUNZIP ?= gunzip GUNZIP ?= gunzip
CURL ?= curl
BDB_VER := 5.2.36 BDB_VER := 5.2.36
BDB_DIR := $(CURDIR)/db-$(BDB_VER) BDB_DIR := $(CURDIR)/db-$(BDB_VER)
BDB_DIST := db-$(BDB_VER).tar.gz BDB_DIST := db-$(BDB_VER)
BDB_DIST_URL := http://download.oracle.com/berkeley-db/$(BDB_DIST) #BDB_DIST_URL := http://download.oracle.com/berkeley-db/$(BDB_DIST).tar.gz
BDB_DIST_URL := https://github.com/downloads/gburd/libdb/$(BDB_DIST).tar.gz
SYSTEM_DIR := $(CURDIR)/system SYSTEM_DIR := $(CURDIR)/system
LIB_DIR := $(SYSTEM_DIR)/lib LIB_DIR := $(SYSTEM_DIR)/lib
@ -17,7 +19,6 @@ INC_DIR := $(SYSTEM_DIR)/include
db: $(LIB_DIR)/libdb.a db: $(LIB_DIR)/libdb.a
$(LIB_DIR)/libdb.a: $(BDB_DIST) $(LIB_DIR)/libdb.a: $(BDB_DIST)
$(GUNZIP) -c db-$(BDB_VER).tar.gz | $(TAR) xf -
@for I in patches/*.patch; do \ @for I in patches/*.patch; do \
(patch -p0 < $${I} || echo "Skipping patch"); \ (patch -p0 < $${I} || echo "Skipping patch"); \
done done
@ -28,6 +29,6 @@ clean:
@rm -rf ./*.o $(SYSTEM_DIR) $(BDB_DIR) @rm -rf ./*.o $(SYSTEM_DIR) $(BDB_DIR)
$(BDB_DIST): $(BDB_DIST):
$(REBAR_FETCH) $(BDB_DIST_URL) $(CURL) -L $(BDB_DIST_URL) | $(GUNZIP) | $(TAR) xf -
.EXPORT_ALL_VARIABLES: .EXPORT_ALL_VARIABLES:

View file

@ -113,6 +113,7 @@ static void do_async_txnop(void* arg);
static void do_async_cursor_put(void* arg); static void do_async_cursor_put(void* arg);
static void do_async_cursor_get(void* arg); static void do_async_cursor_get(void* arg);
static void do_async_cursor_del(void* arg); static void do_async_cursor_del(void* arg);
static void do_async_cursor_count(void* arg);
static void do_async_cursor_cnp(void* arg); static void do_async_cursor_cnp(void* arg);
static void do_async_truncate(void* arg); static void do_async_truncate(void* arg);
static void do_sync_data_dirs_info(PortData *p); static void do_sync_data_dirs_info(PortData *p);
@ -129,13 +130,14 @@ static int del_portref(int dbref, ErlDrvPort port);
static int alloc_dbref(); static int alloc_dbref();
static void abort_txn(PortData* d); static void abort_txn(PortData* d);
static void* zalloc(unsigned int size); static void* driver_calloc(unsigned int size);
static void* deadlock_check(void* arg); static void* deadlock_check(void* arg);
static void* checkpointer(void* arg); static void* checkpointer(void* arg);
static void bdb_errcall(const DB_ENV* dbenv, const char* errpfx, const char* msg); static void bdb_errcall(const DB_ENV* dbenv, const char* errpfx, const char* msg);
static void bdb_msgcall(const DB_ENV* dbenv, const char* msg); static void bdb_msgcall(const DB_ENV* dbenv, const char* msg);
static void bdb_eventcall(DB_ENV* dbenv, u_int32_t type, void* msg);
static void send_log_message(ErlDrvTermData* msg, int elements); static void send_log_message(ErlDrvTermData* msg, int elements);
/** /**
@ -198,7 +200,7 @@ static unsigned int G_CHECKPOINT_ACTIVE = 1;
static unsigned int G_CHECKPOINT_INTERVAL = 60; /* Seconds between checkpoints */ static unsigned int G_CHECKPOINT_INTERVAL = 60; /* Seconds between checkpoints */
/** /**
* Pipe to used to wake up the various monitors. Instead of just sleeping * Pipe is used to wake up the various monitors. Instead of just sleeping
* they wait for an exceptional condition on the read fd of the pipe. When it is time to * they wait for an exceptional condition on the read fd of the pipe. When it is time to
* shutdown, the driver closes the write fd and waits for the threads to be joined. * shutdown, the driver closes the write fd and waits for the threads to be joined.
*/ */
@ -227,36 +229,20 @@ static TPool* G_TPOOL_GENERAL = NULL;
static TPool* G_TPOOL_TXNS = NULL; static TPool* G_TPOOL_TXNS = NULL;
/**
* Helpful macros
*/
#ifdef DEBUG
# define DBG(...) fprintf(stderr, __VA_ARGS__)
# define DBGCMD(P, ...) bdberl_dbgcmd(P, __VA_ARGS__)
# define DBGCMDRC(P, ...) bdberl_dbgcmdrc(P, __VA_ARGS__)
static void bdberl_dbgcmd(PortData *d, const char *fmt, ...);
static void bdberl_dbgcmdrc(PortData *d, int rc);
#else
# define DBG(arg1,...)
# define DBGCMD(d, fmt, ...)
# define DBGCMDRC(d, rc) { while (0) { rc++; } } // otherwise get unused variable error
#endif
#define LOCK_DATABASES(P) \ #define LOCK_DATABASES(P) \
do \ do \
{ \ { \
DBG("threadid %p port %p: locking G_DATABASES\r\n", erl_drv_thread_self(), P); \ DBG("threadid %p port %p: locking G_DATABASES\n", erl_drv_thread_self(), P); \
erl_drv_mutex_lock(G_DATABASES_MUTEX); \ erl_drv_mutex_lock(G_DATABASES_MUTEX); \
DBG("threadid %p port %p: locked G_DATABASES\r\n", erl_drv_thread_self(), P); \ DBG("threadid %p port %p: locked G_DATABASES\n", erl_drv_thread_self(), P); \
} while(0) } while(0)
#define UNLOCK_DATABASES(P) \ #define UNLOCK_DATABASES(P) \
do \ do \
{ \ { \
DBG("threadid %p port %p: unlocking G_DATABASES\r\n", erl_drv_thread_self(), P); \ DBG("threadid %p port %p: unlocking G_DATABASES\n", erl_drv_thread_self(), P); \
erl_drv_mutex_unlock(G_DATABASES_MUTEX); \ erl_drv_mutex_unlock(G_DATABASES_MUTEX); \
DBG("threadid %p port %p: unlocked G_DATABASES\r\n", erl_drv_thread_self(), P); \ DBG("threadid %p port %p: unlocked G_DATABASES\n", erl_drv_thread_self(), P); \
} while (0) } while (0)
@ -268,7 +254,7 @@ static void bdberl_dbgcmdrc(PortData *d, int rc);
DRIVER_INIT(bdberl_drv) DRIVER_INIT(bdberl_drv)
{ {
DBG("DRIVER INIT\r\n"); DBG("DRIVER INIT");
// Setup flags we'll use to init the environment // Setup flags we'll use to init the environment
int flags = int flags =
DB_INIT_LOCK | /* Enable support for locking */ DB_INIT_LOCK | /* Enable support for locking */
@ -277,6 +263,9 @@ DRIVER_INIT(bdberl_drv)
DB_RECOVER | /* Enable support for recovering from failures */ DB_RECOVER | /* Enable support for recovering from failures */
DB_CREATE | /* Create files as necessary */ DB_CREATE | /* Create files as necessary */
DB_REGISTER | /* Run recovery if needed */ DB_REGISTER | /* Run recovery if needed */
DB_FAILCHK | /* Release any database reads locks held by the
thread of control that exited and, if needed,
abort unresolved transaction. */
DB_USE_ENVIRON | /* Use DB_HOME environment variable */ DB_USE_ENVIRON | /* Use DB_HOME environment variable */
DB_THREAD; /* Make the environment free-threaded */ DB_THREAD; /* Make the environment free-threaded */
@ -292,22 +281,46 @@ DRIVER_INIT(bdberl_drv)
// specify where the working directory is // specify where the working directory is
DBG("db_env_create(%p, 0)", &G_DB_ENV); DBG("db_env_create(%p, 0)", &G_DB_ENV);
G_DB_ENV_ERROR = db_env_create(&G_DB_ENV, 0); G_DB_ENV_ERROR = db_env_create(&G_DB_ENV, 0);
DBG(" = %d\r\n", G_DB_ENV_ERROR); DBG(" = %d\n", G_DB_ENV_ERROR);
if (G_DB_ENV_ERROR != 0) if (G_DB_ENV_ERROR != 0)
{ {
G_DB_ENV = 0; G_DB_ENV = 0;
} }
else else
{ {
// DB should use the safe allocation routines provided by the Erlang VM
DBG("G_DB_ENV->set_alloc(%p, ...)", &G_DB_ENV);
G_DB_ENV_ERROR = G_DB_ENV->set_alloc(G_DB_ENV, driver_alloc, driver_realloc, driver_free);
DBG(" = %d\n", G_DB_ENV_ERROR);
// Inform DB of the number of threads that will be operating on the DB Environment
unsigned int nthreads = G_NUM_GENERAL_THREADS + G_NUM_TXN_THREADS;
DBG("G_DB_ENV->set_thread_count(%p, %d, ...)", &G_DB_ENV, nthreads);
G_DB_ENV_ERROR = G_DB_ENV->set_thread_count(G_DB_ENV, nthreads);
DBG(" = %d\n", G_DB_ENV_ERROR);
DBG("G_DB_ENV->set_thread_id(%p, ...)", &G_DB_ENV);
G_DB_ENV_ERROR = G_DB_ENV->set_thread_id(G_DB_ENV, &bdberl_tpool_thread_id);
DBG(" = %d\n", G_DB_ENV_ERROR);
DBG("G_DB_ENV->set_thread_id_string(%p, ...)", &G_DB_ENV);
G_DB_ENV_ERROR = G_DB_ENV->set_thread_id_string(G_DB_ENV, &bdberl_tpool_thread_id_string);
DBG(" = %d\n", G_DB_ENV_ERROR);
DBG("G_DB_ENV->set_is_alive(%p, ...)", &G_DB_ENV);
G_DB_ENV_ERROR = G_DB_ENV->set_isalive(G_DB_ENV, &bdberl_tpool_thread_is_alive);
DBG(" = %d\n", G_DB_ENV_ERROR);
// Open the DB Environment
DBG("G_DB_ENV->open(%p, 0, %08X, 0)", &G_DB_ENV, flags); DBG("G_DB_ENV->open(%p, 0, %08X, 0)", &G_DB_ENV, flags);
G_DB_ENV_ERROR = G_DB_ENV->open(G_DB_ENV, 0, flags, 0); G_DB_ENV_ERROR = G_DB_ENV->open(G_DB_ENV, 0, flags, 0);
DBG(" = %d\r\n", G_DB_ENV_ERROR); DBG(" = %d\n", G_DB_ENV_ERROR);
if (G_DB_ENV_ERROR != 0) if (G_DB_ENV_ERROR != 0)
{ {
// Something bad happened while initializing BDB; in this situation we // Something bad happened while initializing BDB; in this situation we
// cleanup and set the environment to zero. Attempts to open ports will // cleanup and set the environment to zero. Attempts to open ports will
// fail and the user will have to sort out how to resolve the issue. // fail and the user will have to sort out how to resolve the issue.
DBG("G_DB_ENV->close(%p, 0);\r\n", &G_DB_ENV); DBG("G_DB_ENV->close(%p, 0);", &G_DB_ENV);
G_DB_ENV->close(G_DB_ENV, 0); G_DB_ENV->close(G_DB_ENV, 0);
G_DB_ENV = 0; G_DB_ENV = 0;
} }
@ -341,7 +354,7 @@ DRIVER_INIT(bdberl_drv)
} }
else else
{ {
fprintf(stderr, "Ignoring \"BDBERL_PAGE_SIZE\" value %u - not power of 2\r\n", fprintf(stderr, "Ignoring \"BDBERL_PAGE_SIZE\" value %u - not power of 2",
page_size); page_size);
} }
} }
@ -385,7 +398,7 @@ DRIVER_INIT(bdberl_drv)
} }
else else
{ {
DBG("DRIVER INIT FAILED - %s\r\n", db_strerror(G_DB_ENV_ERROR)); DBG("DRIVER INIT FAILED - %s\n", db_strerror(G_DB_ENV_ERROR));
} }
return &bdberl_drv_entry; return &bdberl_drv_entry;
@ -393,8 +406,7 @@ DRIVER_INIT(bdberl_drv)
static ErlDrvData bdberl_drv_start(ErlDrvPort port, char* buffer) static ErlDrvData bdberl_drv_start(ErlDrvPort port, char* buffer)
{ {
DBG("threadid %p port %p: BDB DRIVER STARTING\r\n", DBG("threadid %p port %p: BDB DRIVER STARTING\n", erl_drv_thread_self(), port);
erl_drv_thread_self(), port);
// Make sure we have a functional environment -- if we don't, // Make sure we have a functional environment -- if we don't,
// bail... // bail...
@ -422,7 +434,7 @@ static ErlDrvData bdberl_drv_start(ErlDrvPort port, char* buffer)
// Make sure port is running in binary mode // Make sure port is running in binary mode
set_port_control_flags(port, PORT_CONTROL_FLAG_BINARY); set_port_control_flags(port, PORT_CONTROL_FLAG_BINARY);
DBGCMD(d, "BDB DRIVER STARTED"); DBGCMD(d, "BDB DRIVER STARTED\n");
return (ErlDrvData)d; return (ErlDrvData)d;
} }
@ -431,7 +443,7 @@ static void bdberl_drv_stop(ErlDrvData handle)
{ {
PortData* d = (PortData*)handle; PortData* d = (PortData*)handle;
DBG("Stopping port %p\r\n", d->port); DBG("Stopping port %p\n", d->port);
// Grab the port lock, in case we have an async job running // Grab the port lock, in case we have an async job running
erl_drv_mutex_lock(d->port_lock); erl_drv_mutex_lock(d->port_lock);
@ -440,13 +452,13 @@ static void bdberl_drv_stop(ErlDrvData handle)
// block until the job has either been removed or has run // block until the job has either been removed or has run
if (d->async_job) if (d->async_job)
{ {
DBGCMD(d, "Stopping port %p - cancelling async job %p\r\n", d->port, d->async_job); DBGCMD(d, "Stopping port %p - cancelling async job %p\n", d->port, d->async_job);
// Drop the lock prior to starting the wait for the async process // Drop the lock prior to starting the wait for the async process
erl_drv_mutex_unlock(d->port_lock); erl_drv_mutex_unlock(d->port_lock);
bdberl_tpool_cancel(d->async_pool, d->async_job); bdberl_tpool_cancel(d->async_pool, d->async_job);
DBGCMD(d, "Canceled async job for port: %p\r\n", d->port); DBGCMD(d, "Canceled async job for port: %p\n", d->port);
} }
else else
{ {
@ -458,7 +470,7 @@ static void bdberl_drv_stop(ErlDrvData handle)
erl_drv_mutex_destroy(d->port_lock); erl_drv_mutex_destroy(d->port_lock);
// If a cursor is open, close it // If a cursor is open, close it
DBG("Stopping port %p - cleaning up cursors (%p) and transactions (%p)\r\n", d->port, DBG("Stopping port %p - cleaning up cursors (%p) and transactions (%p)\n", d->port,
d->cursor, d->txn); d->cursor, d->txn);
if (d->cursor) if (d->cursor)
@ -470,13 +482,13 @@ static void bdberl_drv_stop(ErlDrvData handle)
abort_txn(d); abort_txn(d);
// Close all the databases we previously opened // Close all the databases we previously opened
DBG("Stopping port %p - closing all dbrefs\r\n", d->port); DBG("Stopping port %p - closing all dbrefs\n", d->port);
while (d->dbrefs) while (d->dbrefs)
{ {
int dbref = d->dbrefs->dbref; int dbref = d->dbrefs->dbref;
if (close_database(dbref, 0, d) != ERROR_NONE) if (close_database(dbref, 0, d) != ERROR_NONE)
{ {
DBG("Stopping port %p could not close dbref %d\r\n", d->port, dbref); DBG("Stopping port %p could not close dbref %d\n", d->port, dbref);
} }
} }
@ -485,7 +497,7 @@ static void bdberl_drv_stop(ErlDrvData handle)
// unregister if it's already initialized to this port. // unregister if it's already initialized to this port.
if (G_LOG_PORT == d->port) if (G_LOG_PORT == d->port)
{ {
DBG("Stopping port %p - removing logging port\r\n", d->port); DBG("Stopping port %p - removing logging port\n", d->port);
WRITE_LOCK(G_LOG_RWLOCK); WRITE_LOCK(G_LOG_RWLOCK);
@ -496,11 +508,12 @@ static void bdberl_drv_stop(ErlDrvData handle)
// Unregister with BDB -- MUST DO THIS WITH WRITE LOCK HELD! // Unregister with BDB -- MUST DO THIS WITH WRITE LOCK HELD!
G_DB_ENV->set_msgcall(G_DB_ENV, 0); G_DB_ENV->set_msgcall(G_DB_ENV, 0);
G_DB_ENV->set_errcall(G_DB_ENV, 0); G_DB_ENV->set_errcall(G_DB_ENV, 0);
G_DB_ENV->set_event_notify(G_DB_ENV, 0);
WRITE_UNLOCK(G_LOG_RWLOCK); WRITE_UNLOCK(G_LOG_RWLOCK);
} }
DBG("Stopped port: %p\r\n", d->port); DBG("Stopped port: %p\n", d->port);
// Release the port instance data // Release the port instance data
driver_free(d->work_buffer); driver_free(d->work_buffer);
@ -509,7 +522,7 @@ static void bdberl_drv_stop(ErlDrvData handle)
static void bdberl_drv_finish() static void bdberl_drv_finish()
{ {
DBG("BDB DRIVER FINISHING\r\n"); DBG("BDB DRIVER FINISHING");
// Stop the thread pools // Stop the thread pools
if (G_TPOOL_GENERAL != NULL) if (G_TPOOL_GENERAL != NULL)
{ {
@ -588,7 +601,7 @@ static void bdberl_drv_finish()
G_LOG_RWLOCK = NULL; G_LOG_RWLOCK = NULL;
} }
DBG("BDB DRIVER FINISHED\r\n"); DBG("BDB DRIVER FINISHED");
} }
static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
@ -787,8 +800,8 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
// Inbuf is <<Flags:32/native, KeyLen:32/native, KeyBin/bytes>>, // Inbuf is <<Flags:32/native, KeyLen:32/native, KeyBin/bytes>>,
// If the working buffer is large enough, copy the data to put/get into it. Otherwise, realloc // If the working buffer is large enough, copy the data to put/get into it.
// until it is large enough // Otherwise, realloc until it is large enough
if (d->work_buffer_sz < inbuf_sz) if (d->work_buffer_sz < inbuf_sz)
{ {
d->work_buffer = driver_realloc(d->work_buffer, inbuf_sz); d->work_buffer = driver_realloc(d->work_buffer, inbuf_sz);
@ -842,13 +855,26 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
// Let caller know operation is in progress // Let caller know operation is in progress
RETURN_INT(0, outbuf); RETURN_INT(0, outbuf);
} }
case CMD_CURSOR_COUNT:
{
FAIL_IF_ASYNC_PENDING(d, outbuf);
FAIL_IF_NO_CURSOR(d, outbuf);
// Schedule the operation
d->async_op = cmd;
bdberl_general_tpool_run(&do_async_cursor_count, d, 0, &d->async_job);
// Let caller know operation is in progress
// Outbuf is: <<0:32>>
RETURN_INT(0, outbuf);
}
case CMD_CURSOR_CLOSE: case CMD_CURSOR_CLOSE:
{ {
FAIL_IF_ASYNC_PENDING(d, outbuf); FAIL_IF_ASYNC_PENDING(d, outbuf);
FAIL_IF_NO_CURSOR(d, outbuf); FAIL_IF_NO_CURSOR(d, outbuf);
// It's possible to get a deadlock when closing a cursor -- in that situation we also // It's possible to get a deadlock when closing a cursor,
// need to go ahead and abort the txn // in that situation we also need to go ahead and abort the txn.
int rc = d->cursor->close(d->cursor); int rc = d->cursor->close(d->cursor);
if (d->txn && (rc == DB_LOCK_NOTGRANTED || rc == DB_LOCK_DEADLOCK)) if (d->txn && (rc == DB_LOCK_NOTGRANTED || rc == DB_LOCK_DEADLOCK))
{ {
@ -916,6 +942,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
G_DB_ENV->set_msgcall(G_DB_ENV, &bdb_msgcall); G_DB_ENV->set_msgcall(G_DB_ENV, &bdb_msgcall);
G_DB_ENV->set_errcall(G_DB_ENV, &bdb_errcall); G_DB_ENV->set_errcall(G_DB_ENV, &bdb_errcall);
G_DB_ENV->set_event_notify(G_DB_ENV, &bdb_eventcall);
WRITE_UNLOCK(G_LOG_RWLOCK); WRITE_UNLOCK(G_LOG_RWLOCK);
} }
@ -1005,16 +1032,16 @@ static int check_non_neg_env(char *env, unsigned int *val_ptr)
long long val = strtoll(val_str, NULL, 0); long long val = strtoll(val_str, NULL, 0);
if (val == 0 && errno == EINVAL) if (val == 0 && errno == EINVAL)
{ {
fprintf(stderr, "Ignoring \"%s\" value \"%s\" - invalid value\r\n", env, val_str); fprintf(stderr, "Ignoring \"%s\" value \"%s\" - invalid value\n", env, val_str);
return 0; return 0;
} }
if (val <= 0 || val > UINT_MAX) if (val <= 0 || val > UINT_MAX)
{ {
fprintf(stderr, "Ignoring \"%s\" value \"%lld\" - out of range\r\n", env, val); fprintf(stderr, "Ignoring \"%s\" value \"%lld\" - out of range\n", env, val);
return 0; return 0;
} }
unsigned int uival = (unsigned int) val; unsigned int uival = (unsigned int) val;
DBG("Using \"%s\" value %u\r\n", env, uival); DBG("Using \"%s\" value %u\n", env, uival);
*val_ptr = uival; *val_ptr = uival;
return 1; return 1;
} }
@ -1040,7 +1067,7 @@ static int check_pos_env(char *env, unsigned int *val_ptr)
} }
else else
{ {
fprintf(stderr, "Ignoring \"%s\" value \"%u\" - out of range\r\n", env, *val_ptr); fprintf(stderr, "Ignoring \"%s\" value \"%u\" - out of range\n", env, *val_ptr);
*val_ptr = original_val; *val_ptr = original_val;
return 0; return 0;
} }
@ -1131,9 +1158,9 @@ static int open_database(const char* name, DBTYPE type, unsigned int flags, Port
// Create the DB handle // Create the DB handle
DB* db = NULL; DB* db = NULL;
DBGCMD(data, "db_create(&db, %p, 0);", G_DB_ENV); DBGCMD(data, "db_create(&db, %p, 0);\n", G_DB_ENV);
int rc = db_create(&db, G_DB_ENV, 0); int rc = db_create(&db, G_DB_ENV, 0);
DBGCMD(data, "rc = %s (%d) db = %p", rc == 0 ? "ok" : bdberl_rc_to_atom_str(rc), rc, db); DBGCMD(data, " = %s (%d) db = %p\n", rc == 0 ? "ok" : bdberl_rc_to_atom_str(rc), rc, db);
if (rc != 0) if (rc != 0)
{ {
// Failure while creating the database handle -- drop our lock and return // Failure while creating the database handle -- drop our lock and return
@ -1147,12 +1174,14 @@ static int open_database(const char* name, DBTYPE type, unsigned int flags, Port
{ {
if (db->set_pagesize(db, G_PAGE_SIZE) != 0) if (db->set_pagesize(db, G_PAGE_SIZE) != 0)
{ {
bdb_errcall(G_DB_ENV, "", "Failed to set page size."); bdb_errcall(G_DB_ENV, "\n", "Failed to set page size.");
} }
} }
flags |= DB_AUTO_COMMIT;
// Attempt to open our database // Attempt to open our database
DBGCMD(data, "db->open(%p, 0, '%s', 0, %x, %08x, 0);", db, name, type, flags); DBGCMD(data, "db->open(%p, 0, '%s', 0, %x, %08x, 0);\n", db, name, type, flags);
rc = db->open(db, 0, name, 0, type, flags, 0); rc = db->open(db, 0, name, 0, type, flags, 0);
DBGCMDRC(data, rc); DBGCMDRC(data, rc);
if (rc != 0) if (rc != 0)
@ -1168,7 +1197,7 @@ static int open_database(const char* name, DBTYPE type, unsigned int flags, Port
assert(db != NULL); assert(db != NULL);
G_DATABASES[dbref].db = db; G_DATABASES[dbref].db = db;
G_DATABASES[dbref].name = strdup(name); G_DATABASES[dbref].name = strdup(name);
G_DATABASES[dbref].ports = zalloc(sizeof(PortList)); G_DATABASES[dbref].ports = driver_calloc(sizeof(PortList));
G_DATABASES[dbref].ports->port = data->port; G_DATABASES[dbref].ports->port = data->port;
// Make entry in hash table of names // Make entry in hash table of names
@ -1206,7 +1235,7 @@ static int close_database(int dbref, unsigned flags, PortData* data)
if (database->ports == 0) if (database->ports == 0)
{ {
// Close out the BDB handle // Close out the BDB handle
DBGCMD(data, "database->db->close(%p, %08x) (for dbref %d)", database->db, flags, dbref); DBGCMD(data, "database->db->close(%p, %08x) (for dbref %d)\n", database->db, flags, dbref);
rc = database->db->close(database->db, flags); rc = database->db->close(database->db, flags);
DBGCMDRC(data, rc); DBGCMDRC(data, rc);
@ -1242,7 +1271,7 @@ static void check_all_databases_closed()
Database* database = &G_DATABASES[dbref]; Database* database = &G_DATABASES[dbref];
if (database->ports != NULL) if (database->ports != NULL)
{ {
fprintf(stderr, "BDBERL: Ports still open on '%s' dbref %d\r\n", fprintf(stderr, "BDBERL: Ports still open on '%s' dbref %d\n",
database->name ? database->name : "no name", dbref); database->name ? database->name : "no name", dbref);
} }
@ -1251,7 +1280,7 @@ static void check_all_databases_closed()
int flags = 0; int flags = 0;
DBG("final db->close(%p, %08x) (for dbref %d)", database->db, flags, dbref); DBG("final db->close(%p, %08x) (for dbref %d)", database->db, flags, dbref);
rc = database->db->close(database->db, flags); rc = database->db->close(database->db, flags);
DBG(" = %s (%d)\r\n", rc == 0 ? "ok" : bdberl_rc_to_atom_str(rc), rc); DBG(" = %s (%d)\n", rc == 0 ? "ok" : bdberl_rc_to_atom_str(rc), rc);
} }
} }
@ -1264,7 +1293,7 @@ static void abort_txn(PortData* d)
{ {
if (d->txn) if (d->txn)
{ {
DBGCMD(d, "d->txn->abort(%p)", d->txn); DBGCMD(d, "d->txn->abort(%p)\n", d->txn);
int rc = d->txn->abort(d->txn); int rc = d->txn->abort(d->txn);
DBGCMDRC(d, rc); DBGCMDRC(d, rc);
d->txn = NULL; d->txn = NULL;
@ -1284,7 +1313,7 @@ static int delete_database(const char* name, PortData *data)
} }
// Good, database doesn't seem to be open -- attempt the delete // Good, database doesn't seem to be open -- attempt the delete
DBG("Attempting to delete database: %s\r\n", name); DBG("Attempting to delete database: %s\n", name);
int rc = G_DB_ENV->dbremove(G_DB_ENV, 0, name, 0, DB_AUTO_COMMIT); int rc = G_DB_ENV->dbremove(G_DB_ENV, 0, name, 0, DB_AUTO_COMMIT);
UNLOCK_DATABASES(data->port); UNLOCK_DATABASES(data->port);
@ -1460,6 +1489,29 @@ static int send_dir_info(ErlDrvPort port, ErlDrvTermData pid, const char *path)
} }
static void send_error_response(ErlDrvPort port, ErlDrvTermData pid, int rc)
{
// See if this is a standard errno that we have an erlang code for
char *error = bdberl_rc_to_atom_str(rc);
if (error != NULL)
{
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"),
ERL_DRV_ATOM, driver_mk_atom(error),
ERL_DRV_TUPLE, 2};
driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0]));
}
else
{
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"),
ERL_DRV_ATOM, driver_mk_atom("unknown"),
ERL_DRV_INT, rc,
ERL_DRV_TUPLE, 2,
ERL_DRV_TUPLE, 2};
driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0]));
}
}
void bdberl_send_rc(ErlDrvPort port, ErlDrvTermData pid, int rc) void bdberl_send_rc(ErlDrvPort port, ErlDrvTermData pid, int rc)
{ {
// TODO: May need to tag the messages a bit more explicitly so that if another async // TODO: May need to tag the messages a bit more explicitly so that if another async
@ -1472,24 +1524,7 @@ void bdberl_send_rc(ErlDrvPort port, ErlDrvTermData pid, int rc)
} }
else else
{ {
// See if this is a standard errno that we have an erlang code for send_error_response(port, pid, rc);
char *error = bdberl_rc_to_atom_str(rc);
if (error != NULL)
{
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"),
ERL_DRV_ATOM, driver_mk_atom(error),
ERL_DRV_TUPLE, 2};
driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0]));
}
else
{
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"),
ERL_DRV_ATOM, driver_mk_atom("unknown"),
ERL_DRV_INT, rc,
ERL_DRV_TUPLE, 2,
ERL_DRV_TUPLE, 2};
driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0]));
}
} }
} }
@ -1507,6 +1542,32 @@ void bdberl_async_cleanup_and_send_rc(PortData* d, int rc)
bdberl_send_rc(port, pid, rc); bdberl_send_rc(port, pid, rc);
} }
static void async_cleanup_and_send_uint32(PortData* d, int rc, unsigned int value)
{
// Save the port and pid references -- we need copies independent from the PortData
// structure. Once we release the port_lock after clearing the cmd, it's possible that
// the port could go away without waiting on us to finish. This is acceptable, but we need
// to be certain that there is no overlap of data between the two threads. driver_send_term
// is safe to use from a thread, even if the port you're sending from has already expired.
ErlDrvPort port = d->port;
ErlDrvTermData pid = d->port_owner;
bdberl_async_cleanup(d);
// Notify port of result
if (rc == 0)
{
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("ok"),
ERL_DRV_UINT, value,
ERL_DRV_TUPLE, 2};
driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0]));
}
else
{
send_error_response(port, pid, rc);
}
}
static void async_cleanup_and_send_kv(PortData* d, int rc, DBT* key, DBT* value) static void async_cleanup_and_send_kv(PortData* d, int rc, DBT* key, DBT* value)
{ {
// Save the port and pid references -- we need copies independent from the PortData // Save the port and pid references -- we need copies independent from the PortData
@ -1535,24 +1596,7 @@ static void async_cleanup_and_send_kv(PortData* d, int rc, DBT* key, DBT* value)
} }
else else
{ {
// See if this is a standard errno that we have an erlang code for send_error_response(port, pid, rc);
char *error = bdberl_rc_to_atom_str(rc);
if (error != NULL)
{
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"),
ERL_DRV_ATOM, driver_mk_atom(error),
ERL_DRV_TUPLE, 2};
driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0]));
}
else
{
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"),
ERL_DRV_ATOM, driver_mk_atom("unknown"),
ERL_DRV_INT, rc,
ERL_DRV_TUPLE, 2,
ERL_DRV_TUPLE, 2};
driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0]));
}
} }
} }
@ -1587,14 +1631,14 @@ static void do_async_put(void* arg)
int rc; int rc;
if (calc_crc32 != buf_crc32) if (calc_crc32 != buf_crc32)
{ {
DBGCMD(d, "CRC-32 error on put data - buffer %08X calculated %08X.", buf_crc32, calc_crc32); DBGCMD(d, "CRC-32 error on put data - buffer %08X calculated %08X.\n", buf_crc32, calc_crc32);
rc = ERROR_INVALID_VALUE; rc = ERROR_INVALID_VALUE;
} }
else else
{ {
// Execute the actual put. All databases are opened with AUTO_COMMIT, so if msg->port->txn // Execute the actual put. All databases are opened with AUTO_COMMIT, so if msg->port->txn
// is NULL, the put will still be atomic // is NULL, the put will still be atomic
DBGCMD(d, "db->put(%p, %p, %p, %p, %08X) dbref %d key=%p(%d) value=%p(%d)", DBGCMD(d, "db->put(%p, %p, %p, %p, %08X) dbref %d key=%p(%d) value=%p(%d)\n",
db, d->txn, &key, &value, flags, dbref, key.data, key.size, value.data, value.size); db, d->txn, &key, &value, flags, dbref, key.data, key.size, value.data, value.size);
rc = db->put(db, d->txn, &key, &value, flags); rc = db->put(db, d->txn, &key, &value, flags);
DBGCMDRC(d, rc); DBGCMDRC(d, rc);
@ -1654,7 +1698,7 @@ static void do_async_get(void* arg)
if (calc_crc32 != buf_crc32) if (calc_crc32 != buf_crc32)
{ {
DBGCMD(d, "CRC-32 error on get data - buffer %08X calculated %08X.", DBGCMD(d, "CRC-32 error on get data - buffer %08X calculated %08X.\n",
buf_crc32, calc_crc32); buf_crc32, calc_crc32);
rc = ERROR_INVALID_VALUE; rc = ERROR_INVALID_VALUE;
} }
@ -1670,7 +1714,7 @@ static void do_async_get(void* arg)
async_cleanup_and_send_kv(d, rc, &key, &value); async_cleanup_and_send_kv(d, rc, &key, &value);
// Finally, clean up value buffer (driver_send_term made a copy) // Finally, clean up value buffer (driver_send_term made a copy)
free(value.data); driver_free(value.data);
} }
static void do_async_del(void* arg) static void do_async_del(void* arg)
@ -1713,15 +1757,15 @@ static void do_async_txnop(void* arg)
int rc = 0; int rc = 0;
if (d->async_op == CMD_TXN_BEGIN) if (d->async_op == CMD_TXN_BEGIN)
{ {
DBGCMD(d, "G_DB_ENV->txn_begin(%p, 0, %p, %08X)", G_DB_ENV, d->txn, d->async_flags); DBGCMD(d, "G_DB_ENV->txn_begin(%p, 0, %p, %08X)\n", G_DB_ENV, d->txn, d->async_flags);
rc = G_DB_ENV->txn_begin(G_DB_ENV, 0, &(d->txn), d->async_flags); rc = G_DB_ENV->txn_begin(G_DB_ENV, 0, &(d->txn), d->async_flags);
DBGCMD(d, "rc = %s (%d) d->txn = %p", rc == 0 ? "ok" : bdberl_rc_to_atom_str(rc), rc, d->txn); DBGCMD(d, "rc = %s (%d) d->txn = %p\n", rc == 0 ? "ok" : bdberl_rc_to_atom_str(rc), rc, d->txn);
} }
else if (d->async_op == CMD_TXN_COMMIT) else if (d->async_op == CMD_TXN_COMMIT)
{ {
assert(d->txn != NULL); assert(d->txn != NULL);
DBGCMD(d, "d->txn->txn_commit(%p, %08X)", d->txn, d->async_flags); DBGCMD(d, "d->txn->txn_commit(%p, %08X)\n", d->txn, d->async_flags);
rc = d->txn->commit(d->txn, d->async_flags); rc = d->txn->commit(d->txn, d->async_flags);
DBGCMDRC(d, rc); DBGCMDRC(d, rc);
d->txn = 0; d->txn = 0;
@ -1738,10 +1782,10 @@ static void do_async_txnop(void* arg)
static void do_async_cursor_put(void* arg) static void do_async_cursor_put(void* arg)
{ {
PortData* d = (PortData*)arg; PortData* d = (PortData*)arg;
assert(d->cursor != NULL); assert(d->cursor != NULL);
DBGCMD(d, "cursor_put/2 not yet implemented..."); /* TODO: implement this. */ DBGCMD(d, "cursor_put/2 not yet implemented...\n"); /* TODO: implement this. */
bdberl_async_cleanup_and_send_rc(d, ERROR_DB_ACTIVE); bdberl_async_cleanup_and_send_rc(d, ERROR_DB_ACTIVE);
} }
@ -1768,7 +1812,7 @@ static void do_async_cursor_get(void* arg)
value.flags = DB_DBT_MALLOC; value.flags = DB_DBT_MALLOC;
// Execute the operation // Execute the operation
DBGCMD(d, "d->cursor->get(%p, %p, %p, %08X);", d->cursor, &key, &value, flags); DBGCMD(d, "d->cursor->get(%p, %p, %p, %08X\n);", d->cursor, &key, &value, flags);
int rc = d->cursor->get(d->cursor, &key, &value, flags); int rc = d->cursor->get(d->cursor, &key, &value, flags);
DBGCMDRC(d, rc); DBGCMDRC(d, rc);
@ -1781,7 +1825,7 @@ static void do_async_cursor_get(void* arg)
if (calc_crc32 != buf_crc32) if (calc_crc32 != buf_crc32)
{ {
DBGCMD(d, "CRC-32 error on get data - buffer %08X calculated %08X.", DBGCMD(d, "CRC-32 error on get data - buffer %08X calculated %08X.\n",
buf_crc32, calc_crc32); buf_crc32, calc_crc32);
rc = ERROR_INVALID_VALUE; rc = ERROR_INVALID_VALUE;
} }
@ -1800,16 +1844,33 @@ static void do_async_cursor_get(void* arg)
async_cleanup_and_send_kv(d, rc, &key, &value); async_cleanup_and_send_kv(d, rc, &key, &value);
// Finally, clean up value buffer (driver_send_term made a copy) // Finally, clean up value buffer (driver_send_term made a copy)
free(value.data); driver_free(value.data);
} }
static void do_async_cursor_del(void* arg) static void do_async_cursor_del(void* arg)
{ {
PortData* d = (PortData*)arg; PortData* d = (PortData*)arg;
assert(d->cursor != NULL); assert(d->cursor != NULL);
DBGCMD(d, "cursor_del/2 not yet implemented..."); /* TODO: implement this. */ DBGCMD(d, "cursor_del/2 not yet implemented...\n"); /* TODO: implement this. */
bdberl_async_cleanup_and_send_rc(d, ERROR_DB_ACTIVE); bdberl_async_cleanup_and_send_rc(d, ERROR_DB_ACTIVE);
}
static void do_async_cursor_count(void* arg)
{
PortData* d = (PortData*)arg;
assert(d->cursor != NULL);
// Place to store the record count.
db_recno_t count = 0;
// Execute the operation
DBGCMD(d, "d->cursor->count(%p, %p, %08X);\n", d->cursor, &count, 0);
int rc = d->cursor->count(d->cursor, &count, 0);
DBGCMDRC(d, rc);
async_cleanup_and_send_uint32(d, rc, count);
} }
@ -1838,7 +1899,7 @@ static void do_async_cursor_cnp(void* arg)
} }
// Execute the operation // Execute the operation
DBGCMD(d, "d->cursor->get(%p, %p, %p, %08X);", d->cursor, &key, &value, flags); DBGCMD(d, "d->cursor->get(%p, %p, %p, %08X);\n", d->cursor, &key, &value, flags);
int rc = d->cursor->get(d->cursor, &key, &value, flags); int rc = d->cursor->get(d->cursor, &key, &value, flags);
DBGCMDRC(d, rc); DBGCMDRC(d, rc);
@ -1879,7 +1940,7 @@ static void do_async_truncate(void* arg)
if (d->async_dbref == -1) if (d->async_dbref == -1)
{ {
DBG("Truncating all open databases...\r\n"); DBG("Truncating all open databases...");
// Iterate over the whole database list skipping null entries // Iterate over the whole database list skipping null entries
int i = 0; // I hate C int i = 0; // I hate C
@ -1891,10 +1952,9 @@ static void do_async_truncate(void* arg)
DB* db = database->db; DB* db = database->db;
u_int32_t count = 0; u_int32_t count = 0;
DBGCMD(d, "db->truncate(%p, %p, %p, 0) dbref=%d", db, d->txn, &count, i); DBGCMD(d, "db->truncate(%p, %p, %p, 0) dbref=%d\n", db, d->txn, &count, i);
rc = db->truncate(db, d->txn, &count, 0); rc = db->truncate(db, d->txn, &count, 0);
DBGCMD(d, "rc = %s (%d) count=%d", DBGCMD(d, "rc = %s (%d) count=%d\n", rc == 0 ? "ok" : bdberl_rc_to_atom_str(rc), rc, count);
rc == 0 ? "ok" : bdberl_rc_to_atom_str(rc), rc, count);
if (rc != 0) if (rc != 0)
{ {
@ -1907,9 +1967,9 @@ static void do_async_truncate(void* arg)
{ {
DB* db = G_DATABASES[d->async_dbref].db; DB* db = G_DATABASES[d->async_dbref].db;
u_int32_t count = 0; u_int32_t count = 0;
DBGCMD(d, "db->truncate(%p, %p, %p, 0) dbref=%d", db, d->txn, &count, d->async_dbref); DBGCMD(d, "db->truncate(%p, %p, %p, 0) dbref=%d\n", db, d->txn, &count, d->async_dbref);
rc = db->truncate(db, d->txn, &count, 0); rc = db->truncate(db, d->txn, &count, 0);
DBGCMD(d, "rc = %s (%d) count=%d", rc == 0 ? "ok" : bdberl_rc_to_atom_str(rc), DBGCMD(d, "rc = %s (%d) count=%d\n", rc == 0 ? "ok" : bdberl_rc_to_atom_str(rc),
rc, count); rc, count);
} }
@ -2043,18 +2103,16 @@ static void do_sync_driver_info(PortData *d)
} }
static void* zalloc(unsigned int size) static void* driver_calloc(unsigned int size)
{ {
void* res = driver_alloc(size); void* res = driver_alloc(size);
memset(res, '\0', size); memset(res, '\0', size);
return res; return res;
} }
#define zfree(p) driver_free(p)
static int add_portref(int dbref, ErlDrvPort port) static int add_portref(int dbref, ErlDrvPort port)
{ {
DBG("Adding port %p to dbref %d\r\n", port, dbref); DBG("Adding port %p to dbref %d\n", port, dbref);
PortList* current = G_DATABASES[dbref].ports; PortList* current = G_DATABASES[dbref].ports;
if (current) if (current)
{ {
@ -2072,7 +2130,7 @@ static int add_portref(int dbref, ErlDrvPort port)
} while (current != 0); } while (current != 0);
// At the end of the list -- allocate a new entry for this port // At the end of the list -- allocate a new entry for this port
current = (PortList*)zalloc(sizeof(PortList)); current = (PortList*)driver_calloc(sizeof(PortList));
current->port = port; current->port = port;
last->next = current; last->next = current;
return 1; return 1;
@ -2080,7 +2138,7 @@ static int add_portref(int dbref, ErlDrvPort port)
else else
{ {
// Current was initially NULL, so alloc the first one and add it. // Current was initially NULL, so alloc the first one and add it.
current = zalloc(sizeof(PortList)); current = driver_calloc(sizeof(PortList));
current->port = port; current->port = port;
G_DATABASES[dbref].ports = current; G_DATABASES[dbref].ports = current;
return 1; return 1;
@ -2089,7 +2147,7 @@ static int add_portref(int dbref, ErlDrvPort port)
static int del_portref(int dbref, ErlDrvPort port) static int del_portref(int dbref, ErlDrvPort port)
{ {
DBG("Deleting port %p from dbref %d\r\n", port, dbref); DBG("Deleting port %p from dbref %d\n", port, dbref);
PortList* current = G_DATABASES[dbref].ports; PortList* current = G_DATABASES[dbref].ports;
PortList* last = 0; PortList* last = 0;
assert(current != NULL); assert(current != NULL);
@ -2108,7 +2166,7 @@ static int del_portref(int dbref, ErlDrvPort port)
} }
// Delete this entry // Delete this entry
zfree(current); driver_free(current);
return 1; return 1;
} }
@ -2125,7 +2183,7 @@ static int del_portref(int dbref, ErlDrvPort port)
*/ */
static int add_dbref(PortData* data, int dbref) static int add_dbref(PortData* data, int dbref)
{ {
DBG("Adding dbref %d to port %p\r\n", dbref, data->port); DBG("Adding dbref %d to port %p\n", dbref, data->port);
DbRefList* current = data->dbrefs; DbRefList* current = data->dbrefs;
if (current) if (current)
{ {
@ -2142,7 +2200,7 @@ static int add_dbref(PortData* data, int dbref)
} while (current != 0); } while (current != 0);
// At the end of the list -- allocate a new entry // At the end of the list -- allocate a new entry
current = zalloc(sizeof(DbRefList)); current = driver_calloc(sizeof(DbRefList));
current->dbref = dbref; current->dbref = dbref;
last->next = current; last->next = current;
return 1; return 1;
@ -2150,7 +2208,7 @@ static int add_dbref(PortData* data, int dbref)
else else
{ {
// Current was initially NULL, so alloc the first one // Current was initially NULL, so alloc the first one
current = zalloc(sizeof(DbRefList)); current = driver_calloc(sizeof(DbRefList));
current->dbref = dbref; current->dbref = dbref;
data->dbrefs = current; data->dbrefs = current;
return 1; return 1;
@ -2162,7 +2220,7 @@ static int add_dbref(PortData* data, int dbref)
*/ */
static int del_dbref(PortData* data, int dbref) static int del_dbref(PortData* data, int dbref)
{ {
DBG("Deleting dbref %d from port %p\r\n", dbref, data->port); DBG("Deleting dbref %d from port %p\n", dbref, data->port);
DbRefList* current = data->dbrefs; DbRefList* current = data->dbrefs;
DbRefList* last = 0; DbRefList* last = 0;
@ -2183,7 +2241,7 @@ static int del_dbref(PortData* data, int dbref)
} }
// Delete this entry // Delete this entry
zfree(current); driver_free(current);
return 1; return 1;
} }
@ -2314,7 +2372,7 @@ static void* deadlock_check(void* arg)
} }
if (count > 0) if (count > 0)
{ {
DBG("Rejected deadlocks: %d\r\n", count); DBG("Rejected deadlocks: %d\n", count);
} }
if (G_DEADLOCK_CHECK_INTERVAL > 0) if (G_DEADLOCK_CHECK_INTERVAL > 0)
@ -2323,7 +2381,7 @@ static void* deadlock_check(void* arg)
} }
} }
DBG("Deadlock checker exiting.\r\n"); DBG("Deadlock checker exiting.");
return 0; return 0;
} }
@ -2392,6 +2450,7 @@ static void bdb_errcall(const DB_ENV* dbenv, const char* errpfx, const char* msg
ERL_DRV_STRING, (ErlDrvTermData)msg, (ErlDrvUInt)strlen(msg), ERL_DRV_STRING, (ErlDrvTermData)msg, (ErlDrvUInt)strlen(msg),
ERL_DRV_TUPLE, 2}; ERL_DRV_TUPLE, 2};
send_log_message(response, sizeof(response)); send_log_message(response, sizeof(response));
DBG("BDB error call: %s\n", msg);
} }
static void bdb_msgcall(const DB_ENV* dbenv, const char* msg) static void bdb_msgcall(const DB_ENV* dbenv, const char* msg)
@ -2400,6 +2459,35 @@ static void bdb_msgcall(const DB_ENV* dbenv, const char* msg)
ERL_DRV_STRING, (ErlDrvTermData)msg, (ErlDrvUInt)strlen(msg), ERL_DRV_STRING, (ErlDrvTermData)msg, (ErlDrvUInt)strlen(msg),
ERL_DRV_TUPLE, 2}; ERL_DRV_TUPLE, 2};
send_log_message(response, sizeof(response)); send_log_message(response, sizeof(response));
DBG("BDB message call: %s\n", msg);
}
static void bdb_eventcall(DB_ENV* dbenv, u_int32_t type, void* info)
{
switch(type)
{
case DB_EVENT_PANIC:
{
const char *msg = "panic";
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("bdb_event_notify"),
ERL_DRV_STRING, (ErlDrvTermData)msg, (ErlDrvUInt)strlen(msg),
ERL_DRV_TUPLE, 2};
// TODO clearly something should be done to shut things down cleanly and restart (how?)
send_log_message(response, sizeof(response));
DBG("BDB panic event\n");
break;
}
case DB_EVENT_WRITE_FAILED:
{
const char *msg = "write failed";
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("bdb_event_notify"),
ERL_DRV_STRING, (ErlDrvTermData)msg, (ErlDrvUInt)strlen(msg),
ERL_DRV_TUPLE, 2};
send_log_message(response, sizeof(response));
DBG("BDB write failed event\n");
break;
}
}
} }
static void send_log_message(ErlDrvTermData* msg, int elements) static void send_log_message(ErlDrvTermData* msg, int elements)
@ -2413,7 +2501,7 @@ static void send_log_message(ErlDrvTermData* msg, int elements)
} }
#ifdef DEBUG #ifdef DEBUG
static void bdberl_dbgcmd(PortData *d, const char *fmt, ...) void bdberl_dbg(const char *fmt, ...)
{ {
char buf[1024]; char buf[1024];
@ -2422,12 +2510,27 @@ static void bdberl_dbgcmd(PortData *d, const char *fmt, ...)
vsnprintf(buf, sizeof(buf), fmt, ap); vsnprintf(buf, sizeof(buf), fmt, ap);
va_end(ap); va_end(ap);
(void)fprintf(stderr, "threadid %p port %p: %s\r\n", erl_drv_thread_self(), d->port, buf); (void)fprintf(stderr, "%s", buf);
(void)fflush(stderr);
} }
static void bdberl_dbgcmdrc(PortData *d, int rc) void bdberl_dbgcmd(PortData *d, const char *fmt, ...)
{ {
(void)fprintf(stderr, "threadid %p port %p: rc = %s (%d)\r\n", char buf[1024];
va_list ap;
va_start(ap, fmt);
vsnprintf(buf, sizeof(buf), fmt, ap);
va_end(ap);
(void)fprintf(stderr, "threadid %p port %p: %s", erl_drv_thread_self(), d->port, buf);
(void)fflush(stderr);
}
void bdberl_dbgcmdrc(PortData *d, int rc)
{
(void)fprintf(stderr, "threadid %p port %p: rc = %s (%d)\n",
erl_drv_thread_self(), d->port, rc == 0 ? "ok" : bdberl_rc_to_atom_str(rc), rc); erl_drv_thread_self(), d->port, rc == 0 ? "ok" : bdberl_rc_to_atom_str(rc), rc);
(void)fflush(stderr);
} }
#endif // DEBUG #endif // DEBUG

View file

@ -53,9 +53,6 @@
#define CMD_CURSOR_NEXT 12 #define CMD_CURSOR_NEXT 12
#define CMD_CURSOR_PREV 13 #define CMD_CURSOR_PREV 13
#define CMD_CURSOR_CLOSE 14 #define CMD_CURSOR_CLOSE 14
#define CMD_CURSOR_GET 35 /* TODO: renumber these next 3 and match them with bdberl.hrl */
#define CMD_CURSOR_PUT 36
#define CMD_CURSOR_DEL 37
#define CMD_PUT_COMMIT 15 #define CMD_PUT_COMMIT 15
#define CMD_REMOVE_DB 16 #define CMD_REMOVE_DB 16
#define CMD_TRUNCATE 17 #define CMD_TRUNCATE 17
@ -76,6 +73,10 @@
#define CMD_DATA_DIRS_INFO 32 #define CMD_DATA_DIRS_INFO 32
#define CMD_LOG_DIR_INFO 33 #define CMD_LOG_DIR_INFO 33
#define CMD_DRIVER_INFO 34 #define CMD_DRIVER_INFO 34
#define CMD_CURSOR_GET 35
#define CMD_CURSOR_PUT 36
#define CMD_CURSOR_DEL 37
#define CMD_CURSOR_COUNT 38
/** /**
* Command status values * Command status values
@ -249,4 +250,21 @@ void bdberl_txn_tpool_run(TPoolJobFunc main_fn, PortData* d, TPoolJobFunc cance
RETURN_INT(0, outbuf); \ RETURN_INT(0, outbuf); \
}} }}
/**
* Helpful macros
*/
#ifdef DEBUG
# define DBG(...) bdberl_dbg(__VA_ARGS__)
# define DBGCMD(P, ...) bdberl_dbgcmd(P, __VA_ARGS__)
# define DBGCMDRC(P, ...) bdberl_dbgcmdrc(P, __VA_ARGS__)
extern void bdberl_dbg(const char * fmt, ...);
extern void bdberl_dbgcmd(PortData *d, const char *fmt, ...);
extern void bdberl_dbgcmdrc(PortData *d, int rc);
#else
# define DBG(arg1,...)
# define DBGCMD(d, fmt, ...)
# define DBGCMDRC(d, rc) { while (0) { rc++; } } // otherwise get unused variable error
#endif
#endif //_BDBERL_DRV #endif //_BDBERL_DRV

View file

@ -64,31 +64,31 @@ static void async_cleanup_and_send_btree_stats(PortData* d, char *type, DB_BTREE
ERL_DRV_ATOM, driver_mk_atom("type"), ERL_DRV_ATOM, driver_mk_atom("type"),
ERL_DRV_ATOM, driver_mk_atom(type), ERL_DRV_ATOM, driver_mk_atom(type),
ERL_DRV_TUPLE, 2, ERL_DRV_TUPLE, 2,
BT_STATS_TUPLE(bsp, magic), /* Magic number. */ BT_STATS_TUPLE(bsp, magic), /* Magic number. */
BT_STATS_TUPLE(bsp, version), /* Version number. */ BT_STATS_TUPLE(bsp, version), /* Version number. */
BT_STATS_TUPLE(bsp, metaflags), /* Metadata flags. */ BT_STATS_TUPLE(bsp, metaflags), /* Metadata flags. */
BT_STATS_TUPLE(bsp, nkeys), /* Number of unique keys. */ BT_STATS_TUPLE(bsp, nkeys), /* Number of unique keys. */
BT_STATS_TUPLE(bsp, ndata), /* Number of data items. */ BT_STATS_TUPLE(bsp, ndata), /* Number of data items. */
BT_STATS_TUPLE(bsp, pagecnt), /* Page count. */ BT_STATS_TUPLE(bsp, pagecnt), /* Page count. */
BT_STATS_TUPLE(bsp, pagesize), /* Page size. */ BT_STATS_TUPLE(bsp, pagesize), /* Page size. */
BT_STATS_TUPLE(bsp, minkey), /* Minkey value. */ BT_STATS_TUPLE(bsp, minkey), /* Minkey value. */
BT_STATS_TUPLE(bsp, re_len), /* Fixed-length record length. */ BT_STATS_TUPLE(bsp, re_len), /* Fixed-length record length. */
BT_STATS_TUPLE(bsp, re_pad), /* Fixed-length record pad. */ BT_STATS_TUPLE(bsp, re_pad), /* Fixed-length record pad. */
BT_STATS_TUPLE(bsp, levels), /* Tree levels. */ BT_STATS_TUPLE(bsp, levels), /* Tree levels. */
BT_STATS_TUPLE(bsp, int_pg), /* Internal pages. */ BT_STATS_TUPLE(bsp, int_pg), /* Internal pages. */
BT_STATS_TUPLE(bsp, leaf_pg), /* Leaf pages. */ BT_STATS_TUPLE(bsp, leaf_pg), /* Leaf pages. */
BT_STATS_TUPLE(bsp, dup_pg), /* Duplicate pages. */ BT_STATS_TUPLE(bsp, dup_pg), /* Duplicate pages. */
BT_STATS_TUPLE(bsp, over_pg), /* Overflow pages. */ BT_STATS_TUPLE(bsp, over_pg), /* Overflow pages. */
BT_STATS_TUPLE(bsp, empty_pg), /* Empty pages. */ BT_STATS_TUPLE(bsp, empty_pg), /* Empty pages. */
BT_STATS_TUPLE(bsp, free), /* Pages on the free list. */ BT_STATS_TUPLE(bsp, free), /* Pages on the free list. */
BT_STATS_TUPLE(bsp, int_pgfree), /* Bytes free in internal pages. */ BT_STATS_TUPLE(bsp, int_pgfree), /* Bytes free in internal pages. */
BT_STATS_TUPLE(bsp, leaf_pgfree), /* Bytes free in leaf pages. */ BT_STATS_TUPLE(bsp, leaf_pgfree), /* Bytes free in leaf pages. */
BT_STATS_TUPLE(bsp, dup_pgfree), /* Bytes free in duplicate pages. */ BT_STATS_TUPLE(bsp, dup_pgfree), /* Bytes free in duplicate pages. */
BT_STATS_TUPLE(bsp, over_pgfree), /* Bytes free in overflow pages. */ BT_STATS_TUPLE(bsp, over_pgfree), /* Bytes free in overflow pages. */
// End of list // End of list
ERL_DRV_NIL, ERL_DRV_NIL,
ERL_DRV_LIST, 21+2, ERL_DRV_LIST, 21+2,
ERL_DRV_TUPLE, 2 ERL_DRV_TUPLE, 2
}; };
driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0]));
} }
@ -117,33 +117,33 @@ static void async_cleanup_and_send_hash_stats(PortData* d, DB_HASH_STAT *hsp)
ERL_DRV_ATOM, driver_mk_atom("type"), ERL_DRV_ATOM, driver_mk_atom("type"),
ERL_DRV_ATOM, driver_mk_atom("hash"), ERL_DRV_ATOM, driver_mk_atom("hash"),
ERL_DRV_TUPLE, 2, ERL_DRV_TUPLE, 2,
HASH_STATS_TUPLE(hsp, magic), /* Magic number. */ HASH_STATS_TUPLE(hsp, magic), /* Magic number. */
HASH_STATS_TUPLE(hsp, version), /* Version number. */ HASH_STATS_TUPLE(hsp, version), /* Version number. */
HASH_STATS_TUPLE(hsp, metaflags), /* Metadata flags. */ HASH_STATS_TUPLE(hsp, metaflags), /* Metadata flags. */
HASH_STATS_TUPLE(hsp, nkeys), /* Number of unique keys. */ HASH_STATS_TUPLE(hsp, nkeys), /* Number of unique keys. */
HASH_STATS_TUPLE(hsp, ndata), /* Number of data items. */ HASH_STATS_TUPLE(hsp, ndata), /* Number of data items. */
HASH_STATS_TUPLE(hsp, pagecnt), /* Page count. */ HASH_STATS_TUPLE(hsp, pagecnt), /* Page count. */
HASH_STATS_TUPLE(hsp, pagesize), /* Page size. */ HASH_STATS_TUPLE(hsp, pagesize), /* Page size. */
HASH_STATS_TUPLE(hsp, ffactor), /* Fill factor specified at create. */ HASH_STATS_TUPLE(hsp, ffactor), /* Fill factor specified at create. */
HASH_STATS_TUPLE(hsp, buckets), /* Number of hash buckets. */ HASH_STATS_TUPLE(hsp, buckets), /* Number of hash buckets. */
HASH_STATS_TUPLE(hsp, free), /* Pages on the free list. */ HASH_STATS_TUPLE(hsp, free), /* Pages on the free list. */
HASH_STATS_TUPLE(hsp, bfree), /* Bytes free on bucket pages. */ HASH_STATS_TUPLE(hsp, bfree), /* Bytes free on bucket pages. */
HASH_STATS_TUPLE(hsp, bigpages), /* Number of big key/data pages. */ HASH_STATS_TUPLE(hsp, bigpages), /* Number of big key/data pages. */
HASH_STATS_TUPLE(hsp, big_bfree), /* Bytes free on big item pages. */ HASH_STATS_TUPLE(hsp, big_bfree), /* Bytes free on big item pages. */
HASH_STATS_TUPLE(hsp, overflows), /* Number of overflow pages. */ HASH_STATS_TUPLE(hsp, overflows), /* Number of overflow pages. */
HASH_STATS_TUPLE(hsp, ovfl_free), /* Bytes free on ovfl pages. */ HASH_STATS_TUPLE(hsp, ovfl_free), /* Bytes free on ovfl pages. */
HASH_STATS_TUPLE(hsp, dup), /* Number of dup pages. */ HASH_STATS_TUPLE(hsp, dup), /* Number of dup pages. */
HASH_STATS_TUPLE(hsp, dup_free), /* Bytes free on duplicate pages. */ HASH_STATS_TUPLE(hsp, dup_free), /* Bytes free on duplicate pages. */
// End of list // End of list
ERL_DRV_NIL, ERL_DRV_NIL,
ERL_DRV_LIST, 17+2, ERL_DRV_LIST, 17+2,
ERL_DRV_TUPLE, 2 ERL_DRV_TUPLE, 2
}; };
driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0]));
} }
#undef HASH_STATS_TUPLE #undef HASH_STATS_TUPLE
#ifdef ENABLE_QUEUE // If we ever decide to support Queues #ifdef ENABLE_QUEUE // If we ever decide to support Queues
#define QS_STATS_TUPLE(base, member) \ #define QS_STATS_TUPLE(base, member) \
ERL_DRV_ATOM, driver_mk_atom(#member), \ ERL_DRV_ATOM, driver_mk_atom(#member), \
@ -166,23 +166,23 @@ static void async_cleanup_and_send_queue_stats(PortData* d, DB_QUEUE_STAT *qsp)
ERL_DRV_ATOM, driver_mk_atom("type"), ERL_DRV_ATOM, driver_mk_atom("type"),
ERL_DRV_ATOM, driver_mk_atom("queue"), ERL_DRV_ATOM, driver_mk_atom("queue"),
ERL_DRV_TUPLE, 2, ERL_DRV_TUPLE, 2,
QS_STAT_TUPLE(qsp, qs_magic), /* Magic number. */ QS_STAT_TUPLE(qsp, qs_magic), /* Magic number. */
QS_STAT_TUPLE(qsp, version), /* Version number. */ QS_STAT_TUPLE(qsp, version), /* Version number. */
QS_STAT_TUPLE(qsp, metaflags), /* Metadata flags. */ QS_STAT_TUPLE(qsp, metaflags), /* Metadata flags. */
QS_STAT_TUPLE(qsp, nkeys), /* Number of unique keys. */ QS_STAT_TUPLE(qsp, nkeys), /* Number of unique keys. */
QS_STAT_TUPLE(qsp, ndata), /* Number of data items. */ QS_STAT_TUPLE(qsp, ndata), /* Number of data items. */
QS_STAT_TUPLE(qsp, pagesize), /* Page size. */ QS_STAT_TUPLE(qsp, pagesize), /* Page size. */
QS_STAT_TUPLE(qsp, extentsize), /* Pages per extent. */ QS_STAT_TUPLE(qsp, extentsize), /* Pages per extent. */
QS_STAT_TUPLE(qsp, pages), /* Data pages. */ QS_STAT_TUPLE(qsp, pages), /* Data pages. */
QS_STAT_TUPLE(qsp, re_len), /* Fixed-length record length. */ QS_STAT_TUPLE(qsp, re_len), /* Fixed-length record length. */
QS_STAT_TUPLE(qsp, re_pad), /* Fixed-length record pad. */ QS_STAT_TUPLE(qsp, re_pad), /* Fixed-length record pad. */
QS_STAT_TUPLE(qsp, pgfree), /* Bytes free in data pages. */ QS_STAT_TUPLE(qsp, pgfree), /* Bytes free in data pages. */
QS_STAT_TUPLE(qsp, first_recno), /* First not deleted record. */ QS_STAT_TUPLE(qsp, first_recno), /* First not deleted record. */
QS_STAT_TUPLE(qsp, cur_recno), /* Next available record number. */ QS_STAT_TUPLE(qsp, cur_recno), /* Next available record number. */
// End of list // End of list
ERL_DRV_NIL, ERL_DRV_NIL,
ERL_DRV_LIST, 13+2, ERL_DRV_LIST, 13+2,
ERL_DRV_TUPLE, 2 ERL_DRV_TUPLE, 2
}; };
driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0]));
} }
@ -213,53 +213,53 @@ static void async_cleanup_and_send_lock_stats(PortData* d, DB_LOCK_STAT *lsp)
ErlDrvTermData response[] = { ErlDrvTermData response[] = {
ERL_DRV_ATOM, driver_mk_atom("ok"), ERL_DRV_ATOM, driver_mk_atom("ok"),
// Start of list // Start of list
ST_STATS_TUPLE(lsp, id), /* Last allocated locker ID. */ ST_STATS_TUPLE(lsp, id), /* Last allocated locker ID. */
ST_STATS_TUPLE(lsp, cur_maxid), /* Current maximum unused ID. */ ST_STATS_TUPLE(lsp, cur_maxid), /* Current maximum unused ID. */
ST_STATS_TUPLE(lsp, maxlocks), /* Maximum number of locks in table. */ ST_STATS_TUPLE(lsp, maxlocks), /* Maximum number of locks in table. */
ST_STATS_TUPLE(lsp, maxlockers), /* Maximum num of lockers in table. */ ST_STATS_TUPLE(lsp, maxlockers), /* Maximum num of lockers in table. */
ST_STATS_TUPLE(lsp, maxobjects), /* Maximum num of objects in table. */ ST_STATS_TUPLE(lsp, maxobjects), /* Maximum num of objects in table. */
ST_STATS_TUPLE(lsp, partitions), /* number of partitions. */ ST_STATS_TUPLE(lsp, partitions), /* number of partitions. */
ST_STATS_INT_TUPLE(lsp, nmodes), /* Number of lock modes. */ ST_STATS_INT_TUPLE(lsp, nmodes), /* Number of lock modes. */
ST_STATS_TUPLE(lsp, nlockers), /* Current number of lockers. */ ST_STATS_TUPLE(lsp, nlockers), /* Current number of lockers. */
ST_STATS_TUPLE(lsp, nlocks), /* Current number of locks. */ ST_STATS_TUPLE(lsp, nlocks), /* Current number of locks. */
ST_STATS_TUPLE(lsp, maxnlocks), /* Maximum number of locks so far. */ ST_STATS_TUPLE(lsp, maxnlocks), /* Maximum number of locks so far. */
ST_STATS_TUPLE(lsp, maxhlocks), /* Maximum number of locks in any bucket. */ ST_STATS_TUPLE(lsp, maxhlocks), /* Maximum number of locks in any bucket. */
ST_STATS_TUPLE(lsp, locksteals), /* Number of lock steals so far. */ ST_STATS_TUPLE(lsp, locksteals), /* Number of lock steals so far. */
ST_STATS_TUPLE(lsp, maxlsteals), /* Maximum number steals in any partition. */ ST_STATS_TUPLE(lsp, maxlsteals), /* Maximum number steals in any partition. */
ST_STATS_TUPLE(lsp, maxnlockers), /* Maximum number of lockers so far. */ ST_STATS_TUPLE(lsp, maxnlockers), /* Maximum number of lockers so far. */
ST_STATS_TUPLE(lsp, nobjects), /* Current number of objects. */ ST_STATS_TUPLE(lsp, nobjects), /* Current number of objects. */
ST_STATS_TUPLE(lsp, maxnobjects), /* Maximum number of objects so far. */ ST_STATS_TUPLE(lsp, maxnobjects), /* Maximum number of objects so far. */
ST_STATS_TUPLE(lsp, maxhobjects), /* Maximum number of objectsin any bucket. */ ST_STATS_TUPLE(lsp, maxhobjects), /* Maximum number of objectsin any bucket. */
ST_STATS_TUPLE(lsp, objectsteals), /* Number of objects steals so far. */ ST_STATS_TUPLE(lsp, objectsteals), /* Number of objects steals so far. */
ST_STATS_TUPLE(lsp, maxosteals), /* Maximum number of steals in any partition. */ ST_STATS_TUPLE(lsp, maxosteals), /* Maximum number of steals in any partition. */
ST_STATS_TUPLE(lsp, nrequests), /* Number of lock gets. */ ST_STATS_TUPLE(lsp, nrequests), /* Number of lock gets. */
ST_STATS_TUPLE(lsp, nreleases), /* Number of lock puts. */ ST_STATS_TUPLE(lsp, nreleases), /* Number of lock puts. */
ST_STATS_TUPLE(lsp, nupgrade), /* Number of lock upgrades. */ ST_STATS_TUPLE(lsp, nupgrade), /* Number of lock upgrades. */
ST_STATS_TUPLE(lsp, ndowngrade), /* Number of lock downgrades. */ ST_STATS_TUPLE(lsp, ndowngrade), /* Number of lock downgrades. */
ST_STATS_TUPLE(lsp, lock_wait), /* Lock conflicts w/ subsequent wait */ ST_STATS_TUPLE(lsp, lock_wait), /* Lock conflicts w/ subsequent wait */
ST_STATS_TUPLE(lsp, lock_nowait), /* Lock conflicts w/o subsequent wait */ ST_STATS_TUPLE(lsp, lock_nowait), /* Lock conflicts w/o subsequent wait */
ST_STATS_TUPLE(lsp, ndeadlocks), /* Number of lock deadlocks. */ ST_STATS_TUPLE(lsp, ndeadlocks), /* Number of lock deadlocks. */
ST_STATS_TUPLE(lsp, locktimeout), /* Lock timeout. */ ST_STATS_TUPLE(lsp, locktimeout), /* Lock timeout. */
ST_STATS_TUPLE(lsp, nlocktimeouts), /* Number of lock timeouts. */ ST_STATS_TUPLE(lsp, nlocktimeouts), /* Number of lock timeouts. */
ST_STATS_TUPLE(lsp, txntimeout), /* Transaction timeout. */ ST_STATS_TUPLE(lsp, txntimeout), /* Transaction timeout. */
ST_STATS_TUPLE(lsp, ntxntimeouts), /* Number of transaction timeouts. */ ST_STATS_TUPLE(lsp, ntxntimeouts), /* Number of transaction timeouts. */
ST_STATS_TUPLE(lsp, part_wait), /* Partition lock granted after wait. */ ST_STATS_TUPLE(lsp, part_wait), /* Partition lock granted after wait. */
ST_STATS_TUPLE(lsp, part_nowait), /* Partition lock granted without wait. */ ST_STATS_TUPLE(lsp, part_nowait), /* Partition lock granted without wait. */
ST_STATS_TUPLE(lsp, part_max_wait), /* Max partition lock granted after wait. */ ST_STATS_TUPLE(lsp, part_max_wait), /* Max partition lock granted after wait. */
ST_STATS_TUPLE(lsp, part_max_nowait), /* Max partition lock granted without wait. */ ST_STATS_TUPLE(lsp, part_max_nowait), /* Max partition lock granted without wait. */
ST_STATS_TUPLE(lsp, objs_wait), /* Object lock granted after wait. */ ST_STATS_TUPLE(lsp, objs_wait), /* Object lock granted after wait. */
ST_STATS_TUPLE(lsp, objs_nowait), /* Object lock granted without wait. */ ST_STATS_TUPLE(lsp, objs_nowait), /* Object lock granted without wait. */
ST_STATS_TUPLE(lsp, lockers_wait), /* Locker lock granted after wait. */ ST_STATS_TUPLE(lsp, lockers_wait), /* Locker lock granted after wait. */
ST_STATS_TUPLE(lsp, lockers_nowait),/* Locker lock granted without wait. */ ST_STATS_TUPLE(lsp, lockers_nowait),/* Locker lock granted without wait. */
ST_STATS_TUPLE(lsp, region_wait), /* Region lock granted after wait. */ ST_STATS_TUPLE(lsp, region_wait), /* Region lock granted after wait. */
ST_STATS_TUPLE(lsp, region_nowait), /* Region lock granted without wait. */ ST_STATS_TUPLE(lsp, region_nowait), /* Region lock granted without wait. */
ST_STATS_TUPLE(lsp, hash_len), /* Max length of bucket. */ ST_STATS_TUPLE(lsp, hash_len), /* Max length of bucket. */
ST_STATS_TUPLE(lsp, regsize), /* Region size. - will have to cast to uint */ ST_STATS_TUPLE(lsp, regsize), /* Region size. - will have to cast to uint */
// End of list // End of list
ERL_DRV_NIL, ERL_DRV_NIL,
ERL_DRV_LIST, 42+1, ERL_DRV_LIST, 42+1,
ERL_DRV_TUPLE, 2 ERL_DRV_TUPLE, 2
}; };
driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0]));
} }
@ -278,34 +278,34 @@ static void async_cleanup_and_send_log_stats(PortData* d, DB_LOG_STAT *lsp)
ErlDrvTermData response[] = { ErlDrvTermData response[] = {
ERL_DRV_ATOM, driver_mk_atom("ok"), ERL_DRV_ATOM, driver_mk_atom("ok"),
// Start of list // Start of list
ST_STATS_TUPLE(lsp, magic), /* Log file magic number. */ ST_STATS_TUPLE(lsp, magic), /* Log file magic number. */
ST_STATS_TUPLE(lsp, version), /* Log file version number. */ ST_STATS_TUPLE(lsp, version), /* Log file version number. */
ST_STATS_INT_TUPLE(lsp, mode), /* Log file permissions mode. */ ST_STATS_INT_TUPLE(lsp, mode), /* Log file permissions mode. */
ST_STATS_TUPLE(lsp, lg_bsize), /* Log buffer size. */ ST_STATS_TUPLE(lsp, lg_bsize), /* Log buffer size. */
ST_STATS_TUPLE(lsp, lg_size), /* Log file size. */ ST_STATS_TUPLE(lsp, lg_size), /* Log file size. */
ST_STATS_TUPLE(lsp, wc_bytes), /* Bytes to log since checkpoint. */ ST_STATS_TUPLE(lsp, wc_bytes), /* Bytes to log since checkpoint. */
ST_STATS_TUPLE(lsp, wc_mbytes), /* Megabytes to log since checkpoint. */ ST_STATS_TUPLE(lsp, wc_mbytes), /* Megabytes to log since checkpoint. */
ST_STATS_TUPLE(lsp, record), /* Records entered into the log. */ ST_STATS_TUPLE(lsp, record), /* Records entered into the log. */
ST_STATS_TUPLE(lsp, w_bytes), /* Bytes to log. */ ST_STATS_TUPLE(lsp, w_bytes), /* Bytes to log. */
ST_STATS_TUPLE(lsp, w_mbytes), /* Megabytes to log. */ ST_STATS_TUPLE(lsp, w_mbytes), /* Megabytes to log. */
ST_STATS_TUPLE(lsp, wcount), /* Total I/O writes to the log. */ ST_STATS_TUPLE(lsp, wcount), /* Total I/O writes to the log. */
ST_STATS_TUPLE(lsp, wcount_fill),/* Overflow writes to the log. */ ST_STATS_TUPLE(lsp, wcount_fill),/* Overflow writes to the log. */
ST_STATS_TUPLE(lsp, rcount), /* Total I/O reads from the log. */ ST_STATS_TUPLE(lsp, rcount), /* Total I/O reads from the log. */
ST_STATS_TUPLE(lsp, scount), /* Total syncs to the log. */ ST_STATS_TUPLE(lsp, scount), /* Total syncs to the log. */
ST_STATS_TUPLE(lsp, region_wait), /* Region lock granted after wait. */ ST_STATS_TUPLE(lsp, region_wait), /* Region lock granted after wait. */
ST_STATS_TUPLE(lsp, region_nowait), /* Region lock granted without wait. */ ST_STATS_TUPLE(lsp, region_nowait), /* Region lock granted without wait. */
ST_STATS_TUPLE(lsp, cur_file), /* Current log file number. */ ST_STATS_TUPLE(lsp, cur_file), /* Current log file number. */
ST_STATS_TUPLE(lsp, cur_offset),/* Current log file offset. */ ST_STATS_TUPLE(lsp, cur_offset),/* Current log file offset. */
ST_STATS_TUPLE(lsp, disk_file), /* Known on disk log file number. */ ST_STATS_TUPLE(lsp, disk_file), /* Known on disk log file number. */
ST_STATS_TUPLE(lsp, disk_offset), /* Known on disk log file offset. */ ST_STATS_TUPLE(lsp, disk_offset), /* Known on disk log file offset. */
ST_STATS_TUPLE(lsp, maxcommitperflush), /* Max number of commits in a flush. */ ST_STATS_TUPLE(lsp, maxcommitperflush), /* Max number of commits in a flush. */
ST_STATS_TUPLE(lsp, mincommitperflush), /* Min number of commits in a flush. */ ST_STATS_TUPLE(lsp, mincommitperflush), /* Min number of commits in a flush. */
ST_STATS_TUPLE(lsp, regsize), /* Region size. */ ST_STATS_TUPLE(lsp, regsize), /* Region size. */
// End of list // End of list
ERL_DRV_NIL, ERL_DRV_NIL,
ERL_DRV_LIST, 23+1, ERL_DRV_LIST, 23+1,
ERL_DRV_TUPLE, 2 ERL_DRV_TUPLE, 2
}; };
driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0]));
} }
@ -320,18 +320,18 @@ static void send_mpool_fstat(ErlDrvPort port, ErlDrvTermData pid, DB_MPOOL_FSTAT
ERL_DRV_ATOM, driver_mk_atom("name"), ERL_DRV_ATOM, driver_mk_atom("name"),
ERL_DRV_STRING, (ErlDrvTermData) name, name_len, ERL_DRV_STRING, (ErlDrvTermData) name, name_len,
ERL_DRV_TUPLE, 2, ERL_DRV_TUPLE, 2,
ST_STATS_TUPLE(fsp, map), /* Pages from mapped files. */ ST_STATS_TUPLE(fsp, map), /* Pages from mapped files. */
ST_STATS_TUPLE(fsp, cache_hit), /* Pages found in the cache. */ ST_STATS_TUPLE(fsp, cache_hit), /* Pages found in the cache. */
ST_STATS_TUPLE(fsp, cache_miss), /* Pages not found in the cache. */ ST_STATS_TUPLE(fsp, cache_miss), /* Pages not found in the cache. */
ST_STATS_TUPLE(fsp, page_create), /* Pages created in the cache. */ ST_STATS_TUPLE(fsp, page_create), /* Pages created in the cache. */
ST_STATS_TUPLE(fsp, page_in), /* Pages read in. */ ST_STATS_TUPLE(fsp, page_in), /* Pages read in. */
ST_STATS_TUPLE(fsp, page_out), /* Pages written out. */ ST_STATS_TUPLE(fsp, page_out), /* Pages written out. */
// End of list // End of list
ERL_DRV_NIL, ERL_DRV_NIL,
ERL_DRV_LIST, 7+1, ERL_DRV_LIST, 7+1,
ERL_DRV_TUPLE, 2 ERL_DRV_TUPLE, 2
}; };
driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0]));
} }
static void async_cleanup_and_send_memp_stats(PortData* d, DB_MPOOL_STAT *gsp, static void async_cleanup_and_send_memp_stats(PortData* d, DB_MPOOL_STAT *gsp,
@ -357,51 +357,51 @@ static void async_cleanup_and_send_memp_stats(PortData* d, DB_MPOOL_STAT *gsp,
ErlDrvTermData response[] = { ErlDrvTermData response[] = {
ERL_DRV_ATOM, driver_mk_atom("ok"), ERL_DRV_ATOM, driver_mk_atom("ok"),
// Start of list // Start of list
ST_STATS_TUPLE(gsp, gbytes), /* Total cache size: GB. */ ST_STATS_TUPLE(gsp, gbytes), /* Total cache size: GB. */
ST_STATS_TUPLE(gsp, bytes), /* Total cache size: B. */ ST_STATS_TUPLE(gsp, bytes), /* Total cache size: B. */
ST_STATS_TUPLE(gsp, ncache), /* Number of cache regions. */ ST_STATS_TUPLE(gsp, ncache), /* Number of cache regions. */
ST_STATS_TUPLE(gsp, max_ncache), /* Maximum number of regions. */ ST_STATS_TUPLE(gsp, max_ncache), /* Maximum number of regions. */
ST_STATS_INT_TUPLE(gsp, mmapsize), /* Maximum file size for mmap. */ ST_STATS_INT_TUPLE(gsp, mmapsize), /* Maximum file size for mmap. */
ST_STATS_INT_TUPLE(gsp, maxopenfd), /* Maximum number of open fd's. */ ST_STATS_INT_TUPLE(gsp, maxopenfd), /* Maximum number of open fd's. */
ST_STATS_INT_TUPLE(gsp, maxwrite), /* Maximum buffers to write. */ ST_STATS_INT_TUPLE(gsp, maxwrite), /* Maximum buffers to write. */
ST_STATS_TUPLE(gsp, maxwrite_sleep), /* Sleep after writing max buffers. */ ST_STATS_TUPLE(gsp, maxwrite_sleep), /* Sleep after writing max buffers. */
ST_STATS_TUPLE(gsp, pages), /* Total number of pages. */ ST_STATS_TUPLE(gsp, pages), /* Total number of pages. */
ST_STATS_TUPLE(gsp, map), /* Pages from mapped files. */ ST_STATS_TUPLE(gsp, map), /* Pages from mapped files. */
ST_STATS_TUPLE(gsp, cache_hit), /* Pages found in the cache. */ ST_STATS_TUPLE(gsp, cache_hit), /* Pages found in the cache. */
ST_STATS_TUPLE(gsp, cache_miss), /* Pages not found in the cache. */ ST_STATS_TUPLE(gsp, cache_miss), /* Pages not found in the cache. */
ST_STATS_TUPLE(gsp, page_create), /* Pages created in the cache. */ ST_STATS_TUPLE(gsp, page_create), /* Pages created in the cache. */
ST_STATS_TUPLE(gsp, page_in), /* Pages read in. */ ST_STATS_TUPLE(gsp, page_in), /* Pages read in. */
ST_STATS_TUPLE(gsp, page_out), /* Pages written out. */ ST_STATS_TUPLE(gsp, page_out), /* Pages written out. */
ST_STATS_TUPLE(gsp, ro_evict), /* Clean pages forced from the cache. */ ST_STATS_TUPLE(gsp, ro_evict), /* Clean pages forced from the cache. */
ST_STATS_TUPLE(gsp, rw_evict), /* Dirty pages forced from the cache. */ ST_STATS_TUPLE(gsp, rw_evict), /* Dirty pages forced from the cache. */
ST_STATS_TUPLE(gsp, page_trickle), /* Pages written by memp_trickle. */ ST_STATS_TUPLE(gsp, page_trickle), /* Pages written by memp_trickle. */
ST_STATS_TUPLE(gsp, page_clean), /* Clean pages. */ ST_STATS_TUPLE(gsp, page_clean), /* Clean pages. */
ST_STATS_TUPLE(gsp, page_dirty), /* Dirty pages. */ ST_STATS_TUPLE(gsp, page_dirty), /* Dirty pages. */
ST_STATS_TUPLE(gsp, hash_buckets), /* Number of hash buckets. */ ST_STATS_TUPLE(gsp, hash_buckets), /* Number of hash buckets. */
ST_STATS_TUPLE(gsp, hash_searches), /* Total hash chain searches. */ ST_STATS_TUPLE(gsp, hash_searches), /* Total hash chain searches. */
ST_STATS_TUPLE(gsp, hash_longest), /* Longest hash chain searched. */ ST_STATS_TUPLE(gsp, hash_longest), /* Longest hash chain searched. */
ST_STATS_TUPLE(gsp, hash_examined), /* Total hash entries searched. */ ST_STATS_TUPLE(gsp, hash_examined), /* Total hash entries searched. */
ST_STATS_TUPLE(gsp, hash_nowait), /* Hash lock granted with nowait. */ ST_STATS_TUPLE(gsp, hash_nowait), /* Hash lock granted with nowait. */
ST_STATS_TUPLE(gsp, hash_wait), /* Hash lock granted after wait. */ ST_STATS_TUPLE(gsp, hash_wait), /* Hash lock granted after wait. */
ST_STATS_TUPLE(gsp, hash_max_nowait), /* Max hash lock granted with nowait. */ ST_STATS_TUPLE(gsp, hash_max_nowait), /* Max hash lock granted with nowait. */
ST_STATS_TUPLE(gsp, hash_max_wait), /* Max hash lock granted after wait. */ ST_STATS_TUPLE(gsp, hash_max_wait), /* Max hash lock granted after wait. */
ST_STATS_TUPLE(gsp, region_nowait), /* Region lock granted with nowait. */ ST_STATS_TUPLE(gsp, region_nowait), /* Region lock granted with nowait. */
ST_STATS_TUPLE(gsp, region_wait), /* Region lock granted after wait. */ ST_STATS_TUPLE(gsp, region_wait), /* Region lock granted after wait. */
ST_STATS_TUPLE(gsp, mvcc_frozen), /* Buffers frozen. */ ST_STATS_TUPLE(gsp, mvcc_frozen), /* Buffers frozen. */
ST_STATS_TUPLE(gsp, mvcc_thawed), /* Buffers thawed. */ ST_STATS_TUPLE(gsp, mvcc_thawed), /* Buffers thawed. */
ST_STATS_TUPLE(gsp, mvcc_freed), /* Frozen buffers freed. */ ST_STATS_TUPLE(gsp, mvcc_freed), /* Frozen buffers freed. */
ST_STATS_TUPLE(gsp, alloc), /* Number of page allocations. */ ST_STATS_TUPLE(gsp, alloc), /* Number of page allocations. */
ST_STATS_TUPLE(gsp, alloc_buckets), /* Buckets checked during allocation. */ ST_STATS_TUPLE(gsp, alloc_buckets), /* Buckets checked during allocation. */
ST_STATS_TUPLE(gsp, alloc_max_buckets), /* Max checked during allocation. */ ST_STATS_TUPLE(gsp, alloc_max_buckets), /* Max checked during allocation. */
ST_STATS_TUPLE(gsp, alloc_pages), /* Pages checked during allocation. */ ST_STATS_TUPLE(gsp, alloc_pages), /* Pages checked during allocation. */
ST_STATS_TUPLE(gsp, alloc_max_pages), /* Max checked during allocation. */ ST_STATS_TUPLE(gsp, alloc_max_pages), /* Max checked during allocation. */
ST_STATS_TUPLE(gsp, io_wait), /* Thread waited on buffer I/O. */ ST_STATS_TUPLE(gsp, io_wait), /* Thread waited on buffer I/O. */
ST_STATS_TUPLE(gsp, regsize), /* Region size. */ ST_STATS_TUPLE(gsp, regsize), /* Region size. */
// End of list // End of list
ERL_DRV_NIL, ERL_DRV_NIL,
ERL_DRV_LIST, 40+1, ERL_DRV_LIST, 40+1,
ERL_DRV_TUPLE, 2 ERL_DRV_TUPLE, 2
}; };
driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0]));
} }
@ -421,19 +421,19 @@ static void async_cleanup_and_send_mutex_stats(PortData* d, DB_MUTEX_STAT *msp)
ErlDrvTermData response[] = { ErlDrvTermData response[] = {
ERL_DRV_ATOM, driver_mk_atom("ok"), ERL_DRV_ATOM, driver_mk_atom("ok"),
// Start of list // Start of list
ST_STATS_TUPLE(msp, mutex_align), /* Mutex alignment */ ST_STATS_TUPLE(msp, mutex_align), /* Mutex alignment */
ST_STATS_TUPLE(msp, mutex_tas_spins), /* Mutex test-and-set spins */ ST_STATS_TUPLE(msp, mutex_tas_spins), /* Mutex test-and-set spins */
ST_STATS_TUPLE(msp, mutex_cnt), /* Mutex count */ ST_STATS_TUPLE(msp, mutex_cnt), /* Mutex count */
ST_STATS_TUPLE(msp, mutex_free), /* Available mutexes */ ST_STATS_TUPLE(msp, mutex_free), /* Available mutexes */
ST_STATS_TUPLE(msp, mutex_inuse), /* Mutexes in use */ ST_STATS_TUPLE(msp, mutex_inuse), /* Mutexes in use */
ST_STATS_TUPLE(msp, mutex_inuse_max), /* Maximum mutexes ever in use */ ST_STATS_TUPLE(msp, mutex_inuse_max), /* Maximum mutexes ever in use */
ST_STATS_TUPLE(msp, region_wait), /* Region lock granted after wait. */ ST_STATS_TUPLE(msp, region_wait), /* Region lock granted after wait. */
ST_STATS_TUPLE(msp, region_nowait), /* Region lock granted without wait. */ ST_STATS_TUPLE(msp, region_nowait), /* Region lock granted without wait. */
ST_STATS_TUPLE(msp, regsize), /* Region size. */ ST_STATS_TUPLE(msp, regsize), /* Region size. */
// End of list // End of list
ERL_DRV_NIL, ERL_DRV_NIL,
ERL_DRV_LIST, 9+1, ERL_DRV_LIST, 9+1,
ERL_DRV_TUPLE, 2 ERL_DRV_TUPLE, 2
}; };
driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0]));
} }
@ -479,15 +479,15 @@ static void send_txn_tstat(ErlDrvPort port, ErlDrvTermData pid, DB_TXN_ACTIVE *t
ErlDrvTermData response[] = { ErlDrvTermData response[] = {
ERL_DRV_ATOM, driver_mk_atom("txn"), ERL_DRV_ATOM, driver_mk_atom("txn"),
STATS_TUPLE(tasp, txnid), /* Transaction ID */ STATS_TUPLE(tasp, txnid), /* Transaction ID */
STATS_TUPLE(tasp, parentid), /* Transaction ID of parent */ STATS_TUPLE(tasp, parentid), /* Transaction ID of parent */
STATS_TUPLE(tasp, pid), /* Process owning txn ID - pid_t */ STATS_TUPLE(tasp, pid), /* Process owning txn ID - pid_t */
ERL_DRV_ATOM, driver_mk_atom("tid"),/* OSX has 32-bit ints in erlang, so return as */ ERL_DRV_ATOM, driver_mk_atom("tid"),/* OSX has 32-bit ints in erlang, so return as */
ERL_DRV_STRING, (ErlDrvTermData) tid_str, tid_str_len, /* a string */ ERL_DRV_STRING, (ErlDrvTermData) tid_str, tid_str_len, /* a string */
ERL_DRV_TUPLE, 2, ERL_DRV_TUPLE, 2,
STATS_LSN_TUPLE(tasp, lsn), /* LSN when transaction began */ STATS_LSN_TUPLE(tasp, lsn), /* LSN when transaction began */
STATS_LSN_TUPLE(tasp, read_lsn), /* Read LSN for MVCC */ STATS_LSN_TUPLE(tasp, read_lsn), /* Read LSN for MVCC */
STATS_TUPLE(tasp, mvcc_ref), /* MVCC reference count */ STATS_TUPLE(tasp, mvcc_ref), /* MVCC reference count */
// Start of list // Start of list
ERL_DRV_ATOM, driver_mk_atom("status"), ERL_DRV_ATOM, driver_mk_atom("status"),
@ -500,11 +500,11 @@ static void send_txn_tstat(ErlDrvPort port, ErlDrvTermData pid, DB_TXN_ACTIVE *t
// End of list // End of list
ERL_DRV_NIL, ERL_DRV_NIL,
ERL_DRV_LIST, 9+1, ERL_DRV_LIST, 9+1,
ERL_DRV_TUPLE, 2 ERL_DRV_TUPLE, 2
}; };
driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0]));
} }
#define ST_STATS_LSN_TUPLE(base, member) \ #define ST_STATS_LSN_TUPLE(base, member) \
@ -536,26 +536,26 @@ static void async_cleanup_and_send_txn_stats(PortData* d, DB_TXN_STAT *tsp)
ErlDrvTermData response[] = { ErlDrvTermData response[] = {
ERL_DRV_ATOM, driver_mk_atom("ok"), ERL_DRV_ATOM, driver_mk_atom("ok"),
// Start of list // Start of list
ST_STATS_TUPLE(tsp, nrestores), /* number of restored transactions ST_STATS_TUPLE(tsp, nrestores), /* number of restored transactions
after recovery. */ after recovery. */
ST_STATS_LSN_TUPLE(tsp, last_ckp), /* lsn of the last checkpoint */ ST_STATS_LSN_TUPLE(tsp, last_ckp), /* lsn of the last checkpoint */
ST_STATS_TUPLE(tsp, time_ckp), /* time of last checkpoint (time_t to uint) */ ST_STATS_TUPLE(tsp, time_ckp), /* time of last checkpoint (time_t to uint) */
ST_STATS_TUPLE(tsp, last_txnid), /* last transaction id given out */ ST_STATS_TUPLE(tsp, last_txnid), /* last transaction id given out */
ST_STATS_TUPLE(tsp, maxtxns), /* maximum txns possible */ ST_STATS_TUPLE(tsp, maxtxns), /* maximum txns possible */
ST_STATS_TUPLE(tsp, naborts), /* number of aborted transactions */ ST_STATS_TUPLE(tsp, naborts), /* number of aborted transactions */
ST_STATS_TUPLE(tsp, nbegins), /* number of begun transactions */ ST_STATS_TUPLE(tsp, nbegins), /* number of begun transactions */
ST_STATS_TUPLE(tsp, ncommits), /* number of committed transactions */ ST_STATS_TUPLE(tsp, ncommits), /* number of committed transactions */
ST_STATS_TUPLE(tsp, nactive), /* number of active transactions */ ST_STATS_TUPLE(tsp, nactive), /* number of active transactions */
ST_STATS_TUPLE(tsp, nsnapshot), /* number of snapshot transactions */ ST_STATS_TUPLE(tsp, nsnapshot), /* number of snapshot transactions */
ST_STATS_TUPLE(tsp, maxnactive), /* maximum active transactions */ ST_STATS_TUPLE(tsp, maxnactive), /* maximum active transactions */
ST_STATS_TUPLE(tsp, maxnsnapshot), /* maximum snapshot transactions */ ST_STATS_TUPLE(tsp, maxnsnapshot), /* maximum snapshot transactions */
ST_STATS_TUPLE(tsp, region_wait), /* Region lock granted after wait. */ ST_STATS_TUPLE(tsp, region_wait), /* Region lock granted after wait. */
ST_STATS_TUPLE(tsp, region_nowait), /* Region lock granted without wait. */ ST_STATS_TUPLE(tsp, region_nowait), /* Region lock granted without wait. */
ST_STATS_TUPLE(tsp, regsize), /* Region size. */ ST_STATS_TUPLE(tsp, regsize), /* Region size. */
// End of list // End of list
ERL_DRV_NIL, ERL_DRV_NIL,
ERL_DRV_LIST, 15+1, ERL_DRV_LIST, 15+1,
ERL_DRV_TUPLE, 2 ERL_DRV_TUPLE, 2
}; };
driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0]));
} }
@ -598,16 +598,16 @@ static void do_async_stat(void* arg)
async_cleanup_and_send_queue_stats(d, sp); async_cleanup_and_send_queue_stats(d, sp);
break; break;
#endif #endif
default: default:
bdberl_async_cleanup_and_send_rc(d, ERROR_INVALID_DB_TYPE); bdberl_async_cleanup_and_send_rc(d, ERROR_INVALID_DB_TYPE);
break; break;
} }
} }
// Finally, clean up value buffer (driver_send_term made a copy) // Finally, clean up value buffer (driver_send_term made a copy)
if (NULL != sp) if (NULL != sp)
{ {
free(sp); driver_free(sp);
} }
} }
@ -626,11 +626,11 @@ static void do_async_lock_stat(void* arg)
{ {
async_cleanup_and_send_lock_stats(d, lsp); async_cleanup_and_send_lock_stats(d, lsp);
} }
// Finally, clean up lock stats // Finally, clean up lock stats
if (NULL != lsp) if (NULL != lsp)
{ {
free(lsp); driver_free(lsp);
} }
} }
@ -649,11 +649,11 @@ static void do_async_log_stat(void* arg)
{ {
async_cleanup_and_send_log_stats(d, lsp); async_cleanup_and_send_log_stats(d, lsp);
} }
// Finally, clean up stats // Finally, clean up stats
if (NULL != lsp) if (NULL != lsp)
{ {
free(lsp); driver_free(lsp);
} }
} }
@ -673,15 +673,15 @@ static void do_async_memp_stat(void* arg)
{ {
async_cleanup_and_send_memp_stats(d, gsp, fsp); async_cleanup_and_send_memp_stats(d, gsp, fsp);
} }
// Finally, clean up stats // Finally, clean up stats
if (NULL != gsp) if (NULL != gsp)
{ {
free(gsp); driver_free(gsp);
} }
if (NULL != fsp) if (NULL != fsp)
{ {
free(fsp); driver_free(fsp);
} }
} }
@ -700,11 +700,11 @@ static void do_async_mutex_stat(void* arg)
{ {
async_cleanup_and_send_mutex_stats(d, msp); async_cleanup_and_send_mutex_stats(d, msp);
} }
// Finally, clean up stats // Finally, clean up stats
if (NULL != msp) if (NULL != msp)
{ {
free(msp); driver_free(msp);
} }
} }
@ -724,17 +724,17 @@ static void do_async_txn_stat(void* arg)
{ {
async_cleanup_and_send_txn_stats(d, tsp); async_cleanup_and_send_txn_stats(d, tsp);
} }
// Finally, clean up stats // Finally, clean up stats
if (NULL != tsp) if (NULL != tsp)
{ {
free(tsp); driver_free(tsp);
} }
} }
int bdberl_stats_control(PortData* d, unsigned int cmd, int bdberl_stats_control(PortData* d, unsigned int cmd,
char* inbuf, int inbuf_sz, char* inbuf, int inbuf_sz,
char** outbuf, int outbuf_sz) char** outbuf, int outbuf_sz)
{ {
switch(cmd) switch(cmd)
@ -781,10 +781,10 @@ int bdberl_stats_control(PortData* d, unsigned int cmd,
{ {
DB* db = bdberl_lookup_dbref(dbref); DB* db = bdberl_lookup_dbref(dbref);
unsigned int flags = UNPACK_INT(inbuf, 4); unsigned int flags = UNPACK_INT(inbuf, 4);
// Outbuf is <<Rc:32>> // Outbuf is <<Rc:32>>
// Run the command on the VM thread - this is for debugging only, // Run the command on the VM thread - this is for debugging only,
// any real monitoring // any real monitoring
int rc = db->stat_print(db, flags); int rc = db->stat_print(db, flags);
RETURN_INT(rc, outbuf); RETURN_INT(rc, outbuf);
} }
@ -800,7 +800,7 @@ int bdberl_stats_control(PortData* d, unsigned int cmd,
// Inbuf is << Flags:32 >> // Inbuf is << Flags:32 >>
unsigned int flags = UNPACK_INT(inbuf, 0); unsigned int flags = UNPACK_INT(inbuf, 0);
// Outbuf is <<Rc:32>> // Outbuf is <<Rc:32>>
int rc = bdberl_db_env()->stat_print(bdberl_db_env(), flags); int rc = bdberl_db_env()->stat_print(bdberl_db_env(), flags);
RETURN_INT(rc, outbuf); RETURN_INT(rc, outbuf);
@ -813,7 +813,7 @@ int bdberl_stats_control(PortData* d, unsigned int cmd,
d->async_op = cmd; d->async_op = cmd;
d->async_flags = UNPACK_INT(inbuf, 0); d->async_flags = UNPACK_INT(inbuf, 0);
bdberl_general_tpool_run(&do_async_lock_stat, d, 0, &d->async_job); bdberl_general_tpool_run(&do_async_lock_stat, d, 0, &d->async_job);
// Let caller know that the operation is in progress // Let caller know that the operation is in progress
// Outbuf is: <<0:32>> // Outbuf is: <<0:32>>
RETURN_INT(0, outbuf); RETURN_INT(0, outbuf);
@ -824,10 +824,10 @@ int bdberl_stats_control(PortData* d, unsigned int cmd,
// Inbuf is << Flags:32 >> // Inbuf is << Flags:32 >>
unsigned int flags = UNPACK_INT(inbuf, 0); unsigned int flags = UNPACK_INT(inbuf, 0);
// Outbuf is <<Rc:32>> // Outbuf is <<Rc:32>>
// Run the command on the VM thread - this is for debugging only, // Run the command on the VM thread - this is for debugging only,
// any real monitoring will use the async lock_stat // any real monitoring will use the async lock_stat
int rc = bdberl_db_env()->lock_stat_print(bdberl_db_env(), flags); int rc = bdberl_db_env()->lock_stat_print(bdberl_db_env(), flags);
RETURN_INT(rc, outbuf); RETURN_INT(rc, outbuf);
} }
@ -841,7 +841,7 @@ int bdberl_stats_control(PortData* d, unsigned int cmd,
d->async_op = cmd; d->async_op = cmd;
d->async_flags = UNPACK_INT(inbuf, 0); d->async_flags = UNPACK_INT(inbuf, 0);
bdberl_general_tpool_run(&do_async_log_stat, d, 0, &d->async_job); bdberl_general_tpool_run(&do_async_log_stat, d, 0, &d->async_job);
// Let caller know that the operation is in progress // Let caller know that the operation is in progress
// Outbuf is: <<0:32>> // Outbuf is: <<0:32>>
RETURN_INT(0, outbuf); RETURN_INT(0, outbuf);
@ -852,10 +852,10 @@ int bdberl_stats_control(PortData* d, unsigned int cmd,
// Inbuf is << Flags:32 >> // Inbuf is << Flags:32 >>
unsigned int flags = UNPACK_INT(inbuf, 0); unsigned int flags = UNPACK_INT(inbuf, 0);
// Outbuf is <<Rc:32>> // Outbuf is <<Rc:32>>
// Run the command on the VM thread - this is for debugging only, // Run the command on the VM thread - this is for debugging only,
// any real monitoring will use the async lock_stat // any real monitoring will use the async lock_stat
int rc = bdberl_db_env()->log_stat_print(bdberl_db_env(), flags); int rc = bdberl_db_env()->log_stat_print(bdberl_db_env(), flags);
RETURN_INT(rc, outbuf); RETURN_INT(rc, outbuf);
} }
@ -864,12 +864,12 @@ int bdberl_stats_control(PortData* d, unsigned int cmd,
FAIL_IF_ASYNC_PENDING(d, outbuf); FAIL_IF_ASYNC_PENDING(d, outbuf);
// Inbuf is <<Flags:32 >> // Inbuf is <<Flags:32 >>
// Mark the port as busy and then schedule the appropriate async operation // Mark the port as busy and then schedule the appropriate async operation
d->async_op = cmd; d->async_op = cmd;
d->async_flags = UNPACK_INT(inbuf, 0); d->async_flags = UNPACK_INT(inbuf, 0);
bdberl_general_tpool_run(&do_async_memp_stat, d, 0, &d->async_job); bdberl_general_tpool_run(&do_async_memp_stat, d, 0, &d->async_job);
// Let caller know that the operation is in progress // Let caller know that the operation is in progress
// Outbuf is: <<0:32>> // Outbuf is: <<0:32>>
RETURN_INT(0, outbuf); RETURN_INT(0, outbuf);
@ -880,10 +880,10 @@ int bdberl_stats_control(PortData* d, unsigned int cmd,
// Inbuf is << Flags:32 >> // Inbuf is << Flags:32 >>
unsigned int flags = UNPACK_INT(inbuf, 0); unsigned int flags = UNPACK_INT(inbuf, 0);
// Outbuf is <<Rc:32>> // Outbuf is <<Rc:32>>
// Run the command on the VM thread - this is for debugging only, // Run the command on the VM thread - this is for debugging only,
// any real monitoring will use the async lock_stat // any real monitoring will use the async lock_stat
int rc = bdberl_db_env()->memp_stat_print(bdberl_db_env(), flags); int rc = bdberl_db_env()->memp_stat_print(bdberl_db_env(), flags);
RETURN_INT(rc, outbuf); RETURN_INT(rc, outbuf);
} }
@ -897,7 +897,7 @@ int bdberl_stats_control(PortData* d, unsigned int cmd,
d->async_op = cmd; d->async_op = cmd;
d->async_flags = UNPACK_INT(inbuf, 0); d->async_flags = UNPACK_INT(inbuf, 0);
bdberl_general_tpool_run(&do_async_mutex_stat, d, 0, &d->async_job); bdberl_general_tpool_run(&do_async_mutex_stat, d, 0, &d->async_job);
// Let caller know that the operation is in progress // Let caller know that the operation is in progress
// Outbuf is: <<0:32>> // Outbuf is: <<0:32>>
RETURN_INT(0, outbuf); RETURN_INT(0, outbuf);
@ -908,10 +908,10 @@ int bdberl_stats_control(PortData* d, unsigned int cmd,
// Inbuf is << Flags:32 >> // Inbuf is << Flags:32 >>
unsigned int flags = UNPACK_INT(inbuf, 0); unsigned int flags = UNPACK_INT(inbuf, 0);
// Outbuf is <<Rc:32>> // Outbuf is <<Rc:32>>
// Run the command on the VM thread - this is for debugging only, // Run the command on the VM thread - this is for debugging only,
// any real monitoring will use the async lock_stat // any real monitoring will use the async lock_stat
int rc = bdberl_db_env()->mutex_stat_print(bdberl_db_env(), flags); int rc = bdberl_db_env()->mutex_stat_print(bdberl_db_env(), flags);
RETURN_INT(rc, outbuf); RETURN_INT(rc, outbuf);
} }
@ -924,7 +924,7 @@ int bdberl_stats_control(PortData* d, unsigned int cmd,
d->async_op = cmd; d->async_op = cmd;
d->async_flags = UNPACK_INT(inbuf, 0); d->async_flags = UNPACK_INT(inbuf, 0);
bdberl_general_tpool_run(&do_async_txn_stat, d, 0, &d->async_job); bdberl_general_tpool_run(&do_async_txn_stat, d, 0, &d->async_job);
// Let caller know that the operation is in progress // Let caller know that the operation is in progress
// Outbuf is: <<0:32>> // Outbuf is: <<0:32>>
RETURN_INT(0, outbuf); RETURN_INT(0, outbuf);
@ -935,10 +935,10 @@ int bdberl_stats_control(PortData* d, unsigned int cmd,
// Inbuf is << Flags:32 >> // Inbuf is << Flags:32 >>
unsigned int flags = UNPACK_INT(inbuf, 0); unsigned int flags = UNPACK_INT(inbuf, 0);
// Outbuf is <<Rc:32>> // Outbuf is <<Rc:32>>
// Run the command on the VM thread - this is for debugging only, // Run the command on the VM thread - this is for debugging only,
// any real monitoring will use the async lock_stat // any real monitoring will use the async lock_stat
int rc = bdberl_db_env()->txn_stat_print(bdberl_db_env(), flags); int rc = bdberl_db_env()->txn_stat_print(bdberl_db_env(), flags);
RETURN_INT(rc, outbuf); RETURN_INT(rc, outbuf);
} }

View file

@ -1,6 +1,6 @@
/* ------------------------------------------------------------------- /* -------------------------------------------------------------------
* *
* bdberl: Thread Pool * bdberl: Thread Pool
* Copyright (c) 2008-9 The Hive http://www.thehive.com/ * Copyright (c) 2008-9 The Hive http://www.thehive.com/
* Authors: Dave "dizzyd" Smith <dizzyd@dizzyd.com> * Authors: Dave "dizzyd" Smith <dizzyd@dizzyd.com>
* Phil Toland <phil.toland@gmail.com> * Phil Toland <phil.toland@gmail.com>
@ -25,11 +25,17 @@
* THE SOFTWARE. * THE SOFTWARE.
* *
* ------------------------------------------------------------------- */ * ------------------------------------------------------------------- */
#include <db.h>
#include "bdberl_drv.h"
#include "bdberl_tpool.h" #include "bdberl_tpool.h"
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <signal.h>
#include <errno.h>
#include <pthread.h>
static void* bdberl_tpool_main(void* tpool); static void* bdberl_tpool_main(void* tpool);
static TPoolJob* next_job(TPool* tpool); static TPoolJob* next_job(TPool* tpool);
@ -37,8 +43,9 @@ static int remove_pending_job(TPool* tpool, TPoolJob* job);
static void cleanup_job(TPool* tpool, TPoolJob* job); static void cleanup_job(TPool* tpool, TPoolJob* job);
static int is_active_job(TPool* tpool, TPoolJob* job); static int is_active_job(TPool* tpool, TPoolJob* job);
#define LOCK(t) erl_drv_mutex_lock(tpool->lock) #define LOCK(tpool) erl_drv_mutex_lock(tpool->lock)
#define UNLOCK(t) erl_drv_mutex_unlock(tpool->lock) #define UNLOCK(tpool) erl_drv_mutex_unlock(tpool->lock)
TPool* bdberl_tpool_start(unsigned int thread_count) TPool* bdberl_tpool_start(unsigned int thread_count)
{ {
@ -56,10 +63,13 @@ TPool* bdberl_tpool_start(unsigned int thread_count)
int i; int i;
for (i = 0; i < thread_count; i++) for (i = 0; i < thread_count; i++)
{ {
// TODO: Figure out good way to deal with errors in this situation (should be rare, but still...) int rc = erl_drv_thread_create("bdberl_tpool_thread", &(tpool->threads[i]), &bdberl_tpool_main, (void*)tpool, 0);
erl_drv_thread_create("bdberl_tpool_thread", &(tpool->threads[i]), &bdberl_tpool_main, (void*)tpool, 0); if (0 != rc) {
// TODO: Figure out good way to deal with errors in this situation (should be rare, but still...)
fprintf(stderr, "Failed to spawn an erlang thread for the BDB thread pools! %s\n", erl_errno_id(rc));
fflush(stderr);
}
} }
return tpool; return tpool;
} }
@ -78,7 +88,7 @@ void bdberl_tpool_stop(TPool* tpool)
{ {
erl_drv_cond_wait(tpool->work_cv, tpool->lock); erl_drv_cond_wait(tpool->work_cv, tpool->lock);
} }
// Join up with all the workers // Join up with all the workers
int i = 0; int i = 0;
for (i = 0; i < tpool->thread_count; i++) for (i = 0; i < tpool->thread_count; i++)
@ -86,7 +96,7 @@ void bdberl_tpool_stop(TPool* tpool)
erl_drv_thread_join(tpool->threads[i], 0); erl_drv_thread_join(tpool->threads[i], 0);
} }
// Cleanup // Cleanup
erl_drv_cond_destroy(tpool->work_cv); erl_drv_cond_destroy(tpool->work_cv);
erl_drv_cond_destroy(tpool->cancel_cv); erl_drv_cond_destroy(tpool->cancel_cv);
driver_free(tpool->threads); driver_free(tpool->threads);
@ -95,7 +105,7 @@ void bdberl_tpool_stop(TPool* tpool)
driver_free(tpool); driver_free(tpool);
} }
void bdberl_tpool_run(TPool* tpool, TPoolJobFunc main_fn, void* arg, TPoolJobFunc cancel_fn, void bdberl_tpool_run(TPool* tpool, TPoolJobFunc main_fn, void* arg, TPoolJobFunc cancel_fn,
TPoolJob** job_ptr) TPoolJob** job_ptr)
{ {
// Allocate and fill a new job structure // Allocate and fill a new job structure
@ -107,7 +117,7 @@ void bdberl_tpool_run(TPool* tpool, TPoolJobFunc main_fn, void* arg, TPoolJobFun
// Sync up with the tpool and add the job to the pending queue // Sync up with the tpool and add the job to the pending queue
LOCK(tpool); LOCK(tpool);
if (tpool->pending_jobs) if (tpool->pending_jobs)
{ {
// Make sure the current last job points to this one next // Make sure the current last job points to this one next
@ -122,7 +132,7 @@ void bdberl_tpool_run(TPool* tpool, TPoolJobFunc main_fn, void* arg, TPoolJobFun
tpool->last_pending_job = job; tpool->last_pending_job = job;
tpool->pending_job_count++; tpool->pending_job_count++;
// Generate a notification that there is work todo. // Generate a notification that there is work todo.
// TODO: I think this may not be necessary, in the case where there are already other // TODO: I think this may not be necessary, in the case where there are already other
// pending jobs. Not sure ATM, however, so will be on safe side // pending jobs. Not sure ATM, however, so will be on safe side
erl_drv_cond_broadcast(tpool->work_cv); erl_drv_cond_broadcast(tpool->work_cv);
@ -133,7 +143,7 @@ void bdberl_tpool_cancel(TPool* tpool, TPoolJob* job)
{ {
LOCK(tpool); LOCK(tpool);
// Remove the job from the pending queue // Remove the job from the pending queue
if (remove_pending_job(tpool, job)) if (remove_pending_job(tpool, job))
{ {
// Job was removed from pending -- unlock and notify the job that it got canceled // Job was removed from pending -- unlock and notify the job that it got canceled
@ -149,7 +159,7 @@ void bdberl_tpool_cancel(TPool* tpool, TPoolJob* job)
return; return;
} }
// Job not in the pending queue -- check the active queue. // Job not in the pending queue -- check the active queue.
if (is_active_job(tpool, job)) if (is_active_job(tpool, job))
{ {
// Job is currently active -- mark it as cancelled (so we get notified) and wait for it // Job is currently active -- mark it as cancelled (so we get notified) and wait for it
@ -159,7 +169,7 @@ void bdberl_tpool_cancel(TPool* tpool, TPoolJob* job)
erl_drv_cond_wait(tpool->cancel_cv, tpool->lock); erl_drv_cond_wait(tpool->cancel_cv, tpool->lock);
} }
// Job is no longer running and should now be considered dead. Cleanup is handled by // Job is no longer running and should now be considered dead. Cleanup is handled by
// the worker. // the worker.
UNLOCK(tpool); UNLOCK(tpool);
return; return;
@ -212,7 +222,7 @@ static void* bdberl_tpool_main(void* arg)
{ {
erl_drv_cond_broadcast(tpool->cancel_cv); erl_drv_cond_broadcast(tpool->cancel_cv);
} }
// Cleanup the job (remove from active list, free, etc.) // Cleanup the job (remove from active list, free, etc.)
cleanup_job(tpool, job); cleanup_job(tpool, job);
} }
@ -318,7 +328,7 @@ static void cleanup_job(TPool* tpool, TPoolJob* job)
{ {
tpool->active_jobs = current->next; tpool->active_jobs = current->next;
} }
break; break;
} }
@ -348,7 +358,7 @@ static int is_active_job(TPool* tpool, TPoolJob* job)
} }
// Return the number of pending and active jobs // Return the number of pending and active jobs
void bdberl_tpool_job_count(TPool* tpool, unsigned int *pending_count_ptr, void bdberl_tpool_job_count(TPool* tpool, unsigned int *pending_count_ptr,
unsigned int *active_count_ptr) unsigned int *active_count_ptr)
{ {
LOCK(tpool); LOCK(tpool);
@ -356,3 +366,38 @@ void bdberl_tpool_job_count(TPool* tpool, unsigned int *pending_count_ptr,
*active_count_ptr = tpool->active_job_count; *active_count_ptr = tpool->active_job_count;
UNLOCK(tpool); UNLOCK(tpool);
} }
// Returns a unique identifier pair for the current thread of control
void bdberl_tpool_thread_id(DB_ENV *env, pid_t *pid, db_threadid_t *tid)
{
if (pid)
*pid = getpid();
if (tid)
*tid = (db_threadid_t)pthread_self();
}
char *bdberl_tpool_thread_id_string(DB_ENV *dbenv, pid_t pid, db_threadid_t tid, char *buf)
{
snprintf(buf, DB_THREADID_STRLEN, "%d/%p", (unsigned int)pid, (void *)tid);
return buf;
}
// Returns non-zero if the thread of control, identified by the pid and tid arguments,
// is still running.
// If DB_MUTEX_PROCESS_ONLY is set in flags then return only if the process (pid) is
// alive, ignore the thread ID.
int bdberl_tpool_thread_is_alive(DB_ENV *dbenv, pid_t pid, db_threadid_t tid, u_int32_t flags)
{
int alive = 0;
if (kill(pid, 0) != ESRCH)
{
if (flags & DB_MUTEX_PROCESS_ONLY)
alive = 1;
else
if (pthread_kill(tid, 0) != ESRCH)
alive = 1;
}
DBG("bdberl_tpool_thread_is_alive(%08X, %08X, %d) = %d\n", pid, tid, flags, alive);
return alive;
}

View file

@ -1,6 +1,6 @@
/* ------------------------------------------------------------------- /* -------------------------------------------------------------------
* *
* bdberl: Thread Pool * bdberl: Thread Pool
* Copyright (c) 2008-9 The Hive http://www.thehive.com/ * Copyright (c) 2008-9 The Hive http://www.thehive.com/
* Authors: Dave "dizzyd" Smith <dizzyd@dizzyd.com> * Authors: Dave "dizzyd" Smith <dizzyd@dizzyd.com>
* Phil Toland <phil.toland@gmail.com> * Phil Toland <phil.toland@gmail.com>
@ -74,7 +74,7 @@ typedef struct
unsigned int active_threads; unsigned int active_threads;
unsigned int shutdown; unsigned int shutdown;
} TPool; } TPool;
TPool* bdberl_tpool_start(unsigned int thread_count); TPool* bdberl_tpool_start(unsigned int thread_count);
@ -86,7 +86,13 @@ void bdberl_tpool_run(TPool* tpool, TPoolJobFunc main_fn, void* arg, TPoolJobFun
void bdberl_tpool_cancel(TPool* tpool, TPoolJob* job); void bdberl_tpool_cancel(TPool* tpool, TPoolJob* job);
void bdberl_tpool_job_count(TPool* tpool, unsigned int *pending_count_ptr, void bdberl_tpool_job_count(TPool* tpool, unsigned int *pending_count_ptr,
unsigned int *active_count_ptr); unsigned int *active_count_ptr);
void bdberl_tpool_thread_id(DB_ENV *env, pid_t *pid, db_threadid_t *tid);
char *bdberl_tpool_thread_id_string(DB_ENV *dbenv, pid_t pid, db_threadid_t tid, char *buf);
int bdberl_tpool_thread_is_alive(DB_ENV *dbenv, pid_t pid, db_threadid_t tid, u_int32_t flags);
#endif #endif

View file

@ -2,7 +2,8 @@
cd $1/build_unix && \ cd $1/build_unix && \
../dist/configure --disable-shared --enable-static --with-pic \ ../dist/configure --disable-shared --enable-static --with-pic \
--disable-heap --disable-queue \ --disable-heap --disable-queue --disable-replication \
--disable-partition --disable-replication \ --enable-o_direct --enable-o_direct \
--enable-o_direct \ --enable-debug --enable-diagnostics \
--enable-dtrace \
--prefix=$2 --prefix=$2

View file

@ -2,6 +2,6 @@
[{description,"This is an Erlang port driver that allows Erlang programs to store data in BerkleyDB."}, [{description,"This is an Erlang port driver that allows Erlang programs to store data in BerkleyDB."},
{vsn,"5.2.28"}, {vsn,"5.2.28"},
{registered,[]}, {registered,[]},
{applications,[kernel,stdlib]}, {applications,[kernel,stdlib,lager]},
{env,[]}, {env,[]},
{modules,[bdberl,bdberl_logger]}]}. {modules,[bdberl,bdberl_logger]}]}.

View file

@ -46,9 +46,6 @@
-define(CMD_CURSOR_NEXT, 12). -define(CMD_CURSOR_NEXT, 12).
-define(CMD_CURSOR_PREV, 13). -define(CMD_CURSOR_PREV, 13).
-define(CMD_CURSOR_CLOSE, 14). -define(CMD_CURSOR_CLOSE, 14).
-define(CMD_CURSOR_GET, 35). %% TODO: renumber these 3 and match them to bdberl_drv.h
-define(CMD_CURSOR_PUT, 36).
-define(CMD_CURSOR_DEL, 37).
-define(CMD_PUT_COMMIT, 15). -define(CMD_PUT_COMMIT, 15).
-define(CMD_REMOVE_DB, 16). -define(CMD_REMOVE_DB, 16).
-define(CMD_TRUNCATE, 17). -define(CMD_TRUNCATE, 17).
@ -69,6 +66,10 @@
-define(CMD_DATA_DIRS_INFO, 32). -define(CMD_DATA_DIRS_INFO, 32).
-define(CMD_LOG_DIR_INFO, 33). -define(CMD_LOG_DIR_INFO, 33).
-define(CMD_DRIVER_INFO, 34). -define(CMD_DRIVER_INFO, 34).
-define(CMD_CURSOR_GET, 35).
-define(CMD_CURSOR_PUT, 36).
-define(CMD_CURSOR_DEL, 37).
-define(CMD_CURSOR_COUNT, 38).
-define(DB_TYPE_BTREE, 1). -define(DB_TYPE_BTREE, 1).
-define(DB_TYPE_HASH, 2). -define(DB_TYPE_HASH, 2).
@ -384,3 +385,28 @@
-define(DB_UPDATE_SECONDARY, 29). -define(DB_UPDATE_SECONDARY, 29).
-define(DB_SET_LTE, 30). -define(DB_SET_LTE, 30).
-define(DB_GET_BOTH_LTE, 31). -define(DB_GET_BOTH_LTE, 31).
%% DB Event notification types.
-define(DB_EVENT_PANIC, 0).
-define(DB_EVENT_REG_ALIVE, 1).
-define(DB_EVENT_REG_PANIC, 2).
-define(DB_EVENT_REP_CLIENT, 3).
-define(DB_EVENT_REP_CONNECT_BROKEN, 4).
-define(DB_EVENT_REP_CONNECT_ESTD, 5).
-define(DB_EVENT_REP_CONNECT_TRY_FAILED, 6).
-define(DB_EVENT_REP_DUPMASTER, 7).
-define(DB_EVENT_REP_ELECTED, 8).
-define(DB_EVENT_REP_ELECTION_FAILED, 9).
-define(DB_EVENT_REP_INIT_DONE, 10).
-define(DB_EVENT_REP_JOIN_FAILURE, 11).
-define(DB_EVENT_REP_LOCAL_SITE_REMOVED, 12).
-define(DB_EVENT_REP_MASTER, 13).
-define(DB_EVENT_REP_MASTER_FAILURE, 14).
-define(DB_EVENT_REP_NEWMASTER, 15).
-define(DB_EVENT_REP_PERM_FAILED, 16).
-define(DB_EVENT_REP_SITE_ADDED, 17).
-define(DB_EVENT_REP_SITE_REMOVED, 18).
-define(DB_EVENT_REP_STARTUPDONE, 19).
-define(DB_EVENT_REP_WOULD_ROLLBACK, 20).
-define(DB_EVENT_WRITE_FAILED, 21).
-define(DB_EVENT_NO_SUCH_EVENT, 0xffffffff).

View file

@ -4,8 +4,14 @@
%% ex: ft=erlang ts=4 sw=4 et %% ex: ft=erlang ts=4 sw=4 et
%% %%
{require_otp_vsn, "R13B04|R14"}.
{cover_enabled, true}. {cover_enabled, true}.
{erl_opts, [warnings_as_errors]}.
{deps, [
{lager, ".*", {git, "git://github.com/basho/lager", {branch, "master"}}}
]}.
{erl_opts, [warnings_as_errors, {parse_transform, lager_transform}]}.
{port_envs, [ {port_envs, [
{"DRV_CFLAGS", "$DRV_CFLAGS -Werror -I c_src/system/include"}, {"DRV_CFLAGS", "$DRV_CFLAGS -Werror -I c_src/system/include"},

View file

@ -5,7 +5,8 @@
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,
stdlib stdlib,
lager
]}, ]},
{env, []} {env, []}
]}. ]}.

View file

@ -68,6 +68,7 @@
delete_database/1, delete_database/1,
cursor_open/1, cursor_next/0, cursor_prev/0, cursor_current/0, cursor_close/0, cursor_open/1, cursor_next/0, cursor_prev/0, cursor_current/0, cursor_close/0,
cursor_get/0, cursor_get/1, cursor_get/2, %TODO: cursor_del/2, cursor_del/3, cursor_put/2, cursor_put/3, cursor_get/0, cursor_get/1, cursor_get/2, %TODO: cursor_del/2, cursor_del/3, cursor_put/2, cursor_put/3,
cursor_count/0,
driver_info/0, driver_info/0,
register_logger/0, register_logger/0,
stop/0]). stop/0]).
@ -104,7 +105,7 @@
%% @spec open(Name, Type) -> {ok, Db} | {error, Error} %% @spec open(Name, Type) -> {ok, Db} | {error, Error}
%% where %% where
%% Name = string() %% Name = string()
%% Type = btree | hash %% Type = btree | hash | queue
%% Db = integer() %% Db = integer()
%% %%
%% @equiv open(Name, Type, [create]) %% @equiv open(Name, Type, [create])
@ -194,8 +195,8 @@ open(Name, Type, Opts) ->
end, end,
Flags = process_flags(lists:umerge(Opts, [auto_commit, threaded])), Flags = process_flags(lists:umerge(Opts, [auto_commit, threaded])),
Cmd = <<Flags:32/native, TypeCode:8/signed-native, (list_to_binary(Name))/bytes, 0:8/native>>, Cmd = <<Flags:32/native, TypeCode:8/signed-native, (list_to_binary(Name))/bytes, 0:8/native>>,
<<Rc:32/signed-native>> = erlang:port_control(get_port(), ?CMD_OPEN_DB, Cmd), <<Result:32/signed-native>> = erlang:port_control(get_port(), ?CMD_OPEN_DB, Cmd),
recv_val(Rc). recv_val(Result).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -256,8 +257,8 @@ close(Db) ->
close(Db, Opts) -> close(Db, Opts) ->
Flags = process_flags(Opts), Flags = process_flags(Opts),
Cmd = <<Db:32/signed-native, Flags:32/native>>, Cmd = <<Db:32/signed-native, Flags:32/native>>,
<<Rc:32/signed-native>> = erlang:port_control(get_port(), ?CMD_CLOSE_DB, Cmd), <<Result:32/signed-native>> = erlang:port_control(get_port(), ?CMD_CLOSE_DB, Cmd),
recv_ok(Rc). recv_ok(Result).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc %% @doc
@ -272,7 +273,7 @@ close(Db, Opts) ->
-spec txn_begin() -> ok | db_error(). -spec txn_begin() -> ok | db_error().
txn_begin() -> txn_begin() ->
txn_begin([txn_snapshot]). txn_begin([]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -364,8 +365,8 @@ txn_begin() ->
txn_begin(Opts) -> txn_begin(Opts) ->
Flags = process_flags(Opts), Flags = process_flags(Opts),
Cmd = <<Flags:32/native>>, Cmd = <<Flags:32/native>>,
<<Rc:32/signed-native>> = erlang:port_control(get_port(), ?CMD_TXN_BEGIN, Cmd), <<Result:32/signed-native>> = erlang:port_control(get_port(), ?CMD_TXN_BEGIN, Cmd),
recv_ok(Rc). recv_ok(Result).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc %% @doc
@ -427,8 +428,8 @@ txn_commit() ->
txn_commit(Opts) -> txn_commit(Opts) ->
Flags = process_flags(Opts), Flags = process_flags(Opts),
Cmd = <<Flags:32/native>>, Cmd = <<Flags:32/native>>,
<<Rc:32/signed-native>> = erlang:port_control(get_port(), ?CMD_TXN_COMMIT, Cmd), <<Result:32/signed-native>> = erlang:port_control(get_port(), ?CMD_TXN_COMMIT, Cmd),
recv_ok(Rc). recv_ok(Result).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc %% @doc
@ -571,7 +572,7 @@ transaction(Fun, Retries, TimeLeft, Opts) ->
ok -> ok ->
try Fun() of try Fun() of
abort -> abort ->
error_logger:info_msg("function requested abort"), lager:info("function requested abort"),
ok = txn_abort(), ok = txn_abort(),
{error, transaction_aborted}; {error, transaction_aborted};
@ -595,7 +596,7 @@ transaction(Fun, Retries, TimeLeft, Opts) ->
transaction(Fun, R, T, Opts); transaction(Fun, R, T, Opts);
_ : Reason -> _ : Reason ->
error_logger:info_msg("function threw non-lock error - ~p", [Reason]), lager:info("function threw non-lock error - ~p", [{Reason, erlang:get_stacktrace()}]),
ok = txn_abort(), ok = txn_abort(),
{error, {transaction_failed, Reason}} {error, {transaction_failed, Reason}}
end; end;
@ -833,7 +834,7 @@ put_commit_r(Db, Key, Value, Opts) ->
%% @end %% @end
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec get(Db :: db(), Key :: db_key()) -> -spec get(Db :: db(), Key :: db_key()) ->
not_found | {ok, db_ret_value()} | db_error(). not_found | {ok, db_ret_value()} | {error, db_error()}.
get(Db, Key) -> get(Db, Key) ->
get(Db, Key, []). get(Db, Key, []).
@ -884,7 +885,7 @@ get(Db, Key) ->
%% @end %% @end
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec get(Db :: db(), Key :: db_key(), Opts :: db_flags()) -> -spec get(Db :: db(), Key :: db_key(), Opts :: db_flags()) ->
not_found | {ok, db_ret_value()} | db_error(). not_found | {ok, db_ret_value()} | {error, db_error()}.
get(Db, Key, Opts) -> get(Db, Key, Opts) ->
{KeyLen, KeyBin} = to_binary(Key), {KeyLen, KeyBin} = to_binary(Key),
@ -900,7 +901,7 @@ get(Db, Key, Opts) ->
Crc -> Crc ->
{ok, binary_to_term(Payload)}; {ok, binary_to_term(Payload)};
CrcOther -> CrcOther ->
error_logger:warning_msg("Invalid CRC: ~p ~p\n", [Crc, CrcOther]), lager:warning("Invalid CRC: ~p ~p\n", [Crc, CrcOther]),
{error, invalid_crc} {error, invalid_crc}
end; end;
not_found -> not_found; not_found -> not_found;
@ -963,8 +964,6 @@ get_r(Db, Key, Opts) ->
end. end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc %% @doc
%% Delete a value based on key. %% Delete a value based on key.
@ -977,7 +976,7 @@ get_r(Db, Key, Opts) ->
%% This function will return `not_found' if the specified key is not in %% This function will return `not_found' if the specified key is not in
%% the database. %% the database.
%% %%
%% @spec del(Db, Key) -> not_found | {ok, Value} | {error, Error} %% @spec del(Db, Key) -> ok | not_found | {error, Reason}
%% where %% where
%% Db = integer() %% Db = integer()
%% Key = term() %% Key = term()
@ -986,7 +985,7 @@ get_r(Db, Key, Opts) ->
%% @end %% @end
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec del(Db :: db(), Key :: db_key()) -> -spec del(Db :: db(), Key :: db_key()) ->
not_found | ok | db_error(). ok | not_found | {error, Reason :: db_error()}.
del(Db, Key) -> del(Db, Key) ->
{KeyLen, KeyBin} = to_binary(Key), {KeyLen, KeyBin} = to_binary(Key),
@ -995,7 +994,7 @@ del(Db, Key) ->
case decode_rc(Result) of case decode_rc(Result) of
ok -> ok ->
receive receive
{ok, _, _} -> ok; ok -> ok;
not_found -> not_found; not_found -> not_found;
{error, Reason} -> {error, Reason} {error, Reason} -> {error, Reason}
end; end;
@ -1173,8 +1172,8 @@ truncate() ->
truncate(Db) -> truncate(Db) ->
Cmd = <<Db:32/signed-native>>, Cmd = <<Db:32/signed-native>>,
<<Rc:32/signed-native>> = erlang:port_control(get_port(), ?CMD_TRUNCATE, Cmd), <<Result:32/signed-native>> = erlang:port_control(get_port(), ?CMD_TRUNCATE, Cmd),
recv_ok(Rc). recv_ok(Result).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc %% @doc
@ -1194,8 +1193,8 @@ truncate(Db) ->
cursor_open(Db) -> cursor_open(Db) ->
Cmd = <<Db:32/signed-native, 0:32/native>>, Cmd = <<Db:32/signed-native, 0:32/native>>,
<<Rc:32/signed-native>> = erlang:port_control(get_port(), ?CMD_CURSOR_OPEN, Cmd), <<Result:32/signed-native>> = erlang:port_control(get_port(), ?CMD_CURSOR_OPEN, Cmd),
recv_ok(Rc). recv_ok(Result).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc %% @doc
@ -1373,7 +1372,12 @@ cursor_get(Key) ->
not_found | {ok, db_key(), db_value()} | db_error(). not_found | {ok, db_key(), db_value()} | db_error().
cursor_get(Key, Opts) -> cursor_get(Key, Opts) ->
{KeyLen, KeyBin} = to_binary(Key), case Key of
undefined ->
{KeyLen, KeyBin} = {0, <<>>};
_ ->
{KeyLen, KeyBin} = to_binary(Key)
end,
Flags = process_flags(Opts), Flags = process_flags(Opts),
Cmd = <<Flags:32/native, KeyLen:32/native, KeyBin/bytes>>, Cmd = <<Flags:32/native, KeyLen:32/native, KeyBin/bytes>>,
<<Result:32/signed-native>> = erlang:port_control(get_port(), ?CMD_CURSOR_GET, Cmd), <<Result:32/signed-native>> = erlang:port_control(get_port(), ?CMD_CURSOR_GET, Cmd),
@ -1386,7 +1390,7 @@ cursor_get(Key, Opts) ->
Crc -> Crc ->
{ok, binary_to_term(Payload)}; {ok, binary_to_term(Payload)};
CrcOther -> CrcOther ->
error_logger:warning_msg("Invalid CRC: ~p ~p\n", [Crc, CrcOther]), lager:warning("Invalid CRC: ~p ~p\n", [Crc, CrcOther]),
{error, invalid_crc} {error, invalid_crc}
end; end;
not_found -> not_found; not_found -> not_found;
@ -1397,6 +1401,25 @@ cursor_get(Key, Opts) ->
end. end.
%%--------------------------------------------------------------------
%% @doc
%% Returns the count of duplicate records for the key to which the
%% cursor currently refers.
%%
%% If this function fails for any reason, the state of the cursor will
%% be unchanged.
%%
%% @spec cursor_count() -> {ok, Count} | {error, Error}
%%
%% @end
%%--------------------------------------------------------------------
-spec cursor_count() -> {ok, Count :: number()} | {error, db_error()}.
cursor_count() ->
<<Result:32/signed-native>> = erlang:port_control(get_port(), ?CMD_CURSOR_COUNT, <<>>),
recv_val(Result).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc %% @doc
%% Closes the cursor. %% Closes the cursor.
@ -1418,8 +1441,8 @@ cursor_get(Key, Opts) ->
-spec cursor_close() -> ok | db_error(). -spec cursor_close() -> ok | db_error().
cursor_close() -> cursor_close() ->
<<Rc:32/signed-native>> = erlang:port_control(get_port(), ?CMD_CURSOR_CLOSE, <<>>), <<Result:32/signed-native>> = erlang:port_control(get_port(), ?CMD_CURSOR_CLOSE, <<>>),
recv_ok(Rc). recv_ok(Result).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc %% @doc
@ -1445,8 +1468,8 @@ cursor_close() ->
delete_database(Filename) -> delete_database(Filename) ->
Cmd = <<(list_to_binary(Filename))/binary, 0:8>>, Cmd = <<(list_to_binary(Filename))/binary, 0:8>>,
<<Rc:32/signed-native>> = erlang:port_control(get_port(), ?CMD_REMOVE_DB, Cmd), <<Result:32/signed-native>> = erlang:port_control(get_port(), ?CMD_REMOVE_DB, Cmd),
recv_ok(Rc). recv_ok(Result).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc %% @doc
@ -1634,8 +1657,8 @@ get_txn_timeout() ->
stat(Db, Opts) -> stat(Db, Opts) ->
Flags = process_flags(Opts), Flags = process_flags(Opts),
Cmd = <<Db:32/signed-native, Flags:32/native>>, Cmd = <<Db:32/signed-native, Flags:32/native>>,
<<Rc:32/signed-native>> = erlang:port_control(get_port(), ?CMD_DB_STAT, Cmd), <<Result:32/signed-native>> = erlang:port_control(get_port(), ?CMD_DB_STAT, Cmd),
recv_val(Rc). recv_val(Result).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc %% @doc
@ -1739,8 +1762,8 @@ stat_print(Db) ->
lock_stat(Opts) -> lock_stat(Opts) ->
Flags = process_flags(Opts), Flags = process_flags(Opts),
Cmd = <<Flags:32/native>>, Cmd = <<Flags:32/native>>,
<<Rc:32/signed-native>> = erlang:port_control(get_port(), ?CMD_LOCK_STAT, Cmd), <<Result:32/signed-native>> = erlang:port_control(get_port(), ?CMD_LOCK_STAT, Cmd),
recv_val(Rc). recv_val(Result).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc %% @doc
@ -1835,8 +1858,8 @@ lock_stat_print() ->
log_stat(Opts) -> log_stat(Opts) ->
Flags = process_flags(Opts), Flags = process_flags(Opts),
Cmd = <<Flags:32/native>>, Cmd = <<Flags:32/native>>,
<<Rc:32/signed-native>> = erlang:port_control(get_port(), ?CMD_LOG_STAT, Cmd), <<Result:32/signed-native>> = erlang:port_control(get_port(), ?CMD_LOG_STAT, Cmd),
recv_val(Rc). recv_val(Result).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc %% @doc
@ -2016,8 +2039,8 @@ memp_stat_print() ->
mutex_stat(Opts) -> mutex_stat(Opts) ->
Flags = process_flags(Opts), Flags = process_flags(Opts),
Cmd = <<Flags:32/native>>, Cmd = <<Flags:32/native>>,
<<Rc:32/signed-native>> = erlang:port_control(get_port(), ?CMD_MUTEX_STAT, Cmd), <<Result:32/signed-native>> = erlang:port_control(get_port(), ?CMD_MUTEX_STAT, Cmd),
recv_val(Rc). recv_val(Result).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc %% @doc
@ -2232,8 +2255,9 @@ env_stat_print() ->
{ok, [{atom(), number()}]} | db_error(). {ok, [{atom(), number()}]} | db_error().
driver_info() -> driver_info() ->
<<Rc:32/signed-native>> = erlang:port_control(get_port(), ?CMD_DRIVER_INFO, <<>>), <<Result:32/signed-native>> = erlang:port_control(get_port(), ?CMD_DRIVER_INFO, <<>>),
recv_val(Rc). recv_val(Result).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc %% @doc
@ -2251,6 +2275,7 @@ register_logger() ->
[] = erlang:port_control(get_port(), ?CMD_REGISTER_LOGGER, <<>>), [] = erlang:port_control(get_port(), ?CMD_REGISTER_LOGGER, <<>>),
ok. ok.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc %% @doc
%% Stop bdberl - stops the bdberl_logger process so that when %% Stop bdberl - stops the bdberl_logger process so that when
@ -2422,15 +2447,15 @@ do_put(Action, Db, Key, Value, Opts) ->
Flags = process_flags(Opts), Flags = process_flags(Opts),
Cmd = <<Db:32/signed-native, Flags:32/native, KeyLen:32/native, KeyBin/bytes, Cmd = <<Db:32/signed-native, Flags:32/native, KeyLen:32/native, KeyBin/bytes,
FinalValBinLen:32/native, FinalValBin/bytes>>, FinalValBinLen:32/native, FinalValBin/bytes>>,
<<Rc:32/signed-native>> = erlang:port_control(get_port(), Action, Cmd), <<Result:32/signed-native>> = erlang:port_control(get_port(), Action, Cmd),
recv_ok(Rc). recv_ok(Result).
%% %%
%% Move the cursor in a given direction. Invoked by cursor_next/prev/current. %% Move the cursor in a given direction. Invoked by cursor_next/prev/current.
%% %%
do_cursor_move(Direction) -> do_cursor_move(Direction) ->
<<Rc:32/signed-native>> = erlang:port_control(get_port(), Direction, <<>>), <<Result:32/signed-native>> = erlang:port_control(get_port(), Direction, <<>>),
case decode_rc(Rc) of case decode_rc(Result) of
ok -> ok ->
receive receive
{ok, KeyBin, ValueBin} -> {ok, KeyBin, ValueBin} ->
@ -2439,7 +2464,7 @@ do_cursor_move(Direction) ->
Crc -> Crc ->
{ok, binary_to_term(KeyBin), binary_to_term(Payload)}; {ok, binary_to_term(KeyBin), binary_to_term(Payload)};
CrcOther -> CrcOther ->
error_logger:warning_msg("Invalid CRC on cursor: ~p ~p\n", [Crc, CrcOther]), lager:warning("Invalid CRC on cursor: ~p ~p\n", [Crc, CrcOther]),
{error, invalid_crc} {error, invalid_crc}
end; end;
not_found -> not_found ->

View file

@ -67,9 +67,9 @@ init([]) ->
true -> true ->
load_mibs(['BDBERL-MIB']); load_mibs(['BDBERL-MIB']);
false -> false ->
error_logger:warning_msg("SNMP is not running; bdberl stats will not be published.\n") lager:warning("SNMP is not running; bdberl stats will not be published.\n")
end, end,
{ok, #state{}}. {ok, #state{}}.
handle_call(_Request, _From, State) -> handle_call(_Request, _From, State) ->
@ -79,11 +79,11 @@ handle_cast(_Msg, State) ->
{stop, unsupportedOperation, State}. {stop, unsupportedOperation, State}.
handle_info({bdb_error_log, Msg}, State) -> handle_info({bdb_error_log, Msg}, State) ->
error_logger:error_msg("BDB Error: ~s\n", [Msg]), lager:error("BDB: ~s\n", [Msg]),
{noreply, State}; {noreply, State};
handle_info({bdb_info_log, Msg}, State) -> handle_info({bdb_info_log, Msg}, State) ->
error_logger:info_msg("BDB Info: ~s\n", [Msg]), lager:info("BDB: ~s\n", [Msg]),
{noreply, State}; {noreply, State};
handle_info({bdb_checkpoint_stats, CheckpointSecs, ArchiveSecs, 0, 0}, State) -> handle_info({bdb_checkpoint_stats, CheckpointSecs, ArchiveSecs, 0, 0}, State) ->
@ -98,7 +98,7 @@ handle_info({bdb_checkpoint_stats, CheckpointSecs, ArchiveSecs, 0, 0}, State) ->
{noreply, State}; {noreply, State};
handle_info({bdb_checkpoint_stats, _CheckpointSecs, _ArchiveSecs, CheckpointRc, ArchiveRc}, State) -> handle_info({bdb_checkpoint_stats, _CheckpointSecs, _ArchiveSecs, CheckpointRc, ArchiveRc}, State) ->
error_logger:error_msg("BDB Checkpoint error: ~w ~w\n", [CheckpointRc, ArchiveRc]), lager:error("BDB Checkpoint: ~w ~w\n", [CheckpointRc, ArchiveRc]),
{noreply, State}; {noreply, State};
handle_info({bdb_trickle_stats, ElapsedSecs, Pages, 0}, State) -> handle_info({bdb_trickle_stats, ElapsedSecs, Pages, 0}, State) ->
@ -112,11 +112,11 @@ handle_info({bdb_trickle_stats, ElapsedSecs, Pages, 0}, State) ->
end, end,
{noreply, State}; {noreply, State};
handle_info({bdb_trickle_stats, _ElapsedSecs, _Pages, Rc}, State) -> handle_info({bdb_trickle_stats, _ElapsedSecs, _Pages, Rc}, State) ->
error_logger:error_msg("BDB Trickle Write error: ~w\n", [Rc]), lager:error("BDB Trickle Write: ~w\n", [Rc]),
{noreply, State}; {noreply, State};
handle_info(Msg, State) -> handle_info(Msg, State) ->
io:format("Unexpected message: ~p\n", [Msg]), lager:info("Unexpected message: ~p\n", [Msg]),
{noreply, State}. {noreply, State}.
terminate(_Reason, _State) -> terminate(_Reason, _State) ->

View file

@ -45,7 +45,8 @@ all() ->
get_should_return_a_value_when_getting_a_valid_record, get_should_return_a_value_when_getting_a_valid_record,
put_should_succeed_with_manual_transaction, put_should_succeed_with_manual_transaction,
put_should_rollback_with_failed_manual_transaction, put_should_rollback_with_failed_manual_transaction,
% del_should_remove_a_value, %TODO: why is this disabled del_should_remove_a_value,
aborted_del_should_not_remove_a_value,
transaction_should_commit_on_success, transaction_should_commit_on_success,
transaction_should_abort_on_exception, transaction_should_abort_on_exception,
transaction_should_abort_on_user_abort, transaction_should_abort_on_user_abort,
@ -54,6 +55,7 @@ all() ->
update_should_accept_args_for_fun, update_should_accept_args_for_fun,
port_should_return_transaction_timeouts, port_should_return_transaction_timeouts,
cursor_should_iterate, cursor_get_should_pos, cursor_should_fail_if_not_open, cursor_should_iterate, cursor_get_should_pos, cursor_should_fail_if_not_open,
cursor_should_return_count,
put_commit_should_end_txn, put_commit_should_end_txn,
data_dir_should_be_priv_dir, data_dir_should_be_priv_dir,
delete_should_remove_file, delete_should_remove_file,
@ -141,6 +143,16 @@ del_should_remove_a_value(Config) ->
ok = bdberl:del(Db, mykey), ok = bdberl:del(Db, mykey),
not_found = bdberl:get(Db, mykey). not_found = bdberl:get(Db, mykey).
aborted_del_should_not_remove_a_value(Config) ->
Db = ?config(db, Config),
ok = bdberl:put(Db, mykey, avalue),
{ok, avalue} = bdberl:get(Db, mykey),
ok = bdberl:txn_begin(),
ok = bdberl:del(Db, mykey),
not_found = bdberl:get(Db, mykey),
ok = bdberl:txn_abort(),
{ok, avalue} = bdberl:get(Db, mykey).
transaction_should_commit_on_success(Config) -> transaction_should_commit_on_success(Config) ->
Db = ?config(db, Config), Db = ?config(db, Config),
F = fun() -> bdberl:put(Db, mykey, avalue) end, F = fun() -> bdberl:put(Db, mykey, avalue) end,
@ -259,6 +271,20 @@ cursor_get_should_pos(Config) ->
ok = bdberl:cursor_close(). ok = bdberl:cursor_close().
cursor_should_return_count(Config) ->
Db = ?config(db, Config),
%% Store some sample values in the db
ok = bdberl:put(Db, key, value1),
% ok = bdberl:put(Db, key, value2),
% ok = bdberl:put(Db, key, value3),
% ok = bdberl:put(Db, key, value4),
ok = bdberl:cursor_open(Db),
{ok, _} = bdberl:cursor_get(key),
%% Validate the count of duplicate values
{ok, 1} = bdberl:cursor_count(),
ok = bdberl:cursor_close().
cursor_should_fail_if_not_open(_Config) -> cursor_should_fail_if_not_open(_Config) ->
{error, no_cursor} = bdberl:cursor_next(), {error, no_cursor} = bdberl:cursor_next(),
{error, no_cursor} = bdberl:cursor_prev(), {error, no_cursor} = bdberl:cursor_prev(),
@ -344,7 +370,7 @@ hash_stat_should_report_on_success(_Config) ->
1 = proplists:get_value(nkeys, Stat1), 1 = proplists:get_value(nkeys, Stat1),
1 = proplists:get_value(ndata, Stat1), 1 = proplists:get_value(ndata, Stat1),
done. done.
stat_should_fail_on_bad_dbref(_Config) -> stat_should_fail_on_bad_dbref(_Config) ->
{error, invalid_db} = bdberl:stat(10000000, []), {error, invalid_db} = bdberl:stat(10000000, []),
done. done.
@ -384,7 +410,7 @@ data_dirs_info_should_report_on_success(_Config) ->
lg_dir_info_should_report_on_success(_Config) -> lg_dir_info_should_report_on_success(_Config) ->
{ok, _LgDir, _Fsid, _MBytesAvail} = bdberl:get_lg_dir_info(). {ok, _LgDir, _Fsid, _MBytesAvail} = bdberl:get_lg_dir_info().
%% Check the bdberl_logger gets reinstalled after stopping %% Check the bdberl_logger gets reinstalled after stopping
start_after_stop_should_be_safe(_Config) -> start_after_stop_should_be_safe(_Config) ->
@ -413,5 +439,3 @@ start_after_stop_should_be_safe(_Config) ->
end, end,
true = lists:keymember(bdberl_logger, 1, supervisor:which_children(kernel_safe_sup)), true = lists:keymember(bdberl_logger, 1, supervisor:which_children(kernel_safe_sup)),
ok. ok.