Cleaned up driver

- removed the lock checks now we are happy with the mutex
 - replaced 'const == var' tests with 'var == const' to make code consistent.
This commit is contained in:
Jon Meredith 2009-06-19 13:20:38 -06:00 committed by Phillip Toland
parent 783acf09ba
commit 0affeb825c
2 changed files with 50 additions and 117 deletions

View file

@ -136,11 +136,8 @@ static int G_DB_ENV_ERROR = 0;
* G_DATABASES_MUTEX. * G_DATABASES_MUTEX.
*/ */
static Database* G_DATABASES = 0; static Database* G_DATABASES = 0;
static Database* G_DATABASES_SHADOW = 0;
static unsigned int G_DATABASES_SIZE = 0; static unsigned int G_DATABASES_SIZE = 0;
static ErlDrvMutex* G_DATABASES_MUTEX = 0; static ErlDrvMutex* G_DATABASES_MUTEX = 0;
static volatile ErlDrvTid G_DATABASES_MUTEX_TID = 0;
static volatile ErlDrvPort G_DATABASES_MUTEX_PORT = 0;
static hive_hash* G_DATABASES_NAMES = 0; static hive_hash* G_DATABASES_NAMES = 0;
@ -184,8 +181,6 @@ static int G_BDBERL_PIPE[2] = {-1, -1};
* no pid/port is available, no callback is registered with BDB. * no pid/port is available, no callback is registered with BDB.
*/ */
static ErlDrvRWLock* G_LOG_RWLOCK = 0; static ErlDrvRWLock* G_LOG_RWLOCK = 0;
//static ErlDrvTid G_LOG_RWLOCK_TID = 0;
//static ErlDrvPort G_LOG_RWLOCK_PORT = 0;
static ErlDrvTermData G_LOG_PID; static ErlDrvTermData G_LOG_PID;
static ErlDrvPort G_LOG_PORT; static ErlDrvPort G_LOG_PORT;
@ -208,67 +203,36 @@ static TPool* G_TPOOL_TXNS = NULL;
*/ */
#ifdef DEBUG #ifdef DEBUG
# define DBG(...) fprintf(stderr, __VA_ARGS__) # define DBG(...) fprintf(stderr, __VA_ARGS__)
static void DBGCMD(PortData *d, const char *fmt, ...); # define DBGCMD(P, ...) bdberl_dbgcmd(P, __VA_ARGS__)
static void DBGCMDRC(PortData *d, int rc); # define DBGCMDRC(P, ...) bdberl_dbgcmdrc(P, __VA_ARGS__)
static void bdberl_dbgcmd(PortData *d, const char *fmt, ...);
static void bdberl_dbgcmdrc(PortData *d, int rc);
#else #else
# define DBG(arg1,...) # define DBG(arg1,...)
# define DBGCMD(d, fmt, ...) # define DBGCMD(d, fmt, ...)
# define DBGCMDRC(d, rc) { while (0) { rc++; } } // otherwise get unused variable error # define DBGCMDRC(d, rc) { while (0) { rc++; } } // otherwise get unused variable error
#endif #endif
void CHECK_DATABASES(void)
{
int dbref;
for (dbref = 0; dbref < G_DATABASES_SIZE; dbref++)
{
// check the pointers are all NULL or all populated.
if (NULL == G_DATABASES[dbref].db)
{
assert(NULL == G_DATABASES[dbref].name);
assert(G_DATABASES[dbref].ports == NULL);
assert(G_DATABASES[dbref].active_ports == 0);
}
else
{
assert(NULL != G_DATABASES[dbref].name);
assert(G_DATABASES[dbref].ports != NULL);
assert(G_DATABASES[dbref].active_ports > 0);
}
}
}
void LOCK_DATABASES(ErlDrvPort P) #define LOCK_DATABASES(P) \
{ do \
ErlDrvTid self = erl_drv_thread_self(); { \
DBG("threadid %p port %p: locking G_DATABASES\r\n", self, P); DBG("threadid %p port %p: locking G_DATABASES\r\n", erl_drv_thread_self(), P); \
erl_drv_mutex_lock(G_DATABASES_MUTEX); erl_drv_mutex_lock(G_DATABASES_MUTEX); \
CHECK_DATABASES(); DBG("threadid %p port %p: locked G_DATABASES\r\n", erl_drv_thread_self(), P); \
assert(0 == G_DATABASES_MUTEX_TID); } while(0)
assert(0 == G_DATABASES_MUTEX_PORT);
G_DATABASES_MUTEX_TID = self;
G_DATABASES_MUTEX_PORT = P;
assert(0 == memcmp(G_DATABASES, G_DATABASES_SHADOW, sizeof(G_DATABASES[0])*G_DATABASES_SIZE));
DBG("threadid %p port %p: locked G_DATABASES\r\n", self, P);
}
void UNLOCK_DATABASES(ErlDrvPort P) #define UNLOCK_DATABASES(P) \
{ do \
ErlDrvTid self = erl_drv_thread_self(); { \
DBG("threadid %p port %p: unlocking G_DATABASES\r\n", self, P); DBG("threadid %p port %p: unlocking G_DATABASES\r\n", erl_drv_thread_self(), P); \
assert(self == G_DATABASES_MUTEX_TID); erl_drv_mutex_unlock(G_DATABASES_MUTEX); \
assert(P == G_DATABASES_MUTEX_PORT); DBG("threadid %p port %p: unlocked G_DATABASES\r\n", erl_drv_thread_self(), P); \
memcpy(G_DATABASES_SHADOW, G_DATABASES, sizeof(G_DATABASES[0])*G_DATABASES_SIZE); } while (0)
G_DATABASES_MUTEX_TID = 0;
G_DATABASES_MUTEX_PORT = 0;
CHECK_DATABASES();
erl_drv_mutex_unlock(G_DATABASES_MUTEX);
DBG("threadid %p port %p: unlocked G_DATABASES\r\n", self, P);
}
#define READ_LOCK(L) erl_drv_rwlock_rlock(L) #define READ_LOCK(L) erl_drv_rwlock_rlock(L)
#define READ_UNLOCK(L) erl_drv_rwlock_runlock(L) #define READ_UNLOCK(L) erl_drv_rwlock_runlock(L)
#define PROMOTE_READ_LOCK(L) erl_drv_rwlock_runlock(L); erl_drv_rwlock_rwlock(L)
#define WRITE_LOCK(L) erl_drv_rwlock_rwlock(L) #define WRITE_LOCK(L) erl_drv_rwlock_rwlock(L)
#define WRITE_UNLOCK(L) erl_drv_rwlock_rwunlock(L) #define WRITE_UNLOCK(L) erl_drv_rwlock_rwunlock(L)
@ -323,7 +287,7 @@ DRIVER_INIT(bdberl_drv)
if (G_DB_ENV_ERROR == 0) if (G_DB_ENV_ERROR == 0)
{ {
// Pipe for signalling the utility threads all is over. // Pipe for signalling the utility threads all is over.
assert(0 == pipe(G_BDBERL_PIPE)); assert(pipe(G_BDBERL_PIPE) == 0);
// Use the BDBERL_MAX_DBS environment value to determine the max # of // Use the BDBERL_MAX_DBS environment value to determine the max # of
// databases to permit the VM to open at once. Defaults to 1024. // databases to permit the VM to open at once. Defaults to 1024.
@ -362,9 +326,7 @@ DRIVER_INIT(bdberl_drv)
// BDB is setup -- allocate structures for tracking databases // BDB is setup -- allocate structures for tracking databases
G_DATABASES = (Database*) driver_alloc(sizeof(Database) * G_DATABASES_SIZE); G_DATABASES = (Database*) driver_alloc(sizeof(Database) * G_DATABASES_SIZE);
G_DATABASES_SHADOW = (Database*) driver_alloc(sizeof(Database) * G_DATABASES_SIZE);
memset(G_DATABASES, '\0', sizeof(Database) * G_DATABASES_SIZE); memset(G_DATABASES, '\0', sizeof(Database) * G_DATABASES_SIZE);
memcpy(G_DATABASES_SHADOW, G_DATABASES, sizeof(Database) * G_DATABASES_SIZE);
G_DATABASES_MUTEX = erl_drv_mutex_create("bdberl_drv: G_DATABASES_MUTEX"); G_DATABASES_MUTEX = erl_drv_mutex_create("bdberl_drv: G_DATABASES_MUTEX");
G_DATABASES_NAMES = hive_hash_new(G_DATABASES_SIZE); G_DATABASES_NAMES = hive_hash_new(G_DATABASES_SIZE);
@ -582,11 +544,6 @@ static void bdberl_drv_finish()
driver_free(G_DATABASES); driver_free(G_DATABASES);
G_DATABASES = NULL; G_DATABASES = NULL;
} }
if (NULL != G_DATABASES_SHADOW)
{
driver_free(G_DATABASES_SHADOW);
G_DATABASES = NULL;
}
if (NULL != G_DATABASES_MUTEX) if (NULL != G_DATABASES_MUTEX)
{ {
erl_drv_mutex_destroy(G_DATABASES_MUTEX); erl_drv_mutex_destroy(G_DATABASES_MUTEX);
@ -913,12 +870,12 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
// Find the log dir or use DB_HOME - error if not present // Find the log dir or use DB_HOME - error if not present
const char *lg_dir = NULL; const char *lg_dir = NULL;
int rc = G_DB_ENV->get_lg_dir(G_DB_ENV, &lg_dir); int rc = G_DB_ENV->get_lg_dir(G_DB_ENV, &lg_dir);
if (0 == rc && NULL == lg_dir) if (rc == 0 && NULL == lg_dir)
{ {
rc = G_DB_ENV->get_home(G_DB_ENV, &lg_dir); rc = G_DB_ENV->get_home(G_DB_ENV, &lg_dir);
} }
// Send info if we can get a dir, otherwise return the error // Send info if we can get a dir, otherwise return the error
if (0 == rc) if (rc == 0)
{ {
// send a dirinfo message - will send an error message on a NULL lg_dir // send a dirinfo message - will send an error message on a NULL lg_dir
send_dir_info(d->port, d->port_owner, lg_dir); send_dir_info(d->port, d->port_owner, lg_dir);
@ -960,12 +917,12 @@ static int check_non_neg_env(char *env, unsigned int *val_ptr)
{ {
errno = 0; errno = 0;
long long val = strtoll(val_str, NULL, 0); long long val = strtoll(val_str, NULL, 0);
if (0 == val && EINVAL == errno) if (val == 0 && errno == EINVAL)
{ {
fprintf(stderr, "Ignoring \"%s\" value \"%s\" - invalid value\r\n", env, val_str); fprintf(stderr, "Ignoring \"%s\" value \"%s\" - invalid value\r\n", env, val_str);
return 0; return 0;
} }
if (0 >= val || val > UINT_MAX) if (val <= 0 || val > UINT_MAX)
{ {
fprintf(stderr, "Ignoring \"%s\" value \"%lld\" - out of range\r\n", env, val); fprintf(stderr, "Ignoring \"%s\" value \"%lld\" - out of range\r\n", env, val);
return 0; return 0;
@ -1155,7 +1112,7 @@ static int close_database(int dbref, unsigned flags, PortData* data)
assert(G_DATABASES[dbref].ports != 0); assert(G_DATABASES[dbref].ports != 0);
// Now disassociate this port from the database's port list // Now disassociate this port from the database's port list
assert(1 == del_portref(dbref, data->port)); assert(del_portref(dbref, data->port) == 1);
// Finally, if there are no other references to the database, close out // Finally, if there are no other references to the database, close out
// the database completely // the database completely
@ -1297,7 +1254,7 @@ static void get_info(int target, void* values, BinHelper* bh)
// Get the log dir - according to BDB docs, if not set // Get the log dir - according to BDB docs, if not set
// the DB_HOME is used. // the DB_HOME is used.
int rc = G_DB_ENV->get_lg_dir(G_DB_ENV, &dir); int rc = G_DB_ENV->get_lg_dir(G_DB_ENV, &dir);
if (NULL == dir) if (dir == NULL)
{ {
if (0 != G_DB_ENV->get_home(G_DB_ENV, &dir)) if (0 != G_DB_ENV->get_home(G_DB_ENV, &dir))
{ {
@ -1379,7 +1336,7 @@ static int send_dir_info(ErlDrvPort port, ErlDrvTermData pid, const char *path)
struct statvfs svfs; struct statvfs svfs;
int rc; int rc;
if (NULL == path) if (path == NULL)
{ {
rc = EINVAL; rc = EINVAL;
} }
@ -1580,11 +1537,11 @@ static void do_async_get(void* arg)
{ {
// Payload is: << DbRef:32, Flags:32, KeyLen:32, Key:KeyLen >> // Payload is: << DbRef:32, Flags:32, KeyLen:32, Key:KeyLen >>
PortData* d = (PortData*)arg; PortData* d = (PortData*)arg;
// Get the database object, using the provided ref // Get the database object, using the provided ref
int dbref = UNPACK_INT(d->work_buffer, 0); int dbref = UNPACK_INT(d->work_buffer, 0);
DB* db = bdberl_lookup_dbref(dbref); DB* db = bdberl_lookup_dbref(dbref);
// Extract operation flags // Extract operation flags
unsigned flags = UNPACK_INT(d->work_buffer, 4); unsigned flags = UNPACK_INT(d->work_buffer, 4);
@ -1593,38 +1550,23 @@ static void do_async_get(void* arg)
DBT value; DBT value;
memset(&key, '\0', sizeof(DBT)); memset(&key, '\0', sizeof(DBT));
memset(&value, '\0', sizeof(DBT)); memset(&value, '\0', sizeof(DBT));
// Parse payload into DBT // Parse payload into DBT
key.size = UNPACK_INT(d->work_buffer, 8); key.size = UNPACK_INT(d->work_buffer, 8);
key.data = UNPACK_BLOB(d->work_buffer, 12); key.data = UNPACK_BLOB(d->work_buffer, 12);
// Allocate a buffer for the output value // Allocate a buffer for the output value
#ifdef USE_DRIVER_REALLOC
value.ulen = 4096;
value.data = driver_alloc(value.ulen);
value.flags = DB_DBT_USERMEM;
#else
value.flags = DB_DBT_MALLOC; value.flags = DB_DBT_MALLOC;
#endif
int rc = db->get(db, d->txn, &key, &value, flags); int rc = db->get(db, d->txn, &key, &value, flags);
#ifdef USE_DRIVER_REALLOC
while (rc == DB_BUFFER_SMALL)
{
// Grow our value buffer
value.ulen = value.size;
value.size = 0;
value.data = driver_realloc(value.data, value.ulen);
rc = db->get(db, d->txn, &key, &value, flags);
}
#endif
// Check CRC - first 4 bytes are CRC of rest of bytes // Check CRC - first 4 bytes are CRC of rest of bytes
if (0 == rc) if (rc == 0)
{ {
assert(value.size >= 4); assert(value.size >= 4);
uint32_t calc_crc32 = bdberl_crc32(value.data+4, value.size-4); uint32_t calc_crc32 = bdberl_crc32(value.data+4, value.size-4);
uint32_t buf_crc32 = *(uint32_t*) value.data; uint32_t buf_crc32 = *(uint32_t*) value.data;
if (calc_crc32 != buf_crc32) if (calc_crc32 != buf_crc32)
{ {
DBGCMD(d, "CRC-32 error on get data - buffer %08X calculated %08X.", DBGCMD(d, "CRC-32 error on get data - buffer %08X calculated %08X.",
@ -1632,23 +1574,18 @@ static void do_async_get(void* arg)
rc = ERROR_INVALID_VALUE; rc = ERROR_INVALID_VALUE;
} }
} }
// Cleanup transaction as necessary // Cleanup transaction as necessary
if (rc && rc != DB_NOTFOUND && d->txn) if (rc && rc != DB_NOTFOUND && d->txn)
{ {
d->txn->abort(d->txn); d->txn->abort(d->txn);
d->txn = 0; d->txn = 0;
} }
async_cleanup_and_send_kv(d, rc, &key, &value); async_cleanup_and_send_kv(d, rc, &key, &value);
// Finally, clean up value buffer (driver_send_term made a copy) // Finally, clean up value buffer (driver_send_term made a copy)
#ifdef USE_DRIVER_REALLOC free(value.data);
driver_free(value.data);
#else
if (value.data)
free(value.data);
#endif
} }
static void do_async_txnop(void* arg) static void do_async_txnop(void* arg)
@ -1712,7 +1649,7 @@ static void do_async_cursor_get(void* arg)
DBGCMDRC(d, rc); DBGCMDRC(d, rc);
// Check CRC - first 4 bytes are CRC of rest of bytes // Check CRC - first 4 bytes are CRC of rest of bytes
if (0 == rc) if (rc == 0)
{ {
assert(value.size >= 4); assert(value.size >= 4);
uint32_t calc_crc32 = bdberl_crc32(value.data+4, value.size-4); uint32_t calc_crc32 = bdberl_crc32(value.data+4, value.size-4);
@ -1803,42 +1740,42 @@ static void do_sync_data_dirs_info(PortData *d)
// Lookup the environment and add it if not explicitly included in the data_dirs // Lookup the environment and add it if not explicitly included in the data_dirs
int rc = G_DB_ENV->get_home(G_DB_ENV, &db_home); int rc = G_DB_ENV->get_home(G_DB_ENV, &db_home);
if (rc != 0 || NULL == db_home) if (rc != 0 || db_home == NULL)
{ {
// If no db_home we'll have to rely on whatever the global environment is configured with // If no db_home we'll have to rely on whatever the global environment is configured with
got_db_home = 1; got_db_home = 1;
} }
else else
{ {
if (NULL == realpath(db_home, db_home_realpath)) if (realpath(db_home, db_home_realpath) == NULL)
rc = errno; rc = errno;
} }
// Get the data first // Get the data first
rc = G_DB_ENV->get_data_dirs(G_DB_ENV, &data_dirs); rc = G_DB_ENV->get_data_dirs(G_DB_ENV, &data_dirs);
int i; int i;
for (i = 0; 0 == rc && NULL != data_dirs && NULL != data_dirs[i]; i++) for (i = 0; rc == 0 && data_dirs != NULL && data_dirs[i] != NULL; i++)
{ {
data_dir = data_dirs[i]; data_dir = data_dirs[i];
if (!got_db_home) if (!got_db_home)
{ {
// Get the real path of the data dir // Get the real path of the data dir
if (NULL == realpath(data_dir, data_dir_realpath)) if (realpath(data_dir, data_dir_realpath) == NULL)
{ {
rc = errno; rc = errno;
} }
else else
{ {
// Set got_db_home if it matches // Set got_db_home if it matches
if (0 == strcmp(data_dir_realpath, db_home_realpath)) if (strcmp(data_dir_realpath, db_home_realpath) == 0)
{ {
got_db_home = 1; got_db_home = 1;
} }
} }
} }
if (0 == rc) if (rc == 0)
{ {
rc = send_dir_info(d->port, d->port_owner, data_dir); rc = send_dir_info(d->port, d->port_owner, data_dir);
} }
@ -2099,8 +2036,8 @@ static int alloc_dbref()
{ {
if (G_DATABASES[i].db == 0) if (G_DATABASES[i].db == 0)
{ {
assert(NULL == G_DATABASES[i].db); assert(G_DATABASES[i].db == NULL);
assert(NULL == G_DATABASES[i].ports); assert(G_DATABASES[i].ports == NULL);
return i; return i;
} }
} }
@ -2154,9 +2091,9 @@ int util_thread_usleep(unsigned int usecs)
} }
done = 1; done = 1;
if (-1 == select(nfds, &fds, NULL, NULL, &tv)) if (select(nfds, &fds, NULL, NULL, &tv) == -1)
{ {
if (EINTR == errno) // a signal woke up select, back to sleep for us if (errno == EINTR) // a signal woke up select, back to sleep for us
{ {
done = 0; done = 0;
} }
@ -2288,8 +2225,7 @@ static void send_log_message(ErlDrvTermData* msg, int elements)
} }
#ifdef DEBUG #ifdef DEBUG
#if 1 static void bdberl_dbgcmd(PortData *d, const char *fmt, ...)
static void DBGCMD(PortData *d, const char *fmt, ...)
{ {
char buf[1024]; char buf[1024];
@ -2300,8 +2236,8 @@ static void DBGCMD(PortData *d, const char *fmt, ...)
(void)fprintf(stderr, "threadid %p port %p: %s\r\n", erl_drv_thread_self(), d->port, buf); (void)fprintf(stderr, "threadid %p port %p: %s\r\n", erl_drv_thread_self(), d->port, buf);
} }
#endif
static void DBGCMDRC(PortData *d, int rc) static void bdberl_dbgcmdrc(PortData *d, int rc)
{ {
(void)fprintf(stderr, "threadid %p port %p: rc = %s (%d)\r\n", (void)fprintf(stderr, "threadid %p port %p: rc = %s (%d)\r\n",
erl_drv_thread_self(), d->port, rc == 0 ? "ok" : bdberl_rc_to_atom_str(rc), rc); erl_drv_thread_self(), d->port, rc == 0 ? "ok" : bdberl_rc_to_atom_str(rc), rc);

View file

@ -112,7 +112,6 @@ typedef struct
DB* db; DB* db;
const char* name; const char* name;
PortList* ports; PortList* ports;
int active_ports;
} Database; } Database;
@ -130,8 +129,6 @@ typedef struct
DbRefList* dbrefs; /* List of databases that this port has opened */ DbRefList* dbrefs; /* List of databases that this port has opened */
int active_dbs; /* Count of entries in dbrefs (for debug) */
DB_TXN* txn; /* Transaction handle for this port; each port may only have 1 txn DB_TXN* txn; /* Transaction handle for this port; each port may only have 1 txn
* active */ * active */