From 6fe6651757d643a1b0c3d92b3521384860003f65 Mon Sep 17 00:00:00 2001 From: Jon Meredith Date: Tue, 9 Jun 2009 14:00:43 -0600 Subject: [PATCH] Added more comprehensive debug output if built with debug=1. Made deadlock checker interval configurable. --- c_src/bdberl_drv.c | 157 ++++++++++++++++++++++++++++++++++----------- 1 file changed, 119 insertions(+), 38 deletions(-) diff --git a/c_src/bdberl_drv.c b/c_src/bdberl_drv.c index 8f9a79c..e0597f7 100644 --- a/c_src/bdberl_drv.c +++ b/c_src/bdberl_drv.c @@ -7,6 +7,7 @@ #include #include +#include #include #include #include @@ -56,6 +57,8 @@ static int add_portref(int dbref, ErlDrvPort port); static int del_portref(int dbref, ErlDrvPort port); static int alloc_dbref(); +static char *rc_to_atom_str(int rc); +static void abort_txn(PortData* d); static void* zalloc(unsigned int size); @@ -102,9 +105,11 @@ static hive_hash* G_DATABASES_NAMES = 0; * Deadlock detector thread variables. We run a single thread per VM to detect deadlocks within * our global environment. G_DEADLOCK_CHECK_INTERVAL is the time between runs in milliseconds. */ +#define DEFAULT_DEADLOCK_CHECK_INTERVAL 100 /* 100 milliseconds */ + static ErlDrvTid G_DEADLOCK_THREAD = 0; static unsigned int G_DEADLOCK_CHECK_ACTIVE = 1; -static unsigned int G_DEADLOCK_CHECK_INTERVAL = 100; /* Milliseconds between checks */ +static unsigned int G_DEADLOCK_CHECK_INTERVAL = DEFAULT_DEADLOCK_CHECK_INTERVAL; /** @@ -216,9 +221,28 @@ static TPool* G_TPOOL_TXNS = NULL; }} #ifdef DEBUG -# define DBG printf +# define DBG(...) fprintf(stderr, __VA_ARGS__) +static void DBGCMD(PortData *d, const char *fmt, ...) +{ + char buf[1024]; + + va_list ap; + va_start(ap, fmt); + vsnprintf(buf, sizeof(buf), fmt, ap); + va_end(ap); + + (void)fprintf(stderr, "threadid %p port %p: %s\r\n", erl_drv_thread_self(), d->port, buf); +} +static void DBGCMDRC(PortData *d, int rc) +{ + (void)fprintf(stderr, "threadid %p port %p: rc = %s (%d)\r\n", + erl_drv_thread_self(), d->port, rc == 0 ? "ok" : rc_to_atom_str(rc), rc); +} + #else # define DBG(arg1,...) +# define DBGCMD(d, fmt, ...) +# define DBGCMDRC(d, rc) { while (0) { rc++; } } // otherwise get unused variable error #endif @@ -246,19 +270,24 @@ DRIVER_INIT(bdberl_drv) // Initialize global environment -- use environment variable DB_HOME to // specify where the working directory is + DBG("db_env_create(%p, 0)", &G_DB_ENV); G_DB_ENV_ERROR = db_env_create(&G_DB_ENV, 0); + DBG(" = %d\r\n", G_DB_ENV_ERROR); if (G_DB_ENV_ERROR != 0) { G_DB_ENV = 0; } else { + DBG("G_DB_ENV->open(%p, 0, %08X, 0);", &G_DB_ENV, flags); G_DB_ENV_ERROR = G_DB_ENV->open(G_DB_ENV, 0, flags, 0); + DBG(" = %d\r\n", G_DB_ENV_ERROR); if (G_DB_ENV_ERROR != 0) { // Something bad happened while initializing BDB; in this situation we // cleanup and set the environment to zero. Attempts to open ports will // fail and the user will have to sort out how to resolve the issue. + DBG("G_DB_ENV->close(%p, 0);\r\n", &G_DB_ENV); G_DB_ENV->close(G_DB_ENV, 0); G_DB_ENV = 0; } @@ -311,6 +340,24 @@ DRIVER_INIT(bdberl_drv) } } + // Set the deadlock interval + char deadlock_check_interval_str[64]; + value_size = sizeof(deadlock_check_interval_str); + if (erl_drv_getenv("BDBERL_DEADLOCK_CHECK_INTERVAL", + deadlock_check_interval_str, &value_size) >= 0) + { + assert(value_size < sizeof(deadlock_check_interval_str)); + + G_DEADLOCK_CHECK_INTERVAL = atoi(deadlock_check_interval_str); + if (G_DEADLOCK_CHECK_INTERVAL < 0) + { + G_DEADLOCK_CHECK_INTERVAL = DEFAULT_DEADLOCK_CHECK_INTERVAL; + } + + fprintf(stderr, "Deadlock check interval set to %d\r\n", G_DEADLOCK_CHECK_INTERVAL); + } + + // Initialize default page size char page_size_str[64]; value_size = sizeof(page_size_str); @@ -406,6 +453,9 @@ DRIVER_INIT(bdberl_drv) static ErlDrvData bdberl_drv_start(ErlDrvPort port, char* buffer) { + DBG("threadid %p port %p: BDB DRIVER STARTING\r\n", + erl_drv_thread_self(), port); + // Make sure we have a functional environment -- if we don't, // bail... if (!G_DB_ENV) @@ -432,6 +482,8 @@ static ErlDrvData bdberl_drv_start(ErlDrvPort port, char* buffer) // Make sure port is running in binary mode set_port_control_flags(port, PORT_CONTROL_FLAG_BINARY); + DBGCMD(d, "BDB DRIVER STARTED"); + return (ErlDrvData)d; } @@ -439,6 +491,8 @@ static void bdberl_drv_stop(ErlDrvData handle) { PortData* d = (PortData*)handle; + DBG("Stopping port %p\r\n", d->port); + // Grab the port lock, in case we have an async job running erl_drv_mutex_lock(d->port_lock); @@ -469,10 +523,7 @@ static void bdberl_drv_stop(ErlDrvData handle) } // If a txn is currently active, terminate it. - if (d->txn) - { - d->txn->abort(d->txn); - } + abort_txn(d); // Close all the databases we previously opened while (d->dbrefs) @@ -507,6 +558,7 @@ static void bdberl_drv_stop(ErlDrvData handle) static void bdberl_drv_finish() { + DBG("BDB DRIVER FINISHING\r\n"); // Stop the thread pools if (NULL != G_TPOOL_GENERAL) { @@ -584,7 +636,7 @@ static void bdberl_drv_finish() G_LOG_RWLOCK = NULL; } - DBG("DRIVER_FINISH\n"); + DBG("BDB DRIVER FINISHED\r\n"); } static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, @@ -790,8 +842,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, int rc = d->cursor->close(d->cursor); if (d->txn && (rc == DB_LOCK_NOTGRANTED || rc == DB_LOCK_DEADLOCK)) { - d->txn->abort(d->txn); - d->txn = 0; + abort_txn(d); } // Regardless of what happens, clear out the cursor pointer @@ -1189,8 +1240,10 @@ static int open_database(const char* name, DBTYPE type, unsigned int flags, Port } // Create the DB handle - DB* db; + DB* db = NULL; + DBGCMD(data, "db_create(&db, %p, 0);", G_DB_ENV); int rc = db_create(&db, G_DB_ENV, 0); + DBGCMD(data, "rc = %s (%d) db = %p", rc == 0 ? "ok" : rc_to_atom_str(rc), rc, db); if (rc != 0) { // Failure while creating the database handle -- drop our lock and return @@ -1209,7 +1262,9 @@ static int open_database(const char* name, DBTYPE type, unsigned int flags, Port } // Attempt to open our database + DBGCMD(data, "db->open(%p, 0, '%s', 0, %x, %08x, 0);", db, name, type, flags); rc = db->open(db, 0, name, 0, type, flags, 0); + DBGCMDRC(data, rc); if (rc != 0) { // Failure while opening the database -- cleanup the handle, drop the lock @@ -1258,10 +1313,11 @@ static int close_database(int dbref, unsigned flags, PortData* data) Database* database = &G_DATABASES[dbref]; if (database->ports == 0) { - DBG("Closing actual database for dbref %d\r\n", dbref); // Close out the BDB handle - database->db->close(database->db, flags); - + DBGCMD(data, "database->db->close(%p, %08x) (for dbref %d)", database->db, flags, dbref); + int rc = database->db->close(database->db, flags); + DBGCMDRC(data, rc); + // Remove the entry from the names map hive_hash_remove(G_DATABASES_NAMES, database->name); free((char*)database->name); @@ -1279,6 +1335,18 @@ static int close_database(int dbref, unsigned flags, PortData* data) } } +// Abort the transaction and clean up +static void abort_txn(PortData* d) +{ + if (d->txn) + { + DBGCMD(d, "d->txn->abort(%p)", d->txn); + int rc = d->txn->abort(d->txn); + DBGCMDRC(d, rc); + d->txn = NULL; + } +} + static int delete_database(const char* name) { // Go directly to a write lock on the global databases structure @@ -2109,25 +2177,28 @@ static void do_async_put(void* arg) // Check CRC in value payload - first 4 bytes are CRC of rest of bytes assert(value.size >= 4); uint32_t calc_crc32 = bdberl_crc32(value.data+4, value.size-4); - uint32_t file_crc32 = *(uint32_t*) value.data; + uint32_t buf_crc32 = *(uint32_t*) value.data; int rc; - if (calc_crc32 != file_crc32) + if (calc_crc32 != buf_crc32) { + DBGCMD(d, "CRC-32 error on put data - buffer %08X calculated %08X.", buf_crc32, calc_crc32); rc = ERROR_INVALID_VALUE; } else { // Execute the actual put. All databases are opened with AUTO_COMMIT, so if msg->port->txn // is NULL, the put will still be atomic + DBGCMD(d, "db->put(%p, %p, %p, &p, %08X) key=%p(%d) value=%p(%d)", + db, d->txn, &key, &value, flags, key.data, key.size, value.data, value.size); rc = db->put(db, d->txn, &key, &value, flags); + DBGCMDRC(d, rc); } // If any error occurs while we have a txn action, abort it if (d->txn && rc) { - d->txn->abort(d->txn); - d->txn = 0; + abort_txn(d); } else if (d->txn && d->async_op == CMD_PUT_COMMIT) { @@ -2189,10 +2260,12 @@ static void do_async_get(void* arg) { assert(value.size >= 4); uint32_t calc_crc32 = bdberl_crc32(value.data+4, value.size-4); - uint32_t file_crc32 = *(uint32_t*) value.data; + uint32_t buf_crc32 = *(uint32_t*) value.data; - if (calc_crc32 != file_crc32) + if (calc_crc32 != buf_crc32) { + DBGCMD(d, "CRC-32 error on get data - buffer %08X calculated %08X.", + buf_crc32, calc_crc32); rc = ERROR_INVALID_VALUE; } } @@ -2223,19 +2296,23 @@ static void do_async_txnop(void* arg) int rc = 0; if (d->async_op == CMD_TXN_BEGIN) { + DBGCMD(d, "G_DB_ENV->txn_begin(%p, 0, %p, %08X)", G_DB_ENV, d->txn, d->async_flags); rc = G_DB_ENV->txn_begin(G_DB_ENV, 0, &(d->txn), d->async_flags); + DBGCMD(d, "rc = %s (%d) d->txn = %p", rc == 0 ? "ok" : rc_to_atom_str(rc), rc, d->txn); + } else if (d->async_op == CMD_TXN_COMMIT) { assert(NULL != d->txn); + DBGCMD(d, "d->txn->txn_commit(%p, %08X)", d->txn, d->async_flags); rc = d->txn->commit(d->txn, d->async_flags); + DBGCMDRC(d, rc); d->txn = 0; } else { - assert(NULL != d->txn); - rc = d->txn->abort(d->txn); - d->txn = 0; + assert(d->async_op == CMD_TXN_ABORT); + abort_txn(d); } async_cleanup_and_send_rc(d, rc); @@ -2246,7 +2323,8 @@ static void do_async_cursor_get(void* arg) { // Payload is: << DbRef:32, Flags:32, KeyLen:32, Key:KeyLen >> PortData* d = (PortData*)arg; - + assert(NULL != d->cursor); + // Setup DBTs DBT key; DBT value; @@ -2266,7 +2344,9 @@ static void do_async_cursor_get(void* arg) } // Execute the operation + DBGCMD(d, "d->cursor->get(%p, &key, &value, flags);"); int rc = d->cursor->get(d->cursor, &key, &value, flags); + DBGCMDRC(d, rc); // Check CRC - first 4 bytes are CRC of rest of bytes if (0 == rc) @@ -2289,11 +2369,7 @@ static void do_async_cursor_get(void* arg) d->cursor->close(d->cursor); d->cursor = 0; - if (d->txn) - { - d->txn->abort(d->txn); - d->txn = 0; - } + abort_txn(d); } async_cleanup_and_send_kv(d, rc, &key, &value); @@ -2319,11 +2395,12 @@ static void do_async_truncate(void* arg) if (database != NULL && database->db != 0) { DB* db = database->db; - - DBG("Truncating dbref %i\r\n", i); - u_int32_t count = 0; + + DBGCMD(d, "db->truncate(%p, %p, %p) dbref=%d", db, d->txn, &count, 0, i); rc = db->truncate(db, d->txn, &count, 0); + DBGCMD(d, "rc = %s (%d) count=%d", + rc == 0 ? "ok" : rc_to_atom_str(rc), rc, count); if (rc != 0) { @@ -2335,18 +2412,16 @@ static void do_async_truncate(void* arg) else { DB* db = G_DATABASES[d->async_dbref].db; - - DBG("Truncating dbref %i\r\n", dbref); - u_int32_t count = 0; + DBGCMD(d, "db->truncate(%p, %p, %p, 0) dbref=%d", db, d->txn, &count, d->async_dbref); rc = db->truncate(db, d->txn, &count, 0); + DBGCMD(d, "rc = %s (%d) count=%d", rc == 0 ? "ok" : rc_to_atom_str(rc), rc, count); } // If any error occurs while we have a txn action, abort it if (d->txn && rc) { - d->txn->abort(d->txn); - d->txn = 0; + abort_txn(d); } async_cleanup_and_send_rc(d, rc); @@ -2616,7 +2691,7 @@ static int add_portref(int dbref, ErlDrvPort port) current = current->next; } while (current != 0); - // At the end of the list -- allocate a new entry for this por + // At the end of the list -- allocate a new entry for this port current = (PortList*)zalloc(sizeof(PortList)); current->port = port; last->next = current; @@ -2636,6 +2711,7 @@ static int del_portref(int dbref, ErlDrvPort port) { PortList* current = G_DATABASES[dbref].ports; PortList* last = 0; + assert(NULL != current); while (current) { if (current->port == port) @@ -2706,6 +2782,8 @@ static int del_dbref(PortData* data, int dbref) { DbRefList* current = data->dbrefs; DbRefList* last = 0; + assert(NULL != current); + while (current) { if (current->dbref == dbref) @@ -2853,7 +2931,10 @@ static void* deadlock_check(void* arg) DBG("Rejected deadlocks: %d\r\n", count); } - util_thread_usleep(G_DEADLOCK_CHECK_INTERVAL * 1000); + if (G_DEADLOCK_CHECK_INTERVAL > 0) + { + util_thread_usleep(G_DEADLOCK_CHECK_INTERVAL * 1000); + } } DBG("Deadlock checker exiting.\r\n");