Heavily instrumented rwlock code - rwlock seems to be misbehaving.

About to replace with a mutex.wq
This commit is contained in:
Jon Meredith 2009-06-18 15:14:10 -06:00 committed by Phillip Toland
parent bfb1620fb8
commit 3d422bec1d

View file

@ -77,7 +77,7 @@ static int open_database(const char* name, DBTYPE type, unsigned int flags, Port
static int close_database(int dbref, unsigned flags, PortData* data);
static void check_all_databases_closed();
static int delete_database(const char* name);
static int delete_database(const char* name, PortData *data);
static void get_info(int target, void* values, BinHelper* bh);
@ -136,8 +136,11 @@ static int G_DB_ENV_ERROR = 0;
* G_DATABASES_RWLOCK.
*/
static Database* G_DATABASES = 0;
static Database* G_DATABASES_SHADOW = 0;
static unsigned int G_DATABASES_SIZE = 0;
static ErlDrvRWLock* G_DATABASES_RWLOCK = 0;
static volatile ErlDrvTid G_DATABASES_RWLOCK_TID = 0;
static volatile ErlDrvPort G_DATABASES_RWLOCK_PORT = 0;
static hive_hash* G_DATABASES_NAMES = 0;
@ -181,6 +184,8 @@ static int G_BDBERL_PIPE[2] = {-1, -1};
* no pid/port is available, no callback is registered with BDB.
*/
static ErlDrvRWLock* G_LOG_RWLOCK = 0;
//static ErlDrvTid G_LOG_RWLOCK_TID = 0;
//static ErlDrvPort G_LOG_RWLOCK_PORT = 0;
static ErlDrvTermData G_LOG_PID;
static ErlDrvPort G_LOG_PORT;
@ -201,12 +206,6 @@ static TPool* G_TPOOL_TXNS = NULL;
/**
* Helpful macros
*/
#define READ_LOCK(L) erl_drv_rwlock_rlock(L)
#define READ_UNLOCK(L) erl_drv_rwlock_runlock(L)
#define PROMOTE_READ_LOCK(L) erl_drv_rwlock_runlock(L); erl_drv_rwlock_rwlock(L)
#define WRITE_LOCK(L) erl_drv_rwlock_rwlock(L)
#define WRITE_UNLOCK(L) erl_drv_rwlock_rwunlock(L)
#ifdef DEBUG
# define DBG(...) fprintf(stderr, __VA_ARGS__)
static void DBGCMD(PortData *d, const char *fmt, ...);
@ -218,6 +217,70 @@ static void DBGCMDRC(PortData *d, int rc);
#endif
void READ_LOCK_DATABASES(void *x, ErlDrvPort P)
{
ErlDrvTid self = erl_drv_thread_self();
DBG("threadid %p port %p: read locking G_DATABASES\r\n", self, P);
erl_drv_rwlock_rlock(G_DATABASES_RWLOCK);
assert(0 == G_DATABASES_RWLOCK_TID);
assert(0 == G_DATABASES_RWLOCK_PORT);
G_DATABASES_RWLOCK_TID = self;
G_DATABASES_RWLOCK_PORT = P;
assert(0 == memcmp(G_DATABASES, G_DATABASES_SHADOW, sizeof(G_DATABASES[0])*G_DATABASES_SIZE));
DBG("threadid %p port %p: read locked G_DATABASES\r\n", self, P);
}
void READ_UNLOCK_DATABASES(void *x, ErlDrvPort P)
{
ErlDrvTid self = erl_drv_thread_self();
DBG("threadid %p port %p: read unlocking G_DATABASES\r\n", self, P);
assert(erl_drv_thread_self() == G_DATABASES_RWLOCK_TID);
assert(P == G_DATABASES_RWLOCK_PORT);
assert(0 == memcmp(G_DATABASES, G_DATABASES_SHADOW, sizeof(G_DATABASES[0])*G_DATABASES_SIZE));
G_DATABASES_RWLOCK_TID = 0;
G_DATABASES_RWLOCK_PORT = 0;
erl_drv_rwlock_runlock(G_DATABASES_RWLOCK);
DBG("threadid %p port %p: read unlocked G_DATABASES\r\n", self, P);
}
#define PROMOTE_READ_LOCK_DATABASES(L, P) \
READ_UNLOCK_DATABASES(L, P); \
WRITE_LOCK_DATABASES(L, P);
void WRITE_LOCK_DATABASES(void *x, ErlDrvPort P)
{
ErlDrvTid self = erl_drv_thread_self();
DBG("threadid %p port %p: write locking G_DATABASES\r\n", self, P);
erl_drv_rwlock_rwlock(G_DATABASES_RWLOCK);
assert(0 == memcmp(G_DATABASES, G_DATABASES_SHADOW, sizeof(G_DATABASES[0])*G_DATABASES_SIZE));
assert(0 == G_DATABASES_RWLOCK_TID);
assert(0 == G_DATABASES_RWLOCK_PORT);
G_DATABASES_RWLOCK_TID = self;
G_DATABASES_RWLOCK_PORT = P;
DBG("threadid %p port %p: write locked G_DATABASES\r\n", self, P);
}
void WRITE_UNLOCK_DATABASES(void *x, ErlDrvPort P)
{
ErlDrvTid self = erl_drv_thread_self();
DBG("threadid %p port %p: write unlocking G_DATABASES\r\n", self, P);
assert(erl_drv_thread_self() == G_DATABASES_RWLOCK_TID);
assert(P == G_DATABASES_RWLOCK_PORT);
memcpy(G_DATABASES_SHADOW, G_DATABASES, sizeof(G_DATABASES[0])*G_DATABASES_SIZE);
G_DATABASES_RWLOCK_TID = 0;
G_DATABASES_RWLOCK_PORT = 0;
erl_drv_rwlock_rwunlock(G_DATABASES_RWLOCK);
DBG("threadid %p port %p: write unlocked G_DATABASES\r\n", self, P);
}
#define READ_LOCK(L) erl_drv_rwlock_rlock(L)
#define READ_UNLOCK(L) erl_drv_rwlock_runlock(L)
#define PROMOTE_READ_LOCK(L) erl_drv_rwlock_runlock(L); erl_drv_rwlock_rwlock(L)
#define WRITE_LOCK(L) erl_drv_rwlock_rwlock(L)
#define WRITE_UNLOCK(L) erl_drv_rwlock_rwunlock(L)
DRIVER_INIT(bdberl_drv)
{
DBG("DRIVER INIT\r\n");
@ -307,7 +370,9 @@ DRIVER_INIT(bdberl_drv)
// BDB is setup -- allocate structures for tracking databases
G_DATABASES = (Database*) driver_alloc(sizeof(Database) * G_DATABASES_SIZE);
G_DATABASES_SHADOW = (Database*) driver_alloc(sizeof(Database) * G_DATABASES_SIZE);
memset(G_DATABASES, '\0', sizeof(Database) * G_DATABASES_SIZE);
memcpy(G_DATABASES_SHADOW, G_DATABASES, sizeof(Database) * G_DATABASES_SIZE);
G_DATABASES_RWLOCK = erl_drv_rwlock_create("bdberl_drv: G_DATABASES_RWLOCK");
G_DATABASES_NAMES = hive_hash_new(G_DATABASES_SIZE);
@ -383,7 +448,7 @@ static void bdberl_drv_stop(ErlDrvData handle)
{
PortData* d = (PortData*)handle;
DBG("Stopping port %p - cancelling async jobs\r\n", d->port);
DBG("Stopping port %p\r\n", d->port);
// Grab the port lock, in case we have an async job running
erl_drv_mutex_lock(d->port_lock);
@ -392,12 +457,13 @@ static void bdberl_drv_stop(ErlDrvData handle)
// block until the job has either been removed or has run
if (d->async_job)
{
DBGCMD(d, "Stopping port %p - cancelling async job %p\r\n", d->port, d->async_job);
// Drop the lock prior to starting the wait for the async process
erl_drv_mutex_unlock(d->port_lock);
DBG("Cancelling async job for port: %p\r\n", d->port);
bdberl_tpool_cancel(d->async_pool, d->async_job);
DBG("Canceled async job for port: %p\r\n", d->port);
DBGCMD(d, "Canceled async job for port: %p\r\n", d->port);
}
else
{
@ -524,6 +590,11 @@ static void bdberl_drv_finish()
driver_free(G_DATABASES);
G_DATABASES = NULL;
}
if (NULL != G_DATABASES_SHADOW)
{
driver_free(G_DATABASES_SHADOW);
G_DATABASES = NULL;
}
if (NULL != G_DATABASES_RWLOCK)
{
erl_drv_rwlock_destroy(G_DATABASES_RWLOCK);
@ -662,6 +733,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
// Mark the port as busy and then schedule the appropriate async operation
d->async_op = cmd;
d->async_dbref = dbref;
TPoolJobFunc fn;
if (cmd == CMD_PUT || cmd == CMD_PUT_COMMIT)
{
@ -762,7 +834,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
// Inbuf is: << dbname/bytes, 0:8 >>
const char* dbname = UNPACK_STRING(inbuf, 0);
int rc = delete_database(dbname);
int rc = delete_database(dbname, d);
bdberl_send_rc(d->port, d->port_owner, rc);
RETURN_INT(0, outbuf);
}
@ -955,7 +1027,7 @@ DB* bdberl_lookup_dbref(int dbref)
assert(NULL != G_DATABASES);
assert(0 <= dbref);
assert(G_DATABASES_SIZE > dbref);
assert(NULL != G_DATABASES[dbref].db);
return G_DATABASES[dbref].db;
}
@ -977,7 +1049,7 @@ static int open_database(const char* name, DBTYPE type, unsigned int flags, Port
{
*dbref_res = -1;
READ_LOCK(G_DATABASES_RWLOCK);
READ_LOCK_DATABASES(G_DATABASES_RWLOCK, data->port);
// Look up the database by name in our hash table
Database* database = (Database*)hive_hash_get(G_DATABASES_NAMES, name);
@ -992,20 +1064,20 @@ static int open_database(const char* name, DBTYPE type, unsigned int flags, Port
if (add_dbref(data, dbref))
{
// Need to update G_DATABASES -- grab the write lock
PROMOTE_READ_LOCK(G_DATABASES_RWLOCK);
PROMOTE_READ_LOCK_DATABASES(G_DATABASES_RWLOCK, data->port);
// Add a reference to this port
add_portref(dbref, data->port);
// Release RW lock and return the ref
WRITE_UNLOCK(G_DATABASES_RWLOCK);
WRITE_UNLOCK_DATABASES(G_DATABASES_RWLOCK, data->port);
*dbref_res = dbref;
return 0;
}
else
{
// Already in our list of opened databases -- unlock and return the reference
READ_UNLOCK(G_DATABASES_RWLOCK);
READ_UNLOCK_DATABASES(G_DATABASES_RWLOCK, data->port);
*dbref_res = dbref;
return 0;
}
@ -1013,7 +1085,7 @@ static int open_database(const char* name, DBTYPE type, unsigned int flags, Port
else
{
// This database hasn't been opened yet -- grab a write lock
PROMOTE_READ_LOCK(G_DATABASES_RWLOCK);
PROMOTE_READ_LOCK_DATABASES(G_DATABASES_RWLOCK, data->port);
// While waiting on the write lock, another thread could have slipped in and
// opened the database, so do one more check to see if the database is already
@ -1025,7 +1097,7 @@ static int open_database(const char* name, DBTYPE type, unsigned int flags, Port
// to our port and drop the lock ASAP
int dbref = database - G_DATABASES;
add_portref(dbref, data->port);
WRITE_UNLOCK(G_DATABASES_RWLOCK);
WRITE_UNLOCK_DATABASES(G_DATABASES_RWLOCK, data->port);
add_dbref(data, dbref);
*dbref_res = dbref;
@ -1041,7 +1113,7 @@ static int open_database(const char* name, DBTYPE type, unsigned int flags, Port
if (dbref < 0)
{
// No more slots available
WRITE_UNLOCK(G_DATABASES_RWLOCK);
WRITE_UNLOCK_DATABASES(G_DATABASES_RWLOCK, data->port);
return ERROR_MAX_DBS;
}
@ -1054,7 +1126,7 @@ static int open_database(const char* name, DBTYPE type, unsigned int flags, Port
{
// Failure while creating the database handle -- drop our lock and return
// the code
WRITE_UNLOCK(G_DATABASES_RWLOCK);
WRITE_UNLOCK_DATABASES(G_DATABASES_RWLOCK, data->port);
return rc;
}
@ -1076,21 +1148,23 @@ static int open_database(const char* name, DBTYPE type, unsigned int flags, Port
// Failure while opening the database -- cleanup the handle, drop the lock
// and return
db->close(db, 0);
WRITE_UNLOCK(G_DATABASES_RWLOCK);
WRITE_UNLOCK_DATABASES(G_DATABASES_RWLOCK, data->port);
return rc;
}
// Database is open. Store all the data into the allocated ref
assert(NULL != db);
G_DATABASES[dbref].db = db;
G_DATABASES[dbref].name = strdup(name);
G_DATABASES[dbref].ports = zalloc(sizeof(PortList));
G_DATABASES[dbref].ports->port = data->port;
G_DATABASES[dbref].active_ports = 1;
// Make entry in hash table of names
hive_hash_add(G_DATABASES_NAMES, G_DATABASES[dbref].name, &(G_DATABASES[dbref]));
// Drop the write lock
WRITE_UNLOCK(G_DATABASES_RWLOCK);
WRITE_UNLOCK_DATABASES(G_DATABASES_RWLOCK, data->port);
// Add the dbref to the port list
add_dbref(data, dbref);
@ -1106,7 +1180,7 @@ static int close_database(int dbref, unsigned flags, PortData* data)
{
// Something was actually deleted from our list -- now we need to disassociate the
// calling port with the global database structure.
WRITE_LOCK(G_DATABASES_RWLOCK);
WRITE_LOCK_DATABASES(G_DATABASES_RWLOCK, data->port);
assert(G_DATABASES[dbref].db != 0);
assert(G_DATABASES[dbref].ports != 0);
@ -1133,7 +1207,7 @@ static int close_database(int dbref, unsigned flags, PortData* data)
memset(database, '\0', sizeof(Database));
}
WRITE_UNLOCK(G_DATABASES_RWLOCK);
WRITE_UNLOCK_DATABASES(G_DATABASES_RWLOCK, data->port);
return rc;
}
else
@ -1148,7 +1222,7 @@ static int close_database(int dbref, unsigned flags, PortData* data)
// but it is very important for BDB to shutdown cleanly so a final check can't hurt.
static void check_all_databases_closed()
{
WRITE_LOCK(G_DATABASES_RWLOCK);
WRITE_LOCK_DATABASES(G_DATABASES_RWLOCK, (ErlDrvPort)-1); // use unlikely port number - have no port here.
int dbref;
int rc;
@ -1170,7 +1244,7 @@ static void check_all_databases_closed()
}
}
WRITE_UNLOCK(G_DATABASES_RWLOCK);
WRITE_UNLOCK_DATABASES(G_DATABASES_RWLOCK, (ErlDrvPort)-1);
}
@ -1186,22 +1260,22 @@ static void abort_txn(PortData* d)
}
}
static int delete_database(const char* name)
static int delete_database(const char* name, PortData *data)
{
// Go directly to a write lock on the global databases structure
WRITE_LOCK(G_DATABASES_RWLOCK);
WRITE_LOCK_DATABASES(G_DATABASES_RWLOCK, data->port);
// Make sure the database is not opened by anyone
if (hive_hash_get(G_DATABASES_NAMES, name))
{
WRITE_UNLOCK(G_DATABASES_RWLOCK);
WRITE_UNLOCK_DATABASES(G_DATABASES_RWLOCK, data->port);
return ERROR_DB_ACTIVE;
}
// Good, database doesn't seem to be open -- attempt the delete
DBG("Attempting to delete database: %s\r\n", name);
int rc = G_DB_ENV->dbremove(G_DB_ENV, 0, name, 0, DB_AUTO_COMMIT);
WRITE_UNLOCK(G_DATABASES_RWLOCK);
WRITE_UNLOCK_DATABASES(G_DATABASES_RWLOCK, data->port);
return rc;
}
@ -1509,8 +1583,8 @@ static void do_async_put(void* arg)
{
// Execute the actual put. All databases are opened with AUTO_COMMIT, so if msg->port->txn
// is NULL, the put will still be atomic
DBGCMD(d, "db->put(%p, %p, %p, %p, %08X) key=%p(%d) value=%p(%d)",
db, d->txn, &key, &value, flags, key.data, key.size, value.data, value.size);
DBGCMD(d, "db->put(%p, %p, %p, %p, %08X) dbref %d key=%p(%d) value=%p(%d)",
db, d->txn, &key, &value, flags, dbref, key.data, key.size, value.data, value.size);
rc = db->put(db, d->txn, &key, &value, flags);
DBGCMDRC(d, rc);
}
@ -2056,6 +2130,8 @@ static int alloc_dbref()
{
if (G_DATABASES[i].db == 0)
{
assert(NULL == G_DATABASES[i].db);
assert(NULL == G_DATABASES[i].ports);
return i;
}
}