Whitespace and formatting only.
This commit is contained in:
parent
12e3012a05
commit
b649dc7d3b
|
@ -200,7 +200,7 @@ static unsigned int G_CHECKPOINT_ACTIVE = 1;
|
|||
static unsigned int G_CHECKPOINT_INTERVAL = 60; /* Seconds between checkpoints */
|
||||
|
||||
/**
|
||||
* Pipe to used to wake up the various monitors. Instead of just sleeping
|
||||
* Pipe is used to wake up the various monitors. Instead of just sleeping
|
||||
* they wait for an exceptional condition on the read fd of the pipe. When it is time to
|
||||
* shutdown, the driver closes the write fd and waits for the threads to be joined.
|
||||
*/
|
||||
|
@ -249,17 +249,17 @@ static void bdberl_dbgcmdrc(PortData *d, int rc);
|
|||
#define LOCK_DATABASES(P) \
|
||||
do \
|
||||
{ \
|
||||
DBG("threadid %p port %p: locking G_DATABASES", erl_drv_thread_self(), P); \
|
||||
DBG("threadid %p port %p: locking G_DATABASES\n", erl_drv_thread_self(), P); \
|
||||
erl_drv_mutex_lock(G_DATABASES_MUTEX); \
|
||||
DBG("threadid %p port %p: locked G_DATABASES", erl_drv_thread_self(), P); \
|
||||
DBG("threadid %p port %p: locked G_DATABASES\n", erl_drv_thread_self(), P); \
|
||||
} while(0)
|
||||
|
||||
#define UNLOCK_DATABASES(P) \
|
||||
do \
|
||||
{ \
|
||||
DBG("threadid %p port %p: unlocking G_DATABASES", erl_drv_thread_self(), P); \
|
||||
DBG("threadid %p port %p: unlocking G_DATABASES\n", erl_drv_thread_self(), P); \
|
||||
erl_drv_mutex_unlock(G_DATABASES_MUTEX); \
|
||||
DBG("threadid %p port %p: unlocked G_DATABASES", erl_drv_thread_self(), P); \
|
||||
DBG("threadid %p port %p: unlocked G_DATABASES\n", erl_drv_thread_self(), P); \
|
||||
} while (0)
|
||||
|
||||
|
||||
|
@ -295,7 +295,7 @@ DRIVER_INIT(bdberl_drv)
|
|||
// specify where the working directory is
|
||||
DBG("db_env_create(%p, 0)", &G_DB_ENV);
|
||||
G_DB_ENV_ERROR = db_env_create(&G_DB_ENV, 0);
|
||||
DBG(" = %d", G_DB_ENV_ERROR);
|
||||
DBG(" = %d\n", G_DB_ENV_ERROR);
|
||||
if (G_DB_ENV_ERROR != 0)
|
||||
{
|
||||
G_DB_ENV = 0;
|
||||
|
@ -307,7 +307,7 @@ DRIVER_INIT(bdberl_drv)
|
|||
|
||||
DBG("G_DB_ENV->open(%p, 0, %08X, 0)", &G_DB_ENV, flags);
|
||||
G_DB_ENV_ERROR = G_DB_ENV->open(G_DB_ENV, 0, flags, 0);
|
||||
DBG(" = %d", G_DB_ENV_ERROR);
|
||||
DBG(" = %d\n", G_DB_ENV_ERROR);
|
||||
if (G_DB_ENV_ERROR != 0)
|
||||
{
|
||||
// Something bad happened while initializing BDB; in this situation we
|
||||
|
@ -391,7 +391,7 @@ DRIVER_INIT(bdberl_drv)
|
|||
}
|
||||
else
|
||||
{
|
||||
DBG("DRIVER INIT FAILED - %s", db_strerror(G_DB_ENV_ERROR));
|
||||
DBG("DRIVER INIT FAILED - %s\n", db_strerror(G_DB_ENV_ERROR));
|
||||
}
|
||||
|
||||
return &bdberl_drv_entry;
|
||||
|
@ -399,8 +399,7 @@ DRIVER_INIT(bdberl_drv)
|
|||
|
||||
static ErlDrvData bdberl_drv_start(ErlDrvPort port, char* buffer)
|
||||
{
|
||||
DBG("threadid %p port %p: BDB DRIVER STARTING",
|
||||
erl_drv_thread_self(), port);
|
||||
DBG("threadid %p port %p: BDB DRIVER STARTING\n", erl_drv_thread_self(), port);
|
||||
|
||||
// Make sure we have a functional environment -- if we don't,
|
||||
// bail...
|
||||
|
@ -428,7 +427,7 @@ static ErlDrvData bdberl_drv_start(ErlDrvPort port, char* buffer)
|
|||
// Make sure port is running in binary mode
|
||||
set_port_control_flags(port, PORT_CONTROL_FLAG_BINARY);
|
||||
|
||||
DBGCMD(d, "BDB DRIVER STARTED");
|
||||
DBGCMD(d, "BDB DRIVER STARTED\n");
|
||||
|
||||
return (ErlDrvData)d;
|
||||
}
|
||||
|
@ -437,7 +436,7 @@ static void bdberl_drv_stop(ErlDrvData handle)
|
|||
{
|
||||
PortData* d = (PortData*)handle;
|
||||
|
||||
DBG("Stopping port %p", d->port);
|
||||
DBG("Stopping port %p\n", d->port);
|
||||
|
||||
// Grab the port lock, in case we have an async job running
|
||||
erl_drv_mutex_lock(d->port_lock);
|
||||
|
@ -446,13 +445,13 @@ static void bdberl_drv_stop(ErlDrvData handle)
|
|||
// block until the job has either been removed or has run
|
||||
if (d->async_job)
|
||||
{
|
||||
DBGCMD(d, "Stopping port %p - cancelling async job %p", d->port, d->async_job);
|
||||
DBGCMD(d, "Stopping port %p - cancelling async job %p\n", d->port, d->async_job);
|
||||
|
||||
// Drop the lock prior to starting the wait for the async process
|
||||
erl_drv_mutex_unlock(d->port_lock);
|
||||
|
||||
bdberl_tpool_cancel(d->async_pool, d->async_job);
|
||||
DBGCMD(d, "Canceled async job for port: %p", d->port);
|
||||
DBGCMD(d, "Canceled async job for port: %p\n", d->port);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -464,7 +463,7 @@ static void bdberl_drv_stop(ErlDrvData handle)
|
|||
erl_drv_mutex_destroy(d->port_lock);
|
||||
|
||||
// If a cursor is open, close it
|
||||
DBG("Stopping port %p - cleaning up cursors (%p) and transactions (%p)", d->port,
|
||||
DBG("Stopping port %p - cleaning up cursors (%p) and transactions (%p)\n", d->port,
|
||||
d->cursor, d->txn);
|
||||
|
||||
if (d->cursor)
|
||||
|
@ -476,13 +475,13 @@ static void bdberl_drv_stop(ErlDrvData handle)
|
|||
abort_txn(d);
|
||||
|
||||
// Close all the databases we previously opened
|
||||
DBG("Stopping port %p - closing all dbrefs", d->port);
|
||||
DBG("Stopping port %p - closing all dbrefs\n", d->port);
|
||||
while (d->dbrefs)
|
||||
{
|
||||
int dbref = d->dbrefs->dbref;
|
||||
if (close_database(dbref, 0, d) != ERROR_NONE)
|
||||
{
|
||||
DBG("Stopping port %p could not close dbref %d", d->port, dbref);
|
||||
DBG("Stopping port %p could not close dbref %d\n", d->port, dbref);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -491,7 +490,7 @@ static void bdberl_drv_stop(ErlDrvData handle)
|
|||
// unregister if it's already initialized to this port.
|
||||
if (G_LOG_PORT == d->port)
|
||||
{
|
||||
DBG("Stopping port %p - removing logging port", d->port);
|
||||
DBG("Stopping port %p - removing logging port\n", d->port);
|
||||
|
||||
WRITE_LOCK(G_LOG_RWLOCK);
|
||||
|
||||
|
@ -507,7 +506,7 @@ static void bdberl_drv_stop(ErlDrvData handle)
|
|||
WRITE_UNLOCK(G_LOG_RWLOCK);
|
||||
}
|
||||
|
||||
DBG("Stopped port: %p", d->port);
|
||||
DBG("Stopped port: %p\n", d->port);
|
||||
|
||||
// Release the port instance data
|
||||
driver_free(d->work_buffer);
|
||||
|
@ -1035,7 +1034,7 @@ static int check_non_neg_env(char *env, unsigned int *val_ptr)
|
|||
return 0;
|
||||
}
|
||||
unsigned int uival = (unsigned int) val;
|
||||
DBG("Using \"%s\" value %u", env, uival);
|
||||
DBG("Using \"%s\" value %u\n", env, uival);
|
||||
*val_ptr = uival;
|
||||
return 1;
|
||||
}
|
||||
|
@ -1152,9 +1151,9 @@ static int open_database(const char* name, DBTYPE type, unsigned int flags, Port
|
|||
|
||||
// Create the DB handle
|
||||
DB* db = NULL;
|
||||
DBGCMD(data, "db_create(&db, %p, 0);", G_DB_ENV);
|
||||
DBGCMD(data, "db_create(&db, %p, 0);\n", G_DB_ENV);
|
||||
int rc = db_create(&db, G_DB_ENV, 0);
|
||||
DBGCMD(data, "rc = %s (%d) db = %p", rc == 0 ? "ok" : bdberl_rc_to_atom_str(rc), rc, db);
|
||||
DBGCMD(data, " = %s (%d) db = %p\n", rc == 0 ? "ok" : bdberl_rc_to_atom_str(rc), rc, db);
|
||||
if (rc != 0)
|
||||
{
|
||||
// Failure while creating the database handle -- drop our lock and return
|
||||
|
@ -1168,14 +1167,14 @@ static int open_database(const char* name, DBTYPE type, unsigned int flags, Port
|
|||
{
|
||||
if (db->set_pagesize(db, G_PAGE_SIZE) != 0)
|
||||
{
|
||||
bdb_errcall(G_DB_ENV, "", "Failed to set page size.");
|
||||
bdb_errcall(G_DB_ENV, "\n", "Failed to set page size.");
|
||||
}
|
||||
}
|
||||
|
||||
flags |= DB_AUTO_COMMIT;
|
||||
|
||||
// Attempt to open our database
|
||||
DBGCMD(data, "db->open(%p, 0, '%s', 0, %x, %08x, 0);", db, name, type, flags);
|
||||
DBGCMD(data, "db->open(%p, 0, '%s', 0, %x, %08x, 0);\n", db, name, type, flags);
|
||||
rc = db->open(db, 0, name, 0, type, flags, 0);
|
||||
DBGCMDRC(data, rc);
|
||||
if (rc != 0)
|
||||
|
@ -1229,7 +1228,7 @@ static int close_database(int dbref, unsigned flags, PortData* data)
|
|||
if (database->ports == 0)
|
||||
{
|
||||
// Close out the BDB handle
|
||||
DBGCMD(data, "database->db->close(%p, %08x) (for dbref %d)", database->db, flags, dbref);
|
||||
DBGCMD(data, "database->db->close(%p, %08x) (for dbref %d)\n", database->db, flags, dbref);
|
||||
rc = database->db->close(database->db, flags);
|
||||
DBGCMDRC(data, rc);
|
||||
|
||||
|
@ -1274,7 +1273,7 @@ static void check_all_databases_closed()
|
|||
int flags = 0;
|
||||
DBG("final db->close(%p, %08x) (for dbref %d)", database->db, flags, dbref);
|
||||
rc = database->db->close(database->db, flags);
|
||||
DBG(" = %s (%d)", rc == 0 ? "ok" : bdberl_rc_to_atom_str(rc), rc);
|
||||
DBG(" = %s (%d)\n", rc == 0 ? "ok" : bdberl_rc_to_atom_str(rc), rc);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1287,7 +1286,7 @@ static void abort_txn(PortData* d)
|
|||
{
|
||||
if (d->txn)
|
||||
{
|
||||
DBGCMD(d, "d->txn->abort(%p)", d->txn);
|
||||
DBGCMD(d, "d->txn->abort(%p)\n", d->txn);
|
||||
int rc = d->txn->abort(d->txn);
|
||||
DBGCMDRC(d, rc);
|
||||
d->txn = NULL;
|
||||
|
@ -1307,7 +1306,7 @@ static int delete_database(const char* name, PortData *data)
|
|||
}
|
||||
|
||||
// Good, database doesn't seem to be open -- attempt the delete
|
||||
DBG("Attempting to delete database: %s", name);
|
||||
DBG("Attempting to delete database: %s\n", name);
|
||||
int rc = G_DB_ENV->dbremove(G_DB_ENV, 0, name, 0, DB_AUTO_COMMIT);
|
||||
UNLOCK_DATABASES(data->port);
|
||||
|
||||
|
@ -1625,14 +1624,14 @@ static void do_async_put(void* arg)
|
|||
int rc;
|
||||
if (calc_crc32 != buf_crc32)
|
||||
{
|
||||
DBGCMD(d, "CRC-32 error on put data - buffer %08X calculated %08X.", buf_crc32, calc_crc32);
|
||||
DBGCMD(d, "CRC-32 error on put data - buffer %08X calculated %08X.\n", buf_crc32, calc_crc32);
|
||||
rc = ERROR_INVALID_VALUE;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Execute the actual put. All databases are opened with AUTO_COMMIT, so if msg->port->txn
|
||||
// is NULL, the put will still be atomic
|
||||
DBGCMD(d, "db->put(%p, %p, %p, %p, %08X) dbref %d key=%p(%d) value=%p(%d)",
|
||||
DBGCMD(d, "db->put(%p, %p, %p, %p, %08X) dbref %d key=%p(%d) value=%p(%d)\n",
|
||||
db, d->txn, &key, &value, flags, dbref, key.data, key.size, value.data, value.size);
|
||||
rc = db->put(db, d->txn, &key, &value, flags);
|
||||
DBGCMDRC(d, rc);
|
||||
|
@ -1692,7 +1691,7 @@ static void do_async_get(void* arg)
|
|||
|
||||
if (calc_crc32 != buf_crc32)
|
||||
{
|
||||
DBGCMD(d, "CRC-32 error on get data - buffer %08X calculated %08X.",
|
||||
DBGCMD(d, "CRC-32 error on get data - buffer %08X calculated %08X.\n",
|
||||
buf_crc32, calc_crc32);
|
||||
rc = ERROR_INVALID_VALUE;
|
||||
}
|
||||
|
@ -1751,15 +1750,15 @@ static void do_async_txnop(void* arg)
|
|||
int rc = 0;
|
||||
if (d->async_op == CMD_TXN_BEGIN)
|
||||
{
|
||||
DBGCMD(d, "G_DB_ENV->txn_begin(%p, 0, %p, %08X)", G_DB_ENV, d->txn, d->async_flags);
|
||||
DBGCMD(d, "G_DB_ENV->txn_begin(%p, 0, %p, %08X)\n", G_DB_ENV, d->txn, d->async_flags);
|
||||
rc = G_DB_ENV->txn_begin(G_DB_ENV, 0, &(d->txn), d->async_flags);
|
||||
DBGCMD(d, "rc = %s (%d) d->txn = %p", rc == 0 ? "ok" : bdberl_rc_to_atom_str(rc), rc, d->txn);
|
||||
DBGCMD(d, "rc = %s (%d) d->txn = %p\n", rc == 0 ? "ok" : bdberl_rc_to_atom_str(rc), rc, d->txn);
|
||||
|
||||
}
|
||||
else if (d->async_op == CMD_TXN_COMMIT)
|
||||
{
|
||||
assert(d->txn != NULL);
|
||||
DBGCMD(d, "d->txn->txn_commit(%p, %08X)", d->txn, d->async_flags);
|
||||
DBGCMD(d, "d->txn->txn_commit(%p, %08X)\n", d->txn, d->async_flags);
|
||||
rc = d->txn->commit(d->txn, d->async_flags);
|
||||
DBGCMDRC(d, rc);
|
||||
d->txn = 0;
|
||||
|
@ -1776,10 +1775,10 @@ static void do_async_txnop(void* arg)
|
|||
|
||||
static void do_async_cursor_put(void* arg)
|
||||
{
|
||||
PortData* d = (PortData*)arg;
|
||||
assert(d->cursor != NULL);
|
||||
DBGCMD(d, "cursor_put/2 not yet implemented..."); /* TODO: implement this. */
|
||||
bdberl_async_cleanup_and_send_rc(d, ERROR_DB_ACTIVE);
|
||||
PortData* d = (PortData*)arg;
|
||||
assert(d->cursor != NULL);
|
||||
DBGCMD(d, "cursor_put/2 not yet implemented...\n"); /* TODO: implement this. */
|
||||
bdberl_async_cleanup_and_send_rc(d, ERROR_DB_ACTIVE);
|
||||
}
|
||||
|
||||
|
||||
|
@ -1806,7 +1805,7 @@ static void do_async_cursor_get(void* arg)
|
|||
value.flags = DB_DBT_MALLOC;
|
||||
|
||||
// Execute the operation
|
||||
DBGCMD(d, "d->cursor->get(%p, %p, %p, %08X);", d->cursor, &key, &value, flags);
|
||||
DBGCMD(d, "d->cursor->get(%p, %p, %p, %08X\n);", d->cursor, &key, &value, flags);
|
||||
int rc = d->cursor->get(d->cursor, &key, &value, flags);
|
||||
DBGCMDRC(d, rc);
|
||||
|
||||
|
@ -1819,7 +1818,7 @@ static void do_async_cursor_get(void* arg)
|
|||
|
||||
if (calc_crc32 != buf_crc32)
|
||||
{
|
||||
DBGCMD(d, "CRC-32 error on get data - buffer %08X calculated %08X.",
|
||||
DBGCMD(d, "CRC-32 error on get data - buffer %08X calculated %08X.\n",
|
||||
buf_crc32, calc_crc32);
|
||||
rc = ERROR_INVALID_VALUE;
|
||||
}
|
||||
|
@ -1846,7 +1845,7 @@ static void do_async_cursor_del(void* arg)
|
|||
{
|
||||
PortData* d = (PortData*)arg;
|
||||
assert(d->cursor != NULL);
|
||||
DBGCMD(d, "cursor_del/2 not yet implemented..."); /* TODO: implement this. */
|
||||
DBGCMD(d, "cursor_del/2 not yet implemented...\n"); /* TODO: implement this. */
|
||||
bdberl_async_cleanup_and_send_rc(d, ERROR_DB_ACTIVE);
|
||||
}
|
||||
|
||||
|
@ -1860,7 +1859,7 @@ static void do_async_cursor_count(void* arg)
|
|||
db_recno_t count = 0;
|
||||
|
||||
// Execute the operation
|
||||
DBGCMD(d, "d->cursor->count(%p, %p, %08X);", d->cursor, &count, 0);
|
||||
DBGCMD(d, "d->cursor->count(%p, %p, %08X);\n", d->cursor, &count, 0);
|
||||
int rc = d->cursor->count(d->cursor, &count, 0);
|
||||
DBGCMDRC(d, rc);
|
||||
|
||||
|
@ -1893,7 +1892,7 @@ static void do_async_cursor_cnp(void* arg)
|
|||
}
|
||||
|
||||
// Execute the operation
|
||||
DBGCMD(d, "d->cursor->get(%p, %p, %p, %08X);", d->cursor, &key, &value, flags);
|
||||
DBGCMD(d, "d->cursor->get(%p, %p, %p, %08X);\n", d->cursor, &key, &value, flags);
|
||||
int rc = d->cursor->get(d->cursor, &key, &value, flags);
|
||||
DBGCMDRC(d, rc);
|
||||
|
||||
|
@ -1946,10 +1945,9 @@ static void do_async_truncate(void* arg)
|
|||
DB* db = database->db;
|
||||
u_int32_t count = 0;
|
||||
|
||||
DBGCMD(d, "db->truncate(%p, %p, %p, 0) dbref=%d", db, d->txn, &count, i);
|
||||
DBGCMD(d, "db->truncate(%p, %p, %p, 0) dbref=%d\n", db, d->txn, &count, i);
|
||||
rc = db->truncate(db, d->txn, &count, 0);
|
||||
DBGCMD(d, "rc = %s (%d) count=%d",
|
||||
rc == 0 ? "ok" : bdberl_rc_to_atom_str(rc), rc, count);
|
||||
DBGCMD(d, "rc = %s (%d) count=%d\n", rc == 0 ? "ok" : bdberl_rc_to_atom_str(rc), rc, count);
|
||||
|
||||
if (rc != 0)
|
||||
{
|
||||
|
@ -1962,9 +1960,9 @@ static void do_async_truncate(void* arg)
|
|||
{
|
||||
DB* db = G_DATABASES[d->async_dbref].db;
|
||||
u_int32_t count = 0;
|
||||
DBGCMD(d, "db->truncate(%p, %p, %p, 0) dbref=%d", db, d->txn, &count, d->async_dbref);
|
||||
DBGCMD(d, "db->truncate(%p, %p, %p, 0) dbref=%d\n", db, d->txn, &count, d->async_dbref);
|
||||
rc = db->truncate(db, d->txn, &count, 0);
|
||||
DBGCMD(d, "rc = %s (%d) count=%d", rc == 0 ? "ok" : bdberl_rc_to_atom_str(rc),
|
||||
DBGCMD(d, "rc = %s (%d) count=%d\n", rc == 0 ? "ok" : bdberl_rc_to_atom_str(rc),
|
||||
rc, count);
|
||||
}
|
||||
|
||||
|
@ -2107,7 +2105,7 @@ static void* driver_calloc(unsigned int size)
|
|||
|
||||
static int add_portref(int dbref, ErlDrvPort port)
|
||||
{
|
||||
DBG("Adding port %p to dbref %d", port, dbref);
|
||||
DBG("Adding port %p to dbref %d\n", port, dbref);
|
||||
PortList* current = G_DATABASES[dbref].ports;
|
||||
if (current)
|
||||
{
|
||||
|
@ -2142,7 +2140,7 @@ static int add_portref(int dbref, ErlDrvPort port)
|
|||
|
||||
static int del_portref(int dbref, ErlDrvPort port)
|
||||
{
|
||||
DBG("Deleting port %p from dbref %d", port, dbref);
|
||||
DBG("Deleting port %p from dbref %d\n", port, dbref);
|
||||
PortList* current = G_DATABASES[dbref].ports;
|
||||
PortList* last = 0;
|
||||
assert(current != NULL);
|
||||
|
@ -2178,7 +2176,7 @@ static int del_portref(int dbref, ErlDrvPort port)
|
|||
*/
|
||||
static int add_dbref(PortData* data, int dbref)
|
||||
{
|
||||
DBG("Adding dbref %d to port %p", dbref, data->port);
|
||||
DBG("Adding dbref %d to port %p\n", dbref, data->port);
|
||||
DbRefList* current = data->dbrefs;
|
||||
if (current)
|
||||
{
|
||||
|
@ -2215,7 +2213,7 @@ static int add_dbref(PortData* data, int dbref)
|
|||
*/
|
||||
static int del_dbref(PortData* data, int dbref)
|
||||
{
|
||||
DBG("Deleting dbref %d from port %p", dbref, data->port);
|
||||
DBG("Deleting dbref %d from port %p\n", dbref, data->port);
|
||||
|
||||
DbRefList* current = data->dbrefs;
|
||||
DbRefList* last = 0;
|
||||
|
@ -2367,7 +2365,7 @@ static void* deadlock_check(void* arg)
|
|||
}
|
||||
if (count > 0)
|
||||
{
|
||||
DBG("Rejected deadlocks: %d", count);
|
||||
DBG("Rejected deadlocks: %d\n", count);
|
||||
}
|
||||
|
||||
if (G_DEADLOCK_CHECK_INTERVAL > 0)
|
||||
|
@ -2445,6 +2443,7 @@ static void bdb_errcall(const DB_ENV* dbenv, const char* errpfx, const char* msg
|
|||
ERL_DRV_STRING, (ErlDrvTermData)msg, (ErlDrvUInt)strlen(msg),
|
||||
ERL_DRV_TUPLE, 2};
|
||||
send_log_message(response, sizeof(response));
|
||||
DBG("BDB error call: %s\n", msg);
|
||||
}
|
||||
|
||||
static void bdb_msgcall(const DB_ENV* dbenv, const char* msg)
|
||||
|
@ -2453,6 +2452,7 @@ static void bdb_msgcall(const DB_ENV* dbenv, const char* msg)
|
|||
ERL_DRV_STRING, (ErlDrvTermData)msg, (ErlDrvUInt)strlen(msg),
|
||||
ERL_DRV_TUPLE, 2};
|
||||
send_log_message(response, sizeof(response));
|
||||
DBG("BDB message call: %s\n", msg);
|
||||
}
|
||||
|
||||
static void bdb_eventcall(DB_ENV* dbenv, u_int32_t type, void* info)
|
||||
|
@ -2467,6 +2467,7 @@ static void bdb_eventcall(DB_ENV* dbenv, u_int32_t type, void* info)
|
|||
ERL_DRV_TUPLE, 2};
|
||||
// TODO clearly something should be done to shut things down cleanly and restart (how?)
|
||||
send_log_message(response, sizeof(response));
|
||||
DBG("BDB panic event\n");
|
||||
break;
|
||||
}
|
||||
case DB_EVENT_WRITE_FAILED:
|
||||
|
@ -2476,6 +2477,7 @@ static void bdb_eventcall(DB_ENV* dbenv, u_int32_t type, void* info)
|
|||
ERL_DRV_STRING, (ErlDrvTermData)msg, (ErlDrvUInt)strlen(msg),
|
||||
ERL_DRV_TUPLE, 2};
|
||||
send_log_message(response, sizeof(response));
|
||||
DBG("BDB write failed event\n");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
/* -------------------------------------------------------------------
|
||||
*
|
||||
* bdberl: Thread Pool
|
||||
* bdberl: Thread Pool
|
||||
* Copyright (c) 2008-9 The Hive http://www.thehive.com/
|
||||
* Authors: Dave "dizzyd" Smith <dizzyd@dizzyd.com>
|
||||
* Phil Toland <phil.toland@gmail.com>
|
||||
|
@ -78,7 +78,7 @@ void bdberl_tpool_stop(TPool* tpool)
|
|||
{
|
||||
erl_drv_cond_wait(tpool->work_cv, tpool->lock);
|
||||
}
|
||||
|
||||
|
||||
// Join up with all the workers
|
||||
int i = 0;
|
||||
for (i = 0; i < tpool->thread_count; i++)
|
||||
|
@ -86,7 +86,7 @@ void bdberl_tpool_stop(TPool* tpool)
|
|||
erl_drv_thread_join(tpool->threads[i], 0);
|
||||
}
|
||||
|
||||
// Cleanup
|
||||
// Cleanup
|
||||
erl_drv_cond_destroy(tpool->work_cv);
|
||||
erl_drv_cond_destroy(tpool->cancel_cv);
|
||||
driver_free(tpool->threads);
|
||||
|
@ -95,7 +95,7 @@ void bdberl_tpool_stop(TPool* tpool)
|
|||
driver_free(tpool);
|
||||
}
|
||||
|
||||
void bdberl_tpool_run(TPool* tpool, TPoolJobFunc main_fn, void* arg, TPoolJobFunc cancel_fn,
|
||||
void bdberl_tpool_run(TPool* tpool, TPoolJobFunc main_fn, void* arg, TPoolJobFunc cancel_fn,
|
||||
TPoolJob** job_ptr)
|
||||
{
|
||||
// Allocate and fill a new job structure
|
||||
|
@ -107,7 +107,7 @@ void bdberl_tpool_run(TPool* tpool, TPoolJobFunc main_fn, void* arg, TPoolJobFun
|
|||
|
||||
// Sync up with the tpool and add the job to the pending queue
|
||||
LOCK(tpool);
|
||||
|
||||
|
||||
if (tpool->pending_jobs)
|
||||
{
|
||||
// Make sure the current last job points to this one next
|
||||
|
@ -122,7 +122,7 @@ void bdberl_tpool_run(TPool* tpool, TPoolJobFunc main_fn, void* arg, TPoolJobFun
|
|||
tpool->last_pending_job = job;
|
||||
tpool->pending_job_count++;
|
||||
|
||||
// Generate a notification that there is work todo.
|
||||
// Generate a notification that there is work todo.
|
||||
// TODO: I think this may not be necessary, in the case where there are already other
|
||||
// pending jobs. Not sure ATM, however, so will be on safe side
|
||||
erl_drv_cond_broadcast(tpool->work_cv);
|
||||
|
@ -133,7 +133,7 @@ void bdberl_tpool_cancel(TPool* tpool, TPoolJob* job)
|
|||
{
|
||||
LOCK(tpool);
|
||||
|
||||
// Remove the job from the pending queue
|
||||
// Remove the job from the pending queue
|
||||
if (remove_pending_job(tpool, job))
|
||||
{
|
||||
// Job was removed from pending -- unlock and notify the job that it got canceled
|
||||
|
@ -149,7 +149,7 @@ void bdberl_tpool_cancel(TPool* tpool, TPoolJob* job)
|
|||
return;
|
||||
}
|
||||
|
||||
// Job not in the pending queue -- check the active queue.
|
||||
// Job not in the pending queue -- check the active queue.
|
||||
if (is_active_job(tpool, job))
|
||||
{
|
||||
// Job is currently active -- mark it as cancelled (so we get notified) and wait for it
|
||||
|
@ -159,7 +159,7 @@ void bdberl_tpool_cancel(TPool* tpool, TPoolJob* job)
|
|||
erl_drv_cond_wait(tpool->cancel_cv, tpool->lock);
|
||||
}
|
||||
|
||||
// Job is no longer running and should now be considered dead. Cleanup is handled by
|
||||
// Job is no longer running and should now be considered dead. Cleanup is handled by
|
||||
// the worker.
|
||||
UNLOCK(tpool);
|
||||
return;
|
||||
|
@ -212,7 +212,7 @@ static void* bdberl_tpool_main(void* arg)
|
|||
{
|
||||
erl_drv_cond_broadcast(tpool->cancel_cv);
|
||||
}
|
||||
|
||||
|
||||
// Cleanup the job (remove from active list, free, etc.)
|
||||
cleanup_job(tpool, job);
|
||||
}
|
||||
|
@ -318,7 +318,7 @@ static void cleanup_job(TPool* tpool, TPoolJob* job)
|
|||
{
|
||||
tpool->active_jobs = current->next;
|
||||
}
|
||||
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -348,7 +348,7 @@ static int is_active_job(TPool* tpool, TPoolJob* job)
|
|||
}
|
||||
|
||||
// Return the number of pending and active jobs
|
||||
void bdberl_tpool_job_count(TPool* tpool, unsigned int *pending_count_ptr,
|
||||
void bdberl_tpool_job_count(TPool* tpool, unsigned int *pending_count_ptr,
|
||||
unsigned int *active_count_ptr)
|
||||
{
|
||||
LOCK(tpool);
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
/* -------------------------------------------------------------------
|
||||
*
|
||||
* bdberl: Thread Pool
|
||||
* bdberl: Thread Pool
|
||||
* Copyright (c) 2008-9 The Hive http://www.thehive.com/
|
||||
* Authors: Dave "dizzyd" Smith <dizzyd@dizzyd.com>
|
||||
* Phil Toland <phil.toland@gmail.com>
|
||||
|
@ -74,7 +74,7 @@ typedef struct
|
|||
unsigned int active_threads;
|
||||
|
||||
unsigned int shutdown;
|
||||
|
||||
|
||||
} TPool;
|
||||
|
||||
TPool* bdberl_tpool_start(unsigned int thread_count);
|
||||
|
@ -86,7 +86,13 @@ void bdberl_tpool_run(TPool* tpool, TPoolJobFunc main_fn, void* arg, TPoolJobFun
|
|||
|
||||
void bdberl_tpool_cancel(TPool* tpool, TPoolJob* job);
|
||||
|
||||
void bdberl_tpool_job_count(TPool* tpool, unsigned int *pending_count_ptr,
|
||||
void bdberl_tpool_job_count(TPool* tpool, unsigned int *pending_count_ptr,
|
||||
unsigned int *active_count_ptr);
|
||||
|
||||
void bdberl_tpool_thread_id(DB_ENV *env, pid_t *pid, db_threadid_t *tid);
|
||||
|
||||
char *bdberl_tpool_thread_id_string(DB_ENV *dbenv, pid_t pid, db_threadid_t tid, char *buf);
|
||||
|
||||
int bdberl_tpool_thread_is_alive(DB_ENV *dbenv, pid_t pid, db_threadid_t tid, u_int32_t flags);
|
||||
|
||||
#endif
|
||||
|
|
Loading…
Reference in a new issue