/* ------------------------------------------------------------------- * * bdberl: Berkeley DB Driver for Erlang * Copyright (c) 2008 The Hive. All rights reserved. * * ------------------------------------------------------------------- */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include "hive_hash.h" #include "bdberl_drv.h" #include "bin_helper.h" /** * Function prototypes */ static int open_database(const char* name, DBTYPE type, unsigned int flags, PortData* data, int* dbref_res); static int close_database(int dbref, unsigned flags, PortData* data); static int delete_database(const char* name); static void get_info(int target, void* values, BinHelper* bh); static void do_async_put(void* arg); static void do_async_get(void* arg); static void do_async_txnop(void* arg); static void do_async_cursor_get(void* arg); static void do_async_truncate(void* arg); static void do_async_stat(void* arg); static void do_async_lock_stat(void* arg); static void do_async_log_stat(void* arg); static void do_async_memp_stat(void* arg); static void do_async_mutex_stat(void* arg); static void do_async_txn_stat(void* arg); static void do_sync_data_dirs_info(PortData *p); static int send_dir_info(ErlDrvPort port, ErlDrvTermData pid, const char *path); static void send_rc(ErlDrvPort port, ErlDrvTermData pid, int rc); static int add_dbref(PortData* data, int dbref); static int del_dbref(PortData* data, int dbref); static int has_dbref(PortData* data, int dbref); static int add_portref(int dbref, ErlDrvPort port); static int del_portref(int dbref, ErlDrvPort port); static int alloc_dbref(); static void* zalloc(unsigned int size); static void* deadlock_check(void* arg); static void* checkpointer(void* arg); static void bdb_errcall(const DB_ENV* dbenv, const char* errpfx, const char* msg); static void bdb_msgcall(const DB_ENV* dbenv, const char* msg); static void send_log_message(ErlDrvTermData* msg, int elements); /** * Global instance of DB_ENV; only a single one exists per O/S process. */ static DB_ENV* G_DB_ENV = 0; /** * Global variable to track the return code from opening the DB_ENV. We track this * value so as to provide a useful error code when the user attempts to open the * port and it fails due to an error that occurred when opening the environment. */ static int G_DB_ENV_ERROR = 0; /** * G_DATABASES is a global array of Database structs. Used to track currently opened DB* * handles and ensure that they get cleaned up when all ports which were using them exit or * explicitly close them. * * This array is allocated when the driver is first initialized and does not grow/shrink * dynamically. G_DATABASES_SIZE contains the size of the array. G_DATABASES_NAMES is a hash of * filenames to array index for an opened Database. * * All access to G_DATABASES and G_DATABASES_NAMES must be protected by the read/write lock * G_DATABASES_RWLOCK. */ static Database* G_DATABASES = 0; static int G_DATABASES_SIZE = 0; static ErlDrvRWLock* G_DATABASES_RWLOCK = 0; static hive_hash* G_DATABASES_NAMES = 0; /** * Deadlock detector thread variables. We run a single thread per VM to detect deadlocks within * our global environment. G_DEADLOCK_CHECK_INTERVAL is the time between runs in milliseconds. */ static ErlDrvTid G_DEADLOCK_THREAD = 0; static unsigned int G_DEADLOCK_CHECK_ACTIVE = 1; static unsigned int G_DEADLOCK_CHECK_INTERVAL = 100; /* Milliseconds between checks */ /** * Trickle writer for dirty pages. We run a single thread per VM to perform background * trickling of dirty pages to disk. G_TRICKLE_INTERVAL is the time between runs in seconds. */ static unsigned int G_TRICKLE_ACTIVE = 1; static unsigned int G_TRICKLE_INTERVAL = 60 * 5; /* Seconds between trickle writes */ static unsigned int G_TRICKLE_PERCENTAGE = 50; /* Desired % of clean pages in cache */ /** * Transaction checkpoint monitor. We run a single thread per VM to flush transaction * logs into the backing data store. G_CHECKPOINT_INTERVAL is the time between runs in seconds. * TODO The interval should be configurable. */ static ErlDrvTid G_CHECKPOINT_THREAD = 0; static unsigned int G_CHECKPOINT_ACTIVE = 1; static unsigned int G_CHECKPOINT_INTERVAL = 60 * 60; /* Seconds between checkpoints */ /** * Pipe to used to wake up the various monitors. Instead of just sleeping * they wait for an exceptional condition on the read fd of the pipe. When it is time to * shutdown, the driver closes the write fd and waits for the threads to be joined. */ static int G_BDBERL_PIPE[2] = {-1, -1}; /** * Lock, port and pid reference for relaying BDB output into the SASL logger. READ lock * is required to log data. WRITE lock is used when replacing the pid/port reference. If * no pid/port is available, no callback is registered with BDB. */ static ErlDrvRWLock* G_LOG_RWLOCK = 0; static ErlDrvTermData G_LOG_PID; static ErlDrvPort G_LOG_PORT; /** * Default page size to use for newly created databases */ static unsigned int G_PAGE_SIZE = 0; /** * */ static TPool* G_TPOOL_GENERAL = NULL; static TPool* G_TPOOL_TXNS = NULL; /** * Helpful macros */ #define READ_LOCK(L) erl_drv_rwlock_rlock(L) #define READ_UNLOCK(L) erl_drv_rwlock_runlock(L) #define PROMOTE_READ_LOCK(L) erl_drv_rwlock_runlock(L); erl_drv_rwlock_rwlock(L) #define WRITE_LOCK(L) erl_drv_rwlock_rwlock(L) #define WRITE_UNLOCK(L) erl_drv_rwlock_rwunlock(L) #define UNPACK_BYTE(_buf, _off) (_buf[_off]) #define UNPACK_INT(_buf, _off) *((int*)(_buf+(_off))) #define UNPACK_STRING(_buf, _off) (char*)(_buf+(_off)) #define UNPACK_BLOB(_buf, _off) (void*)(_buf+(_off)) #define RETURN_BH(bh, outbuf) *outbuf = (char*)bh.bin; return bh.offset; #define RETURN_INT(val, outbuf) { \ BinHelper bh; \ bin_helper_init(&bh); \ bin_helper_push_int32(&bh, val); \ RETURN_BH(bh, outbuf); } #define FAIL_IF_ASYNC_PENDING(d, outbuf) { \ erl_drv_mutex_lock(d->port_lock); \ if (d->async_op != CMD_NONE) { \ erl_drv_mutex_unlock(d->port_lock); \ RETURN_INT(ERROR_ASYNC_PENDING, outbuf); \ } else { \ erl_drv_mutex_unlock(d->port_lock); \ }} #define FAIL_IF_CURSOR_OPEN(d, outbuf) { \ if (NULL != d->cursor) \ { \ send_rc(d->port, d->port_owner, ERROR_CURSOR_OPEN); \ RETURN_INT(0, outbuf); \ }} #define FAIL_IF_NO_CURSOR(d, outbuf) { \ if (NULL == d->cursor) \ { \ send_rc(d->port, d->port_owner, ERROR_NO_CURSOR); \ RETURN_INT(0, outbuf); \ }} #define FAIL_IF_TXN_OPEN(d, outbuf) { \ if (NULL != d->txn) \ { \ send_rc(d->port, d->port_owner, ERROR_TXN_OPEN); \ RETURN_INT(0, outbuf); \ }} #define FAIL_IF_NO_TXN(d, outbuf) { \ if (NULL == d->txn) \ { \ send_rc(d->port, d->port_owner, ERROR_NO_TXN); \ RETURN_INT(0, outbuf); \ }} #ifdef DEBUG # define DBG printf #else # define DBG(arg1,...) #endif DRIVER_INIT(bdberl_drv) { DBG("DRIVER INIT\r\n"); // Setup flags we'll use to init the environment int flags = DB_INIT_LOCK | /* Enable support for locking */ DB_INIT_TXN | /* Enable support for transactions */ DB_INIT_MPOOL | /* Enable support for memory pools */ DB_RECOVER | /* Enable support for recovering from failures */ DB_CREATE | /* Create files as necessary */ DB_REGISTER | /* Run recovery if needed */ DB_USE_ENVIRON | /* Use DB_HOME environment variable */ DB_THREAD; /* Make the environment free-threaded */ // Check for environment flag which indicates we want to use DB_SYSTEM_MEM char* use_system_mem = getenv("BDBERL_SYSTEM_MEM"); if (use_system_mem != 0) { flags |= DB_SYSTEM_MEM; } // Initialize global environment -- use environment variable DB_HOME to // specify where the working directory is G_DB_ENV_ERROR = db_env_create(&G_DB_ENV, 0); if (G_DB_ENV_ERROR != 0) { G_DB_ENV = 0; } else { G_DB_ENV_ERROR = G_DB_ENV->open(G_DB_ENV, 0, flags, 0); if (G_DB_ENV_ERROR != 0) { // Something bad happened while initializing BDB; in this situation we // cleanup and set the environment to zero. Attempts to open ports will // fail and the user will have to sort out how to resolve the issue. G_DB_ENV->close(G_DB_ENV, 0); G_DB_ENV = 0; } } if (G_DB_ENV_ERROR == 0) { // Pipe for signalling the utility threads all is over. assert(0 == pipe(G_BDBERL_PIPE)); // Use the BDBERL_MAX_DBS environment value to determine the max # of // databases to permit the VM to open at once. Defaults to 1024. G_DATABASES_SIZE = 1024; char* max_dbs_str = getenv("BDBERL_MAX_DBS"); /* TODO: Use erl_drv_getenv */ if (max_dbs_str != 0) { G_DATABASES_SIZE = atoi(max_dbs_str); if (G_DATABASES_SIZE <= 0) { G_DATABASES_SIZE = 1024; } } // Use the BDBERL_TRICKLE_TIME and BDBERL_TRICKLE_PERCENTAGE to control how often // the trickle writer runs and what percentage of pages should be flushed. char* trickle_time_str = getenv("BDBERL_TRICKLE_TIME"); if (trickle_time_str != 0) { G_TRICKLE_INTERVAL = atoi(trickle_time_str); if (G_TRICKLE_INTERVAL <= 0) { G_TRICKLE_INTERVAL = 60 * 5; } } char* trickle_percentage_str = getenv("BDBERL_TRICKLE_PERCENTAGE"); if (trickle_percentage_str != 0) { G_TRICKLE_PERCENTAGE = atoi(trickle_percentage_str); if (G_TRICKLE_PERCENTAGE <= 0) { G_TRICKLE_PERCENTAGE = 50; } } // Initialize default page size char* page_size_str = getenv("BDBERL_PAGE_SIZE"); if (page_size_str != 0) { // Convert to integer and only set it if it is a power of 2. unsigned int page_size = atoi(page_size_str); if (page_size != 0 && ((page_size & (~page_size +1)) == page_size)) { G_PAGE_SIZE = page_size; } } // Make sure we can distiguish between lock timeouts and deadlocks G_DB_ENV->set_flags(G_DB_ENV, DB_TIME_NOTGRANTED, 1); // Initialization transaction timeout so that deadlock checking works properly db_timeout_t to = 500 * 1000; // 500 ms G_DB_ENV->set_timeout(G_DB_ENV, to, DB_SET_TXN_TIMEOUT); // BDB is setup -- allocate structures for tracking databases G_DATABASES = (Database*) driver_alloc(sizeof(Database) * G_DATABASES_SIZE); memset(G_DATABASES, '\0', sizeof(Database) * G_DATABASES_SIZE); G_DATABASES_RWLOCK = erl_drv_rwlock_create("bdberl_drv: G_DATABASES_RWLOCK"); G_DATABASES_NAMES = hive_hash_new(G_DATABASES_SIZE); // Startup deadlock check thread erl_drv_thread_create("bdberl_drv_deadlock_checker", &G_DEADLOCK_THREAD, &deadlock_check, 0, 0); // Use the BDBERL_CHECKPOINT_TIME environment value to determine the // interval between transaction checkpoints. Defaults to 1 hour. char* cp_int_str = getenv("BDBERL_CHECKPOINT_TIME"); /* TODO: Use erl_drv_getenv */ if (cp_int_str != 0) { G_CHECKPOINT_INTERVAL = atoi(cp_int_str); if (G_CHECKPOINT_INTERVAL <= 0) { G_CHECKPOINT_INTERVAL = 60 * 60; } } // Startup checkpoint thread erl_drv_thread_create("bdberl_drv_checkpointer", &G_CHECKPOINT_THREAD, &checkpointer, 0, 0); // Startup our thread pools // TODO: Make configurable/adjustable G_TPOOL_GENERAL = bdberl_tpool_start(10); G_TPOOL_TXNS = bdberl_tpool_start(10); // Initialize logging lock and refs G_LOG_RWLOCK = erl_drv_rwlock_create("bdberl_drv: G_LOG_RWLOCK"); G_LOG_PORT = 0; G_LOG_PID = 0; } else { DBG("DRIVER INIT FAILED - %s\r\n", db_strerror(G_DB_ENV_ERROR)); } return &bdberl_drv_entry; } static ErlDrvData bdberl_drv_start(ErlDrvPort port, char* buffer) { // Make sure we have a functional environment -- if we don't, // bail... if (!G_DB_ENV) { return ERL_DRV_ERROR_BADARG; } PortData* d = (PortData*)driver_alloc(sizeof(PortData)); memset(d, '\0', sizeof(PortData)); // Save handle to the port d->port = port; // Allocate a mutex for the port d->port_lock = erl_drv_mutex_create("bdberl_port_lock"); // Save the caller/owner PID d->port_owner = driver_connected(port); // Allocate an initial buffer for work purposes d->work_buffer = driver_alloc(4096); d->work_buffer_sz = 4096; // Make sure port is running in binary mode set_port_control_flags(port, PORT_CONTROL_FLAG_BINARY); return (ErlDrvData)d; } static void bdberl_drv_stop(ErlDrvData handle) { PortData* d = (PortData*)handle; // Grab the port lock, in case we have an async job running erl_drv_mutex_lock(d->port_lock); // If there is an async job pending, we need to cancel it. The cancel operation will // block until the job has either been removed or has run if (d->async_job) { // Drop the lock prior to starting the wait for the async process erl_drv_mutex_unlock(d->port_lock); DBG("Cancelling async job for port: %p\r\n", d->port); bdberl_tpool_cancel(d->async_pool, d->async_job); DBG("Canceled async job for port: %p\r\n", d->port); } else { // If there was no async job, drop the lock -- not needed erl_drv_mutex_unlock(d->port_lock); } // Cleanup the port lock erl_drv_mutex_destroy(d->port_lock); // If a cursor is open, close it if (d->cursor) { d->cursor->close(d->cursor); } // If a txn is currently active, terminate it. if (d->txn) { d->txn->abort(d->txn); } // Close all the databases we previously opened while (d->dbrefs) { close_database(d->dbrefs->dbref, 0, d); } // If this port was registered as the endpoint for logging, go ahead and // remove it. Note that we don't need to lock to check this since we only // unregister if it's already initialized to this port. if (G_LOG_PORT == d->port) { WRITE_LOCK(G_LOG_RWLOCK); // Remove the references G_LOG_PORT = 0; G_LOG_PID = 0; // Unregister with BDB -- MUST DO THIS WITH WRITE LOCK HELD! G_DB_ENV->set_msgcall(G_DB_ENV, 0); G_DB_ENV->set_errcall(G_DB_ENV, 0); WRITE_UNLOCK(G_LOG_RWLOCK); } DBG("Stopped port: %p\r\n", d->port); // Release the port instance data driver_free(d->work_buffer); driver_free(handle); } static void bdberl_drv_finish() { // Stop the thread pools if (NULL != G_TPOOL_GENERAL) { bdberl_tpool_stop(G_TPOOL_GENERAL); G_TPOOL_GENERAL = NULL; } if (NULL != G_TPOOL_TXNS) { bdberl_tpool_stop(G_TPOOL_TXNS); G_TPOOL_TXNS = NULL; } // Signal the utility threads time is up G_TRICKLE_ACTIVE = 0; G_DEADLOCK_CHECK_ACTIVE = 0; G_CHECKPOINT_ACTIVE = 0; // Close the writer fd on the pipe to signal finish to the utility threads if (-1 != G_BDBERL_PIPE[1]) { close(G_BDBERL_PIPE[1]); G_BDBERL_PIPE[1] = -1; } // Wait for the deadlock checker to shutdown -- then wait for it if (0 != G_DEADLOCK_THREAD) { erl_drv_thread_join(G_DEADLOCK_THREAD, 0); G_DEADLOCK_THREAD = 0; } // Wait for the checkpointer to shutdown -- then wait for it if (0 != G_CHECKPOINT_THREAD) { erl_drv_thread_join(G_CHECKPOINT_THREAD, 0); G_CHECKPOINT_THREAD = 0; } // Close the reader fd on the pipe now utility threads are closed if (-1 != G_BDBERL_PIPE[0]) { close(G_BDBERL_PIPE[0]); } G_BDBERL_PIPE[0] = -1; // Cleanup and shut down the BDB environment. Note that we assume // all ports have been released and thuse all databases/txns/etc are also gone. if (NULL != G_DB_ENV) { G_DB_ENV->close(G_DB_ENV, 0); G_DB_ENV = NULL; } if (NULL != G_DATABASES) { driver_free(G_DATABASES); G_DATABASES = NULL; } if (NULL != G_DATABASES_RWLOCK) { erl_drv_rwlock_destroy(G_DATABASES_RWLOCK); G_DATABASES_RWLOCK = NULL; } if (NULL != G_DATABASES_NAMES) { hive_hash_destroy(G_DATABASES_NAMES); G_DATABASES_NAMES = NULL; } // Release the logging rwlock if (NULL != G_LOG_RWLOCK) { erl_drv_rwlock_destroy(G_LOG_RWLOCK); G_LOG_RWLOCK = NULL; } DBG("DRIVER_FINISH\n"); } static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, char* inbuf, int inbuf_sz, char** outbuf, int outbuf_sz) { PortData* d = (PortData*)handle; switch(cmd) { case CMD_OPEN_DB: { // Extract the type code and filename from the inbuf // Inbuf is: <> unsigned flags = UNPACK_INT(inbuf, 0); DBTYPE type = (DBTYPE) UNPACK_BYTE(inbuf, 4); char* name = UNPACK_STRING(inbuf, 5); int dbref; int rc = open_database(name, type, flags, d, &dbref); // Queue up a message for bdberl:open to process if (rc == 0) // success: send {ok, DbRef} { ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("ok"), ERL_DRV_INT, dbref, ERL_DRV_TUPLE, 2}; driver_send_term(d->port, d->port_owner, response, sizeof(response) / sizeof(response[0])); } else // failure: send {error, atom() | {error, {unknown, Rc}} { send_rc(d->port, d->port_owner, rc); } // Outbuf is: <> - always send 0 and the driver will receive the real value RETURN_INT(0, outbuf); } case CMD_CLOSE_DB: { FAIL_IF_ASYNC_PENDING(d, outbuf); FAIL_IF_CURSOR_OPEN(d, outbuf); FAIL_IF_TXN_OPEN(d, outbuf); // Take the provided dbref and attempt to close it // Inbuf is: <> int dbref = UNPACK_INT(inbuf, 0); unsigned flags = (unsigned) UNPACK_INT(inbuf, 4); int rc = close_database(dbref, flags, d); // Queue up a message for bdberl:close to process send_rc(d->port, d->port_owner, rc); // Outbuf is: <> - always send 0 and the driver will receive the real value RETURN_INT(0, outbuf); } case CMD_TXN_BEGIN: { FAIL_IF_ASYNC_PENDING(d, outbuf); FAIL_IF_TXN_OPEN(d, outbuf); // Setup async command and schedule it on the txns threadpool d->async_op = cmd; d->async_flags = UNPACK_INT(inbuf, 0); d->async_pool = G_TPOOL_TXNS; bdberl_tpool_run(d->async_pool, &do_async_txnop, d, 0, &d->async_job); // Outbuf is <> RETURN_INT(0, outbuf); } case CMD_TXN_COMMIT: case CMD_TXN_ABORT: { FAIL_IF_ASYNC_PENDING(d, outbuf); FAIL_IF_NO_TXN(d, outbuf); // Setup async command and schedule it on the txns threadpool d->async_op = cmd; if (cmd == CMD_TXN_COMMIT) { d->async_flags = UNPACK_INT(inbuf, 0); } d->async_pool = G_TPOOL_TXNS; bdberl_tpool_run(d->async_pool, &do_async_txnop, d, 0, &d->async_job); // Outbuf is <> RETURN_INT(0, outbuf); } case CMD_PUT: case CMD_GET: case CMD_PUT_COMMIT: { FAIL_IF_ASYNC_PENDING(d, outbuf); // Put/commit requires a transaction to be active if (cmd == CMD_PUT_COMMIT && (!d->txn)) { send_rc(d->port, d->port_owner, ERROR_NO_TXN); RETURN_INT(0, outbuf); } // Inbuf is: << DbRef:32, Rest/binary>> int dbref = UNPACK_INT(inbuf, 0); // Make sure this port currently has dbref open -- if it doesn't, error out. Of note, // if it's in our list, we don't need to grab the RWLOCK, as we don't have to worry about // the underlying handle disappearing since we have a reference. if (has_dbref(d, dbref)) { // If the working buffer is large enough, copy the data to put/get into it. Otherwise, realloc // until it is large enough if (d->work_buffer_sz < inbuf_sz) { d->work_buffer = driver_realloc(d->work_buffer, inbuf_sz); d->work_buffer_sz = inbuf_sz; } // Copy the payload into place memcpy(d->work_buffer, inbuf, inbuf_sz); d->work_buffer_offset = inbuf_sz; // Mark the port as busy and then schedule the appropriate async operation d->async_op = cmd; TPoolJobFunc fn; if (cmd == CMD_PUT || cmd == CMD_PUT_COMMIT) { fn = &do_async_put; } else { assert(cmd == CMD_GET); fn = &do_async_get; } d->async_pool = G_TPOOL_GENERAL; bdberl_tpool_run(d->async_pool, fn, d, 0, &d->async_job); // Let caller know that the operation is in progress // Outbuf is: <<0:32>> RETURN_INT(0, outbuf); } else { // Invalid dbref send_rc(d->port, d->port_owner, ERROR_INVALID_DBREF); RETURN_INT(0, outbuf); } } case CMD_GETINFO: { // Inbuf is: << Target:32, Values/binary >> int target = UNPACK_INT(inbuf, 0); char* values = UNPACK_BLOB(inbuf, 4); // Execute the tuning -- the result to send back to the caller is wrapped // up in the provided binhelper BinHelper bh; get_info(target, values, &bh); RETURN_BH(bh, outbuf); } case CMD_CURSOR_OPEN: { FAIL_IF_ASYNC_PENDING(d, outbuf); FAIL_IF_CURSOR_OPEN(d, outbuf); // Inbuf is << DbRef:32, Flags:32 >> int dbref = UNPACK_INT(inbuf, 0); unsigned int flags = UNPACK_INT(inbuf, 4); // Make sure we have a reference to the requested database if (has_dbref(d, dbref)) { // Grab the database handle and open the cursor DB* db = G_DATABASES[dbref].db; int rc = db->cursor(db, d->txn, &(d->cursor), flags); send_rc(d->port, d->port_owner, rc); RETURN_INT(0, outbuf); } else { send_rc(d->port, d->port_owner, ERROR_INVALID_DBREF); RETURN_INT(0, outbuf); } } case CMD_CURSOR_CURR: case CMD_CURSOR_NEXT: case CMD_CURSOR_PREV: { FAIL_IF_ASYNC_PENDING(d, outbuf); FAIL_IF_NO_CURSOR(d, outbuf); // Schedule the operation d->async_op = cmd; d->async_pool = G_TPOOL_GENERAL; bdberl_tpool_run(d->async_pool, &do_async_cursor_get, d, 0, &d->async_job); // Let caller know operation is in progress RETURN_INT(0, outbuf); } case CMD_CURSOR_CLOSE: { FAIL_IF_ASYNC_PENDING(d, outbuf); FAIL_IF_NO_CURSOR(d, outbuf); // It's possible to get a deadlock when closing a cursor -- in that situation we also // need to go ahead and abort the txn int rc = d->cursor->close(d->cursor); if (d->txn && (rc == DB_LOCK_NOTGRANTED || rc == DB_LOCK_DEADLOCK)) { d->txn->abort(d->txn); d->txn = 0; } // Regardless of what happens, clear out the cursor pointer d->cursor = 0; // Send result code send_rc(d->port, d->port_owner, rc); RETURN_INT(0, outbuf); } case CMD_REMOVE_DB: { FAIL_IF_ASYNC_PENDING(d, outbuf); FAIL_IF_TXN_OPEN(d, outbuf); // Inbuf is: << dbname/bytes, 0:8 >> const char* dbname = UNPACK_STRING(inbuf, 0); int rc = delete_database(dbname); send_rc(d->port, d->port_owner, rc); RETURN_INT(0, outbuf); } case CMD_TRUNCATE: { FAIL_IF_ASYNC_PENDING(d, outbuf); FAIL_IF_CURSOR_OPEN(d, outbuf); // Inbuf is: <> int dbref = UNPACK_INT(inbuf, 0); // Make sure this port currently has dbref open -- if it doesn't, error out. Of note, // if it's in our list, we don't need to grab the RWLOCK, as we don't have to worry about // the underlying handle disappearing since we have a reference. if (dbref == -1 || has_dbref(d, dbref)) { memcpy(d->work_buffer, inbuf, inbuf_sz); // Mark the port as busy and then schedule the appropriate async operation d->async_op = cmd; d->async_pool = G_TPOOL_GENERAL; bdberl_tpool_run(d->async_pool, &do_async_truncate, d, 0, &d->async_job); // Let caller know that the operation is in progress // Outbuf is: <<0:32>> RETURN_INT(0, outbuf); } else { // Invalid dbref RETURN_INT(ERROR_INVALID_DBREF, outbuf); } } case CMD_REGISTER_LOGGER: { // If this port is not the current logger, make it so. Only one logger can be registered // at a time. if (G_LOG_PORT != d->port) { // Grab the write lock and update the global vars; also make sure to update BDB callbacks // within the write lock to avoid race conditions. WRITE_LOCK(G_LOG_RWLOCK); G_LOG_PORT = d->port; G_LOG_PID = driver_connected(d->port); G_DB_ENV->set_msgcall(G_DB_ENV, &bdb_msgcall); G_DB_ENV->set_errcall(G_DB_ENV, &bdb_errcall); WRITE_UNLOCK(G_LOG_RWLOCK); } *outbuf = 0; return 0; } case CMD_DB_STAT: { FAIL_IF_ASYNC_PENDING(d, outbuf); // Inbuf is << DbRef:32, Flags:32 >> int dbref = UNPACK_INT(inbuf, 0); // Make sure this port currently has dbref open -- if it doesn't, error out. Of note, // if it's in our list, we don't need to grab the RWLOCK, as we don't have to worry about // the underlying handle disappearing since we have a reference. if (has_dbref(d, dbref)) { // If the working buffer is large enough, copy the data to put/get into it. Otherwise, realloc // until it is large enough if (d->work_buffer_sz < inbuf_sz) { d->work_buffer = driver_realloc(d->work_buffer, inbuf_sz); d->work_buffer_sz = inbuf_sz; } // Copy the payload into place memcpy(d->work_buffer, inbuf, inbuf_sz); d->work_buffer_offset = inbuf_sz; // Mark the port as busy and then schedule the appropriate async operation d->async_op = cmd; d->async_pool = G_TPOOL_GENERAL; bdberl_tpool_run(d->async_pool, &do_async_stat, d, 0, &d->async_job); // Let caller know that the operation is in progress // Outbuf is: <<0:32>> RETURN_INT(0, outbuf); } else { // Invalid dbref RETURN_INT(ERROR_INVALID_DBREF, outbuf); } } case CMD_DB_STAT_PRINT: { FAIL_IF_ASYNC_PENDING(d, outbuf); // Inbuf is << DbRef:32, Flags:32 >> int dbref = UNPACK_INT(inbuf, 0); // Make sure this port currently has dbref open -- if it doesn't, error out. Of note, // if it's in our list, we don't need to grab the RWLOCK, as we don't have to worry about // the underlying handle disappearing since we have a reference. if (has_dbref(d, dbref)) { DB* db = G_DATABASES[dbref].db; unsigned int flags = UNPACK_INT(inbuf, 4); // Outbuf is <> // Run the command on the VM thread - this is for debugging only, // any real monitoring int rc = db->stat_print(db, flags); RETURN_INT(rc, outbuf); } else { // Invalid dbref RETURN_INT(ERROR_INVALID_DBREF, outbuf); } } case CMD_ENV_STAT_PRINT: { FAIL_IF_ASYNC_PENDING(d, outbuf); // Inbuf is << Flags:32 >> unsigned int flags = UNPACK_INT(inbuf, 0); // Outbuf is <> int rc = G_DB_ENV->stat_print(G_DB_ENV, flags); RETURN_INT(rc, outbuf); } case CMD_LOCK_STAT: { FAIL_IF_ASYNC_PENDING(d, outbuf); // Inbuf is <> // If the working buffer is large enough, copy the data to put/get into it. Otherwise, realloc // until it is large enough if (d->work_buffer_sz < inbuf_sz) { d->work_buffer = driver_realloc(d->work_buffer, inbuf_sz); d->work_buffer_sz = inbuf_sz; } // Copy the payload into place memcpy(d->work_buffer, inbuf, inbuf_sz); d->work_buffer_offset = inbuf_sz; // Mark the port as busy and then schedule the appropriate async operation d->async_op = cmd; d->async_pool = G_TPOOL_GENERAL; bdberl_tpool_run(d->async_pool, &do_async_lock_stat, d, 0, &d->async_job); // Let caller know that the operation is in progress // Outbuf is: <<0:32>> RETURN_INT(0, outbuf); } case CMD_LOCK_STAT_PRINT: { FAIL_IF_ASYNC_PENDING(d, outbuf); // Inbuf is << Flags:32 >> unsigned int flags = UNPACK_INT(inbuf, 0); // Outbuf is <> // Run the command on the VM thread - this is for debugging only, // any real monitoring will use the async lock_stat int rc = G_DB_ENV->lock_stat_print(G_DB_ENV, flags); RETURN_INT(rc, outbuf); } case CMD_LOG_STAT: { FAIL_IF_ASYNC_PENDING(d, outbuf); // Inbuf is <> // If the working buffer is large enough, copy the data to put/get into it. Otherwise, realloc // until it is large enough if (d->work_buffer_sz < inbuf_sz) { d->work_buffer = driver_realloc(d->work_buffer, inbuf_sz); d->work_buffer_sz = inbuf_sz; } // Copy the payload into place memcpy(d->work_buffer, inbuf, inbuf_sz); d->work_buffer_offset = inbuf_sz; // Mark the port as busy and then schedule the appropriate async operation d->async_op = cmd; d->async_pool = G_TPOOL_GENERAL; bdberl_tpool_run(d->async_pool, &do_async_log_stat, d, 0, &d->async_job); // Let caller know that the operation is in progress // Outbuf is: <<0:32>> RETURN_INT(0, outbuf); } case CMD_LOG_STAT_PRINT: { FAIL_IF_ASYNC_PENDING(d, outbuf); // Inbuf is << Flags:32 >> unsigned int flags = UNPACK_INT(inbuf, 0); // Outbuf is <> // Run the command on the VM thread - this is for debugging only, // any real monitoring will use the async lock_stat int rc = G_DB_ENV->log_stat_print(G_DB_ENV, flags); RETURN_INT(rc, outbuf); } case CMD_MEMP_STAT: { FAIL_IF_ASYNC_PENDING(d, outbuf); // Inbuf is <> // If the working buffer is large enough, copy the data to put/get into it. Otherwise, realloc // until it is large enough if (d->work_buffer_sz < inbuf_sz) { d->work_buffer = driver_realloc(d->work_buffer, inbuf_sz); d->work_buffer_sz = inbuf_sz; } // Copy the payload into place memcpy(d->work_buffer, inbuf, inbuf_sz); d->work_buffer_offset = inbuf_sz; // Mark the port as busy and then schedule the appropriate async operation d->async_op = cmd; d->async_pool = G_TPOOL_GENERAL; bdberl_tpool_run(d->async_pool, &do_async_memp_stat, d, 0, &d->async_job); // Let caller know that the operation is in progress // Outbuf is: <<0:32>> RETURN_INT(0, outbuf); } case CMD_MEMP_STAT_PRINT: { FAIL_IF_ASYNC_PENDING(d, outbuf); // Inbuf is << Flags:32 >> unsigned int flags = UNPACK_INT(inbuf, 0); // Outbuf is <> // Run the command on the VM thread - this is for debugging only, // any real monitoring will use the async lock_stat int rc = G_DB_ENV->memp_stat_print(G_DB_ENV, flags); RETURN_INT(rc, outbuf); } case CMD_MUTEX_STAT: { FAIL_IF_ASYNC_PENDING(d, outbuf); // Inbuf is <> // If the working buffer is large enough, copy the data to put/get into it. Otherwise, realloc // until it is large enough if (d->work_buffer_sz < inbuf_sz) { d->work_buffer = driver_realloc(d->work_buffer, inbuf_sz); d->work_buffer_sz = inbuf_sz; } // Copy the payload into place memcpy(d->work_buffer, inbuf, inbuf_sz); d->work_buffer_offset = inbuf_sz; // Mark the port as busy and then schedule the appropriate async operation d->async_op = cmd; d->async_pool = G_TPOOL_GENERAL; bdberl_tpool_run(d->async_pool, &do_async_mutex_stat, d, 0, &d->async_job); // Let caller know that the operation is in progress // Outbuf is: <<0:32>> RETURN_INT(0, outbuf); } case CMD_MUTEX_STAT_PRINT: { FAIL_IF_ASYNC_PENDING(d, outbuf); // Inbuf is << Flags:32 >> unsigned int flags = UNPACK_INT(inbuf, 0); // Outbuf is <> // Run the command on the VM thread - this is for debugging only, // any real monitoring will use the async lock_stat int rc = G_DB_ENV->mutex_stat_print(G_DB_ENV, flags); RETURN_INT(rc, outbuf); } case CMD_TXN_STAT: { FAIL_IF_ASYNC_PENDING(d, outbuf); // Inbuf is <> // If the working buffer is large enough, copy the data to put/get into it. Otherwise, realloc // until it is large enough if (d->work_buffer_sz < inbuf_sz) { d->work_buffer = driver_realloc(d->work_buffer, inbuf_sz); d->work_buffer_sz = inbuf_sz; } // Copy the payload into place memcpy(d->work_buffer, inbuf, inbuf_sz); d->work_buffer_offset = inbuf_sz; // Mark the port as busy and then schedule the appropriate async operation d->async_op = cmd; d->async_pool = G_TPOOL_GENERAL; bdberl_tpool_run(d->async_pool, &do_async_txn_stat, d, 0, &d->async_job); // Let caller know that the operation is in progress // Outbuf is: <<0:32>> RETURN_INT(0, outbuf); } case CMD_TXN_STAT_PRINT: { FAIL_IF_ASYNC_PENDING(d, outbuf); // Inbuf is << Flags:32 >> unsigned int flags = UNPACK_INT(inbuf, 0); // Outbuf is <> // Run the command on the VM thread - this is for debugging only, // any real monitoring will use the async lock_stat int rc = G_DB_ENV->txn_stat_print(G_DB_ENV, flags); RETURN_INT(rc, outbuf); } case CMD_DATA_DIRS_INFO: { FAIL_IF_ASYNC_PENDING(d, outbuf); do_sync_data_dirs_info(d); // Let caller know that the operation is in progress // Outbuf is: <<0:32>> RETURN_INT(0, outbuf); } case CMD_LOG_DIR_INFO: { FAIL_IF_ASYNC_PENDING(d, outbuf); // Find the log dir or use DB_HOME - error if not present const char *lg_dir = NULL; int rc = G_DB_ENV->get_lg_dir(G_DB_ENV, &lg_dir); if (0 == rc && NULL == lg_dir) { rc = G_DB_ENV->get_home(G_DB_ENV, &lg_dir); } // Send info if we can get a dir, otherwise return the error if (0 == rc) { // send a dirinfo message - will send an error message on a NULL lg_dir send_dir_info(d->port, d->port_owner, lg_dir); } else { send_rc(d->port, d->port_owner, rc); } // Let caller know that the operation is in progress // Outbuf is: <<0:32>> RETURN_INT(0, outbuf); } } *outbuf = 0; return 0; } static int open_database(const char* name, DBTYPE type, unsigned int flags, PortData* data, int* dbref_res) { *dbref_res = -1; READ_LOCK(G_DATABASES_RWLOCK); // Look up the database by name in our hash table Database* database = (Database*)hive_hash_get(G_DATABASES_NAMES, name); if (database) { // Convert the database pointer into a dbref int dbref = database - G_DATABASES; // Great, the database was previously opened by someone else. Add it to our // list of refs, and if it's a new addition also register this port with the // Database structure in G_DATABASES if (add_dbref(data, dbref)) { // Need to update G_DATABASES -- grab the write lock PROMOTE_READ_LOCK(G_DATABASES_RWLOCK); // Add a reference to this port add_portref(dbref, data->port); // Release RW lock and return the ref WRITE_UNLOCK(G_DATABASES_RWLOCK); *dbref_res = dbref; return 0; } else { // Already in our list of opened databases -- unlock and return the reference READ_UNLOCK(G_DATABASES_RWLOCK); *dbref_res = dbref; return 0; } } else { // This database hasn't been opened yet -- grab a write lock PROMOTE_READ_LOCK(G_DATABASES_RWLOCK); // While waiting on the write lock, another thread could have slipped in and // opened the database, so do one more check to see if the database is already // open database = (Database*)hive_hash_get(G_DATABASES_NAMES, name); if (database) { // Database got created while we were waiting on the write lock, add a reference // to our port and drop the lock ASAP int dbref = database - G_DATABASES; add_portref(dbref, data->port); WRITE_UNLOCK(G_DATABASES_RWLOCK); add_dbref(data, dbref); *dbref_res = dbref; return 0; } // Database hasn't been created while we were waiting on write lock, so // create/open it // Find the first available slot in G_DATABASES; the index will be our // reference for database operations int dbref = alloc_dbref(); if (dbref < 0) { // No more slots available WRITE_UNLOCK(G_DATABASES_RWLOCK); return ERROR_MAX_DBS; } // Create the DB handle DB* db; int rc = db_create(&db, G_DB_ENV, 0); if (rc != 0) { // Failure while creating the database handle -- drop our lock and return // the code WRITE_UNLOCK(G_DATABASES_RWLOCK); return rc; } // If a custom page size has been specified, try to use it if (G_PAGE_SIZE > 0) { if (db->set_pagesize(db, G_PAGE_SIZE) != 0) { bdb_errcall(G_DB_ENV, "", "Failed to set page size."); } } // Attempt to open our database rc = db->open(db, 0, name, 0, type, flags, 0); if (rc != 0) { // Failure while opening the database -- cleanup the handle, drop the lock // and return db->close(db, 0); WRITE_UNLOCK(G_DATABASES_RWLOCK); return rc; } // Database is open. Store all the data into the allocated ref G_DATABASES[dbref].db = db; G_DATABASES[dbref].name = strdup(name); G_DATABASES[dbref].ports = zalloc(sizeof(PortList)); G_DATABASES[dbref].ports->port = data->port; // Make entry in hash table of names hive_hash_add(G_DATABASES_NAMES, G_DATABASES[dbref].name, &(G_DATABASES[dbref])); // Drop the write lock WRITE_UNLOCK(G_DATABASES_RWLOCK); // Add the dbref to the port list add_dbref(data, dbref); *dbref_res = dbref; return 0; } } static int close_database(int dbref, unsigned flags, PortData* data) { // Remove this database from our list if (del_dbref(data, dbref)) { // Something was actually deleted from our list -- now we need to disassociate the // calling port with the global database structure. WRITE_LOCK(G_DATABASES_RWLOCK); assert(G_DATABASES[dbref].db != 0); assert(G_DATABASES[dbref].ports != 0); // Now disassociate this port from the database's port list del_portref(dbref, data->port); // Finally, if there are no other references to the database, close out // the database completely Database* database = &G_DATABASES[dbref]; if (database->ports == 0) { DBG("Closing actual database for dbref %d\r\n", dbref); // Close out the BDB handle database->db->close(database->db, flags); // Remove the entry from the names map hive_hash_remove(G_DATABASES_NAMES, database->name); free((char*)database->name); // Zero out the whole record memset(database, '\0', sizeof(Database)); } WRITE_UNLOCK(G_DATABASES_RWLOCK); return ERROR_NONE; } else { return ERROR_INVALID_DBREF; } } static int delete_database(const char* name) { // Go directly to a write lock on the global databases structure WRITE_LOCK(G_DATABASES_RWLOCK); // Make sure the database is not opened by anyone if (hive_hash_get(G_DATABASES_NAMES, name)) { WRITE_UNLOCK(G_DATABASES_RWLOCK); return ERROR_DB_ACTIVE; } // Good, database doesn't seem to be open -- attempt the delete DBG("Attempting to delete database: %s\r\n", name); int rc = G_DB_ENV->dbremove(G_DB_ENV, 0, name, 0, DB_AUTO_COMMIT); WRITE_UNLOCK(G_DATABASES_RWLOCK); return rc; } /** * Given a target system parameter, return the requested value */ static void get_info(int target, void* values, BinHelper* bh) { switch(target) { case SYSP_CACHESIZE_GET: { unsigned int gbytes = 0; unsigned int bytes = 0; int caches = 0; int rc = G_DB_ENV->get_cachesize(G_DB_ENV, &gbytes, &bytes, &caches); bin_helper_init(bh); bin_helper_push_int32(bh, rc); bin_helper_push_int32(bh, gbytes); bin_helper_push_int32(bh, bytes); bin_helper_push_int32(bh, caches); break; } case SYSP_TXN_TIMEOUT_GET: { unsigned int timeout = 0; int rc = G_DB_ENV->get_timeout(G_DB_ENV, &timeout, DB_SET_TXN_TIMEOUT); bin_helper_init(bh); bin_helper_push_int32(bh, rc); bin_helper_push_int32(bh, timeout); break; } case SYSP_DATA_DIR_GET: { const char** dirs = 0; int rc = G_DB_ENV->get_data_dirs(G_DB_ENV, &dirs); bin_helper_init(bh); bin_helper_push_int32(bh, rc); while (dirs && *dirs) { bin_helper_push_string(bh, *dirs); dirs++; } break; } case SYSP_LOG_DIR_GET: { const char* dir = 0; // Get the log dir - according to BDB docs, if not set // the DB_HOME is used. int rc = G_DB_ENV->get_lg_dir(G_DB_ENV, &dir); if (NULL == dir) { dir = getenv("DB_HOME"); } bin_helper_init(bh); bin_helper_push_int32(bh, rc); bin_helper_push_string(bh, dir); break; } } } static void async_cleanup(PortData* d) { // Release the port for another operation d->work_buffer_offset = 0; erl_drv_mutex_lock(d->port_lock); d->async_pool = 0; d->async_job = 0; d->async_op = CMD_NONE; erl_drv_mutex_unlock(d->port_lock); } // Convert an rc from BDB into a string suitable for driver_mk_atom // returns NULL on no match static char *rc_to_atom_str(int rc) { char *error = erl_errno_id(rc); //fprintf(stderr, "erl_errno_id(%d) = %s db_strerror = %s\n", rc, error, db_strerror(rc)); if (NULL != error && strcmp("unknown", error) != 0) { return error; } else { switch (rc) { // bdberl driver errors case ERROR_ASYNC_PENDING: return "async_pending"; case ERROR_INVALID_DBREF: return "invalid_db"; case ERROR_TXN_OPEN: return "transaction_open"; case ERROR_NO_TXN: return "no_txn"; case ERROR_CURSOR_OPEN: return "cursor_open"; case ERROR_NO_CURSOR: return "no_cursor"; // bonafide BDB errors case DB_BUFFER_SMALL: return "buffer_small"; case DB_DONOTINDEX: return "do_not_index"; case DB_FOREIGN_CONFLICT: return "foreign_conflict"; case DB_KEYEMPTY: return "key_empty"; case DB_KEYEXIST: return "key_exist"; case DB_LOCK_DEADLOCK: return "deadlock"; case DB_LOCK_NOTGRANTED: return "lock_not_granted"; case DB_LOG_BUFFER_FULL: return "log_buffer_full"; case DB_NOTFOUND: return "not_found"; case DB_OLD_VERSION: return "old_version"; case DB_PAGE_NOTFOUND: return "page_not_found"; case DB_RUNRECOVERY: return "run_recovery"; case DB_VERIFY_BAD: return "verify_bad"; case DB_VERSION_MISMATCH: return "version_mismatch"; default: return NULL; } } } // Send a {dirinfo, Path, FsId, MbyteAvail} message to pid given. // Send an {errno, Reason} on failure // returns 0 on success, errno on failure static int send_dir_info(ErlDrvPort port, ErlDrvTermData pid, const char *path) { struct statvfs svfs; int rc; if (NULL == path) { rc = EINVAL; } else if (0 != statvfs(path, &svfs)) { rc = errno; } else { rc = 0; } if (0 != rc) { send_rc(port, pid, rc); } else { fsblkcnt_t blocks_per_mbyte = 1024 * 1024 / svfs.f_frsize; assert(blocks_per_mbyte > 0); unsigned int mbyte_avail = (unsigned int) (svfs.f_bavail / blocks_per_mbyte); int path_len = strlen(path); ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("dirinfo"), ERL_DRV_STRING, (ErlDrvTermData) path, path_len, // send fsid as a binary as will only be used // to compare which physical filesystem is on // and the definintion varies between platforms. ERL_DRV_BUF2BINARY, (ErlDrvTermData) &svfs.f_fsid, sizeof(svfs.f_fsid), ERL_DRV_UINT, mbyte_avail, ERL_DRV_TUPLE, 4}; driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); } return rc; } static void send_rc(ErlDrvPort port, ErlDrvTermData pid, int rc) { // TODO: May need to tag the messages a bit more explicitly so that if another async // job runs to completion before the message gets delivered we don't mis-interpret this // response code. if (rc == 0) { ErlDrvTermData response[] = {ERL_DRV_ATOM, driver_mk_atom("ok")}; driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); } else { // See if this is a standard errno that we have an erlang code for char *error = rc_to_atom_str(rc); if (NULL != error) { ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"), ERL_DRV_ATOM, driver_mk_atom(error), ERL_DRV_TUPLE, 2}; driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); } else { ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"), ERL_DRV_ATOM, driver_mk_atom("unknown"), ERL_DRV_INT, rc, ERL_DRV_TUPLE, 2, ERL_DRV_TUPLE, 2}; driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); } } } static void async_cleanup_and_send_rc(PortData* d, int rc) { // Save the port and pid references -- we need copies independent from the PortData // structure. Once we release the port_lock after clearing the cmd, it's possible that // the port could go away without waiting on us to finish. This is acceptable, but we need // to be certain that there is no overlap of data between the two threads. driver_send_term // is safe to use from a thread, even if the port you're sending from has already expired. ErlDrvPort port = d->port; ErlDrvTermData pid = d->port_owner; async_cleanup(d); send_rc(port, pid, rc); } static void async_cleanup_and_send_kv(PortData* d, int rc, DBT* key, DBT* value) { // Save the port and pid references -- we need copies independent from the PortData // structure. Once we release the port_lock after clearing the cmd, it's possible that // the port could go away without waiting on us to finish. This is acceptable, but we need // to be certain that there is no overlap of data between the two threads. driver_send_term // is safe to use from a thread, even if the port you're sending from has already expired. ErlDrvPort port = d->port; ErlDrvTermData pid = d->port_owner; async_cleanup(d); // Notify port of result if (rc == 0) { ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("ok"), ERL_DRV_BUF2BINARY, (ErlDrvTermData)key->data, (ErlDrvUInt)key->size, ERL_DRV_BUF2BINARY, (ErlDrvTermData)value->data, (ErlDrvUInt)value->size, ERL_DRV_TUPLE, 3}; driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); } else if (rc == DB_NOTFOUND) { ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("not_found") }; driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); } else { ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"), ERL_DRV_INT, rc, ERL_DRV_TUPLE, 2}; driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); } } #define BT_STATS_TUPLE(base, member) \ ERL_DRV_ATOM, driver_mk_atom(#member), \ ERL_DRV_UINT, (base)->bt_##member, \ ERL_DRV_TUPLE, 2 static void async_cleanup_and_send_btree_stats(PortData* d, char *type, DB_BTREE_STAT *bsp) { // Save the port and pid references -- we need copies independent from the PortData // structure. Once we release the port_lock after clearing the cmd, it's possible that // the port could go away without waiting on us to finish. This is acceptable, but we need // to be certain that there is no overlap of data between the two threads. driver_send_term // is safe to use from a thread, even if the port you're sending from has already expired. ErlDrvPort port = d->port; ErlDrvTermData pid = d->port_owner; async_cleanup(d); ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("ok"), // Start of list ERL_DRV_ATOM, driver_mk_atom("type"), ERL_DRV_ATOM, driver_mk_atom(type), ERL_DRV_TUPLE, 2, BT_STATS_TUPLE(bsp, magic), /* Magic number. */ BT_STATS_TUPLE(bsp, version), /* Version number. */ BT_STATS_TUPLE(bsp, metaflags), /* Metadata flags. */ BT_STATS_TUPLE(bsp, nkeys), /* Number of unique keys. */ BT_STATS_TUPLE(bsp, ndata), /* Number of data items. */ BT_STATS_TUPLE(bsp, pagecnt), /* Page count. */ BT_STATS_TUPLE(bsp, pagesize), /* Page size. */ BT_STATS_TUPLE(bsp, minkey), /* Minkey value. */ BT_STATS_TUPLE(bsp, re_len), /* Fixed-length record length. */ BT_STATS_TUPLE(bsp, re_pad), /* Fixed-length record pad. */ BT_STATS_TUPLE(bsp, levels), /* Tree levels. */ BT_STATS_TUPLE(bsp, int_pg), /* Internal pages. */ BT_STATS_TUPLE(bsp, leaf_pg), /* Leaf pages. */ BT_STATS_TUPLE(bsp, dup_pg), /* Duplicate pages. */ BT_STATS_TUPLE(bsp, over_pg), /* Overflow pages. */ BT_STATS_TUPLE(bsp, empty_pg), /* Empty pages. */ BT_STATS_TUPLE(bsp, free), /* Pages on the free list. */ BT_STATS_TUPLE(bsp, int_pgfree), /* Bytes free in internal pages. */ BT_STATS_TUPLE(bsp, leaf_pgfree), /* Bytes free in leaf pages. */ BT_STATS_TUPLE(bsp, dup_pgfree), /* Bytes free in duplicate pages. */ BT_STATS_TUPLE(bsp, over_pgfree), /* Bytes free in overflow pages. */ // End of list ERL_DRV_NIL, ERL_DRV_LIST, 21+2, ERL_DRV_TUPLE, 2 }; driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); } #undef BT_STATS_TUPLE #define HASH_STATS_TUPLE(base, member) \ ERL_DRV_ATOM, driver_mk_atom(#member), \ ERL_DRV_UINT, (base)->hash_##member, \ ERL_DRV_TUPLE, 2 static void async_cleanup_and_send_hash_stats(PortData* d, DB_HASH_STAT *hsp) { // Save the port and pid references -- we need copies independent from the PortData // structure. Once we release the port_lock after clearing the cmd, it's possible that // the port could go away without waiting on us to finish. This is acceptable, but we need // to be certain that there is no overlap of data between the two threads. driver_send_term // is safe to use from a thread, even if the port you're sending from has already expired. ErlDrvPort port = d->port; ErlDrvTermData pid = d->port_owner; async_cleanup(d); ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("ok"), // Start of list ERL_DRV_ATOM, driver_mk_atom("type"), ERL_DRV_ATOM, driver_mk_atom("hash"), ERL_DRV_TUPLE, 2, HASH_STATS_TUPLE(hsp, magic), /* Magic number. */ HASH_STATS_TUPLE(hsp, version), /* Version number. */ HASH_STATS_TUPLE(hsp, metaflags), /* Metadata flags. */ HASH_STATS_TUPLE(hsp, nkeys), /* Number of unique keys. */ HASH_STATS_TUPLE(hsp, ndata), /* Number of data items. */ HASH_STATS_TUPLE(hsp, pagecnt), /* Page count. */ HASH_STATS_TUPLE(hsp, pagesize), /* Page size. */ HASH_STATS_TUPLE(hsp, ffactor), /* Fill factor specified at create. */ HASH_STATS_TUPLE(hsp, buckets), /* Number of hash buckets. */ HASH_STATS_TUPLE(hsp, free), /* Pages on the free list. */ HASH_STATS_TUPLE(hsp, bfree), /* Bytes free on bucket pages. */ HASH_STATS_TUPLE(hsp, bigpages), /* Number of big key/data pages. */ HASH_STATS_TUPLE(hsp, big_bfree), /* Bytes free on big item pages. */ HASH_STATS_TUPLE(hsp, overflows), /* Number of overflow pages. */ HASH_STATS_TUPLE(hsp, ovfl_free), /* Bytes free on ovfl pages. */ HASH_STATS_TUPLE(hsp, dup), /* Number of dup pages. */ HASH_STATS_TUPLE(hsp, dup_free), /* Bytes free on duplicate pages. */ // End of list ERL_DRV_NIL, ERL_DRV_LIST, 17+2, ERL_DRV_TUPLE, 2 }; driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); } #undef HASH_STATS_TUPLE #ifdef ENABLE_QUEUE // If we ever decide to support Queues #define QS_STATS_TUPLE(base, member) \ ERL_DRV_ATOM, driver_mk_atom(#member), \ ERL_DRV_UINT, (base)->qs_##member, \ ERL_DRV_TUPLE, 2 static void async_cleanup_and_send_queue_stats(PortData* d, DB_QUEUE_STAT *qsp) { // Save the port and pid references -- we need copies independent from the PortData // structure. Once we release the port_lock after clearing the cmd, it's possible that // the port could go away without waiting on us to finish. This is acceptable, but we need // to be certain that there is no overlap of data between the two threads. driver_send_term // is safe to use from a thread, even if the port you're sending from has already expired. ErlDrvPort port = d->port; ErlDrvTermData pid = d->port_owner; async_cleanup(d); ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("ok"), // Start of list ERL_DRV_ATOM, driver_mk_atom("type"), ERL_DRV_ATOM, driver_mk_atom("queue"), ERL_DRV_TUPLE, 2, QS_STAT_TUPLE(qsp, qs_magic), /* Magic number. */ QS_STAT_TUPLE(qsp, version), /* Version number. */ QS_STAT_TUPLE(qsp, metaflags), /* Metadata flags. */ QS_STAT_TUPLE(qsp, nkeys), /* Number of unique keys. */ QS_STAT_TUPLE(qsp, ndata), /* Number of data items. */ QS_STAT_TUPLE(qsp, pagesize), /* Page size. */ QS_STAT_TUPLE(qsp, extentsize), /* Pages per extent. */ QS_STAT_TUPLE(qsp, pages), /* Data pages. */ QS_STAT_TUPLE(qsp, re_len), /* Fixed-length record length. */ QS_STAT_TUPLE(qsp, re_pad), /* Fixed-length record pad. */ QS_STAT_TUPLE(qsp, pgfree), /* Bytes free in data pages. */ QS_STAT_TUPLE(qsp, first_recno), /* First not deleted record. */ QS_STAT_TUPLE(qsp, cur_recno), /* Next available record number. */ // End of list ERL_DRV_NIL, ERL_DRV_LIST, 13+2, ERL_DRV_TUPLE, 2 }; driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); } #undef QUEUE_STATS_TUPLE #endif // ENABLE_QUEUE #define ST_STATS_TUPLE(base, member) \ ERL_DRV_ATOM, driver_mk_atom(#member), \ ERL_DRV_UINT, (base)->st_##member, \ ERL_DRV_TUPLE, 2 #define ST_STATS_INT_TUPLE(base, member) \ ERL_DRV_ATOM, driver_mk_atom(#member), \ ERL_DRV_INT, (base)->st_##member, \ ERL_DRV_TUPLE, 2 static void async_cleanup_and_send_lock_stats(PortData* d, DB_LOCK_STAT *lsp) { // Save the port and pid references -- we need copies independent from the PortData // structure. Once we release the port_lock after clearing the cmd, it's possible that // the port could go away without waiting on us to finish. This is acceptable, but we need // to be certain that there is no overlap of data between the two threads. driver_send_term // is safe to use from a thread, even if the port you're sending from has already expired. ErlDrvPort port = d->port; ErlDrvTermData pid = d->port_owner; async_cleanup(d); ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("ok"), // Start of list ST_STATS_TUPLE(lsp, id), /* Last allocated locker ID. */ ST_STATS_TUPLE(lsp, cur_maxid), /* Current maximum unused ID. */ ST_STATS_TUPLE(lsp, maxlocks), /* Maximum number of locks in table. */ ST_STATS_TUPLE(lsp, maxlockers), /* Maximum num of lockers in table. */ ST_STATS_TUPLE(lsp, maxobjects), /* Maximum num of objects in table. */ ST_STATS_TUPLE(lsp, partitions), /* number of partitions. */ ST_STATS_INT_TUPLE(lsp, nmodes), /* Number of lock modes. */ ST_STATS_TUPLE(lsp, nlockers), /* Current number of lockers. */ ST_STATS_TUPLE(lsp, nlocks), /* Current number of locks. */ ST_STATS_TUPLE(lsp, maxnlocks), /* Maximum number of locks so far. */ ST_STATS_TUPLE(lsp, maxhlocks), /* Maximum number of locks in any bucket. */ ST_STATS_TUPLE(lsp, locksteals), /* Number of lock steals so far. */ ST_STATS_TUPLE(lsp, maxlsteals), /* Maximum number steals in any partition. */ ST_STATS_TUPLE(lsp, maxnlockers), /* Maximum number of lockers so far. */ ST_STATS_TUPLE(lsp, nobjects), /* Current number of objects. */ ST_STATS_TUPLE(lsp, maxnobjects), /* Maximum number of objects so far. */ ST_STATS_TUPLE(lsp, maxhobjects), /* Maximum number of objectsin any bucket. */ ST_STATS_TUPLE(lsp, objectsteals), /* Number of objects steals so far. */ ST_STATS_TUPLE(lsp, maxosteals), /* Maximum number of steals in any partition. */ ST_STATS_TUPLE(lsp, nrequests), /* Number of lock gets. */ ST_STATS_TUPLE(lsp, nreleases), /* Number of lock puts. */ ST_STATS_TUPLE(lsp, nupgrade), /* Number of lock upgrades. */ ST_STATS_TUPLE(lsp, ndowngrade), /* Number of lock downgrades. */ ST_STATS_TUPLE(lsp, lock_wait), /* Lock conflicts w/ subsequent wait */ ST_STATS_TUPLE(lsp, lock_nowait), /* Lock conflicts w/o subsequent wait */ ST_STATS_TUPLE(lsp, ndeadlocks), /* Number of lock deadlocks. */ ST_STATS_TUPLE(lsp, locktimeout), /* Lock timeout. */ ST_STATS_TUPLE(lsp, nlocktimeouts), /* Number of lock timeouts. */ ST_STATS_TUPLE(lsp, txntimeout), /* Transaction timeout. */ ST_STATS_TUPLE(lsp, ntxntimeouts), /* Number of transaction timeouts. */ ST_STATS_TUPLE(lsp, part_wait), /* Partition lock granted after wait. */ ST_STATS_TUPLE(lsp, part_nowait), /* Partition lock granted without wait. */ ST_STATS_TUPLE(lsp, part_max_wait), /* Max partition lock granted after wait. */ ST_STATS_TUPLE(lsp, part_max_nowait), /* Max partition lock granted without wait. */ ST_STATS_TUPLE(lsp, objs_wait), /* Object lock granted after wait. */ ST_STATS_TUPLE(lsp, objs_nowait), /* Object lock granted without wait. */ ST_STATS_TUPLE(lsp, lockers_wait), /* Locker lock granted after wait. */ ST_STATS_TUPLE(lsp, lockers_nowait),/* Locker lock granted without wait. */ ST_STATS_TUPLE(lsp, region_wait), /* Region lock granted after wait. */ ST_STATS_TUPLE(lsp, region_nowait), /* Region lock granted without wait. */ ST_STATS_TUPLE(lsp, hash_len), /* Max length of bucket. */ ST_STATS_TUPLE(lsp, regsize), /* Region size. - will have to cast to uint */ // End of list ERL_DRV_NIL, ERL_DRV_LIST, 42+1, ERL_DRV_TUPLE, 2 }; driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); } static void async_cleanup_and_send_log_stats(PortData* d, DB_LOG_STAT *lsp) { // Save the port and pid references -- we need copies independent from the PortData // structure. Once we release the port_lock after clearing the cmd, it's possible that // the port could go away without waiting on us to finish. This is acceptable, but we need // to be certain that there is no overlap of data between the two threads. driver_send_term // is safe to use from a thread, even if the port you're sending from has already expired. ErlDrvPort port = d->port; ErlDrvTermData pid = d->port_owner; async_cleanup(d); ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("ok"), // Start of list ST_STATS_TUPLE(lsp, magic), /* Log file magic number. */ ST_STATS_TUPLE(lsp, version), /* Log file version number. */ ST_STATS_INT_TUPLE(lsp, mode), /* Log file permissions mode. */ ST_STATS_TUPLE(lsp, lg_bsize), /* Log buffer size. */ ST_STATS_TUPLE(lsp, lg_size), /* Log file size. */ ST_STATS_TUPLE(lsp, wc_bytes), /* Bytes to log since checkpoint. */ ST_STATS_TUPLE(lsp, wc_mbytes), /* Megabytes to log since checkpoint. */ ST_STATS_TUPLE(lsp, record), /* Records entered into the log. */ ST_STATS_TUPLE(lsp, w_bytes), /* Bytes to log. */ ST_STATS_TUPLE(lsp, w_mbytes), /* Megabytes to log. */ ST_STATS_TUPLE(lsp, wcount), /* Total I/O writes to the log. */ ST_STATS_TUPLE(lsp, wcount_fill),/* Overflow writes to the log. */ ST_STATS_TUPLE(lsp, rcount), /* Total I/O reads from the log. */ ST_STATS_TUPLE(lsp, scount), /* Total syncs to the log. */ ST_STATS_TUPLE(lsp, region_wait), /* Region lock granted after wait. */ ST_STATS_TUPLE(lsp, region_nowait), /* Region lock granted without wait. */ ST_STATS_TUPLE(lsp, cur_file), /* Current log file number. */ ST_STATS_TUPLE(lsp, cur_offset),/* Current log file offset. */ ST_STATS_TUPLE(lsp, disk_file), /* Known on disk log file number. */ ST_STATS_TUPLE(lsp, disk_offset), /* Known on disk log file offset. */ ST_STATS_TUPLE(lsp, maxcommitperflush), /* Max number of commits in a flush. */ ST_STATS_TUPLE(lsp, mincommitperflush), /* Min number of commits in a flush. */ ST_STATS_TUPLE(lsp, regsize), /* Region size. */ // End of list ERL_DRV_NIL, ERL_DRV_LIST, 23+1, ERL_DRV_TUPLE, 2 }; driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); } static void send_mpool_fstat(ErlDrvPort port, ErlDrvTermData pid, DB_MPOOL_FSTAT *fsp) { char *name = fsp->file_name ? fsp->file_name : ""; int name_len = strlen(name); ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("fstat"), // Start of list ERL_DRV_ATOM, driver_mk_atom("name"), ERL_DRV_STRING, (ErlDrvTermData) name, name_len, ERL_DRV_TUPLE, 2, ST_STATS_TUPLE(fsp, map), /* Pages from mapped files. */ ST_STATS_TUPLE(fsp, cache_hit), /* Pages found in the cache. */ ST_STATS_TUPLE(fsp, cache_miss), /* Pages not found in the cache. */ ST_STATS_TUPLE(fsp, page_create), /* Pages created in the cache. */ ST_STATS_TUPLE(fsp, page_in), /* Pages read in. */ ST_STATS_TUPLE(fsp, page_out), /* Pages written out. */ // End of list ERL_DRV_NIL, ERL_DRV_LIST, 7+1, ERL_DRV_TUPLE, 2 }; driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); } static void async_cleanup_and_send_memp_stats(PortData* d, DB_MPOOL_STAT *gsp, DB_MPOOL_FSTAT **fsp) { // Save the port and pid references -- we need copies independent from the PortData // structure. Once we release the port_lock after clearing the cmd, it's possible that // the port could go away without waiting on us to finish. This is acceptable, but we need // to be certain that there is no overlap of data between the two threads. driver_send_term // is safe to use from a thread, even if the port you're sending from has already expired. ErlDrvPort port = d->port; ErlDrvTermData pid = d->port_owner; async_cleanup(d); // First send the per-file stats int i; for (i = 0; fsp != NULL && fsp[i] != NULL; i++) { send_mpool_fstat(port, pid, fsp[i]); } // Then send the global stats ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("ok"), // Start of list ST_STATS_TUPLE(gsp, gbytes), /* Total cache size: GB. */ ST_STATS_TUPLE(gsp, bytes), /* Total cache size: B. */ ST_STATS_TUPLE(gsp, ncache), /* Number of cache regions. */ ST_STATS_TUPLE(gsp, max_ncache), /* Maximum number of regions. */ ST_STATS_INT_TUPLE(gsp, mmapsize), /* Maximum file size for mmap. */ ST_STATS_INT_TUPLE(gsp, maxopenfd), /* Maximum number of open fd's. */ ST_STATS_INT_TUPLE(gsp, maxwrite), /* Maximum buffers to write. */ ST_STATS_TUPLE(gsp, maxwrite_sleep), /* Sleep after writing max buffers. */ ST_STATS_TUPLE(gsp, pages), /* Total number of pages. */ ST_STATS_TUPLE(gsp, map), /* Pages from mapped files. */ ST_STATS_TUPLE(gsp, cache_hit), /* Pages found in the cache. */ ST_STATS_TUPLE(gsp, cache_miss), /* Pages not found in the cache. */ ST_STATS_TUPLE(gsp, page_create), /* Pages created in the cache. */ ST_STATS_TUPLE(gsp, page_in), /* Pages read in. */ ST_STATS_TUPLE(gsp, page_out), /* Pages written out. */ ST_STATS_TUPLE(gsp, ro_evict), /* Clean pages forced from the cache. */ ST_STATS_TUPLE(gsp, rw_evict), /* Dirty pages forced from the cache. */ ST_STATS_TUPLE(gsp, page_trickle), /* Pages written by memp_trickle. */ ST_STATS_TUPLE(gsp, page_clean), /* Clean pages. */ ST_STATS_TUPLE(gsp, page_dirty), /* Dirty pages. */ ST_STATS_TUPLE(gsp, hash_buckets), /* Number of hash buckets. */ ST_STATS_TUPLE(gsp, hash_searches), /* Total hash chain searches. */ ST_STATS_TUPLE(gsp, hash_longest), /* Longest hash chain searched. */ ST_STATS_TUPLE(gsp, hash_examined), /* Total hash entries searched. */ ST_STATS_TUPLE(gsp, hash_nowait), /* Hash lock granted with nowait. */ ST_STATS_TUPLE(gsp, hash_wait), /* Hash lock granted after wait. */ ST_STATS_TUPLE(gsp, hash_max_nowait), /* Max hash lock granted with nowait. */ ST_STATS_TUPLE(gsp, hash_max_wait), /* Max hash lock granted after wait. */ ST_STATS_TUPLE(gsp, region_nowait), /* Region lock granted with nowait. */ ST_STATS_TUPLE(gsp, region_wait), /* Region lock granted after wait. */ ST_STATS_TUPLE(gsp, mvcc_frozen), /* Buffers frozen. */ ST_STATS_TUPLE(gsp, mvcc_thawed), /* Buffers thawed. */ ST_STATS_TUPLE(gsp, mvcc_freed), /* Frozen buffers freed. */ ST_STATS_TUPLE(gsp, alloc), /* Number of page allocations. */ ST_STATS_TUPLE(gsp, alloc_buckets), /* Buckets checked during allocation. */ ST_STATS_TUPLE(gsp, alloc_max_buckets), /* Max checked during allocation. */ ST_STATS_TUPLE(gsp, alloc_pages), /* Pages checked during allocation. */ ST_STATS_TUPLE(gsp, alloc_max_pages), /* Max checked during allocation. */ ST_STATS_TUPLE(gsp, io_wait), /* Thread waited on buffer I/O. */ ST_STATS_TUPLE(gsp, regsize), /* Region size. */ // End of list ERL_DRV_NIL, ERL_DRV_LIST, 40+1, ERL_DRV_TUPLE, 2 }; driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); } static void async_cleanup_and_send_mutex_stats(PortData* d, DB_MUTEX_STAT *msp) { // Save the port and pid references -- we need copies independent from the PortData // structure. Once we release the port_lock after clearing the cmd, it's possible that // the port could go away without waiting on us to finish. This is acceptable, but we need // to be certain that there is no overlap of data between the two threads. driver_send_term // is safe to use from a thread, even if the port you're sending from has already expired. ErlDrvPort port = d->port; ErlDrvTermData pid = d->port_owner; async_cleanup(d); ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("ok"), // Start of list ST_STATS_TUPLE(msp, mutex_align), /* Mutex alignment */ ST_STATS_TUPLE(msp, mutex_tas_spins), /* Mutex test-and-set spins */ ST_STATS_TUPLE(msp, mutex_cnt), /* Mutex count */ ST_STATS_TUPLE(msp, mutex_free), /* Available mutexes */ ST_STATS_TUPLE(msp, mutex_inuse), /* Mutexes in use */ ST_STATS_TUPLE(msp, mutex_inuse_max), /* Maximum mutexes ever in use */ ST_STATS_TUPLE(msp, region_wait), /* Region lock granted after wait. */ ST_STATS_TUPLE(msp, region_nowait), /* Region lock granted without wait. */ ST_STATS_TUPLE(msp, regsize), /* Region size. */ // End of list ERL_DRV_NIL, ERL_DRV_LIST, 9+1, ERL_DRV_TUPLE, 2 }; driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); } #define STATS_TUPLE(base, member) \ ERL_DRV_ATOM, driver_mk_atom(#member), \ ERL_DRV_UINT, (base)->member, \ ERL_DRV_TUPLE, 2 #define STATS_LSN_TUPLE(base, member) \ ERL_DRV_ATOM, driver_mk_atom(#member), \ ERL_DRV_UINT, (base)->member.file, \ ERL_DRV_UINT, (base)->member.offset, \ ERL_DRV_TUPLE, 2, \ ERL_DRV_TUPLE, 2 static void send_txn_tstat(ErlDrvPort port, ErlDrvTermData pid, DB_TXN_ACTIVE *tasp) { char *name = tasp->name ? tasp->name : ""; int name_len = strlen(name); char tid_str[32]; char *status_str; switch (tasp->status) { case TXN_ABORTED: status_str = "aborted"; break; case TXN_COMMITTED: status_str = "committed"; break; case TXN_PREPARED: status_str = "prepared"; break; case TXN_RUNNING: status_str = "running"; break; default: status_str = "undefined"; break; } int tid_str_len = snprintf(tid_str, sizeof(tid_str), "%lu", (unsigned long) tasp->tid); ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("txn"), STATS_TUPLE(tasp, txnid), /* Transaction ID */ STATS_TUPLE(tasp, parentid), /* Transaction ID of parent */ STATS_TUPLE(tasp, pid), /* Process owning txn ID - pid_t */ ERL_DRV_ATOM, driver_mk_atom("tid"),/* OSX has 32-bit ints in erlang, so return as */ ERL_DRV_STRING, (ErlDrvTermData) tid_str, tid_str_len, /* a string */ ERL_DRV_TUPLE, 2, STATS_LSN_TUPLE(tasp, lsn), /* LSN when transaction began */ STATS_LSN_TUPLE(tasp, read_lsn), /* Read LSN for MVCC */ STATS_TUPLE(tasp, mvcc_ref), /* MVCC reference count */ // Start of list ERL_DRV_ATOM, driver_mk_atom("status"), ERL_DRV_ATOM, driver_mk_atom(status_str), ERL_DRV_TUPLE, 2, ERL_DRV_ATOM, driver_mk_atom("name"), ERL_DRV_STRING, (ErlDrvTermData) name, name_len, ERL_DRV_TUPLE, 2, // End of list ERL_DRV_NIL, ERL_DRV_LIST, 9+1, ERL_DRV_TUPLE, 2 }; driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); } #define ST_STATS_LSN_TUPLE(base, member) \ ERL_DRV_ATOM, driver_mk_atom(#member), \ ERL_DRV_UINT, (base)->st_##member.file, \ ERL_DRV_UINT, (base)->st_##member.offset, \ ERL_DRV_TUPLE, 2, \ ERL_DRV_TUPLE, 2 static void async_cleanup_and_send_txn_stats(PortData* d, DB_TXN_STAT *tsp) { // Save the port and pid references -- we need copies independent from the PortData // structure. Once we release the port_lock after clearing the cmd, it's possible that // the port could go away without waiting on us to finish. This is acceptable, but we need // to be certain that there is no overlap of data between the two threads. driver_send_term // is safe to use from a thread, even if the port you're sending from has already expired. ErlDrvPort port = d->port; ErlDrvTermData pid = d->port_owner; async_cleanup(d); // First send the array of active transactions */ int i; for (i = 0; i < tsp->st_nactive; i++) { send_txn_tstat(port, pid, tsp->st_txnarray+i); } // Then send the global stats ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("ok"), // Start of list ST_STATS_TUPLE(tsp, nrestores), /* number of restored transactions after recovery. */ ST_STATS_LSN_TUPLE(tsp, last_ckp), /* lsn of the last checkpoint */ ST_STATS_TUPLE(tsp, time_ckp), /* time of last checkpoint (time_t to uint) */ ST_STATS_TUPLE(tsp, last_txnid), /* last transaction id given out */ ST_STATS_TUPLE(tsp, maxtxns), /* maximum txns possible */ ST_STATS_TUPLE(tsp, naborts), /* number of aborted transactions */ ST_STATS_TUPLE(tsp, nbegins), /* number of begun transactions */ ST_STATS_TUPLE(tsp, ncommits), /* number of committed transactions */ ST_STATS_TUPLE(tsp, nactive), /* number of active transactions */ ST_STATS_TUPLE(tsp, nsnapshot), /* number of snapshot transactions */ ST_STATS_TUPLE(tsp, maxnactive), /* maximum active transactions */ ST_STATS_TUPLE(tsp, maxnsnapshot), /* maximum snapshot transactions */ ST_STATS_TUPLE(tsp, region_wait), /* Region lock granted after wait. */ ST_STATS_TUPLE(tsp, region_nowait), /* Region lock granted without wait. */ ST_STATS_TUPLE(tsp, regsize), /* Region size. */ // End of list ERL_DRV_NIL, ERL_DRV_LIST, 15+1, ERL_DRV_TUPLE, 2 }; driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); } static void do_async_put(void* arg) { // Payload is: <> PortData* d = (PortData*)arg; // Get the database reference and flags from the payload int dbref = UNPACK_INT(d->work_buffer, 0); DB* db = G_DATABASES[dbref].db; unsigned int flags = UNPACK_INT(d->work_buffer, 4); // Setup DBTs DBT key; DBT value; memset(&key, '\0', sizeof(DBT)); memset(&value, '\0', sizeof(DBT)); // Parse payload into DBTs key.size = UNPACK_INT(d->work_buffer, 8); key.data = UNPACK_BLOB(d->work_buffer, 12); value.size = UNPACK_INT(d->work_buffer, 12 + key.size); value.data = UNPACK_BLOB(d->work_buffer, 12 + key.size + 4); // Execute the actual put. All databases are opened with AUTO_COMMIT, so if msg->port->txn // is NULL, the put will still be atomic int rc = db->put(db, d->txn, &key, &value, flags); // If any error occurs while we have a txn action, abort it if (d->txn && rc) { d->txn->abort(d->txn); d->txn = 0; } else if (d->txn && d->async_op == CMD_PUT_COMMIT) { // Put needs to be followed by a commit -- saves us another pass through the driver and // threadpool queues rc = d->txn->commit(d->txn, 0); // Regardless of the txn commit outcome, we still need to invalidate the transaction d->txn = 0; } async_cleanup_and_send_rc(d, rc); } static void do_async_get(void* arg) { // Payload is: << DbRef:32, Flags:32, KeyLen:32, Key:KeyLen >> PortData* d = (PortData*)arg; // Get the database object, using the provided ref int dbref = UNPACK_INT(d->work_buffer, 0); DB* db = G_DATABASES[dbref].db; // Extract operation flags unsigned flags = UNPACK_INT(d->work_buffer, 4); // Setup DBTs DBT key; DBT value; memset(&key, '\0', sizeof(DBT)); memset(&value, '\0', sizeof(DBT)); // Parse payload into DBT key.size = UNPACK_INT(d->work_buffer, 8); key.data = UNPACK_BLOB(d->work_buffer, 12); // Allocate a buffer for the output value value.data = driver_alloc(4096); value.ulen = 4096; value.flags = DB_DBT_USERMEM; int rc = db->get(db, d->txn, &key, &value, flags); while (rc == DB_BUFFER_SMALL) { // Grow our value buffer and try again value.data = driver_realloc(value.data, value.size); value.ulen = value.size; rc = db->get(db, d->txn, &key, &value, flags); } // Cleanup transaction as necessary if (rc && rc != DB_NOTFOUND && d->txn) { d->txn->abort(d->txn); d->txn = 0; } async_cleanup_and_send_kv(d, rc, &key, &value); // Finally, clean up value buffer (driver_send_term made a copy) driver_free(value.data); } static void do_async_txnop(void* arg) { PortData* d = (PortData*)arg; // Execute the actual begin/commit/abort int rc = 0; if (d->async_op == CMD_TXN_BEGIN) { rc = G_DB_ENV->txn_begin(G_DB_ENV, 0, &(d->txn), d->async_flags); } else if (d->async_op == CMD_TXN_COMMIT) { rc = d->txn->commit(d->txn, d->async_flags); d->txn = 0; } else { rc = d->txn->abort(d->txn); d->txn = 0; } async_cleanup_and_send_rc(d, rc); } static void do_async_cursor_get(void* arg) { // Payload is: << DbRef:32, Flags:32, KeyLen:32, Key:KeyLen >> PortData* d = (PortData*)arg; // Setup DBTs DBT key; DBT value; memset(&key, '\0', sizeof(DBT)); memset(&value, '\0', sizeof(DBT)); // Determine what type of cursor get to perform int flags = 0; switch (d->async_op) { case CMD_CURSOR_NEXT: flags = DB_NEXT; break; case CMD_CURSOR_PREV: flags = DB_PREV; break; default: flags = DB_CURRENT; } // Execute the operation int rc = d->cursor->get(d->cursor, &key, &value, flags); // Cleanup as necessary; any sort of failure means we need to close the cursor and abort // the transaction if (rc && rc != DB_NOTFOUND) { DBG("cursor flags=%d rc=%d\n", flags, rc); d->cursor->close(d->cursor); d->cursor = 0; if (d->txn) { d->txn->abort(d->txn); d->txn = 0; } } async_cleanup_and_send_kv(d, rc, &key, &value); } static void do_async_truncate(void* arg) { // Payload is: <> PortData* d = (PortData*)arg; // Get the database reference and flags from the payload int dbref = UNPACK_INT(d->work_buffer, 0); int rc = 0; if (dbref == -1) { DBG("Truncating all open databases...\r\n"); // Iterate over the whole database list skipping null entries int i = 0; // I hate C for ( ; i < G_DATABASES_SIZE; ++i) { Database* database = &G_DATABASES[i]; if (database != NULL && database->db != 0) { DB* db = database->db; DBG("Truncating dbref %i\r\n", i); u_int32_t count = 0; rc = db->truncate(db, d->txn, &count, 0); if (rc != 0) { break; } } } } else { DB* db = G_DATABASES[dbref].db; DBG("Truncating dbref %i\r\n", dbref); u_int32_t count = 0; rc = db->truncate(db, d->txn, &count, 0); } // If any error occurs while we have a txn action, abort it if (d->txn && rc) { d->txn->abort(d->txn); d->txn = 0; } async_cleanup_and_send_rc(d, rc); } static void do_async_stat(void* arg) { // Payload is: << DbRef:32, Flags:32 >> PortData* d = (PortData*)arg; // Get the database object, using the provided ref int dbref = UNPACK_INT(d->work_buffer, 0); DB* db = G_DATABASES[dbref].db; DBTYPE type = DB_UNKNOWN; int rc = db->get_type(db, &type); if (rc != 0) { async_cleanup_and_send_rc(d, rc); return; } // Extract operation flags unsigned flags = UNPACK_INT(d->work_buffer, 4); void *sp = NULL; rc = db->stat(db, d->txn, &sp, flags); if (rc != 0 || sp == NULL) { async_cleanup_and_send_rc(d, rc); } else { switch(type) { case DB_BTREE: /*FALLTHRU*/ case DB_RECNO: async_cleanup_and_send_btree_stats(d, type == DB_BTREE ? "btree" :"recno", sp); break; case DB_HASH: async_cleanup_and_send_hash_stats(d, sp); break; #ifdef ENABLE_QUEUE case DB_QUEUE: async_cleanup_and_send_queue_stats(d, sp); break; #endif default: async_cleanup_and_send_rc(d, ERROR_INVALID_DB_TYPE); break; } } // Finally, clean up value buffer (driver_send_term made a copy) if (NULL != sp) { free(sp); } } static void do_async_lock_stat(void* arg) { // Payload is: <> PortData* d = (PortData*)arg; // Extract operation flags unsigned flags = UNPACK_INT(d->work_buffer, 0); DB_LOCK_STAT *lsp = NULL; int rc = G_DB_ENV->lock_stat(G_DB_ENV, &lsp, flags); if (rc != 0 || lsp == NULL) { async_cleanup_and_send_rc(d, rc); } else { async_cleanup_and_send_lock_stats(d, lsp); } // Finally, clean up lock stats if (NULL != lsp) { free(lsp); } } static void do_async_log_stat(void* arg) { // Payload is: <> PortData* d = (PortData*)arg; // Extract operation flags unsigned flags = UNPACK_INT(d->work_buffer, 0); DB_LOG_STAT *lsp = NULL; int rc = G_DB_ENV->log_stat(G_DB_ENV, &lsp, flags); if (rc != 0 || lsp == NULL) { async_cleanup_and_send_rc(d, rc); } else { async_cleanup_and_send_log_stats(d, lsp); } // Finally, clean up stats if (NULL != lsp) { free(lsp); } } static void do_async_memp_stat(void* arg) { // Payload is: <> PortData* d = (PortData*)arg; // Extract operation flags unsigned flags = UNPACK_INT(d->work_buffer, 0); DB_MPOOL_STAT *gsp = NULL; DB_MPOOL_FSTAT **fsp = NULL; int rc = G_DB_ENV->memp_stat(G_DB_ENV, &gsp, &fsp, flags); if (rc != 0 || gsp == NULL) { async_cleanup_and_send_rc(d, rc); } else { async_cleanup_and_send_memp_stats(d, gsp, fsp); } // Finally, clean up stats if (NULL != gsp) { free(gsp); } if (NULL != fsp) { free(fsp); } } static void do_async_mutex_stat(void* arg) { // Payload is: <> PortData* d = (PortData*)arg; // Extract operation flags unsigned flags = UNPACK_INT(d->work_buffer, 0); DB_MUTEX_STAT *msp = NULL; int rc = G_DB_ENV->mutex_stat(G_DB_ENV, &msp, flags); if (rc != 0 || msp == NULL) { async_cleanup_and_send_rc(d, rc); } else { async_cleanup_and_send_mutex_stats(d, msp); } // Finally, clean up stats if (NULL != msp) { free(msp); } } static void do_async_txn_stat(void* arg) { // Payload is: <> PortData* d = (PortData*)arg; // Extract operation flags unsigned flags = UNPACK_INT(d->work_buffer, 0); DB_TXN_STAT *tsp = NULL; int rc = G_DB_ENV->txn_stat(G_DB_ENV, &tsp, flags); if (rc != 0 || tsp == NULL) { async_cleanup_and_send_rc(d, rc); } else { async_cleanup_and_send_txn_stats(d, tsp); } // Finally, clean up stats if (NULL != tsp) { free(tsp); } } static void do_sync_data_dirs_info(PortData *d) { // Get DB_HOME and find the real path const char *db_home = NULL; const char *data_dir = NULL; const char **data_dirs = NULL; char db_home_realpath[PATH_MAX+1]; char data_dir_realpath[PATH_MAX+1]; int got_db_home = 0; // Lookup the environment and add it if not explicitly included in the data_dirs int rc = G_DB_ENV->get_home(G_DB_ENV, &db_home); if (rc != 0 || NULL == db_home) { // If no db_home we'll have to rely on whatever the global environment is configured with got_db_home = 1; } else { if (NULL == realpath(db_home, db_home_realpath)) rc = errno; } // Get the data first rc = G_DB_ENV->get_data_dirs(G_DB_ENV, &data_dirs); int i; for (i = 0; 0 == rc && NULL != data_dirs && NULL != data_dirs[i]; i++) { data_dir = data_dirs[i]; if (!got_db_home) { // Get the real path of the data dir if (NULL == realpath(data_dir, data_dir_realpath)) { rc = errno; } else { // Set got_db_home if it matches if (0 == strcmp(data_dir_realpath, db_home_realpath)) { got_db_home = 1; } } } if (0 == rc) { rc = send_dir_info(d->port, d->port_owner, data_dir); } } // BDB always searches the environment home too so add it to the list if (!got_db_home && rc == 0) { rc = send_dir_info(d->port, d->port_owner, db_home); } // Send the return code - will termiante the receive loop in bdberl.erl send_rc(d->port, d->port_owner, rc); } static void* zalloc(unsigned int size) { void* res = driver_alloc(size); memset(res, '\0', size); return res; } #define zfree(p) driver_free(p) static int add_portref(int dbref, ErlDrvPort port) { PortList* current = G_DATABASES[dbref].ports; if (current) { PortList* last = 0; do { // If the current item matches our port, bail -- nothing to do here if (current->port == port) { return 0; } last = current; current = current->next; } while (current != 0); // At the end of the list -- allocate a new entry for this por current = (PortList*)zalloc(sizeof(PortList)); current->port = port; last->next = current; return 1; } else { // Current was initially NULL, so alloc the first one and add it. current = zalloc(sizeof(PortList)); current->port = port; G_DATABASES[dbref].ports = current; return 1; } } static int del_portref(int dbref, ErlDrvPort port) { PortList* current = G_DATABASES[dbref].ports; PortList* last = 0; while (current) { if (current->port == port) { // Found our match -- look back and connect the last item to our next if (last) { last->next = current->next; } else { G_DATABASES[dbref].ports = current->next; } // Delete this entry zfree(current); return 1; } last = current; current = current->next; } // Didn't delete anything return 0; } /** * Add a db reference to a port's DbRefList. Returns 1 if added; 0 if already present */ static int add_dbref(PortData* data, int dbref) { DbRefList* current = data->dbrefs; if (current) { DbRefList* last = 0; do { if (current->dbref == dbref) { return 0; } last = current; current = current->next; } while (current != 0); // At the end of the list -- allocate a new entry current = zalloc(sizeof(DbRefList)); current->dbref = dbref; last->next = current; return 1; } else { // Current was initially NULL, so alloc the first one current = zalloc(sizeof(DbRefList)); current->dbref = dbref; data->dbrefs = current; return 1; } } /** * Delete a db reference from a port's DbRefList. Returns 1 if deleted; 0 if not */ static int del_dbref(PortData* data, int dbref) { DbRefList* current = data->dbrefs; DbRefList* last = 0; while (current) { if (current->dbref == dbref) { // Found our match -- look back and connect the last item to our next if (last) { last->next = current->next; } else { data->dbrefs = current->next; } // Delete this entry zfree(current); return 1; } last = current; current = current->next; } // Didn't delete anything return 0; } /** * Validate that a provided dbref is currently opened by a port. Return 1 if true; 0 if false. */ static int has_dbref(PortData* data, int dbref) { DbRefList* current = data->dbrefs; while (current) { if (current->dbref == dbref) { return 1; } current = current->next; } return 0; } /** * Allocate a Database structure; find first available slot in G_DATABASES and return the * index of it. If no free slots are available, return -1 */ static int alloc_dbref() { int i; for (i = 0; i < G_DATABASES_SIZE; i++) { if (G_DATABASES[i].db == 0) { return i; } } return -1; } /** * Utility thread sleep - returns true if being signalled to exit * otherwise false if timeout exceeded. */ int util_thread_usleep(unsigned int usecs) { fd_set fds; struct timeval sleep_until; struct timeval sleep_for; struct timeval now; struct timeval tv; int done; int nfds = (G_BDBERL_PIPE[0] > G_BDBERL_PIPE[1] ? G_BDBERL_PIPE[0] : G_BDBERL_PIPE[1]) + 1; memset(&sleep_for, 0, sizeof(sleep_for)); sleep_for.tv_sec = usecs / 1000000; sleep_for.tv_usec = usecs % 1000000; gettimeofday(&now, NULL); timeradd(&now, &sleep_for, &sleep_until); do { FD_ZERO(&fds); FD_SET(G_BDBERL_PIPE[0], &fds); // read fd of pipe // Check if we have slept long enough gettimeofday(&now, NULL); if (timercmp(&now, &sleep_until, >)) { done = 1; } else // take a nap { // work out the remaining time to sleep on the fd for - make sure that this time // is less than or equal to the original sleep time requested, just in // case the system time is being adjusted. If the adjustment would result // in a longer wait then cap it at the sleep_for time. timersub(&sleep_until, &now, &tv); if (timercmp(&tv, &sleep_for, >)) { memcpy(&tv, &sleep_for, sizeof(tv)); } done = 1; if (-1 == select(nfds, &fds, NULL, NULL, &tv)) { if (EINTR == errno) // a signal woke up select, back to sleep for us { done = 0; } // any other signals can return to the caller to fail safe as it // doesn't matter if the util threads get woken up more often } else if (FD_ISSET(G_BDBERL_PIPE[0], &fds)) { done = 1; } } } while (!done); return FD_ISSET(G_BDBERL_PIPE[0], &fds); } /** * Thread function that runs the deadlock checker periodically */ static void* deadlock_check(void* arg) { while(G_DEADLOCK_CHECK_ACTIVE) { // Run the lock detection int count = 0; int rc = G_DB_ENV->lock_detect(G_DB_ENV, 0, DB_LOCK_DEFAULT, &count); if (0 != rc) { DBG("lock_detect returned %s(%d)\n", db_strerror(rc), rc); } if (count > 0) { DBG("Rejected deadlocks: %d\r\n", count); } util_thread_usleep(G_DEADLOCK_CHECK_INTERVAL * 1000); } DBG("Deadlock checker exiting.\r\n"); return 0; } /** * Thread function that does trickle writes or checkpointing at fixed intervals. */ static void* checkpointer(void* arg) { time_t last_checkpoint_time = time(0); time_t last_trickle_time = time(0); while (G_CHECKPOINT_ACTIVE) { time_t now = time(0); if (now - last_checkpoint_time > G_CHECKPOINT_INTERVAL) { // Time to checkpoint and cleanup log files int checkpoint_rc = G_DB_ENV->txn_checkpoint(G_DB_ENV, 0, 0, 0); // Mark the time before starting log_archive so we can know how long it took time_t log_now = time(0); int log_rc = G_DB_ENV->log_archive(G_DB_ENV, NULL, DB_ARCH_REMOVE); time_t finish_now = time(0); // Bundle up the results and elapsed time into a message for the logger ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("bdb_checkpoint_stats"), ERL_DRV_UINT, log_now - now, /* Elapsed seconds for checkpoint */ ERL_DRV_UINT, finish_now - log_now, /* Elapsed seconds for log_archive */ ERL_DRV_INT, checkpoint_rc, /* Return code of checkpoint */ ERL_DRV_INT, log_rc, /* Return code of log_archive */ ERL_DRV_TUPLE, 5}; send_log_message(response, sizeof(response)); // Note the time of this checkpoint completion last_checkpoint_time = finish_now; } else if (now - last_trickle_time > G_TRICKLE_INTERVAL) { // Time to run the trickle operation again int pages_wrote = 0; int rc = G_DB_ENV->memp_trickle(G_DB_ENV, G_TRICKLE_PERCENTAGE, &pages_wrote); time_t finish_now = time(0); // Bundle up the results and elapsed time into a message for the logger ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("bdb_trickle_stats"), ERL_DRV_UINT, finish_now - now, /* Elapsed seconds for trickle */ ERL_DRV_UINT, pages_wrote, /* Number of pages flushed */ ERL_DRV_INT, rc, /* Return code of checkpoint */ ERL_DRV_TUPLE, 4}; send_log_message(response, sizeof(response)); // Note the time of this trickle completion last_trickle_time = finish_now; } // Always sleep for one second util_thread_usleep(1000000); } return 0; } static void bdb_errcall(const DB_ENV* dbenv, const char* errpfx, const char* msg) { ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("bdb_error_log"), ERL_DRV_STRING, (ErlDrvTermData)msg, (ErlDrvUInt)strlen(msg), ERL_DRV_TUPLE, 2}; send_log_message(response, sizeof(response)); } static void bdb_msgcall(const DB_ENV* dbenv, const char* msg) { ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("bdb_info_log"), ERL_DRV_STRING, (ErlDrvTermData)msg, (ErlDrvUInt)strlen(msg), ERL_DRV_TUPLE, 2}; send_log_message(response, sizeof(response)); } static void send_log_message(ErlDrvTermData* msg, int elements) { if (G_LOG_PORT) { READ_LOCK(G_LOG_RWLOCK); driver_send_term(G_LOG_PORT, G_LOG_PID, msg, elements / sizeof(msg[0])); READ_UNLOCK(G_LOG_RWLOCK); } }