From ade3f99f8151108b64495b51e3a2b51b7213338a Mon Sep 17 00:00:00 2001 From: Dave Smith Date: Sun, 7 Dec 2008 23:28:04 -0700 Subject: [PATCH] Open/Close now seems to be functional --- c_src/bdberl_drv.c | 420 +++++++++++++++++++++++++++++++++++++++++++- c_src/bdberl_drv.h | 30 +++- src/bdberl_port.erl | 40 ++++- 3 files changed, 479 insertions(+), 11 deletions(-) diff --git a/c_src/bdberl_drv.c b/c_src/bdberl_drv.c index fb0519b..d7e9bd6 100644 --- a/c_src/bdberl_drv.c +++ b/c_src/bdberl_drv.c @@ -13,6 +13,22 @@ #include "hive_hash.h" #include "bdberl_drv.h" +/** + * Function prototypes + */ +static int open_database(const char* name, DBTYPE type, PortData* data, int* errno); +static int close_database(int dbref, PortData* data); + +static int add_dbref(PortData* data, int dbref); +static int del_dbref(PortData* data, int dbref); + +static int add_portref(int dbref, ErlDrvPort port); +static int del_portref(int dbref, ErlDrvPort port); + +static int alloc_dbref(); + +static void* zalloc(unsigned int size); + /** * Global instance of DB_ENV; only a single one exists per O/S process. */ @@ -44,8 +60,20 @@ static ErlDrvRWLock* G_DATABASES_RWLOCK; static hive_hash* G_DATABASES_NAMES; + +/** + * Helpful macros + */ +#define READ_LOCK(L) erl_drv_rwlock_rlock(L) +#define READ_UNLOCK(L) erl_drv_rwlock_runlock(L) +#define PROMOTE_READ_LOCK(L) { erl_drv_rwlock_runlock(L); erl_drv_rwlock_rwlock(L); } +#define WRITE_LOCK(L) erl_drv_rwlock_rwlock(L) +#define WRITE_UNLOCK(L) erl_drv_rwlock_rwunlock(L) + + DRIVER_INIT(bdberl_drv) { + printf("DRIVER INIT\n"); // Setup flags we'll use to init the environment int flags = DB_INIT_LOCK | /* Enable support for locking */ @@ -95,6 +123,13 @@ DRIVER_INIT(bdberl_drv) static ErlDrvData bdberl_drv_start(ErlDrvPort port, char* buffer) { + // Make sure we have a functional environment -- if we don't, + // bail... + if (!G_DB_ENV) + { + return ERL_DRV_ERROR_BADARG; + } + PortData* d = (PortData*)driver_alloc(sizeof(PortData)); memset(d, '\0', sizeof(PortData)); @@ -109,20 +144,81 @@ static ErlDrvData bdberl_drv_start(ErlDrvPort port, char* buffer) static void bdberl_drv_stop(ErlDrvData handle) { + PortData* d = (PortData*)handle; + + // TODO: Terminate any txns + + // Close all the databases we previously opened + while (d->dbrefs) + { + close_database(d->dbrefs->dbref, d); + } + // Release the port instance data driver_free(handle); } +static void bdberl_drv_finish() +{ + // Driver is unloading -- cleanup and shut down the BDB environment. Note that we assume + // all ports have been released and thuse all databases/txns/etc are also gone. + G_DB_ENV->close(G_DB_ENV, 0); + + driver_free(G_DATABASES); + erl_drv_rwlock_destroy(G_DATABASES_RWLOCK); + hive_hash_destroy(G_DATABASES_NAMES); +} + static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, char* inbuf, int inbuf_sz, char** outbuf, int outbuf_sz) { - //PortData* d = (PortData*)handle; - DB* dbp; - db_create(&dbp, NULL, 0); + PortData* d = (PortData*)handle; - switch(cmd) + switch(cmd) { + case CMD_OPEN_DB: + { + // Extract the type code and filename from the inbuf + // Expect: Type:8, Name/bytes, NULL:8 + DBTYPE type = (DBTYPE)((char)*inbuf); + char* name = (char*)(inbuf+1); + int dbref; + int status; + int rc = open_database(name, type, d, &dbref); + if (rc == 0) + { + status = STATUS_OK; + } + else + { + status = STATUS_ERROR; + dbref = rc; + } + + // Pack the status and dbref (or errno) into a binary and return it + // Byte 0 : Status + // Byte 1..4: dbref/errno + ErlDrvBinary* result = driver_alloc_binary(5); + result->orig_bytes[0] = status; + memcpy(result->orig_bytes+1, (char*)&dbref, sizeof(dbref)); + *outbuf = (char*)result; + return result->orig_size; + } + case CMD_CLOSE_DB: + { + // TODO: If data is inflight, fail. Abort any open txns. + + // Take the provided dbref and attempt to close it + int dbref = *((int*)inbuf); + int rc = close_database(dbref, d); + + // Setup to return the rc + ErlDrvBinary* result = driver_alloc_binary(4); + memcpy(result->orig_bytes, (char*)&rc, sizeof(rc)); + *outbuf = (char*)result; + return result->orig_size; + } } *outbuf = 0; return 0; @@ -135,3 +231,319 @@ static void bdberl_drv_ready_async(ErlDrvData handle, ErlDrvThreadData thread_da static void bdberl_drv_process_exit(ErlDrvData handle, ErlDrvMonitor *monitor) { } + +static int open_database(const char* name, DBTYPE type, PortData* data, int* dbref_res) +{ + *dbref_res = -1; + + READ_LOCK(G_DATABASES_RWLOCK); + + // Look up the database by name in our hash table + Database* database = (Database*)hive_hash_get(G_DATABASES_NAMES, name); + if (database) + { + // Convert the database pointer into a dbref + int dbref = database - G_DATABASES; + + // Great, the database was previously opened by someone else. Add it to our + // list of refs, and if it's a new addition also register this port with the + // Database structure in G_DATABASES + if (add_dbref(data, dbref)) + { + // Need to update G_DATABASES -- grab the write lock + PROMOTE_READ_LOCK(G_DATABASES_RWLOCK); + + // Add a reference to this port + add_portref(dbref, data->port); + + // Release RW lock and return the ref + WRITE_UNLOCK(G_DATABASES_RWLOCK); + *dbref_res = dbref; + return 0; + } + else + { + // Already in our list of opened databases -- unlock and return the reference + READ_UNLOCK(G_DATABASES_RWLOCK); + *dbref_res = dbref; + return 0; + } + } + else + { + // This database hasn't been opened yet -- grab a write lock + PROMOTE_READ_LOCK(G_DATABASES_RWLOCK); + + // While waiting on the write lock, another thread could have slipped in and + // opened the database, so do one more check to see if the database is already + // open + database = (Database*)hive_hash_get(G_DATABASES_NAMES, name); + if (database) + { + // Database got created while we were waiting on the write lock, add a reference + // to our port and drop the lock ASAP + int dbref = database - G_DATABASES; + add_portref(dbref, data->port); + WRITE_UNLOCK(G_DATABASES_RWLOCK); + + add_dbref(data, dbref); + *dbref_res = dbref; + return 0; + } + + // Database hasn't been created while we were waiting on write lock, so + // create/open it + + // Find the first available slot in G_DATABASES; the index will be our + // reference for database operations + int dbref = alloc_dbref(); + if (dbref < 0) + { + // No more slots available + WRITE_UNLOCK(G_DATABASES_RWLOCK); + return ERROR_MAX_DBS; + } + + // Create the DB handle + DB* db; + int rc = db_create(&db, G_DB_ENV, 0); + if (rc != 0) + { + // Failure while creating the database handle -- drop our lock and return + // the code + WRITE_UNLOCK(G_DATABASES_RWLOCK); + return rc; + } + + // Attempt to open our database + rc = db->open(db, 0, name, 0, type, DB_CREATE | DB_AUTO_COMMIT | DB_THREAD, 0); + if (rc != 0) + { + // Failure while opening the database -- cleanup the handle, drop the lock + // and return + db->close(db, 0); + WRITE_UNLOCK(G_DATABASES_RWLOCK); + return rc; + } + + // Database is open. Store all the data into the allocated ref + G_DATABASES[dbref].db = db; + G_DATABASES[dbref].name = strdup(name); + G_DATABASES[dbref].ports = zalloc(sizeof(PortList)); + G_DATABASES[dbref].ports->port = data->port; + + // Make entry in hash table of names + hive_hash_add(G_DATABASES_NAMES, G_DATABASES[dbref].name, &(G_DATABASES[dbref])); + + // Drop the write lock + WRITE_UNLOCK(G_DATABASES_RWLOCK); + + // Add the dbref to the port list + add_dbref(data, dbref); + *dbref_res = dbref; + return 0; + } +} + +static int close_database(int dbref, PortData* data) +{ + printf("Closing %d for port %d\n", dbref, (int)data->port); + + // Remove this database from our list + if (del_dbref(data, dbref)) + { + // Something was actually deleted from our list -- now we need to disassociate the + // calling port with the global database structure. + WRITE_LOCK(G_DATABASES_RWLOCK); + + assert(G_DATABASES[dbref].db != 0); + assert(G_DATABASES[dbref].ports != 0); + + // Now disassociate this port from the database's port list + del_portref(dbref, data->port); + + // Finally, if there are no other references to the database, close out + // the database completely + Database* database = &G_DATABASES[dbref]; + if (database->ports == 0) + { + printf("Closing actual database for dbref %d\n", dbref); + // Close out the BDB handle + database->db->close(database->db, 0); + + // Remove the entry from the names map + hive_hash_remove(G_DATABASES_NAMES, database->name); + free((char*)database->name); + + // Zero out the whole record + memset(database, '\0', sizeof(Database)); + } + + WRITE_UNLOCK(G_DATABASES_RWLOCK); + return 1; + } + + return 0; +} + +static void* zalloc(unsigned int size) +{ + void* res = driver_alloc(size); + memset(res, '\0', size); + return res; +} + +#define zfree(p) driver_free(p) + +static int add_portref(int dbref, ErlDrvPort port) +{ + PortList* current = G_DATABASES[dbref].ports; + if (current) + { + PortList* last = 0; + do + { + // If the current item matches our port, bail -- nothing to do here + if (current->port == port) + { + return 0; + } + + last = current; + current = current->next; + } while (current != 0); + + // At the end of the list -- allocate a new entry for this por + current = (PortList*)zalloc(sizeof(PortList)); + current->port = port; + last->next = current; + return 1; + } + else + { + // Current was initially NULL, so alloc the first one and add it. + current = zalloc(sizeof(PortList)); + current->port = port; + G_DATABASES[dbref].ports = current; + return 1; + } +} + +static int del_portref(int dbref, ErlDrvPort port) +{ + PortList* current = G_DATABASES[dbref].ports; + PortList* last = 0; + while (current) + { + if (current->port == port) + { + // Found our match -- look back and connect the last item to our next + if (last) + { + last->next = current->next; + } + else + { + G_DATABASES[dbref].ports = current->next; + } + + // Delete this entry + zfree(current); + return 1; + } + + last = current; + current = current->next; + } + + // Didn't delete anything + return 0; +} + +/** + * Add a db reference to a port's DbRefList. Returns 1 if added; 0 if already present + */ +static int add_dbref(PortData* data, int dbref) +{ + DbRefList* current = data->dbrefs; + if (current) + { + DbRefList* last = 0; + do + { + if (current->dbref == dbref) + { + return 0; + } + + last = current; + current = current->next; + } while (current != 0); + + // At the end of the list -- allocate a new entry + current = zalloc(sizeof(DbRefList)); + current->dbref = dbref; + last->next = current; + return 1; + } + else + { + // Current was initially NULL, so alloc the first one + current = zalloc(sizeof(DbRefList)); + current->dbref = dbref; + data->dbrefs = current; + return 1; + } +} + +/** + * Delete a db reference from a port's DbRefList. Returns 1 if deleted; 0 if not + */ +static int del_dbref(PortData* data, int dbref) +{ + DbRefList* current = data->dbrefs; + DbRefList* last = 0; + while (current) + { + if (current->dbref == dbref) + { + // Found our match -- look back and connect the last item to our next + if (last) + { + last->next = current->next; + } + else + { + data->dbrefs = current->next; + } + + // Delete this entry + zfree(current); + return 1; + } + + last = current; + current = current->next; + } + + // Didn't delete anything + return 0; +} + +/** + * Allocate a Database structure; find first available slot in G_DATABASES and return the + * index of it. If no free slots are available, return -1 + */ +static int alloc_dbref() +{ + int i; + for (i = 0; i < G_DATABASES_SIZE; i++) + { + if (G_DATABASES[i].db == 0) + { + return i; + } + } + + return -1; +} diff --git a/c_src/bdberl_drv.h b/c_src/bdberl_drv.h index ee1318d..caec772 100644 --- a/c_src/bdberl_drv.h +++ b/c_src/bdberl_drv.h @@ -17,6 +17,8 @@ static ErlDrvData bdberl_drv_start(ErlDrvPort port, char* buffer); static void bdberl_drv_stop(ErlDrvData handle); +static void bdberl_drv_finish(); + static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, char* inbuf, int inbuf_sz, char** outbuf, int outbuf_sz); @@ -37,6 +39,24 @@ static void bdberl_drv_process_exit(ErlDrvData handle, ErlDrvMonitor *monitor); #define CMD_PUT 6 #define CMD_PUT_ATOMIC 7 +/** + * Command status values + */ +#define STATUS_OK 0 +#define STATUS_ERROR 1 + +/** + * Database Types (see db.h) + */ +#define DB_TYPE_BTREE DB_BTREE /* 1 */ +#define DB_TYPE_HASH DB_HASH /* 2 */ + +/** + * Error codes -- chosen so that we do not conflict with other packages, particularly + * db.h. We use error namespace from -29000 to -29500. + */ +#define ERROR_MAX_DBS (-29000) /* System can not open any further databases */ + /** * Driver Entry */ @@ -49,7 +69,7 @@ ErlDrvEntry bdberl_drv_entry = NULL, /* F_PTR ready_input, called when input descriptor ready */ NULL, /* F_PTR ready_output, called when output descriptor ready */ "bdberl_drv", /* driver_name */ - NULL, /* F_PTR finish, called when unloaded */ + bdberl_drv_finish, /* F_PTR finish, called when unloaded */ NULL, /* handle */ bdberl_drv_control, /* F_PTR control, port_command callback */ NULL, /* F_PTR timeout, reserved */ @@ -66,17 +86,17 @@ ErlDrvEntry bdberl_drv_entry = bdberl_drv_process_exit /* F_PTR process_exit */ }; -typedef struct +typedef struct _DbRefList { unsigned int dbref; - struct DbRefList* next; + struct _DbRefList* next; } DbRefList; -typedef struct +typedef struct _PortList { ErlDrvPort port; - struct PortList* next; + struct _PortList* next; } PortList; diff --git a/src/bdberl_port.erl b/src/bdberl_port.erl index 54e2edc..1626509 100644 --- a/src/bdberl_port.erl +++ b/src/bdberl_port.erl @@ -6,9 +6,45 @@ %% ------------------------------------------------------------------- -module(bdberl_port). --export([open/0]). +-export([new/0, + open_database/3, + close_database/2]). -open() -> +-define(CMD_OPEN_DB, 0). +-define(CMD_CLOSE_DB, 1). + +-define(DB_TYPE_BTREE, 1). +-define(DB_TYPE_HASH, 2). + +-define(STATUS_OK, 0). +-define(STATUS_ERROR, 1). + +new() -> ok = erl_ddll:load_driver(code:priv_dir(bdberl), bdberl_drv), Port = open_port({spawn, bdberl_drv}, [binary]), {ok, Port}. + + +open_database(Port, Name, Type) -> + %% Map database type into an integer code + case Type of + btree -> TypeCode = ?DB_TYPE_BTREE; + hash -> TypeCode = ?DB_TYPE_HASH + end, + Cmd = <>, + case erlang:port_control(Port, ?CMD_OPEN_DB, Cmd) of + <> -> + {ok, DbRef}; + <> -> + {error, Errno} + end. + +close_database(Port, DbRef) -> + Cmd = <>, + case erlang:port_control(Port, ?CMD_CLOSE_DB, Cmd) of + <<0:32/native-integer>> -> + {error, invalid_dbref}; + <<1:32/native-integer>> -> + ok + end. +