diff --git a/c_src/bdberl_drv.c b/c_src/bdberl_drv.c index 756cab2..8bfbf52 100644 --- a/c_src/bdberl_drv.c +++ b/c_src/bdberl_drv.c @@ -77,7 +77,7 @@ static int open_database(const char* name, DBTYPE type, unsigned int flags, Port static int close_database(int dbref, unsigned flags, PortData* data); static void check_all_databases_closed(); -static int delete_database(const char* name); +static int delete_database(const char* name, PortData *data); static void get_info(int target, void* values, BinHelper* bh); @@ -136,8 +136,11 @@ static int G_DB_ENV_ERROR = 0; * G_DATABASES_RWLOCK. */ static Database* G_DATABASES = 0; +static Database* G_DATABASES_SHADOW = 0; static unsigned int G_DATABASES_SIZE = 0; static ErlDrvRWLock* G_DATABASES_RWLOCK = 0; +static volatile ErlDrvTid G_DATABASES_RWLOCK_TID = 0; +static volatile ErlDrvPort G_DATABASES_RWLOCK_PORT = 0; static hive_hash* G_DATABASES_NAMES = 0; @@ -181,6 +184,8 @@ static int G_BDBERL_PIPE[2] = {-1, -1}; * no pid/port is available, no callback is registered with BDB. */ static ErlDrvRWLock* G_LOG_RWLOCK = 0; +//static ErlDrvTid G_LOG_RWLOCK_TID = 0; +//static ErlDrvPort G_LOG_RWLOCK_PORT = 0; static ErlDrvTermData G_LOG_PID; static ErlDrvPort G_LOG_PORT; @@ -201,12 +206,6 @@ static TPool* G_TPOOL_TXNS = NULL; /** * Helpful macros */ -#define READ_LOCK(L) erl_drv_rwlock_rlock(L) -#define READ_UNLOCK(L) erl_drv_rwlock_runlock(L) -#define PROMOTE_READ_LOCK(L) erl_drv_rwlock_runlock(L); erl_drv_rwlock_rwlock(L) -#define WRITE_LOCK(L) erl_drv_rwlock_rwlock(L) -#define WRITE_UNLOCK(L) erl_drv_rwlock_rwunlock(L) - #ifdef DEBUG # define DBG(...) fprintf(stderr, __VA_ARGS__) static void DBGCMD(PortData *d, const char *fmt, ...); @@ -218,6 +217,70 @@ static void DBGCMDRC(PortData *d, int rc); #endif + +void READ_LOCK_DATABASES(void *x, ErlDrvPort P) +{ + ErlDrvTid self = erl_drv_thread_self(); + DBG("threadid %p port %p: read locking G_DATABASES\r\n", self, P); + erl_drv_rwlock_rlock(G_DATABASES_RWLOCK); + assert(0 == G_DATABASES_RWLOCK_TID); + assert(0 == G_DATABASES_RWLOCK_PORT); + G_DATABASES_RWLOCK_TID = self; + G_DATABASES_RWLOCK_PORT = P; + assert(0 == memcmp(G_DATABASES, G_DATABASES_SHADOW, sizeof(G_DATABASES[0])*G_DATABASES_SIZE)); + DBG("threadid %p port %p: read locked G_DATABASES\r\n", self, P); +} + +void READ_UNLOCK_DATABASES(void *x, ErlDrvPort P) +{ + ErlDrvTid self = erl_drv_thread_self(); + DBG("threadid %p port %p: read unlocking G_DATABASES\r\n", self, P); + assert(erl_drv_thread_self() == G_DATABASES_RWLOCK_TID); + assert(P == G_DATABASES_RWLOCK_PORT); + assert(0 == memcmp(G_DATABASES, G_DATABASES_SHADOW, sizeof(G_DATABASES[0])*G_DATABASES_SIZE)); + G_DATABASES_RWLOCK_TID = 0; + G_DATABASES_RWLOCK_PORT = 0; + erl_drv_rwlock_runlock(G_DATABASES_RWLOCK); + DBG("threadid %p port %p: read unlocked G_DATABASES\r\n", self, P); +} + +#define PROMOTE_READ_LOCK_DATABASES(L, P) \ + READ_UNLOCK_DATABASES(L, P); \ + WRITE_LOCK_DATABASES(L, P); + +void WRITE_LOCK_DATABASES(void *x, ErlDrvPort P) +{ + ErlDrvTid self = erl_drv_thread_self(); + DBG("threadid %p port %p: write locking G_DATABASES\r\n", self, P); + erl_drv_rwlock_rwlock(G_DATABASES_RWLOCK); + assert(0 == memcmp(G_DATABASES, G_DATABASES_SHADOW, sizeof(G_DATABASES[0])*G_DATABASES_SIZE)); + assert(0 == G_DATABASES_RWLOCK_TID); + assert(0 == G_DATABASES_RWLOCK_PORT); + G_DATABASES_RWLOCK_TID = self; + G_DATABASES_RWLOCK_PORT = P; + DBG("threadid %p port %p: write locked G_DATABASES\r\n", self, P); +} + +void WRITE_UNLOCK_DATABASES(void *x, ErlDrvPort P) +{ + ErlDrvTid self = erl_drv_thread_self(); + DBG("threadid %p port %p: write unlocking G_DATABASES\r\n", self, P); + assert(erl_drv_thread_self() == G_DATABASES_RWLOCK_TID); + assert(P == G_DATABASES_RWLOCK_PORT); + memcpy(G_DATABASES_SHADOW, G_DATABASES, sizeof(G_DATABASES[0])*G_DATABASES_SIZE); + G_DATABASES_RWLOCK_TID = 0; + G_DATABASES_RWLOCK_PORT = 0; + erl_drv_rwlock_rwunlock(G_DATABASES_RWLOCK); + DBG("threadid %p port %p: write unlocked G_DATABASES\r\n", self, P); +} + +#define READ_LOCK(L) erl_drv_rwlock_rlock(L) +#define READ_UNLOCK(L) erl_drv_rwlock_runlock(L) +#define PROMOTE_READ_LOCK(L) erl_drv_rwlock_runlock(L); erl_drv_rwlock_rwlock(L) +#define WRITE_LOCK(L) erl_drv_rwlock_rwlock(L) +#define WRITE_UNLOCK(L) erl_drv_rwlock_rwunlock(L) + + DRIVER_INIT(bdberl_drv) { DBG("DRIVER INIT\r\n"); @@ -307,7 +370,9 @@ DRIVER_INIT(bdberl_drv) // BDB is setup -- allocate structures for tracking databases G_DATABASES = (Database*) driver_alloc(sizeof(Database) * G_DATABASES_SIZE); + G_DATABASES_SHADOW = (Database*) driver_alloc(sizeof(Database) * G_DATABASES_SIZE); memset(G_DATABASES, '\0', sizeof(Database) * G_DATABASES_SIZE); + memcpy(G_DATABASES_SHADOW, G_DATABASES, sizeof(Database) * G_DATABASES_SIZE); G_DATABASES_RWLOCK = erl_drv_rwlock_create("bdberl_drv: G_DATABASES_RWLOCK"); G_DATABASES_NAMES = hive_hash_new(G_DATABASES_SIZE); @@ -383,7 +448,7 @@ static void bdberl_drv_stop(ErlDrvData handle) { PortData* d = (PortData*)handle; - DBG("Stopping port %p - cancelling async jobs\r\n", d->port); + DBG("Stopping port %p\r\n", d->port); // Grab the port lock, in case we have an async job running erl_drv_mutex_lock(d->port_lock); @@ -392,12 +457,13 @@ static void bdberl_drv_stop(ErlDrvData handle) // block until the job has either been removed or has run if (d->async_job) { + DBGCMD(d, "Stopping port %p - cancelling async job %p\r\n", d->port, d->async_job); + // Drop the lock prior to starting the wait for the async process erl_drv_mutex_unlock(d->port_lock); - DBG("Cancelling async job for port: %p\r\n", d->port); bdberl_tpool_cancel(d->async_pool, d->async_job); - DBG("Canceled async job for port: %p\r\n", d->port); + DBGCMD(d, "Canceled async job for port: %p\r\n", d->port); } else { @@ -524,6 +590,11 @@ static void bdberl_drv_finish() driver_free(G_DATABASES); G_DATABASES = NULL; } + if (NULL != G_DATABASES_SHADOW) + { + driver_free(G_DATABASES_SHADOW); + G_DATABASES = NULL; + } if (NULL != G_DATABASES_RWLOCK) { erl_drv_rwlock_destroy(G_DATABASES_RWLOCK); @@ -662,6 +733,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, // Mark the port as busy and then schedule the appropriate async operation d->async_op = cmd; + d->async_dbref = dbref; TPoolJobFunc fn; if (cmd == CMD_PUT || cmd == CMD_PUT_COMMIT) { @@ -762,7 +834,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, // Inbuf is: << dbname/bytes, 0:8 >> const char* dbname = UNPACK_STRING(inbuf, 0); - int rc = delete_database(dbname); + int rc = delete_database(dbname, d); bdberl_send_rc(d->port, d->port_owner, rc); RETURN_INT(0, outbuf); } @@ -955,7 +1027,7 @@ DB* bdberl_lookup_dbref(int dbref) assert(NULL != G_DATABASES); assert(0 <= dbref); assert(G_DATABASES_SIZE > dbref); - + assert(NULL != G_DATABASES[dbref].db); return G_DATABASES[dbref].db; } @@ -977,7 +1049,7 @@ static int open_database(const char* name, DBTYPE type, unsigned int flags, Port { *dbref_res = -1; - READ_LOCK(G_DATABASES_RWLOCK); + READ_LOCK_DATABASES(G_DATABASES_RWLOCK, data->port); // Look up the database by name in our hash table Database* database = (Database*)hive_hash_get(G_DATABASES_NAMES, name); @@ -992,20 +1064,20 @@ static int open_database(const char* name, DBTYPE type, unsigned int flags, Port if (add_dbref(data, dbref)) { // Need to update G_DATABASES -- grab the write lock - PROMOTE_READ_LOCK(G_DATABASES_RWLOCK); + PROMOTE_READ_LOCK_DATABASES(G_DATABASES_RWLOCK, data->port); // Add a reference to this port add_portref(dbref, data->port); // Release RW lock and return the ref - WRITE_UNLOCK(G_DATABASES_RWLOCK); + WRITE_UNLOCK_DATABASES(G_DATABASES_RWLOCK, data->port); *dbref_res = dbref; return 0; } else { // Already in our list of opened databases -- unlock and return the reference - READ_UNLOCK(G_DATABASES_RWLOCK); + READ_UNLOCK_DATABASES(G_DATABASES_RWLOCK, data->port); *dbref_res = dbref; return 0; } @@ -1013,7 +1085,7 @@ static int open_database(const char* name, DBTYPE type, unsigned int flags, Port else { // This database hasn't been opened yet -- grab a write lock - PROMOTE_READ_LOCK(G_DATABASES_RWLOCK); + PROMOTE_READ_LOCK_DATABASES(G_DATABASES_RWLOCK, data->port); // While waiting on the write lock, another thread could have slipped in and // opened the database, so do one more check to see if the database is already @@ -1025,7 +1097,7 @@ static int open_database(const char* name, DBTYPE type, unsigned int flags, Port // to our port and drop the lock ASAP int dbref = database - G_DATABASES; add_portref(dbref, data->port); - WRITE_UNLOCK(G_DATABASES_RWLOCK); + WRITE_UNLOCK_DATABASES(G_DATABASES_RWLOCK, data->port); add_dbref(data, dbref); *dbref_res = dbref; @@ -1041,7 +1113,7 @@ static int open_database(const char* name, DBTYPE type, unsigned int flags, Port if (dbref < 0) { // No more slots available - WRITE_UNLOCK(G_DATABASES_RWLOCK); + WRITE_UNLOCK_DATABASES(G_DATABASES_RWLOCK, data->port); return ERROR_MAX_DBS; } @@ -1054,7 +1126,7 @@ static int open_database(const char* name, DBTYPE type, unsigned int flags, Port { // Failure while creating the database handle -- drop our lock and return // the code - WRITE_UNLOCK(G_DATABASES_RWLOCK); + WRITE_UNLOCK_DATABASES(G_DATABASES_RWLOCK, data->port); return rc; } @@ -1076,21 +1148,23 @@ static int open_database(const char* name, DBTYPE type, unsigned int flags, Port // Failure while opening the database -- cleanup the handle, drop the lock // and return db->close(db, 0); - WRITE_UNLOCK(G_DATABASES_RWLOCK); + WRITE_UNLOCK_DATABASES(G_DATABASES_RWLOCK, data->port); return rc; } // Database is open. Store all the data into the allocated ref + assert(NULL != db); G_DATABASES[dbref].db = db; G_DATABASES[dbref].name = strdup(name); G_DATABASES[dbref].ports = zalloc(sizeof(PortList)); G_DATABASES[dbref].ports->port = data->port; + G_DATABASES[dbref].active_ports = 1; // Make entry in hash table of names hive_hash_add(G_DATABASES_NAMES, G_DATABASES[dbref].name, &(G_DATABASES[dbref])); // Drop the write lock - WRITE_UNLOCK(G_DATABASES_RWLOCK); + WRITE_UNLOCK_DATABASES(G_DATABASES_RWLOCK, data->port); // Add the dbref to the port list add_dbref(data, dbref); @@ -1106,7 +1180,7 @@ static int close_database(int dbref, unsigned flags, PortData* data) { // Something was actually deleted from our list -- now we need to disassociate the // calling port with the global database structure. - WRITE_LOCK(G_DATABASES_RWLOCK); + WRITE_LOCK_DATABASES(G_DATABASES_RWLOCK, data->port); assert(G_DATABASES[dbref].db != 0); assert(G_DATABASES[dbref].ports != 0); @@ -1133,7 +1207,7 @@ static int close_database(int dbref, unsigned flags, PortData* data) memset(database, '\0', sizeof(Database)); } - WRITE_UNLOCK(G_DATABASES_RWLOCK); + WRITE_UNLOCK_DATABASES(G_DATABASES_RWLOCK, data->port); return rc; } else @@ -1148,7 +1222,7 @@ static int close_database(int dbref, unsigned flags, PortData* data) // but it is very important for BDB to shutdown cleanly so a final check can't hurt. static void check_all_databases_closed() { - WRITE_LOCK(G_DATABASES_RWLOCK); + WRITE_LOCK_DATABASES(G_DATABASES_RWLOCK, (ErlDrvPort)-1); // use unlikely port number - have no port here. int dbref; int rc; @@ -1170,7 +1244,7 @@ static void check_all_databases_closed() } } - WRITE_UNLOCK(G_DATABASES_RWLOCK); + WRITE_UNLOCK_DATABASES(G_DATABASES_RWLOCK, (ErlDrvPort)-1); } @@ -1186,22 +1260,22 @@ static void abort_txn(PortData* d) } } -static int delete_database(const char* name) +static int delete_database(const char* name, PortData *data) { // Go directly to a write lock on the global databases structure - WRITE_LOCK(G_DATABASES_RWLOCK); + WRITE_LOCK_DATABASES(G_DATABASES_RWLOCK, data->port); // Make sure the database is not opened by anyone if (hive_hash_get(G_DATABASES_NAMES, name)) { - WRITE_UNLOCK(G_DATABASES_RWLOCK); + WRITE_UNLOCK_DATABASES(G_DATABASES_RWLOCK, data->port); return ERROR_DB_ACTIVE; } // Good, database doesn't seem to be open -- attempt the delete DBG("Attempting to delete database: %s\r\n", name); int rc = G_DB_ENV->dbremove(G_DB_ENV, 0, name, 0, DB_AUTO_COMMIT); - WRITE_UNLOCK(G_DATABASES_RWLOCK); + WRITE_UNLOCK_DATABASES(G_DATABASES_RWLOCK, data->port); return rc; } @@ -1509,8 +1583,8 @@ static void do_async_put(void* arg) { // Execute the actual put. All databases are opened with AUTO_COMMIT, so if msg->port->txn // is NULL, the put will still be atomic - DBGCMD(d, "db->put(%p, %p, %p, %p, %08X) key=%p(%d) value=%p(%d)", - db, d->txn, &key, &value, flags, key.data, key.size, value.data, value.size); + DBGCMD(d, "db->put(%p, %p, %p, %p, %08X) dbref %d key=%p(%d) value=%p(%d)", + db, d->txn, &key, &value, flags, dbref, key.data, key.size, value.data, value.size); rc = db->put(db, d->txn, &key, &value, flags); DBGCMDRC(d, rc); } @@ -2056,6 +2130,8 @@ static int alloc_dbref() { if (G_DATABASES[i].db == 0) { + assert(NULL == G_DATABASES[i].db); + assert(NULL == G_DATABASES[i].ports); return i; } }