Relaced G_DATABASES rwlock with a mutex.
This commit is contained in:
parent
425d8ddf00
commit
783acf09ba
3
Rakefile
3
Rakefile
|
@ -9,8 +9,9 @@ INT_TEST_FLAGS << '+A10'
|
||||||
|
|
||||||
DB_LIB = "c_src/system/lib/libdb.a"
|
DB_LIB = "c_src/system/lib/libdb.a"
|
||||||
|
|
||||||
CC_FLAGS << "-Ic_src/system/include"
|
CC_FLAGS << "-Ic_src/system/include -pthread"
|
||||||
LD_LIBS << DB_LIB
|
LD_LIBS << DB_LIB
|
||||||
|
LD_FLAGS << "-pthread"
|
||||||
|
|
||||||
file DB_LIB do
|
file DB_LIB do
|
||||||
sh "cd c_src && ./buildlib.sh 2>&1"
|
sh "cd c_src && ./buildlib.sh 2>&1"
|
||||||
|
|
|
@ -133,14 +133,14 @@ static int G_DB_ENV_ERROR = 0;
|
||||||
* filenames to array index for an opened Database.
|
* filenames to array index for an opened Database.
|
||||||
*
|
*
|
||||||
* All access to G_DATABASES and G_DATABASES_NAMES must be protected by the read/write lock
|
* All access to G_DATABASES and G_DATABASES_NAMES must be protected by the read/write lock
|
||||||
* G_DATABASES_RWLOCK.
|
* G_DATABASES_MUTEX.
|
||||||
*/
|
*/
|
||||||
static Database* G_DATABASES = 0;
|
static Database* G_DATABASES = 0;
|
||||||
static Database* G_DATABASES_SHADOW = 0;
|
static Database* G_DATABASES_SHADOW = 0;
|
||||||
static unsigned int G_DATABASES_SIZE = 0;
|
static unsigned int G_DATABASES_SIZE = 0;
|
||||||
static ErlDrvRWLock* G_DATABASES_RWLOCK = 0;
|
static ErlDrvMutex* G_DATABASES_MUTEX = 0;
|
||||||
static volatile ErlDrvTid G_DATABASES_RWLOCK_TID = 0; /* TID for writers */
|
static volatile ErlDrvTid G_DATABASES_MUTEX_TID = 0;
|
||||||
static volatile ErlDrvPort G_DATABASES_RWLOCK_PORT = 0; /* Port for writers */
|
static volatile ErlDrvPort G_DATABASES_MUTEX_PORT = 0;
|
||||||
static hive_hash* G_DATABASES_NAMES = 0;
|
static hive_hash* G_DATABASES_NAMES = 0;
|
||||||
|
|
||||||
|
|
||||||
|
@ -237,68 +237,35 @@ void CHECK_DATABASES(void)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void READ_LOCK_DATABASES(void *x, ErlDrvPort P)
|
void LOCK_DATABASES(ErlDrvPort P)
|
||||||
{
|
{
|
||||||
#ifdef DEBUG
|
|
||||||
ErlDrvTid self = erl_drv_thread_self();
|
ErlDrvTid self = erl_drv_thread_self();
|
||||||
#endif
|
DBG("threadid %p port %p: locking G_DATABASES\r\n", self, P);
|
||||||
DBG("threadid %p port %p: read locking G_DATABASES\r\n", self, P);
|
erl_drv_mutex_lock(G_DATABASES_MUTEX);
|
||||||
CHECK_DATABASES();
|
CHECK_DATABASES();
|
||||||
erl_drv_rwlock_rlock(G_DATABASES_RWLOCK);
|
assert(0 == G_DATABASES_MUTEX_TID);
|
||||||
assert(0 == G_DATABASES_RWLOCK_TID);
|
assert(0 == G_DATABASES_MUTEX_PORT);
|
||||||
assert(0 == G_DATABASES_RWLOCK_PORT);
|
G_DATABASES_MUTEX_TID = self;
|
||||||
|
G_DATABASES_MUTEX_PORT = P;
|
||||||
assert(0 == memcmp(G_DATABASES, G_DATABASES_SHADOW, sizeof(G_DATABASES[0])*G_DATABASES_SIZE));
|
assert(0 == memcmp(G_DATABASES, G_DATABASES_SHADOW, sizeof(G_DATABASES[0])*G_DATABASES_SIZE));
|
||||||
DBG("threadid %p port %p: read locked G_DATABASES\r\n", self, P);
|
DBG("threadid %p port %p: locked G_DATABASES\r\n", self, P);
|
||||||
}
|
}
|
||||||
|
|
||||||
void READ_UNLOCK_DATABASES(void *x, ErlDrvPort P)
|
void UNLOCK_DATABASES(ErlDrvPort P)
|
||||||
{
|
|
||||||
#ifdef DEBUG
|
|
||||||
ErlDrvTid self = erl_drv_thread_self();
|
|
||||||
#endif
|
|
||||||
DBG("threadid %p port %p: read unlocking G_DATABASES\r\n", self, P);
|
|
||||||
assert(0 == G_DATABASES_RWLOCK_TID);
|
|
||||||
assert(0 == G_DATABASES_RWLOCK_PORT);
|
|
||||||
assert(0 == memcmp(G_DATABASES, G_DATABASES_SHADOW, sizeof(G_DATABASES[0])*G_DATABASES_SIZE));
|
|
||||||
CHECK_DATABASES();
|
|
||||||
erl_drv_rwlock_runlock(G_DATABASES_RWLOCK);
|
|
||||||
DBG("threadid %p port %p: read unlocked G_DATABASES\r\n", self, P);
|
|
||||||
}
|
|
||||||
|
|
||||||
#define PROMOTE_READ_LOCK_DATABASES(L, P) \
|
|
||||||
READ_UNLOCK_DATABASES(L, P); \
|
|
||||||
WRITE_LOCK_DATABASES(L, P);
|
|
||||||
|
|
||||||
void WRITE_LOCK_DATABASES(void *x, ErlDrvPort P)
|
|
||||||
{
|
{
|
||||||
ErlDrvTid self = erl_drv_thread_self();
|
ErlDrvTid self = erl_drv_thread_self();
|
||||||
DBG("threadid %p port %p: write locking G_DATABASES\r\n", self, P);
|
DBG("threadid %p port %p: unlocking G_DATABASES\r\n", self, P);
|
||||||
erl_drv_rwlock_rwlock(G_DATABASES_RWLOCK);
|
assert(self == G_DATABASES_MUTEX_TID);
|
||||||
CHECK_DATABASES();
|
assert(P == G_DATABASES_MUTEX_PORT);
|
||||||
assert(0 == memcmp(G_DATABASES, G_DATABASES_SHADOW, sizeof(G_DATABASES[0])*G_DATABASES_SIZE));
|
|
||||||
assert(0 == G_DATABASES_RWLOCK_TID);
|
|
||||||
assert(0 == G_DATABASES_RWLOCK_PORT);
|
|
||||||
G_DATABASES_RWLOCK_TID = self;
|
|
||||||
G_DATABASES_RWLOCK_PORT = P;
|
|
||||||
DBG("threadid %p port %p: write locked G_DATABASES\r\n", self, P);
|
|
||||||
}
|
|
||||||
|
|
||||||
void WRITE_UNLOCK_DATABASES(void *x, ErlDrvPort P)
|
|
||||||
{
|
|
||||||
#ifdef DEBUG
|
|
||||||
ErlDrvTid self = erl_drv_thread_self();
|
|
||||||
#endif
|
|
||||||
DBG("threadid %p port %p: write unlocking G_DATABASES\r\n", self, P);
|
|
||||||
assert(erl_drv_thread_self() == G_DATABASES_RWLOCK_TID);
|
|
||||||
assert(P == G_DATABASES_RWLOCK_PORT);
|
|
||||||
memcpy(G_DATABASES_SHADOW, G_DATABASES, sizeof(G_DATABASES[0])*G_DATABASES_SIZE);
|
memcpy(G_DATABASES_SHADOW, G_DATABASES, sizeof(G_DATABASES[0])*G_DATABASES_SIZE);
|
||||||
G_DATABASES_RWLOCK_TID = 0;
|
G_DATABASES_MUTEX_TID = 0;
|
||||||
G_DATABASES_RWLOCK_PORT = 0;
|
G_DATABASES_MUTEX_PORT = 0;
|
||||||
CHECK_DATABASES();
|
CHECK_DATABASES();
|
||||||
erl_drv_rwlock_rwunlock(G_DATABASES_RWLOCK);
|
erl_drv_mutex_unlock(G_DATABASES_MUTEX);
|
||||||
DBG("threadid %p port %p: write unlocked G_DATABASES\r\n", self, P);
|
DBG("threadid %p port %p: unlocked G_DATABASES\r\n", self, P);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
#define READ_LOCK(L) erl_drv_rwlock_rlock(L)
|
#define READ_LOCK(L) erl_drv_rwlock_rlock(L)
|
||||||
#define READ_UNLOCK(L) erl_drv_rwlock_runlock(L)
|
#define READ_UNLOCK(L) erl_drv_rwlock_runlock(L)
|
||||||
#define PROMOTE_READ_LOCK(L) erl_drv_rwlock_runlock(L); erl_drv_rwlock_rwlock(L)
|
#define PROMOTE_READ_LOCK(L) erl_drv_rwlock_runlock(L); erl_drv_rwlock_rwlock(L)
|
||||||
|
@ -398,7 +365,7 @@ DRIVER_INIT(bdberl_drv)
|
||||||
G_DATABASES_SHADOW = (Database*) driver_alloc(sizeof(Database) * G_DATABASES_SIZE);
|
G_DATABASES_SHADOW = (Database*) driver_alloc(sizeof(Database) * G_DATABASES_SIZE);
|
||||||
memset(G_DATABASES, '\0', sizeof(Database) * G_DATABASES_SIZE);
|
memset(G_DATABASES, '\0', sizeof(Database) * G_DATABASES_SIZE);
|
||||||
memcpy(G_DATABASES_SHADOW, G_DATABASES, sizeof(Database) * G_DATABASES_SIZE);
|
memcpy(G_DATABASES_SHADOW, G_DATABASES, sizeof(Database) * G_DATABASES_SIZE);
|
||||||
G_DATABASES_RWLOCK = erl_drv_rwlock_create("bdberl_drv: G_DATABASES_RWLOCK");
|
G_DATABASES_MUTEX = erl_drv_mutex_create("bdberl_drv: G_DATABASES_MUTEX");
|
||||||
G_DATABASES_NAMES = hive_hash_new(G_DATABASES_SIZE);
|
G_DATABASES_NAMES = hive_hash_new(G_DATABASES_SIZE);
|
||||||
|
|
||||||
// Startup deadlock check thread
|
// Startup deadlock check thread
|
||||||
|
@ -620,10 +587,10 @@ static void bdberl_drv_finish()
|
||||||
driver_free(G_DATABASES_SHADOW);
|
driver_free(G_DATABASES_SHADOW);
|
||||||
G_DATABASES = NULL;
|
G_DATABASES = NULL;
|
||||||
}
|
}
|
||||||
if (NULL != G_DATABASES_RWLOCK)
|
if (NULL != G_DATABASES_MUTEX)
|
||||||
{
|
{
|
||||||
erl_drv_rwlock_destroy(G_DATABASES_RWLOCK);
|
erl_drv_mutex_destroy(G_DATABASES_MUTEX);
|
||||||
G_DATABASES_RWLOCK = NULL;
|
G_DATABASES_MUTEX = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (NULL != G_DATABASES_NAMES)
|
if (NULL != G_DATABASES_NAMES)
|
||||||
|
@ -1074,7 +1041,7 @@ static int open_database(const char* name, DBTYPE type, unsigned int flags, Port
|
||||||
{
|
{
|
||||||
*dbref_res = -1;
|
*dbref_res = -1;
|
||||||
|
|
||||||
READ_LOCK_DATABASES(G_DATABASES_RWLOCK, data->port);
|
LOCK_DATABASES(data->port);
|
||||||
|
|
||||||
// Look up the database by name in our hash table
|
// Look up the database by name in our hash table
|
||||||
Database* database = (Database*)hive_hash_get(G_DATABASES_NAMES, name);
|
Database* database = (Database*)hive_hash_get(G_DATABASES_NAMES, name);
|
||||||
|
@ -1088,47 +1055,24 @@ static int open_database(const char* name, DBTYPE type, unsigned int flags, Port
|
||||||
// Database structure in G_DATABASES
|
// Database structure in G_DATABASES
|
||||||
if (add_dbref(data, dbref))
|
if (add_dbref(data, dbref))
|
||||||
{
|
{
|
||||||
// Need to update G_DATABASES -- grab the write lock
|
|
||||||
PROMOTE_READ_LOCK_DATABASES(G_DATABASES_RWLOCK, data->port);
|
|
||||||
|
|
||||||
// Add a reference to this port
|
// Add a reference to this port
|
||||||
add_portref(dbref, data->port);
|
add_portref(dbref, data->port);
|
||||||
|
|
||||||
// Release RW lock and return the ref
|
// Release RW lock and return the ref
|
||||||
WRITE_UNLOCK_DATABASES(G_DATABASES_RWLOCK, data->port);
|
UNLOCK_DATABASES(data->port);
|
||||||
*dbref_res = dbref;
|
*dbref_res = dbref;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
// Already in our list of opened databases -- unlock and return the reference
|
// Already in our list of opened databases -- unlock and return the reference
|
||||||
READ_UNLOCK_DATABASES(G_DATABASES_RWLOCK, data->port);
|
UNLOCK_DATABASES(data->port);
|
||||||
*dbref_res = dbref;
|
*dbref_res = dbref;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
// This database hasn't been opened yet -- grab a write lock
|
|
||||||
PROMOTE_READ_LOCK_DATABASES(G_DATABASES_RWLOCK, data->port);
|
|
||||||
|
|
||||||
// While waiting on the write lock, another thread could have slipped in and
|
|
||||||
// opened the database, so do one more check to see if the database is already
|
|
||||||
// open
|
|
||||||
database = (Database*)hive_hash_get(G_DATABASES_NAMES, name);
|
|
||||||
if (database)
|
|
||||||
{
|
|
||||||
// Database got created while we were waiting on the write lock, add a reference
|
|
||||||
// to our port and drop the lock ASAP
|
|
||||||
int dbref = database - G_DATABASES;
|
|
||||||
add_portref(dbref, data->port);
|
|
||||||
WRITE_UNLOCK_DATABASES(G_DATABASES_RWLOCK, data->port);
|
|
||||||
|
|
||||||
add_dbref(data, dbref);
|
|
||||||
*dbref_res = dbref;
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Database hasn't been created while we were waiting on write lock, so
|
// Database hasn't been created while we were waiting on write lock, so
|
||||||
// create/open it
|
// create/open it
|
||||||
|
|
||||||
|
@ -1138,7 +1082,7 @@ static int open_database(const char* name, DBTYPE type, unsigned int flags, Port
|
||||||
if (dbref < 0)
|
if (dbref < 0)
|
||||||
{
|
{
|
||||||
// No more slots available
|
// No more slots available
|
||||||
WRITE_UNLOCK_DATABASES(G_DATABASES_RWLOCK, data->port);
|
UNLOCK_DATABASES(data->port);
|
||||||
return ERROR_MAX_DBS;
|
return ERROR_MAX_DBS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1151,7 +1095,7 @@ static int open_database(const char* name, DBTYPE type, unsigned int flags, Port
|
||||||
{
|
{
|
||||||
// Failure while creating the database handle -- drop our lock and return
|
// Failure while creating the database handle -- drop our lock and return
|
||||||
// the code
|
// the code
|
||||||
WRITE_UNLOCK_DATABASES(G_DATABASES_RWLOCK, data->port);
|
UNLOCK_DATABASES(data->port);
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1173,7 +1117,7 @@ static int open_database(const char* name, DBTYPE type, unsigned int flags, Port
|
||||||
// Failure while opening the database -- cleanup the handle, drop the lock
|
// Failure while opening the database -- cleanup the handle, drop the lock
|
||||||
// and return
|
// and return
|
||||||
db->close(db, 0);
|
db->close(db, 0);
|
||||||
WRITE_UNLOCK_DATABASES(G_DATABASES_RWLOCK, data->port);
|
UNLOCK_DATABASES(data->port);
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1189,7 +1133,7 @@ static int open_database(const char* name, DBTYPE type, unsigned int flags, Port
|
||||||
hive_hash_add(G_DATABASES_NAMES, G_DATABASES[dbref].name, &(G_DATABASES[dbref]));
|
hive_hash_add(G_DATABASES_NAMES, G_DATABASES[dbref].name, &(G_DATABASES[dbref]));
|
||||||
|
|
||||||
// Drop the write lock
|
// Drop the write lock
|
||||||
WRITE_UNLOCK_DATABASES(G_DATABASES_RWLOCK, data->port);
|
UNLOCK_DATABASES(data->port);
|
||||||
|
|
||||||
// Add the dbref to the port list
|
// Add the dbref to the port list
|
||||||
add_dbref(data, dbref);
|
add_dbref(data, dbref);
|
||||||
|
@ -1205,7 +1149,7 @@ static int close_database(int dbref, unsigned flags, PortData* data)
|
||||||
{
|
{
|
||||||
// Something was actually deleted from our list -- now we need to disassociate the
|
// Something was actually deleted from our list -- now we need to disassociate the
|
||||||
// calling port with the global database structure.
|
// calling port with the global database structure.
|
||||||
WRITE_LOCK_DATABASES(G_DATABASES_RWLOCK, data->port);
|
LOCK_DATABASES(data->port);
|
||||||
|
|
||||||
assert(G_DATABASES[dbref].db != 0);
|
assert(G_DATABASES[dbref].db != 0);
|
||||||
assert(G_DATABASES[dbref].ports != 0);
|
assert(G_DATABASES[dbref].ports != 0);
|
||||||
|
@ -1232,7 +1176,7 @@ static int close_database(int dbref, unsigned flags, PortData* data)
|
||||||
memset(database, '\0', sizeof(Database));
|
memset(database, '\0', sizeof(Database));
|
||||||
}
|
}
|
||||||
|
|
||||||
WRITE_UNLOCK_DATABASES(G_DATABASES_RWLOCK, data->port);
|
UNLOCK_DATABASES(data->port);
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -1247,7 +1191,7 @@ static int close_database(int dbref, unsigned flags, PortData* data)
|
||||||
// but it is very important for BDB to shutdown cleanly so a final check can't hurt.
|
// but it is very important for BDB to shutdown cleanly so a final check can't hurt.
|
||||||
static void check_all_databases_closed()
|
static void check_all_databases_closed()
|
||||||
{
|
{
|
||||||
WRITE_LOCK_DATABASES(G_DATABASES_RWLOCK, (ErlDrvPort)-1); // use unlikely port number - have no port here.
|
LOCK_DATABASES((ErlDrvPort)-1); // use unlikely port number - have no port here.
|
||||||
|
|
||||||
int dbref;
|
int dbref;
|
||||||
int rc;
|
int rc;
|
||||||
|
@ -1269,7 +1213,7 @@ static void check_all_databases_closed()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
WRITE_UNLOCK_DATABASES(G_DATABASES_RWLOCK, (ErlDrvPort)-1);
|
UNLOCK_DATABASES((ErlDrvPort)-1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -1288,19 +1232,19 @@ static void abort_txn(PortData* d)
|
||||||
static int delete_database(const char* name, PortData *data)
|
static int delete_database(const char* name, PortData *data)
|
||||||
{
|
{
|
||||||
// Go directly to a write lock on the global databases structure
|
// Go directly to a write lock on the global databases structure
|
||||||
WRITE_LOCK_DATABASES(G_DATABASES_RWLOCK, data->port);
|
LOCK_DATABASES(data->port);
|
||||||
|
|
||||||
// Make sure the database is not opened by anyone
|
// Make sure the database is not opened by anyone
|
||||||
if (hive_hash_get(G_DATABASES_NAMES, name))
|
if (hive_hash_get(G_DATABASES_NAMES, name))
|
||||||
{
|
{
|
||||||
WRITE_UNLOCK_DATABASES(G_DATABASES_RWLOCK, data->port);
|
UNLOCK_DATABASES(data->port);
|
||||||
return ERROR_DB_ACTIVE;
|
return ERROR_DB_ACTIVE;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Good, database doesn't seem to be open -- attempt the delete
|
// Good, database doesn't seem to be open -- attempt the delete
|
||||||
DBG("Attempting to delete database: %s\r\n", name);
|
DBG("Attempting to delete database: %s\r\n", name);
|
||||||
int rc = G_DB_ENV->dbremove(G_DB_ENV, 0, name, 0, DB_AUTO_COMMIT);
|
int rc = G_DB_ENV->dbremove(G_DB_ENV, 0, name, 0, DB_AUTO_COMMIT);
|
||||||
WRITE_UNLOCK_DATABASES(G_DATABASES_RWLOCK, data->port);
|
UNLOCK_DATABASES(data->port);
|
||||||
|
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue