Added more comprehensive debug output if built with debug=1.
Made deadlock checker interval configurable.
This commit is contained in:
parent
19addac0fe
commit
6fe6651757
1 changed files with 119 additions and 38 deletions
|
@ -7,6 +7,7 @@
|
|||
|
||||
#include <assert.h>
|
||||
#include <stdio.h>
|
||||
#include <stdarg.h>
|
||||
#include <string.h>
|
||||
#include <stdint.h>
|
||||
#include <stdlib.h>
|
||||
|
@ -56,6 +57,8 @@ static int add_portref(int dbref, ErlDrvPort port);
|
|||
static int del_portref(int dbref, ErlDrvPort port);
|
||||
|
||||
static int alloc_dbref();
|
||||
static char *rc_to_atom_str(int rc);
|
||||
static void abort_txn(PortData* d);
|
||||
|
||||
static void* zalloc(unsigned int size);
|
||||
|
||||
|
@ -102,9 +105,11 @@ static hive_hash* G_DATABASES_NAMES = 0;
|
|||
* Deadlock detector thread variables. We run a single thread per VM to detect deadlocks within
|
||||
* our global environment. G_DEADLOCK_CHECK_INTERVAL is the time between runs in milliseconds.
|
||||
*/
|
||||
#define DEFAULT_DEADLOCK_CHECK_INTERVAL 100 /* 100 milliseconds */
|
||||
|
||||
static ErlDrvTid G_DEADLOCK_THREAD = 0;
|
||||
static unsigned int G_DEADLOCK_CHECK_ACTIVE = 1;
|
||||
static unsigned int G_DEADLOCK_CHECK_INTERVAL = 100; /* Milliseconds between checks */
|
||||
static unsigned int G_DEADLOCK_CHECK_INTERVAL = DEFAULT_DEADLOCK_CHECK_INTERVAL;
|
||||
|
||||
|
||||
/**
|
||||
|
@ -216,9 +221,28 @@ static TPool* G_TPOOL_TXNS = NULL;
|
|||
}}
|
||||
|
||||
#ifdef DEBUG
|
||||
# define DBG printf
|
||||
# define DBG(...) fprintf(stderr, __VA_ARGS__)
|
||||
static void DBGCMD(PortData *d, const char *fmt, ...)
|
||||
{
|
||||
char buf[1024];
|
||||
|
||||
va_list ap;
|
||||
va_start(ap, fmt);
|
||||
vsnprintf(buf, sizeof(buf), fmt, ap);
|
||||
va_end(ap);
|
||||
|
||||
(void)fprintf(stderr, "threadid %p port %p: %s\r\n", erl_drv_thread_self(), d->port, buf);
|
||||
}
|
||||
static void DBGCMDRC(PortData *d, int rc)
|
||||
{
|
||||
(void)fprintf(stderr, "threadid %p port %p: rc = %s (%d)\r\n",
|
||||
erl_drv_thread_self(), d->port, rc == 0 ? "ok" : rc_to_atom_str(rc), rc);
|
||||
}
|
||||
|
||||
#else
|
||||
# define DBG(arg1,...)
|
||||
# define DBGCMD(d, fmt, ...)
|
||||
# define DBGCMDRC(d, rc) { while (0) { rc++; } } // otherwise get unused variable error
|
||||
#endif
|
||||
|
||||
|
||||
|
@ -246,19 +270,24 @@ DRIVER_INIT(bdberl_drv)
|
|||
|
||||
// Initialize global environment -- use environment variable DB_HOME to
|
||||
// specify where the working directory is
|
||||
DBG("db_env_create(%p, 0)", &G_DB_ENV);
|
||||
G_DB_ENV_ERROR = db_env_create(&G_DB_ENV, 0);
|
||||
DBG(" = %d\r\n", G_DB_ENV_ERROR);
|
||||
if (G_DB_ENV_ERROR != 0)
|
||||
{
|
||||
G_DB_ENV = 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
DBG("G_DB_ENV->open(%p, 0, %08X, 0);", &G_DB_ENV, flags);
|
||||
G_DB_ENV_ERROR = G_DB_ENV->open(G_DB_ENV, 0, flags, 0);
|
||||
DBG(" = %d\r\n", G_DB_ENV_ERROR);
|
||||
if (G_DB_ENV_ERROR != 0)
|
||||
{
|
||||
// Something bad happened while initializing BDB; in this situation we
|
||||
// cleanup and set the environment to zero. Attempts to open ports will
|
||||
// fail and the user will have to sort out how to resolve the issue.
|
||||
DBG("G_DB_ENV->close(%p, 0);\r\n", &G_DB_ENV);
|
||||
G_DB_ENV->close(G_DB_ENV, 0);
|
||||
G_DB_ENV = 0;
|
||||
}
|
||||
|
@ -311,6 +340,24 @@ DRIVER_INIT(bdberl_drv)
|
|||
}
|
||||
}
|
||||
|
||||
// Set the deadlock interval
|
||||
char deadlock_check_interval_str[64];
|
||||
value_size = sizeof(deadlock_check_interval_str);
|
||||
if (erl_drv_getenv("BDBERL_DEADLOCK_CHECK_INTERVAL",
|
||||
deadlock_check_interval_str, &value_size) >= 0)
|
||||
{
|
||||
assert(value_size < sizeof(deadlock_check_interval_str));
|
||||
|
||||
G_DEADLOCK_CHECK_INTERVAL = atoi(deadlock_check_interval_str);
|
||||
if (G_DEADLOCK_CHECK_INTERVAL < 0)
|
||||
{
|
||||
G_DEADLOCK_CHECK_INTERVAL = DEFAULT_DEADLOCK_CHECK_INTERVAL;
|
||||
}
|
||||
|
||||
fprintf(stderr, "Deadlock check interval set to %d\r\n", G_DEADLOCK_CHECK_INTERVAL);
|
||||
}
|
||||
|
||||
|
||||
// Initialize default page size
|
||||
char page_size_str[64];
|
||||
value_size = sizeof(page_size_str);
|
||||
|
@ -406,6 +453,9 @@ DRIVER_INIT(bdberl_drv)
|
|||
|
||||
static ErlDrvData bdberl_drv_start(ErlDrvPort port, char* buffer)
|
||||
{
|
||||
DBG("threadid %p port %p: BDB DRIVER STARTING\r\n",
|
||||
erl_drv_thread_self(), port);
|
||||
|
||||
// Make sure we have a functional environment -- if we don't,
|
||||
// bail...
|
||||
if (!G_DB_ENV)
|
||||
|
@ -432,6 +482,8 @@ static ErlDrvData bdberl_drv_start(ErlDrvPort port, char* buffer)
|
|||
// Make sure port is running in binary mode
|
||||
set_port_control_flags(port, PORT_CONTROL_FLAG_BINARY);
|
||||
|
||||
DBGCMD(d, "BDB DRIVER STARTED");
|
||||
|
||||
return (ErlDrvData)d;
|
||||
}
|
||||
|
||||
|
@ -439,6 +491,8 @@ static void bdberl_drv_stop(ErlDrvData handle)
|
|||
{
|
||||
PortData* d = (PortData*)handle;
|
||||
|
||||
DBG("Stopping port %p\r\n", d->port);
|
||||
|
||||
// Grab the port lock, in case we have an async job running
|
||||
erl_drv_mutex_lock(d->port_lock);
|
||||
|
||||
|
@ -469,10 +523,7 @@ static void bdberl_drv_stop(ErlDrvData handle)
|
|||
}
|
||||
|
||||
// If a txn is currently active, terminate it.
|
||||
if (d->txn)
|
||||
{
|
||||
d->txn->abort(d->txn);
|
||||
}
|
||||
abort_txn(d);
|
||||
|
||||
// Close all the databases we previously opened
|
||||
while (d->dbrefs)
|
||||
|
@ -507,6 +558,7 @@ static void bdberl_drv_stop(ErlDrvData handle)
|
|||
|
||||
static void bdberl_drv_finish()
|
||||
{
|
||||
DBG("BDB DRIVER FINISHING\r\n");
|
||||
// Stop the thread pools
|
||||
if (NULL != G_TPOOL_GENERAL)
|
||||
{
|
||||
|
@ -584,7 +636,7 @@ static void bdberl_drv_finish()
|
|||
G_LOG_RWLOCK = NULL;
|
||||
}
|
||||
|
||||
DBG("DRIVER_FINISH\n");
|
||||
DBG("BDB DRIVER FINISHED\r\n");
|
||||
}
|
||||
|
||||
static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
|
||||
|
@ -790,8 +842,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
|
|||
int rc = d->cursor->close(d->cursor);
|
||||
if (d->txn && (rc == DB_LOCK_NOTGRANTED || rc == DB_LOCK_DEADLOCK))
|
||||
{
|
||||
d->txn->abort(d->txn);
|
||||
d->txn = 0;
|
||||
abort_txn(d);
|
||||
}
|
||||
|
||||
// Regardless of what happens, clear out the cursor pointer
|
||||
|
@ -1189,8 +1240,10 @@ static int open_database(const char* name, DBTYPE type, unsigned int flags, Port
|
|||
}
|
||||
|
||||
// Create the DB handle
|
||||
DB* db;
|
||||
DB* db = NULL;
|
||||
DBGCMD(data, "db_create(&db, %p, 0);", G_DB_ENV);
|
||||
int rc = db_create(&db, G_DB_ENV, 0);
|
||||
DBGCMD(data, "rc = %s (%d) db = %p", rc == 0 ? "ok" : rc_to_atom_str(rc), rc, db);
|
||||
if (rc != 0)
|
||||
{
|
||||
// Failure while creating the database handle -- drop our lock and return
|
||||
|
@ -1209,7 +1262,9 @@ static int open_database(const char* name, DBTYPE type, unsigned int flags, Port
|
|||
}
|
||||
|
||||
// Attempt to open our database
|
||||
DBGCMD(data, "db->open(%p, 0, '%s', 0, %x, %08x, 0);", db, name, type, flags);
|
||||
rc = db->open(db, 0, name, 0, type, flags, 0);
|
||||
DBGCMDRC(data, rc);
|
||||
if (rc != 0)
|
||||
{
|
||||
// Failure while opening the database -- cleanup the handle, drop the lock
|
||||
|
@ -1258,10 +1313,11 @@ static int close_database(int dbref, unsigned flags, PortData* data)
|
|||
Database* database = &G_DATABASES[dbref];
|
||||
if (database->ports == 0)
|
||||
{
|
||||
DBG("Closing actual database for dbref %d\r\n", dbref);
|
||||
// Close out the BDB handle
|
||||
database->db->close(database->db, flags);
|
||||
|
||||
DBGCMD(data, "database->db->close(%p, %08x) (for dbref %d)", database->db, flags, dbref);
|
||||
int rc = database->db->close(database->db, flags);
|
||||
DBGCMDRC(data, rc);
|
||||
|
||||
// Remove the entry from the names map
|
||||
hive_hash_remove(G_DATABASES_NAMES, database->name);
|
||||
free((char*)database->name);
|
||||
|
@ -1279,6 +1335,18 @@ static int close_database(int dbref, unsigned flags, PortData* data)
|
|||
}
|
||||
}
|
||||
|
||||
// Abort the transaction and clean up
|
||||
static void abort_txn(PortData* d)
|
||||
{
|
||||
if (d->txn)
|
||||
{
|
||||
DBGCMD(d, "d->txn->abort(%p)", d->txn);
|
||||
int rc = d->txn->abort(d->txn);
|
||||
DBGCMDRC(d, rc);
|
||||
d->txn = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
static int delete_database(const char* name)
|
||||
{
|
||||
// Go directly to a write lock on the global databases structure
|
||||
|
@ -2109,25 +2177,28 @@ static void do_async_put(void* arg)
|
|||
// Check CRC in value payload - first 4 bytes are CRC of rest of bytes
|
||||
assert(value.size >= 4);
|
||||
uint32_t calc_crc32 = bdberl_crc32(value.data+4, value.size-4);
|
||||
uint32_t file_crc32 = *(uint32_t*) value.data;
|
||||
uint32_t buf_crc32 = *(uint32_t*) value.data;
|
||||
|
||||
int rc;
|
||||
if (calc_crc32 != file_crc32)
|
||||
if (calc_crc32 != buf_crc32)
|
||||
{
|
||||
DBGCMD(d, "CRC-32 error on put data - buffer %08X calculated %08X.", buf_crc32, calc_crc32);
|
||||
rc = ERROR_INVALID_VALUE;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Execute the actual put. All databases are opened with AUTO_COMMIT, so if msg->port->txn
|
||||
// is NULL, the put will still be atomic
|
||||
DBGCMD(d, "db->put(%p, %p, %p, &p, %08X) key=%p(%d) value=%p(%d)",
|
||||
db, d->txn, &key, &value, flags, key.data, key.size, value.data, value.size);
|
||||
rc = db->put(db, d->txn, &key, &value, flags);
|
||||
DBGCMDRC(d, rc);
|
||||
}
|
||||
|
||||
// If any error occurs while we have a txn action, abort it
|
||||
if (d->txn && rc)
|
||||
{
|
||||
d->txn->abort(d->txn);
|
||||
d->txn = 0;
|
||||
abort_txn(d);
|
||||
}
|
||||
else if (d->txn && d->async_op == CMD_PUT_COMMIT)
|
||||
{
|
||||
|
@ -2189,10 +2260,12 @@ static void do_async_get(void* arg)
|
|||
{
|
||||
assert(value.size >= 4);
|
||||
uint32_t calc_crc32 = bdberl_crc32(value.data+4, value.size-4);
|
||||
uint32_t file_crc32 = *(uint32_t*) value.data;
|
||||
uint32_t buf_crc32 = *(uint32_t*) value.data;
|
||||
|
||||
if (calc_crc32 != file_crc32)
|
||||
if (calc_crc32 != buf_crc32)
|
||||
{
|
||||
DBGCMD(d, "CRC-32 error on get data - buffer %08X calculated %08X.",
|
||||
buf_crc32, calc_crc32);
|
||||
rc = ERROR_INVALID_VALUE;
|
||||
}
|
||||
}
|
||||
|
@ -2223,19 +2296,23 @@ static void do_async_txnop(void* arg)
|
|||
int rc = 0;
|
||||
if (d->async_op == CMD_TXN_BEGIN)
|
||||
{
|
||||
DBGCMD(d, "G_DB_ENV->txn_begin(%p, 0, %p, %08X)", G_DB_ENV, d->txn, d->async_flags);
|
||||
rc = G_DB_ENV->txn_begin(G_DB_ENV, 0, &(d->txn), d->async_flags);
|
||||
DBGCMD(d, "rc = %s (%d) d->txn = %p", rc == 0 ? "ok" : rc_to_atom_str(rc), rc, d->txn);
|
||||
|
||||
}
|
||||
else if (d->async_op == CMD_TXN_COMMIT)
|
||||
{
|
||||
assert(NULL != d->txn);
|
||||
DBGCMD(d, "d->txn->txn_commit(%p, %08X)", d->txn, d->async_flags);
|
||||
rc = d->txn->commit(d->txn, d->async_flags);
|
||||
DBGCMDRC(d, rc);
|
||||
d->txn = 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
assert(NULL != d->txn);
|
||||
rc = d->txn->abort(d->txn);
|
||||
d->txn = 0;
|
||||
assert(d->async_op == CMD_TXN_ABORT);
|
||||
abort_txn(d);
|
||||
}
|
||||
|
||||
async_cleanup_and_send_rc(d, rc);
|
||||
|
@ -2246,7 +2323,8 @@ static void do_async_cursor_get(void* arg)
|
|||
{
|
||||
// Payload is: << DbRef:32, Flags:32, KeyLen:32, Key:KeyLen >>
|
||||
PortData* d = (PortData*)arg;
|
||||
|
||||
assert(NULL != d->cursor);
|
||||
|
||||
// Setup DBTs
|
||||
DBT key;
|
||||
DBT value;
|
||||
|
@ -2266,7 +2344,9 @@ static void do_async_cursor_get(void* arg)
|
|||
}
|
||||
|
||||
// Execute the operation
|
||||
DBGCMD(d, "d->cursor->get(%p, &key, &value, flags);");
|
||||
int rc = d->cursor->get(d->cursor, &key, &value, flags);
|
||||
DBGCMDRC(d, rc);
|
||||
|
||||
// Check CRC - first 4 bytes are CRC of rest of bytes
|
||||
if (0 == rc)
|
||||
|
@ -2289,11 +2369,7 @@ static void do_async_cursor_get(void* arg)
|
|||
|
||||
d->cursor->close(d->cursor);
|
||||
d->cursor = 0;
|
||||
if (d->txn)
|
||||
{
|
||||
d->txn->abort(d->txn);
|
||||
d->txn = 0;
|
||||
}
|
||||
abort_txn(d);
|
||||
}
|
||||
|
||||
async_cleanup_and_send_kv(d, rc, &key, &value);
|
||||
|
@ -2319,11 +2395,12 @@ static void do_async_truncate(void* arg)
|
|||
if (database != NULL && database->db != 0)
|
||||
{
|
||||
DB* db = database->db;
|
||||
|
||||
DBG("Truncating dbref %i\r\n", i);
|
||||
|
||||
u_int32_t count = 0;
|
||||
|
||||
DBGCMD(d, "db->truncate(%p, %p, %p) dbref=%d", db, d->txn, &count, 0, i);
|
||||
rc = db->truncate(db, d->txn, &count, 0);
|
||||
DBGCMD(d, "rc = %s (%d) count=%d",
|
||||
rc == 0 ? "ok" : rc_to_atom_str(rc), rc, count);
|
||||
|
||||
if (rc != 0)
|
||||
{
|
||||
|
@ -2335,18 +2412,16 @@ static void do_async_truncate(void* arg)
|
|||
else
|
||||
{
|
||||
DB* db = G_DATABASES[d->async_dbref].db;
|
||||
|
||||
DBG("Truncating dbref %i\r\n", dbref);
|
||||
|
||||
u_int32_t count = 0;
|
||||
DBGCMD(d, "db->truncate(%p, %p, %p, 0) dbref=%d", db, d->txn, &count, d->async_dbref);
|
||||
rc = db->truncate(db, d->txn, &count, 0);
|
||||
DBGCMD(d, "rc = %s (%d) count=%d", rc == 0 ? "ok" : rc_to_atom_str(rc), rc, count);
|
||||
}
|
||||
|
||||
// If any error occurs while we have a txn action, abort it
|
||||
if (d->txn && rc)
|
||||
{
|
||||
d->txn->abort(d->txn);
|
||||
d->txn = 0;
|
||||
abort_txn(d);
|
||||
}
|
||||
|
||||
async_cleanup_and_send_rc(d, rc);
|
||||
|
@ -2616,7 +2691,7 @@ static int add_portref(int dbref, ErlDrvPort port)
|
|||
current = current->next;
|
||||
} while (current != 0);
|
||||
|
||||
// At the end of the list -- allocate a new entry for this por
|
||||
// At the end of the list -- allocate a new entry for this port
|
||||
current = (PortList*)zalloc(sizeof(PortList));
|
||||
current->port = port;
|
||||
last->next = current;
|
||||
|
@ -2636,6 +2711,7 @@ static int del_portref(int dbref, ErlDrvPort port)
|
|||
{
|
||||
PortList* current = G_DATABASES[dbref].ports;
|
||||
PortList* last = 0;
|
||||
assert(NULL != current);
|
||||
while (current)
|
||||
{
|
||||
if (current->port == port)
|
||||
|
@ -2706,6 +2782,8 @@ static int del_dbref(PortData* data, int dbref)
|
|||
{
|
||||
DbRefList* current = data->dbrefs;
|
||||
DbRefList* last = 0;
|
||||
assert(NULL != current);
|
||||
|
||||
while (current)
|
||||
{
|
||||
if (current->dbref == dbref)
|
||||
|
@ -2853,7 +2931,10 @@ static void* deadlock_check(void* arg)
|
|||
DBG("Rejected deadlocks: %d\r\n", count);
|
||||
}
|
||||
|
||||
util_thread_usleep(G_DEADLOCK_CHECK_INTERVAL * 1000);
|
||||
if (G_DEADLOCK_CHECK_INTERVAL > 0)
|
||||
{
|
||||
util_thread_usleep(G_DEADLOCK_CHECK_INTERVAL * 1000);
|
||||
}
|
||||
}
|
||||
|
||||
DBG("Deadlock checker exiting.\r\n");
|
||||
|
|
Loading…
Reference in a new issue