diff --git a/c_src/bdberl_drv.c b/c_src/bdberl_drv.c index d9e4acb..645d3b0 100644 --- a/c_src/bdberl_drv.c +++ b/c_src/bdberl_drv.c @@ -9,12 +9,15 @@ #include #include #include +#include +#include #include #include #include #include #include #include +#include #include "hive_hash.h" #include "bdberl_drv.h" @@ -34,6 +37,16 @@ static void do_async_get(void* arg); static void do_async_txnop(void* arg); static void do_async_cursor_get(void* arg); static void do_async_truncate(void* arg); +static void do_async_stat(void* arg); +static void do_async_lock_stat(void* arg); +static void do_async_log_stat(void* arg); +static void do_async_memp_stat(void* arg); +static void do_async_mutex_stat(void* arg); +static void do_async_txn_stat(void* arg); +static void do_sync_data_dirs_info(PortData *p); + +static int send_dir_info(ErlDrvPort port, ErlDrvTermData pid, const char *path); +static void send_rc(ErlDrvPort port, ErlDrvTermData pid, int rc); static int add_dbref(PortData* data, int dbref); static int del_dbref(PortData* data, int dbref); @@ -171,6 +184,32 @@ static TPool* G_TPOOL_TXNS = NULL; erl_drv_mutex_unlock(d->port_lock); \ }} +#define FAIL_IF_CURSOR_OPEN(d, outbuf) { \ + if (NULL != d->cursor) \ + { \ + send_rc(d->port, d->port_owner, ERROR_CURSOR_OPEN); \ + RETURN_INT(0, outbuf); \ + }} +#define FAIL_IF_NO_CURSOR(d, outbuf) { \ + if (NULL == d->cursor) \ + { \ + send_rc(d->port, d->port_owner, ERROR_NO_CURSOR); \ + RETURN_INT(0, outbuf); \ + }} + +#define FAIL_IF_TXN_OPEN(d, outbuf) { \ + if (NULL != d->txn) \ + { \ + send_rc(d->port, d->port_owner, ERROR_TXN_OPEN); \ + RETURN_INT(0, outbuf); \ + }} +#define FAIL_IF_NO_TXN(d, outbuf) { \ + if (NULL == d->txn) \ + { \ + send_rc(d->port, d->port_owner, ERROR_NO_TXN); \ + RETURN_INT(0, outbuf); \ + }} + #ifdef DEBUG # define DBG printf #else @@ -314,6 +353,10 @@ DRIVER_INIT(bdberl_drv) G_LOG_PORT = 0; G_LOG_PID = 0; } + else + { + DBG("DRIVER INIT FAILED - %s\r\n", db_strerror(G_DB_ENV_ERROR)); + } return &bdberl_drv_entry; } @@ -422,8 +465,17 @@ static void bdberl_drv_stop(ErlDrvData handle) static void bdberl_drv_finish() { // Stop the thread pools - bdberl_tpool_stop(G_TPOOL_GENERAL); - bdberl_tpool_stop(G_TPOOL_TXNS); + if (NULL != G_TPOOL_GENERAL) + { + bdberl_tpool_stop(G_TPOOL_GENERAL); + G_TPOOL_GENERAL = NULL; + } + + if (NULL != G_TPOOL_TXNS) + { + bdberl_tpool_stop(G_TPOOL_TXNS); + G_TPOOL_TXNS = NULL; + } // Signal the utility threads time is up G_TRICKLE_ACTIVE = 0; @@ -431,28 +483,63 @@ static void bdberl_drv_finish() G_CHECKPOINT_ACTIVE = 0; // Close the writer fd on the pipe to signal finish to the utility threads - close(G_BDBERL_PIPE[1]); - G_BDBERL_PIPE[1] = -1; + if (-1 != G_BDBERL_PIPE[1]) + { + close(G_BDBERL_PIPE[1]); + G_BDBERL_PIPE[1] = -1; + } // Wait for the deadlock checker to shutdown -- then wait for it - erl_drv_thread_join(G_DEADLOCK_THREAD, 0); + if (0 != G_DEADLOCK_THREAD) + { + erl_drv_thread_join(G_DEADLOCK_THREAD, 0); + G_DEADLOCK_THREAD = 0; + } // Wait for the checkpointer to shutdown -- then wait for it - erl_drv_thread_join(G_CHECKPOINT_THREAD, 0); + if (0 != G_CHECKPOINT_THREAD) + { + erl_drv_thread_join(G_CHECKPOINT_THREAD, 0); + G_CHECKPOINT_THREAD = 0; + } // Close the reader fd on the pipe now utility threads are closed - close(G_BDBERL_PIPE[0]); + if (-1 != G_BDBERL_PIPE[0]) + { + close(G_BDBERL_PIPE[0]); + } G_BDBERL_PIPE[0] = -1; // Cleanup and shut down the BDB environment. Note that we assume // all ports have been released and thuse all databases/txns/etc are also gone. - G_DB_ENV->close(G_DB_ENV, 0); - driver_free(G_DATABASES); - erl_drv_rwlock_destroy(G_DATABASES_RWLOCK); - hive_hash_destroy(G_DATABASES_NAMES); + if (NULL != G_DB_ENV) + { + G_DB_ENV->close(G_DB_ENV, 0); + G_DB_ENV = NULL; + } + if (NULL != G_DATABASES) + { + driver_free(G_DATABASES); + G_DATABASES = NULL; + } + if (NULL != G_DATABASES_RWLOCK) + { + erl_drv_rwlock_destroy(G_DATABASES_RWLOCK); + G_DATABASES_RWLOCK = NULL; + } + if (NULL != G_DATABASES_NAMES) + { + hive_hash_destroy(G_DATABASES_NAMES); + G_DATABASES_NAMES = NULL; + } + // Release the logging rwlock - erl_drv_rwlock_destroy(G_LOG_RWLOCK); + if (NULL != G_LOG_RWLOCK) + { + erl_drv_rwlock_destroy(G_LOG_RWLOCK); + G_LOG_RWLOCK = NULL; + } DBG("DRIVER_FINISH\n"); } @@ -472,41 +559,29 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, DBTYPE type = (DBTYPE) UNPACK_BYTE(inbuf, 4); char* name = UNPACK_STRING(inbuf, 5); int dbref; - int status; int rc = open_database(name, type, flags, d, &dbref); - if (rc == 0) - { - status = STATUS_OK; - } - else - { - status = STATUS_ERROR; - dbref = rc; - } - // Pack the status and dbref (or errno) into a binary and return it - // Outbuf is: <> - BinHelper bh; - bin_helper_init(&bh); - bin_helper_push_byte(&bh, status); - bin_helper_push_int32(&bh, dbref); - RETURN_BH(bh, outbuf); + // Queue up a message for bdberl:open to process + if (rc == 0) // success: send {ok, DbRef} + { + ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("ok"), + ERL_DRV_INT, dbref, + ERL_DRV_TUPLE, 2}; + driver_send_term(d->port, d->port_owner, + response, sizeof(response) / sizeof(response[0])); + } + else // failure: send {error, atom() | {error, {unknown, Rc}} + { + send_rc(d->port, d->port_owner, rc); + } + // Outbuf is: <> - always send 0 and the driver will receive the real value + RETURN_INT(0, outbuf); } case CMD_CLOSE_DB: { FAIL_IF_ASYNC_PENDING(d, outbuf); - - // Fail if a cursor is open - if (d->cursor != 0) - { - RETURN_INT(ERROR_CURSOR_OPEN, outbuf); - } - - // Fail if a txn is open - if (d->txn != 0) - { - RETURN_INT(ERROR_TXN_OPEN, outbuf); - } + FAIL_IF_CURSOR_OPEN(d, outbuf); + FAIL_IF_TXN_OPEN(d, outbuf); // Take the provided dbref and attempt to close it // Inbuf is: <> @@ -515,18 +590,15 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, int rc = close_database(dbref, flags, d); - // Outbuf is: <> - RETURN_INT(rc, outbuf); + // Queue up a message for bdberl:close to process + send_rc(d->port, d->port_owner, rc); + // Outbuf is: <> - always send 0 and the driver will receive the real value + RETURN_INT(0, outbuf); } case CMD_TXN_BEGIN: { FAIL_IF_ASYNC_PENDING(d, outbuf); - - // If we already have a txn open, fail - if (d->txn != 0) - { - RETURN_INT(ERROR_TXN_OPEN, outbuf); - } + FAIL_IF_TXN_OPEN(d, outbuf); // Setup async command and schedule it on the txns threadpool d->async_op = cmd; @@ -541,12 +613,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, case CMD_TXN_ABORT: { FAIL_IF_ASYNC_PENDING(d, outbuf); - - // If we don't already have a txn open, fail - if (d->txn == 0) - { - RETURN_INT(ERROR_NO_TXN, outbuf); - } + FAIL_IF_NO_TXN(d, outbuf); // Setup async command and schedule it on the txns threadpool d->async_op = cmd; @@ -569,7 +636,8 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, // Put/commit requires a transaction to be active if (cmd == CMD_PUT_COMMIT && (!d->txn)) { - RETURN_INT(ERROR_NO_TXN, outbuf); + send_rc(d->port, d->port_owner, ERROR_NO_TXN); + RETURN_INT(0, outbuf); } // Inbuf is: << DbRef:32, Rest/binary>> @@ -614,7 +682,8 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, else { // Invalid dbref - RETURN_INT(ERROR_INVALID_DBREF, outbuf); + send_rc(d->port, d->port_owner, ERROR_INVALID_DBREF); + RETURN_INT(0, outbuf); } } case CMD_GETINFO: @@ -632,11 +701,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, case CMD_CURSOR_OPEN: { FAIL_IF_ASYNC_PENDING(d, outbuf); - - if (d->cursor) - { - RETURN_INT(ERROR_CURSOR_OPEN, outbuf); - } + FAIL_IF_CURSOR_OPEN(d, outbuf); // Inbuf is << DbRef:32, Flags:32 >> int dbref = UNPACK_INT(inbuf, 0); @@ -648,11 +713,13 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, // Grab the database handle and open the cursor DB* db = G_DATABASES[dbref].db; int rc = db->cursor(db, d->txn, &(d->cursor), flags); - RETURN_INT(rc, outbuf); + send_rc(d->port, d->port_owner, rc); + RETURN_INT(0, outbuf); } else { - RETURN_INT(ERROR_INVALID_DBREF, outbuf); + send_rc(d->port, d->port_owner, ERROR_INVALID_DBREF); + RETURN_INT(0, outbuf); } } case CMD_CURSOR_CURR: @@ -660,12 +727,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, case CMD_CURSOR_PREV: { FAIL_IF_ASYNC_PENDING(d, outbuf); - - // Fail if no cursor currently open - if (!d->cursor) - { - RETURN_INT(ERROR_NO_CURSOR, outbuf); - } + FAIL_IF_NO_CURSOR(d, outbuf); // Schedule the operation d->async_op = cmd; @@ -678,12 +740,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, case CMD_CURSOR_CLOSE: { FAIL_IF_ASYNC_PENDING(d, outbuf); - - // Fail if no cursor open - if (!d->cursor) - { - RETURN_INT(ERROR_NO_CURSOR, outbuf); - } + FAIL_IF_NO_CURSOR(d, outbuf); // It's possible to get a deadlock when closing a cursor -- in that situation we also // need to go ahead and abort the txn @@ -696,33 +753,26 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, // Regardless of what happens, clear out the cursor pointer d->cursor = 0; - + + // Send result code + send_rc(d->port, d->port_owner, rc); RETURN_INT(0, outbuf); } case CMD_REMOVE_DB: { FAIL_IF_ASYNC_PENDING(d, outbuf); - - // Fail if a txn is open - if (d->txn != 0) - { - RETURN_INT(ERROR_TXN_OPEN, outbuf); - } + FAIL_IF_TXN_OPEN(d, outbuf); // Inbuf is: << dbname/bytes, 0:8 >> const char* dbname = UNPACK_STRING(inbuf, 0); int rc = delete_database(dbname); - RETURN_INT(rc, outbuf); + send_rc(d->port, d->port_owner, rc); + RETURN_INT(0, outbuf); } case CMD_TRUNCATE: { FAIL_IF_ASYNC_PENDING(d, outbuf); - - // Fail if a cursor is open - if (d->cursor) - { - RETURN_INT(ERROR_CURSOR_OPEN, outbuf); - } + FAIL_IF_CURSOR_OPEN(d, outbuf); // Inbuf is: <> int dbref = UNPACK_INT(inbuf, 0); @@ -767,7 +817,320 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, WRITE_UNLOCK(G_LOG_RWLOCK); } + *outbuf = 0; + return 0; } + case CMD_DB_STAT: + { + FAIL_IF_ASYNC_PENDING(d, outbuf); + + // Inbuf is << DbRef:32, Flags:32 >> + int dbref = UNPACK_INT(inbuf, 0); + + // Make sure this port currently has dbref open -- if it doesn't, error out. Of note, + // if it's in our list, we don't need to grab the RWLOCK, as we don't have to worry about + // the underlying handle disappearing since we have a reference. + if (has_dbref(d, dbref)) + { + // If the working buffer is large enough, copy the data to put/get into it. Otherwise, realloc + // until it is large enough + if (d->work_buffer_sz < inbuf_sz) + { + d->work_buffer = driver_realloc(d->work_buffer, inbuf_sz); + d->work_buffer_sz = inbuf_sz; + } + + // Copy the payload into place + memcpy(d->work_buffer, inbuf, inbuf_sz); + d->work_buffer_offset = inbuf_sz; + + // Mark the port as busy and then schedule the appropriate async operation + d->async_op = cmd; + d->async_pool = G_TPOOL_GENERAL; + bdberl_tpool_run(d->async_pool, &do_async_stat, d, 0, &d->async_job); + + // Let caller know that the operation is in progress + // Outbuf is: <<0:32>> + RETURN_INT(0, outbuf); + } + else + { + // Invalid dbref + RETURN_INT(ERROR_INVALID_DBREF, outbuf); + } + } + case CMD_DB_STAT_PRINT: + { + FAIL_IF_ASYNC_PENDING(d, outbuf); + + // Inbuf is << DbRef:32, Flags:32 >> + int dbref = UNPACK_INT(inbuf, 0); + + // Make sure this port currently has dbref open -- if it doesn't, error out. Of note, + // if it's in our list, we don't need to grab the RWLOCK, as we don't have to worry about + // the underlying handle disappearing since we have a reference. + if (has_dbref(d, dbref)) + { + DB* db = G_DATABASES[dbref].db; + unsigned int flags = UNPACK_INT(inbuf, 4); + + // Outbuf is <> + // Run the command on the VM thread - this is for debugging only, + // any real monitoring + int rc = db->stat_print(db, flags); + RETURN_INT(rc, outbuf); + } + else + { + // Invalid dbref + RETURN_INT(ERROR_INVALID_DBREF, outbuf); + } + } + case CMD_ENV_STAT_PRINT: + { + FAIL_IF_ASYNC_PENDING(d, outbuf); + + // Inbuf is << Flags:32 >> + unsigned int flags = UNPACK_INT(inbuf, 0); + + // Outbuf is <> + int rc = G_DB_ENV->stat_print(G_DB_ENV, flags); + RETURN_INT(rc, outbuf); + } + case CMD_LOCK_STAT: + { + FAIL_IF_ASYNC_PENDING(d, outbuf); + + // Inbuf is <> + // If the working buffer is large enough, copy the data to put/get into it. Otherwise, realloc + // until it is large enough + if (d->work_buffer_sz < inbuf_sz) + { + d->work_buffer = driver_realloc(d->work_buffer, inbuf_sz); + d->work_buffer_sz = inbuf_sz; + } + + // Copy the payload into place + memcpy(d->work_buffer, inbuf, inbuf_sz); + d->work_buffer_offset = inbuf_sz; + + // Mark the port as busy and then schedule the appropriate async operation + d->async_op = cmd; + d->async_pool = G_TPOOL_GENERAL; + bdberl_tpool_run(d->async_pool, &do_async_lock_stat, d, 0, &d->async_job); + + // Let caller know that the operation is in progress + // Outbuf is: <<0:32>> + RETURN_INT(0, outbuf); + } + case CMD_LOCK_STAT_PRINT: + { + FAIL_IF_ASYNC_PENDING(d, outbuf); + + // Inbuf is << Flags:32 >> + unsigned int flags = UNPACK_INT(inbuf, 0); + + // Outbuf is <> + // Run the command on the VM thread - this is for debugging only, + // any real monitoring will use the async lock_stat + int rc = G_DB_ENV->lock_stat_print(G_DB_ENV, flags); + RETURN_INT(rc, outbuf); + } + case CMD_LOG_STAT: + { + FAIL_IF_ASYNC_PENDING(d, outbuf); + + // Inbuf is <> + // If the working buffer is large enough, copy the data to put/get into it. Otherwise, realloc + // until it is large enough + if (d->work_buffer_sz < inbuf_sz) + { + d->work_buffer = driver_realloc(d->work_buffer, inbuf_sz); + d->work_buffer_sz = inbuf_sz; + } + + // Copy the payload into place + memcpy(d->work_buffer, inbuf, inbuf_sz); + d->work_buffer_offset = inbuf_sz; + + // Mark the port as busy and then schedule the appropriate async operation + d->async_op = cmd; + d->async_pool = G_TPOOL_GENERAL; + bdberl_tpool_run(d->async_pool, &do_async_log_stat, d, 0, &d->async_job); + + // Let caller know that the operation is in progress + // Outbuf is: <<0:32>> + RETURN_INT(0, outbuf); + } + case CMD_LOG_STAT_PRINT: + { + FAIL_IF_ASYNC_PENDING(d, outbuf); + + // Inbuf is << Flags:32 >> + unsigned int flags = UNPACK_INT(inbuf, 0); + + // Outbuf is <> + // Run the command on the VM thread - this is for debugging only, + // any real monitoring will use the async lock_stat + int rc = G_DB_ENV->log_stat_print(G_DB_ENV, flags); + RETURN_INT(rc, outbuf); + } + case CMD_MEMP_STAT: + { + FAIL_IF_ASYNC_PENDING(d, outbuf); + + // Inbuf is <> + // If the working buffer is large enough, copy the data to put/get into it. Otherwise, realloc + // until it is large enough + if (d->work_buffer_sz < inbuf_sz) + { + d->work_buffer = driver_realloc(d->work_buffer, inbuf_sz); + d->work_buffer_sz = inbuf_sz; + } + + // Copy the payload into place + memcpy(d->work_buffer, inbuf, inbuf_sz); + d->work_buffer_offset = inbuf_sz; + + // Mark the port as busy and then schedule the appropriate async operation + d->async_op = cmd; + d->async_pool = G_TPOOL_GENERAL; + bdberl_tpool_run(d->async_pool, &do_async_memp_stat, d, 0, &d->async_job); + + // Let caller know that the operation is in progress + // Outbuf is: <<0:32>> + RETURN_INT(0, outbuf); + } + case CMD_MEMP_STAT_PRINT: + { + FAIL_IF_ASYNC_PENDING(d, outbuf); + + // Inbuf is << Flags:32 >> + unsigned int flags = UNPACK_INT(inbuf, 0); + + // Outbuf is <> + // Run the command on the VM thread - this is for debugging only, + // any real monitoring will use the async lock_stat + int rc = G_DB_ENV->memp_stat_print(G_DB_ENV, flags); + RETURN_INT(rc, outbuf); + } + case CMD_MUTEX_STAT: + { + FAIL_IF_ASYNC_PENDING(d, outbuf); + + // Inbuf is <> + // If the working buffer is large enough, copy the data to put/get into it. Otherwise, realloc + // until it is large enough + if (d->work_buffer_sz < inbuf_sz) + { + d->work_buffer = driver_realloc(d->work_buffer, inbuf_sz); + d->work_buffer_sz = inbuf_sz; + } + + // Copy the payload into place + memcpy(d->work_buffer, inbuf, inbuf_sz); + d->work_buffer_offset = inbuf_sz; + + // Mark the port as busy and then schedule the appropriate async operation + d->async_op = cmd; + d->async_pool = G_TPOOL_GENERAL; + bdberl_tpool_run(d->async_pool, &do_async_mutex_stat, d, 0, &d->async_job); + + // Let caller know that the operation is in progress + // Outbuf is: <<0:32>> + RETURN_INT(0, outbuf); + } + case CMD_MUTEX_STAT_PRINT: + { + FAIL_IF_ASYNC_PENDING(d, outbuf); + + // Inbuf is << Flags:32 >> + unsigned int flags = UNPACK_INT(inbuf, 0); + + // Outbuf is <> + // Run the command on the VM thread - this is for debugging only, + // any real monitoring will use the async lock_stat + int rc = G_DB_ENV->mutex_stat_print(G_DB_ENV, flags); + RETURN_INT(rc, outbuf); + } + case CMD_TXN_STAT: + { + FAIL_IF_ASYNC_PENDING(d, outbuf); + + // Inbuf is <> + // If the working buffer is large enough, copy the data to put/get into it. Otherwise, realloc + // until it is large enough + if (d->work_buffer_sz < inbuf_sz) + { + d->work_buffer = driver_realloc(d->work_buffer, inbuf_sz); + d->work_buffer_sz = inbuf_sz; + } + + // Copy the payload into place + memcpy(d->work_buffer, inbuf, inbuf_sz); + d->work_buffer_offset = inbuf_sz; + + // Mark the port as busy and then schedule the appropriate async operation + d->async_op = cmd; + d->async_pool = G_TPOOL_GENERAL; + bdberl_tpool_run(d->async_pool, &do_async_txn_stat, d, 0, &d->async_job); + + // Let caller know that the operation is in progress + // Outbuf is: <<0:32>> + RETURN_INT(0, outbuf); + } + case CMD_TXN_STAT_PRINT: + { + FAIL_IF_ASYNC_PENDING(d, outbuf); + + // Inbuf is << Flags:32 >> + unsigned int flags = UNPACK_INT(inbuf, 0); + + // Outbuf is <> + // Run the command on the VM thread - this is for debugging only, + // any real monitoring will use the async lock_stat + int rc = G_DB_ENV->txn_stat_print(G_DB_ENV, flags); + RETURN_INT(rc, outbuf); + } + case CMD_DATA_DIRS_INFO: + { + FAIL_IF_ASYNC_PENDING(d, outbuf); + + do_sync_data_dirs_info(d); + + // Let caller know that the operation is in progress + // Outbuf is: <<0:32>> + RETURN_INT(0, outbuf); + } + + case CMD_LOG_DIR_INFO: + { + FAIL_IF_ASYNC_PENDING(d, outbuf); + + // Find the log dir or use DB_HOME - error if not present + const char *lg_dir = NULL; + int rc = G_DB_ENV->get_lg_dir(G_DB_ENV, &lg_dir); + if (0 == rc && NULL == lg_dir) + { + rc = G_DB_ENV->get_home(G_DB_ENV, &lg_dir); + } + // Send info if we can get a dir, otherwise return the error + if (0 == rc) + { + // send a dirinfo message - will send an error message on a NULL lg_dir + send_dir_info(d->port, d->port_owner, lg_dir); + } + else + { + send_rc(d->port, d->port_owner, rc); + } + + // Let caller know that the operation is in progress + // Outbuf is: <<0:32>> + RETURN_INT(0, outbuf); + } + + } *outbuf = 0; return 0; @@ -953,6 +1316,7 @@ static int delete_database(const char* name) DBG("Attempting to delete database: %s\r\n", name); int rc = G_DB_ENV->dbremove(G_DB_ENV, 0, name, 0, DB_AUTO_COMMIT); WRITE_UNLOCK(G_DATABASES_RWLOCK); + return rc; } @@ -998,6 +1362,21 @@ static void get_info(int target, void* values, BinHelper* bh) } break; } + case SYSP_LOG_DIR_GET: + { + const char* dir = 0; + // Get the log dir - according to BDB docs, if not set + // the DB_HOME is used. + int rc = G_DB_ENV->get_lg_dir(G_DB_ENV, &dir); + if (NULL == dir) + { + dir = getenv("DB_HOME"); + } + bin_helper_init(bh); + bin_helper_push_int32(bh, rc); + bin_helper_push_string(bh, dir); + break; + } } } @@ -1012,6 +1391,129 @@ static void async_cleanup(PortData* d) erl_drv_mutex_unlock(d->port_lock); } + +// Convert an rc from BDB into a string suitable for driver_mk_atom +// returns NULL on no match +static char *rc_to_atom_str(int rc) +{ + char *error = erl_errno_id(rc); + //fprintf(stderr, "erl_errno_id(%d) = %s db_strerror = %s\n", rc, error, db_strerror(rc)); + if (NULL != error && strcmp("unknown", error) != 0) + { + return error; + } + else + { + switch (rc) + { + // bdberl driver errors + case ERROR_ASYNC_PENDING: return "async_pending"; + case ERROR_INVALID_DBREF: return "invalid_db"; + case ERROR_TXN_OPEN: return "transaction_open"; + case ERROR_NO_TXN: return "no_txn"; + case ERROR_CURSOR_OPEN: return "cursor_open"; + case ERROR_NO_CURSOR: return "no_cursor"; + // bonafide BDB errors + case DB_BUFFER_SMALL: return "buffer_small"; + case DB_DONOTINDEX: return "do_not_index"; + case DB_FOREIGN_CONFLICT: return "foreign_conflict"; + case DB_KEYEMPTY: return "key_empty"; + case DB_KEYEXIST: return "key_exist"; + case DB_LOCK_DEADLOCK: return "deadlock"; + case DB_LOCK_NOTGRANTED: return "lock_not_granted"; + case DB_LOG_BUFFER_FULL: return "log_buffer_full"; + case DB_NOTFOUND: return "not_found"; + case DB_OLD_VERSION: return "old_version"; + case DB_PAGE_NOTFOUND: return "page_not_found"; + case DB_RUNRECOVERY: return "run_recovery"; + case DB_VERIFY_BAD: return "verify_bad"; + case DB_VERSION_MISMATCH: return "version_mismatch"; + default: return NULL; + } + } +} + + +// Send a {dirinfo, Path, FsId, MbyteAvail} message to pid given. +// Send an {errno, Reason} on failure +// returns 0 on success, errno on failure +static int send_dir_info(ErlDrvPort port, ErlDrvTermData pid, const char *path) +{ + struct statvfs svfs; + int rc; + + if (NULL == path) + { + rc = EINVAL; + } + else if (0 != statvfs(path, &svfs)) + { + rc = errno; + } + else + { + rc = 0; + } + + if (0 != rc) + { + send_rc(port, pid, rc); + } + else + { + fsblkcnt_t blocks_per_mbyte = 1024 * 1024 / svfs.f_frsize; + assert(blocks_per_mbyte > 0); + unsigned int mbyte_avail = (unsigned int) (svfs.f_bavail / blocks_per_mbyte); + int path_len = strlen(path); + + ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("dirinfo"), + ERL_DRV_STRING, (ErlDrvTermData) path, path_len, + // send fsid as a binary as will only be used + // to compare which physical filesystem is on + // and the definintion varies between platforms. + ERL_DRV_BUF2BINARY, (ErlDrvTermData) &svfs.f_fsid, + sizeof(svfs.f_fsid), + ERL_DRV_UINT, mbyte_avail, + ERL_DRV_TUPLE, 4}; + driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); + } + return rc; +} + + +static void send_rc(ErlDrvPort port, ErlDrvTermData pid, int rc) +{ + // TODO: May need to tag the messages a bit more explicitly so that if another async + // job runs to completion before the message gets delivered we don't mis-interpret this + // response code. + if (rc == 0) + { + ErlDrvTermData response[] = {ERL_DRV_ATOM, driver_mk_atom("ok")}; + driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); + } + else + { + // See if this is a standard errno that we have an erlang code for + char *error = rc_to_atom_str(rc); + if (NULL != error) + { + ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"), + ERL_DRV_ATOM, driver_mk_atom(error), + ERL_DRV_TUPLE, 2}; + driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); + } + else + { + ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"), + ERL_DRV_ATOM, driver_mk_atom("unknown"), + ERL_DRV_INT, rc, + ERL_DRV_TUPLE, 2, + ERL_DRV_TUPLE, 2}; + driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); + } + } +} + static void async_cleanup_and_send_rc(PortData* d, int rc) { // Save the port and pid references -- we need copies independent from the PortData @@ -1023,22 +1525,7 @@ static void async_cleanup_and_send_rc(PortData* d, int rc) ErlDrvTermData pid = d->port_owner; async_cleanup(d); - - // TODO: May need to tag the messages a bit more explicitly so that if another async - // job runs to completion before the message gets delivered we don't mis-interpret this - // response code. - if (rc == 0) - { - ErlDrvTermData response[] = {ERL_DRV_ATOM, driver_mk_atom("ok")}; - driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); - } - else - { - ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"), - ERL_DRV_INT, rc, - ERL_DRV_TUPLE, 2}; - driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); - } + send_rc(port, pid, rc); } static void async_cleanup_and_send_kv(PortData* d, int rc, DBT* key, DBT* value) @@ -1076,6 +1563,525 @@ static void async_cleanup_and_send_kv(PortData* d, int rc, DBT* key, DBT* value) } } + + +#define BT_STATS_TUPLE(base, member) \ + ERL_DRV_ATOM, driver_mk_atom(#member), \ + ERL_DRV_UINT, (base)->bt_##member, \ + ERL_DRV_TUPLE, 2 +static void async_cleanup_and_send_btree_stats(PortData* d, char *type, DB_BTREE_STAT *bsp) +{ + // Save the port and pid references -- we need copies independent from the PortData + // structure. Once we release the port_lock after clearing the cmd, it's possible that + // the port could go away without waiting on us to finish. This is acceptable, but we need + // to be certain that there is no overlap of data between the two threads. driver_send_term + // is safe to use from a thread, even if the port you're sending from has already expired. + ErlDrvPort port = d->port; + ErlDrvTermData pid = d->port_owner; + async_cleanup(d); + + ErlDrvTermData response[] = { + ERL_DRV_ATOM, driver_mk_atom("ok"), + // Start of list + ERL_DRV_ATOM, driver_mk_atom("type"), + ERL_DRV_ATOM, driver_mk_atom(type), + ERL_DRV_TUPLE, 2, + BT_STATS_TUPLE(bsp, magic), /* Magic number. */ + BT_STATS_TUPLE(bsp, version), /* Version number. */ + BT_STATS_TUPLE(bsp, metaflags), /* Metadata flags. */ + BT_STATS_TUPLE(bsp, nkeys), /* Number of unique keys. */ + BT_STATS_TUPLE(bsp, ndata), /* Number of data items. */ + BT_STATS_TUPLE(bsp, pagecnt), /* Page count. */ + BT_STATS_TUPLE(bsp, pagesize), /* Page size. */ + BT_STATS_TUPLE(bsp, minkey), /* Minkey value. */ + BT_STATS_TUPLE(bsp, re_len), /* Fixed-length record length. */ + BT_STATS_TUPLE(bsp, re_pad), /* Fixed-length record pad. */ + BT_STATS_TUPLE(bsp, levels), /* Tree levels. */ + BT_STATS_TUPLE(bsp, int_pg), /* Internal pages. */ + BT_STATS_TUPLE(bsp, leaf_pg), /* Leaf pages. */ + BT_STATS_TUPLE(bsp, dup_pg), /* Duplicate pages. */ + BT_STATS_TUPLE(bsp, over_pg), /* Overflow pages. */ + BT_STATS_TUPLE(bsp, empty_pg), /* Empty pages. */ + BT_STATS_TUPLE(bsp, free), /* Pages on the free list. */ + BT_STATS_TUPLE(bsp, int_pgfree), /* Bytes free in internal pages. */ + BT_STATS_TUPLE(bsp, leaf_pgfree), /* Bytes free in leaf pages. */ + BT_STATS_TUPLE(bsp, dup_pgfree), /* Bytes free in duplicate pages. */ + BT_STATS_TUPLE(bsp, over_pgfree), /* Bytes free in overflow pages. */ + // End of list + ERL_DRV_NIL, + ERL_DRV_LIST, 21+2, + ERL_DRV_TUPLE, 2 + }; + driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); +} +#undef BT_STATS_TUPLE + + +#define HASH_STATS_TUPLE(base, member) \ + ERL_DRV_ATOM, driver_mk_atom(#member), \ + ERL_DRV_UINT, (base)->hash_##member, \ + ERL_DRV_TUPLE, 2 + +static void async_cleanup_and_send_hash_stats(PortData* d, DB_HASH_STAT *hsp) +{ + // Save the port and pid references -- we need copies independent from the PortData + // structure. Once we release the port_lock after clearing the cmd, it's possible that + // the port could go away without waiting on us to finish. This is acceptable, but we need + // to be certain that there is no overlap of data between the two threads. driver_send_term + // is safe to use from a thread, even if the port you're sending from has already expired. + ErlDrvPort port = d->port; + ErlDrvTermData pid = d->port_owner; + async_cleanup(d); + + ErlDrvTermData response[] = { + ERL_DRV_ATOM, driver_mk_atom("ok"), + // Start of list + ERL_DRV_ATOM, driver_mk_atom("type"), + ERL_DRV_ATOM, driver_mk_atom("hash"), + ERL_DRV_TUPLE, 2, + HASH_STATS_TUPLE(hsp, magic), /* Magic number. */ + HASH_STATS_TUPLE(hsp, version), /* Version number. */ + HASH_STATS_TUPLE(hsp, metaflags), /* Metadata flags. */ + HASH_STATS_TUPLE(hsp, nkeys), /* Number of unique keys. */ + HASH_STATS_TUPLE(hsp, ndata), /* Number of data items. */ + HASH_STATS_TUPLE(hsp, pagecnt), /* Page count. */ + HASH_STATS_TUPLE(hsp, pagesize), /* Page size. */ + HASH_STATS_TUPLE(hsp, ffactor), /* Fill factor specified at create. */ + HASH_STATS_TUPLE(hsp, buckets), /* Number of hash buckets. */ + HASH_STATS_TUPLE(hsp, free), /* Pages on the free list. */ + HASH_STATS_TUPLE(hsp, bfree), /* Bytes free on bucket pages. */ + HASH_STATS_TUPLE(hsp, bigpages), /* Number of big key/data pages. */ + HASH_STATS_TUPLE(hsp, big_bfree), /* Bytes free on big item pages. */ + HASH_STATS_TUPLE(hsp, overflows), /* Number of overflow pages. */ + HASH_STATS_TUPLE(hsp, ovfl_free), /* Bytes free on ovfl pages. */ + HASH_STATS_TUPLE(hsp, dup), /* Number of dup pages. */ + HASH_STATS_TUPLE(hsp, dup_free), /* Bytes free on duplicate pages. */ + // End of list + ERL_DRV_NIL, + ERL_DRV_LIST, 17+2, + ERL_DRV_TUPLE, 2 + }; + driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); +} +#undef HASH_STATS_TUPLE + +#ifdef ENABLE_QUEUE // If we ever decide to support Queues + +#define QS_STATS_TUPLE(base, member) \ + ERL_DRV_ATOM, driver_mk_atom(#member), \ + ERL_DRV_UINT, (base)->qs_##member, \ + ERL_DRV_TUPLE, 2 +static void async_cleanup_and_send_queue_stats(PortData* d, DB_QUEUE_STAT *qsp) +{ + // Save the port and pid references -- we need copies independent from the PortData + // structure. Once we release the port_lock after clearing the cmd, it's possible that + // the port could go away without waiting on us to finish. This is acceptable, but we need + // to be certain that there is no overlap of data between the two threads. driver_send_term + // is safe to use from a thread, even if the port you're sending from has already expired. + ErlDrvPort port = d->port; + ErlDrvTermData pid = d->port_owner; + async_cleanup(d); + + ErlDrvTermData response[] = { + ERL_DRV_ATOM, driver_mk_atom("ok"), + // Start of list + ERL_DRV_ATOM, driver_mk_atom("type"), + ERL_DRV_ATOM, driver_mk_atom("queue"), + ERL_DRV_TUPLE, 2, + QS_STAT_TUPLE(qsp, qs_magic), /* Magic number. */ + QS_STAT_TUPLE(qsp, version), /* Version number. */ + QS_STAT_TUPLE(qsp, metaflags), /* Metadata flags. */ + QS_STAT_TUPLE(qsp, nkeys), /* Number of unique keys. */ + QS_STAT_TUPLE(qsp, ndata), /* Number of data items. */ + QS_STAT_TUPLE(qsp, pagesize), /* Page size. */ + QS_STAT_TUPLE(qsp, extentsize), /* Pages per extent. */ + QS_STAT_TUPLE(qsp, pages), /* Data pages. */ + QS_STAT_TUPLE(qsp, re_len), /* Fixed-length record length. */ + QS_STAT_TUPLE(qsp, re_pad), /* Fixed-length record pad. */ + QS_STAT_TUPLE(qsp, pgfree), /* Bytes free in data pages. */ + QS_STAT_TUPLE(qsp, first_recno), /* First not deleted record. */ + QS_STAT_TUPLE(qsp, cur_recno), /* Next available record number. */ + // End of list + ERL_DRV_NIL, + ERL_DRV_LIST, 13+2, + ERL_DRV_TUPLE, 2 + }; + driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); +} +#undef QUEUE_STATS_TUPLE +#endif // ENABLE_QUEUE + +#define ST_STATS_TUPLE(base, member) \ + ERL_DRV_ATOM, driver_mk_atom(#member), \ + ERL_DRV_UINT, (base)->st_##member, \ + ERL_DRV_TUPLE, 2 + +#define ST_STATS_INT_TUPLE(base, member) \ + ERL_DRV_ATOM, driver_mk_atom(#member), \ + ERL_DRV_INT, (base)->st_##member, \ + ERL_DRV_TUPLE, 2 + +static void async_cleanup_and_send_lock_stats(PortData* d, DB_LOCK_STAT *lsp) +{ + // Save the port and pid references -- we need copies independent from the PortData + // structure. Once we release the port_lock after clearing the cmd, it's possible that + // the port could go away without waiting on us to finish. This is acceptable, but we need + // to be certain that there is no overlap of data between the two threads. driver_send_term + // is safe to use from a thread, even if the port you're sending from has already expired. + ErlDrvPort port = d->port; + ErlDrvTermData pid = d->port_owner; + async_cleanup(d); + + ErlDrvTermData response[] = { + ERL_DRV_ATOM, driver_mk_atom("ok"), + // Start of list + ST_STATS_TUPLE(lsp, id), /* Last allocated locker ID. */ + ST_STATS_TUPLE(lsp, cur_maxid), /* Current maximum unused ID. */ + ST_STATS_TUPLE(lsp, maxlocks), /* Maximum number of locks in table. */ + ST_STATS_TUPLE(lsp, maxlockers), /* Maximum num of lockers in table. */ + ST_STATS_TUPLE(lsp, maxobjects), /* Maximum num of objects in table. */ + ST_STATS_TUPLE(lsp, partitions), /* number of partitions. */ + ST_STATS_INT_TUPLE(lsp, nmodes), /* Number of lock modes. */ + ST_STATS_TUPLE(lsp, nlockers), /* Current number of lockers. */ + ST_STATS_TUPLE(lsp, nlocks), /* Current number of locks. */ + ST_STATS_TUPLE(lsp, maxnlocks), /* Maximum number of locks so far. */ + ST_STATS_TUPLE(lsp, maxhlocks), /* Maximum number of locks in any bucket. */ + ST_STATS_TUPLE(lsp, locksteals), /* Number of lock steals so far. */ + ST_STATS_TUPLE(lsp, maxlsteals), /* Maximum number steals in any partition. */ + ST_STATS_TUPLE(lsp, maxnlockers), /* Maximum number of lockers so far. */ + ST_STATS_TUPLE(lsp, nobjects), /* Current number of objects. */ + ST_STATS_TUPLE(lsp, maxnobjects), /* Maximum number of objects so far. */ + ST_STATS_TUPLE(lsp, maxhobjects), /* Maximum number of objectsin any bucket. */ + ST_STATS_TUPLE(lsp, objectsteals), /* Number of objects steals so far. */ + ST_STATS_TUPLE(lsp, maxosteals), /* Maximum number of steals in any partition. */ + ST_STATS_TUPLE(lsp, nrequests), /* Number of lock gets. */ + ST_STATS_TUPLE(lsp, nreleases), /* Number of lock puts. */ + ST_STATS_TUPLE(lsp, nupgrade), /* Number of lock upgrades. */ + ST_STATS_TUPLE(lsp, ndowngrade), /* Number of lock downgrades. */ + ST_STATS_TUPLE(lsp, lock_wait), /* Lock conflicts w/ subsequent wait */ + ST_STATS_TUPLE(lsp, lock_nowait), /* Lock conflicts w/o subsequent wait */ + ST_STATS_TUPLE(lsp, ndeadlocks), /* Number of lock deadlocks. */ + ST_STATS_TUPLE(lsp, locktimeout), /* Lock timeout. */ + ST_STATS_TUPLE(lsp, nlocktimeouts), /* Number of lock timeouts. */ + ST_STATS_TUPLE(lsp, txntimeout), /* Transaction timeout. */ + ST_STATS_TUPLE(lsp, ntxntimeouts), /* Number of transaction timeouts. */ + ST_STATS_TUPLE(lsp, part_wait), /* Partition lock granted after wait. */ + ST_STATS_TUPLE(lsp, part_nowait), /* Partition lock granted without wait. */ + ST_STATS_TUPLE(lsp, part_max_wait), /* Max partition lock granted after wait. */ + ST_STATS_TUPLE(lsp, part_max_nowait), /* Max partition lock granted without wait. */ + ST_STATS_TUPLE(lsp, objs_wait), /* Object lock granted after wait. */ + ST_STATS_TUPLE(lsp, objs_nowait), /* Object lock granted without wait. */ + ST_STATS_TUPLE(lsp, lockers_wait), /* Locker lock granted after wait. */ + ST_STATS_TUPLE(lsp, lockers_nowait),/* Locker lock granted without wait. */ + ST_STATS_TUPLE(lsp, region_wait), /* Region lock granted after wait. */ + ST_STATS_TUPLE(lsp, region_nowait), /* Region lock granted without wait. */ + ST_STATS_TUPLE(lsp, hash_len), /* Max length of bucket. */ + ST_STATS_TUPLE(lsp, regsize), /* Region size. - will have to cast to uint */ + + // End of list + ERL_DRV_NIL, + ERL_DRV_LIST, 42+1, + ERL_DRV_TUPLE, 2 + }; + driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); +} + +static void async_cleanup_and_send_log_stats(PortData* d, DB_LOG_STAT *lsp) +{ + // Save the port and pid references -- we need copies independent from the PortData + // structure. Once we release the port_lock after clearing the cmd, it's possible that + // the port could go away without waiting on us to finish. This is acceptable, but we need + // to be certain that there is no overlap of data between the two threads. driver_send_term + // is safe to use from a thread, even if the port you're sending from has already expired. + ErlDrvPort port = d->port; + ErlDrvTermData pid = d->port_owner; + async_cleanup(d); + + ErlDrvTermData response[] = { + ERL_DRV_ATOM, driver_mk_atom("ok"), + // Start of list + ST_STATS_TUPLE(lsp, magic), /* Log file magic number. */ + ST_STATS_TUPLE(lsp, version), /* Log file version number. */ + ST_STATS_INT_TUPLE(lsp, mode), /* Log file permissions mode. */ + ST_STATS_TUPLE(lsp, lg_bsize), /* Log buffer size. */ + ST_STATS_TUPLE(lsp, lg_size), /* Log file size. */ + ST_STATS_TUPLE(lsp, wc_bytes), /* Bytes to log since checkpoint. */ + ST_STATS_TUPLE(lsp, wc_mbytes), /* Megabytes to log since checkpoint. */ + ST_STATS_TUPLE(lsp, record), /* Records entered into the log. */ + ST_STATS_TUPLE(lsp, w_bytes), /* Bytes to log. */ + ST_STATS_TUPLE(lsp, w_mbytes), /* Megabytes to log. */ + ST_STATS_TUPLE(lsp, wcount), /* Total I/O writes to the log. */ + ST_STATS_TUPLE(lsp, wcount_fill),/* Overflow writes to the log. */ + ST_STATS_TUPLE(lsp, rcount), /* Total I/O reads from the log. */ + ST_STATS_TUPLE(lsp, scount), /* Total syncs to the log. */ + ST_STATS_TUPLE(lsp, region_wait), /* Region lock granted after wait. */ + ST_STATS_TUPLE(lsp, region_nowait), /* Region lock granted without wait. */ + ST_STATS_TUPLE(lsp, cur_file), /* Current log file number. */ + ST_STATS_TUPLE(lsp, cur_offset),/* Current log file offset. */ + ST_STATS_TUPLE(lsp, disk_file), /* Known on disk log file number. */ + ST_STATS_TUPLE(lsp, disk_offset), /* Known on disk log file offset. */ + ST_STATS_TUPLE(lsp, maxcommitperflush), /* Max number of commits in a flush. */ + ST_STATS_TUPLE(lsp, mincommitperflush), /* Min number of commits in a flush. */ + ST_STATS_TUPLE(lsp, regsize), /* Region size. */ + + // End of list + ERL_DRV_NIL, + ERL_DRV_LIST, 23+1, + ERL_DRV_TUPLE, 2 + }; + driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); +} + +static void send_mpool_fstat(ErlDrvPort port, ErlDrvTermData pid, DB_MPOOL_FSTAT *fsp) +{ + char *name = fsp->file_name ? fsp->file_name : ""; + int name_len = strlen(name); + ErlDrvTermData response[] = { + ERL_DRV_ATOM, driver_mk_atom("fstat"), + // Start of list + ERL_DRV_ATOM, driver_mk_atom("name"), + ERL_DRV_STRING, (ErlDrvTermData) name, name_len, + ERL_DRV_TUPLE, 2, + ST_STATS_TUPLE(fsp, map), /* Pages from mapped files. */ + ST_STATS_TUPLE(fsp, cache_hit), /* Pages found in the cache. */ + ST_STATS_TUPLE(fsp, cache_miss), /* Pages not found in the cache. */ + ST_STATS_TUPLE(fsp, page_create), /* Pages created in the cache. */ + ST_STATS_TUPLE(fsp, page_in), /* Pages read in. */ + ST_STATS_TUPLE(fsp, page_out), /* Pages written out. */ + // End of list + ERL_DRV_NIL, + ERL_DRV_LIST, 7+1, + ERL_DRV_TUPLE, 2 + }; + driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); +} + +static void async_cleanup_and_send_memp_stats(PortData* d, DB_MPOOL_STAT *gsp, + DB_MPOOL_FSTAT **fsp) +{ + // Save the port and pid references -- we need copies independent from the PortData + // structure. Once we release the port_lock after clearing the cmd, it's possible that + // the port could go away without waiting on us to finish. This is acceptable, but we need + // to be certain that there is no overlap of data between the two threads. driver_send_term + // is safe to use from a thread, even if the port you're sending from has already expired. + ErlDrvPort port = d->port; + ErlDrvTermData pid = d->port_owner; + async_cleanup(d); + + // First send the per-file stats + int i; + for (i = 0; fsp != NULL && fsp[i] != NULL; i++) + { + send_mpool_fstat(port, pid, fsp[i]); + } + + // Then send the global stats + ErlDrvTermData response[] = { + ERL_DRV_ATOM, driver_mk_atom("ok"), + // Start of list + ST_STATS_TUPLE(gsp, gbytes), /* Total cache size: GB. */ + ST_STATS_TUPLE(gsp, bytes), /* Total cache size: B. */ + ST_STATS_TUPLE(gsp, ncache), /* Number of cache regions. */ + ST_STATS_TUPLE(gsp, max_ncache), /* Maximum number of regions. */ + ST_STATS_INT_TUPLE(gsp, mmapsize), /* Maximum file size for mmap. */ + ST_STATS_INT_TUPLE(gsp, maxopenfd), /* Maximum number of open fd's. */ + ST_STATS_INT_TUPLE(gsp, maxwrite), /* Maximum buffers to write. */ + ST_STATS_TUPLE(gsp, maxwrite_sleep), /* Sleep after writing max buffers. */ + ST_STATS_TUPLE(gsp, pages), /* Total number of pages. */ + ST_STATS_TUPLE(gsp, map), /* Pages from mapped files. */ + ST_STATS_TUPLE(gsp, cache_hit), /* Pages found in the cache. */ + ST_STATS_TUPLE(gsp, cache_miss), /* Pages not found in the cache. */ + ST_STATS_TUPLE(gsp, page_create), /* Pages created in the cache. */ + ST_STATS_TUPLE(gsp, page_in), /* Pages read in. */ + ST_STATS_TUPLE(gsp, page_out), /* Pages written out. */ + ST_STATS_TUPLE(gsp, ro_evict), /* Clean pages forced from the cache. */ + ST_STATS_TUPLE(gsp, rw_evict), /* Dirty pages forced from the cache. */ + ST_STATS_TUPLE(gsp, page_trickle), /* Pages written by memp_trickle. */ + ST_STATS_TUPLE(gsp, page_clean), /* Clean pages. */ + ST_STATS_TUPLE(gsp, page_dirty), /* Dirty pages. */ + ST_STATS_TUPLE(gsp, hash_buckets), /* Number of hash buckets. */ + ST_STATS_TUPLE(gsp, hash_searches), /* Total hash chain searches. */ + ST_STATS_TUPLE(gsp, hash_longest), /* Longest hash chain searched. */ + ST_STATS_TUPLE(gsp, hash_examined), /* Total hash entries searched. */ + ST_STATS_TUPLE(gsp, hash_nowait), /* Hash lock granted with nowait. */ + ST_STATS_TUPLE(gsp, hash_wait), /* Hash lock granted after wait. */ + ST_STATS_TUPLE(gsp, hash_max_nowait), /* Max hash lock granted with nowait. */ + ST_STATS_TUPLE(gsp, hash_max_wait), /* Max hash lock granted after wait. */ + ST_STATS_TUPLE(gsp, region_nowait), /* Region lock granted with nowait. */ + ST_STATS_TUPLE(gsp, region_wait), /* Region lock granted after wait. */ + ST_STATS_TUPLE(gsp, mvcc_frozen), /* Buffers frozen. */ + ST_STATS_TUPLE(gsp, mvcc_thawed), /* Buffers thawed. */ + ST_STATS_TUPLE(gsp, mvcc_freed), /* Frozen buffers freed. */ + ST_STATS_TUPLE(gsp, alloc), /* Number of page allocations. */ + ST_STATS_TUPLE(gsp, alloc_buckets), /* Buckets checked during allocation. */ + ST_STATS_TUPLE(gsp, alloc_max_buckets), /* Max checked during allocation. */ + ST_STATS_TUPLE(gsp, alloc_pages), /* Pages checked during allocation. */ + ST_STATS_TUPLE(gsp, alloc_max_pages), /* Max checked during allocation. */ + ST_STATS_TUPLE(gsp, io_wait), /* Thread waited on buffer I/O. */ + ST_STATS_TUPLE(gsp, regsize), /* Region size. */ + + // End of list + ERL_DRV_NIL, + ERL_DRV_LIST, 40+1, + ERL_DRV_TUPLE, 2 + }; + driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); +} + + +static void async_cleanup_and_send_mutex_stats(PortData* d, DB_MUTEX_STAT *msp) +{ + // Save the port and pid references -- we need copies independent from the PortData + // structure. Once we release the port_lock after clearing the cmd, it's possible that + // the port could go away without waiting on us to finish. This is acceptable, but we need + // to be certain that there is no overlap of data between the two threads. driver_send_term + // is safe to use from a thread, even if the port you're sending from has already expired. + ErlDrvPort port = d->port; + ErlDrvTermData pid = d->port_owner; + async_cleanup(d); + + ErlDrvTermData response[] = { + ERL_DRV_ATOM, driver_mk_atom("ok"), + // Start of list + ST_STATS_TUPLE(msp, mutex_align), /* Mutex alignment */ + ST_STATS_TUPLE(msp, mutex_tas_spins), /* Mutex test-and-set spins */ + ST_STATS_TUPLE(msp, mutex_cnt), /* Mutex count */ + ST_STATS_TUPLE(msp, mutex_free), /* Available mutexes */ + ST_STATS_TUPLE(msp, mutex_inuse), /* Mutexes in use */ + ST_STATS_TUPLE(msp, mutex_inuse_max), /* Maximum mutexes ever in use */ + ST_STATS_TUPLE(msp, region_wait), /* Region lock granted after wait. */ + ST_STATS_TUPLE(msp, region_nowait), /* Region lock granted without wait. */ + ST_STATS_TUPLE(msp, regsize), /* Region size. */ + // End of list + ERL_DRV_NIL, + ERL_DRV_LIST, 9+1, + ERL_DRV_TUPLE, 2 + }; + driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); +} + +#define STATS_TUPLE(base, member) \ + ERL_DRV_ATOM, driver_mk_atom(#member), \ + ERL_DRV_UINT, (base)->member, \ + ERL_DRV_TUPLE, 2 + +#define STATS_LSN_TUPLE(base, member) \ + ERL_DRV_ATOM, driver_mk_atom(#member), \ + ERL_DRV_UINT, (base)->member.file, \ + ERL_DRV_UINT, (base)->member.offset, \ + ERL_DRV_TUPLE, 2, \ + ERL_DRV_TUPLE, 2 + +static void send_txn_tstat(ErlDrvPort port, ErlDrvTermData pid, DB_TXN_ACTIVE *tasp) +{ + char *name = tasp->name ? tasp->name : ""; + int name_len = strlen(name); + char tid_str[32]; + char *status_str; + switch (tasp->status) + { + case TXN_ABORTED: + status_str = "aborted"; + break; + case TXN_COMMITTED: + status_str = "committed"; + break; + case TXN_PREPARED: + status_str = "prepared"; + break; + case TXN_RUNNING: + status_str = "running"; + break; + default: + status_str = "undefined"; + break; + } + + int tid_str_len = snprintf(tid_str, sizeof(tid_str), "%lu", (unsigned long) tasp->tid); + + ErlDrvTermData response[] = { + ERL_DRV_ATOM, driver_mk_atom("txn"), + STATS_TUPLE(tasp, txnid), /* Transaction ID */ + STATS_TUPLE(tasp, parentid), /* Transaction ID of parent */ + STATS_TUPLE(tasp, pid), /* Process owning txn ID - pid_t */ + ERL_DRV_ATOM, driver_mk_atom("tid"),/* OSX has 32-bit ints in erlang, so return as */ + ERL_DRV_STRING, (ErlDrvTermData) tid_str, tid_str_len, /* a string */ + ERL_DRV_TUPLE, 2, + STATS_LSN_TUPLE(tasp, lsn), /* LSN when transaction began */ + STATS_LSN_TUPLE(tasp, read_lsn), /* Read LSN for MVCC */ + STATS_TUPLE(tasp, mvcc_ref), /* MVCC reference count */ + + // Start of list + ERL_DRV_ATOM, driver_mk_atom("status"), + ERL_DRV_ATOM, driver_mk_atom(status_str), + ERL_DRV_TUPLE, 2, + + ERL_DRV_ATOM, driver_mk_atom("name"), + ERL_DRV_STRING, (ErlDrvTermData) name, name_len, + ERL_DRV_TUPLE, 2, + + + // End of list + ERL_DRV_NIL, + ERL_DRV_LIST, 9+1, + ERL_DRV_TUPLE, 2 + }; + driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); +} + +#define ST_STATS_LSN_TUPLE(base, member) \ + ERL_DRV_ATOM, driver_mk_atom(#member), \ + ERL_DRV_UINT, (base)->st_##member.file, \ + ERL_DRV_UINT, (base)->st_##member.offset, \ + ERL_DRV_TUPLE, 2, \ + ERL_DRV_TUPLE, 2 + +static void async_cleanup_and_send_txn_stats(PortData* d, DB_TXN_STAT *tsp) +{ + // Save the port and pid references -- we need copies independent from the PortData + // structure. Once we release the port_lock after clearing the cmd, it's possible that + // the port could go away without waiting on us to finish. This is acceptable, but we need + // to be certain that there is no overlap of data between the two threads. driver_send_term + // is safe to use from a thread, even if the port you're sending from has already expired. + ErlDrvPort port = d->port; + ErlDrvTermData pid = d->port_owner; + async_cleanup(d); + + // First send the array of active transactions */ + int i; + for (i = 0; i < tsp->st_nactive; i++) + { + send_txn_tstat(port, pid, tsp->st_txnarray+i); + } + + // Then send the global stats + ErlDrvTermData response[] = { + ERL_DRV_ATOM, driver_mk_atom("ok"), + // Start of list + ST_STATS_TUPLE(tsp, nrestores), /* number of restored transactions + after recovery. */ + ST_STATS_LSN_TUPLE(tsp, last_ckp), /* lsn of the last checkpoint */ + ST_STATS_TUPLE(tsp, time_ckp), /* time of last checkpoint (time_t to uint) */ + ST_STATS_TUPLE(tsp, last_txnid), /* last transaction id given out */ + ST_STATS_TUPLE(tsp, maxtxns), /* maximum txns possible */ + ST_STATS_TUPLE(tsp, naborts), /* number of aborted transactions */ + ST_STATS_TUPLE(tsp, nbegins), /* number of begun transactions */ + ST_STATS_TUPLE(tsp, ncommits), /* number of committed transactions */ + ST_STATS_TUPLE(tsp, nactive), /* number of active transactions */ + ST_STATS_TUPLE(tsp, nsnapshot), /* number of snapshot transactions */ + ST_STATS_TUPLE(tsp, maxnactive), /* maximum active transactions */ + ST_STATS_TUPLE(tsp, maxnsnapshot), /* maximum snapshot transactions */ + ST_STATS_TUPLE(tsp, region_wait), /* Region lock granted after wait. */ + ST_STATS_TUPLE(tsp, region_nowait), /* Region lock granted without wait. */ + ST_STATS_TUPLE(tsp, regsize), /* Region size. */ + // End of list + ERL_DRV_NIL, + ERL_DRV_LIST, 15+1, + ERL_DRV_TUPLE, 2 + }; + driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); +} + static void do_async_put(void* arg) { // Payload is: <> @@ -1294,6 +2300,261 @@ static void do_async_truncate(void* arg) } +static void do_async_stat(void* arg) +{ + // Payload is: << DbRef:32, Flags:32 >> + PortData* d = (PortData*)arg; + + // Get the database object, using the provided ref + int dbref = UNPACK_INT(d->work_buffer, 0); + DB* db = G_DATABASES[dbref].db; + DBTYPE type = DB_UNKNOWN; + int rc = db->get_type(db, &type); + if (rc != 0) + { + async_cleanup_and_send_rc(d, rc); + return; + } + + // Extract operation flags + unsigned flags = UNPACK_INT(d->work_buffer, 4); + + void *sp = NULL; + rc = db->stat(db, d->txn, &sp, flags); + if (rc != 0 || sp == NULL) + { + async_cleanup_and_send_rc(d, rc); + } + else + { + switch(type) + { + case DB_BTREE: /*FALLTHRU*/ + case DB_RECNO: + async_cleanup_and_send_btree_stats(d, type == DB_BTREE ? "btree" :"recno", sp); + break; + case DB_HASH: + async_cleanup_and_send_hash_stats(d, sp); + break; +#ifdef ENABLE_QUEUE + case DB_QUEUE: + async_cleanup_and_send_queue_stats(d, sp); + break; +#endif + default: + async_cleanup_and_send_rc(d, ERROR_INVALID_DB_TYPE); + break; + } + } + + // Finally, clean up value buffer (driver_send_term made a copy) + if (NULL != sp) + { + free(sp); + } +} + +static void do_async_lock_stat(void* arg) +{ + // Payload is: <> + PortData* d = (PortData*)arg; + + // Extract operation flags + unsigned flags = UNPACK_INT(d->work_buffer, 0); + + DB_LOCK_STAT *lsp = NULL; + int rc = G_DB_ENV->lock_stat(G_DB_ENV, &lsp, flags); + if (rc != 0 || lsp == NULL) + { + async_cleanup_and_send_rc(d, rc); + } + else + { + async_cleanup_and_send_lock_stats(d, lsp); + } + + // Finally, clean up lock stats + if (NULL != lsp) + { + free(lsp); + } +} + +static void do_async_log_stat(void* arg) +{ + // Payload is: <> + PortData* d = (PortData*)arg; + + // Extract operation flags + unsigned flags = UNPACK_INT(d->work_buffer, 0); + + DB_LOG_STAT *lsp = NULL; + int rc = G_DB_ENV->log_stat(G_DB_ENV, &lsp, flags); + if (rc != 0 || lsp == NULL) + { + async_cleanup_and_send_rc(d, rc); + } + else + { + async_cleanup_and_send_log_stats(d, lsp); + } + + // Finally, clean up stats + if (NULL != lsp) + { + free(lsp); + } +} + +static void do_async_memp_stat(void* arg) +{ + // Payload is: <> + PortData* d = (PortData*)arg; + + // Extract operation flags + unsigned flags = UNPACK_INT(d->work_buffer, 0); + + DB_MPOOL_STAT *gsp = NULL; + DB_MPOOL_FSTAT **fsp = NULL; + int rc = G_DB_ENV->memp_stat(G_DB_ENV, &gsp, &fsp, flags); + if (rc != 0 || gsp == NULL) + { + async_cleanup_and_send_rc(d, rc); + } + else + { + async_cleanup_and_send_memp_stats(d, gsp, fsp); + } + + // Finally, clean up stats + if (NULL != gsp) + { + free(gsp); + } + if (NULL != fsp) + { + free(fsp); + } +} + +static void do_async_mutex_stat(void* arg) +{ + // Payload is: <> + PortData* d = (PortData*)arg; + + // Extract operation flags + unsigned flags = UNPACK_INT(d->work_buffer, 0); + + DB_MUTEX_STAT *msp = NULL; + int rc = G_DB_ENV->mutex_stat(G_DB_ENV, &msp, flags); + if (rc != 0 || msp == NULL) + { + async_cleanup_and_send_rc(d, rc); + } + else + { + async_cleanup_and_send_mutex_stats(d, msp); + } + + // Finally, clean up stats + if (NULL != msp) + { + free(msp); + } +} + + +static void do_async_txn_stat(void* arg) +{ + // Payload is: <> + PortData* d = (PortData*)arg; + + // Extract operation flags + unsigned flags = UNPACK_INT(d->work_buffer, 0); + + DB_TXN_STAT *tsp = NULL; + int rc = G_DB_ENV->txn_stat(G_DB_ENV, &tsp, flags); + if (rc != 0 || tsp == NULL) + { + async_cleanup_and_send_rc(d, rc); + } + else + { + async_cleanup_and_send_txn_stats(d, tsp); + } + + // Finally, clean up stats + if (NULL != tsp) + { + free(tsp); + } +} + + +static void do_sync_data_dirs_info(PortData *d) +{ + // Get DB_HOME and find the real path + const char *db_home = NULL; + const char *data_dir = NULL; + const char **data_dirs = NULL; + char db_home_realpath[PATH_MAX+1]; + char data_dir_realpath[PATH_MAX+1]; + int got_db_home = 0; + + // Lookup the environment and add it if not explicitly included in the data_dirs + int rc = G_DB_ENV->get_home(G_DB_ENV, &db_home); + if (rc != 0 || NULL == db_home) + { + // If no db_home we'll have to rely on whatever the global environment is configured with + got_db_home = 1; + } + else + { + if (NULL == realpath(db_home, db_home_realpath)) + rc = errno; + } + + // Get the data first + rc = G_DB_ENV->get_data_dirs(G_DB_ENV, &data_dirs); + int i; + for (i = 0; 0 == rc && NULL != data_dirs && NULL != data_dirs[i]; i++) + { + data_dir = data_dirs[i]; + + if (!got_db_home) + { + // Get the real path of the data dir + if (NULL == realpath(data_dir, data_dir_realpath)) + { + rc = errno; + } + else + { + // Set got_db_home if it matches + if (0 == strcmp(data_dir_realpath, db_home_realpath)) + { + got_db_home = 1; + } + } + } + + if (0 == rc) + { + rc = send_dir_info(d->port, d->port_owner, data_dir); + } + } + + // BDB always searches the environment home too so add it to the list + if (!got_db_home && rc == 0) + { + rc = send_dir_info(d->port, d->port_owner, db_home); + } + + // Send the return code - will termiante the receive loop in bdberl.erl + send_rc(d->port, d->port_owner, rc); +} + + static void* zalloc(unsigned int size) { void* res = driver_alloc(size); diff --git a/c_src/bdberl_drv.h b/c_src/bdberl_drv.h index a3388d7..e282b0d 100644 --- a/c_src/bdberl_drv.h +++ b/c_src/bdberl_drv.h @@ -46,6 +46,21 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, #define CMD_REMOVE_DB 15 #define CMD_TRUNCATE 16 #define CMD_REGISTER_LOGGER 17 +#define CMD_DB_STAT 18 +#define CMD_DB_STAT_PRINT 19 +#define CMD_ENV_STAT_PRINT 20 +#define CMD_LOCK_STAT 21 +#define CMD_LOCK_STAT_PRINT 22 +#define CMD_LOG_STAT 23 +#define CMD_LOG_STAT_PRINT 24 +#define CMD_MEMP_STAT 25 +#define CMD_MEMP_STAT_PRINT 26 +#define CMD_MUTEX_STAT 27 +#define CMD_MUTEX_STAT_PRINT 28 +#define CMD_TXN_STAT 29 +#define CMD_TXN_STAT_PRINT 30 +#define CMD_DATA_DIRS_INFO 31 +#define CMD_LOG_DIR_INFO 32 /** * Command status values @@ -74,7 +89,8 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, #define ERROR_CURSOR_OPEN (-29005) /* Cursor already active on this port */ #define ERROR_NO_CURSOR (-29006) /* No cursor open on this port */ #define ERROR_DB_ACTIVE (-29007) /* Database is currently active; operation requires otherwise */ - +#define ERROR_INVALID_CMD (-29008) /* Invalid command code requested */ +#define ERROR_INVALID_DB_TYPE (-29009) /* Invalid database type */ /** * System information ids @@ -82,6 +98,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, #define SYSP_CACHESIZE_GET 1 #define SYSP_TXN_TIMEOUT_GET 2 #define SYSP_DATA_DIR_GET 3 +#define SYSP_LOG_DIR_GET 4 /** * Driver Entry diff --git a/c_src/bin_helper.c b/c_src/bin_helper.c index beedafb..bc1c0d2 100644 --- a/c_src/bin_helper.c +++ b/c_src/bin_helper.c @@ -44,6 +44,10 @@ void bin_helper_push_int32(BinHelper* bh, int value) void bin_helper_push_string(BinHelper* bh, const char* string) { + if (NULL == string) + { + string = ""; + } int sz = strlen(string); bin_helper_check_size(bh, sz+1); strncpy(bh->bin->orig_bytes+(bh->offset), string, sz+1); diff --git a/include/bdberl.hrl b/include/bdberl.hrl index a306b30..939c741 100644 --- a/include/bdberl.hrl +++ b/include/bdberl.hrl @@ -23,13 +23,30 @@ -define(CMD_REMOVE_DB, 15). -define(CMD_TRUNCATE, 16). -define(CMD_REGISTER_LOGGER, 17). +-define(CMD_DB_STAT, 18). +-define(CMD_DB_STAT_PRINT, 19). +-define(CMD_ENV_STAT_PRINT, 20). +-define(CMD_LOCK_STAT, 21). +-define(CMD_LOCK_STAT_PRINT, 22). +-define(CMD_LOG_STAT, 23). +-define(CMD_LOG_STAT_PRINT, 24). +-define(CMD_MEMP_STAT, 25). +-define(CMD_MEMP_STAT_PRINT, 26). +-define(CMD_MUTEX_STAT, 27). +-define(CMD_MUTEX_STAT_PRINT,28). +-define(CMD_TXN_STAT, 29). +-define(CMD_TXN_STAT_PRINT, 30). +-define(CMD_DATA_DIRS_INFO, 31). +-define(CMD_LOG_DIR_INFO, 32). -define(DB_TYPE_BTREE, 1). -define(DB_TYPE_HASH, 2). +-define(DB_TYPE_UNKNOWN, 5). -define(SYSP_CACHESIZE_GET, 1). -define(SYSP_TXN_TIMEOUT_GET, 2). -define(SYSP_DATA_DIR_GET, 3). +-define(SYSP_LOG_DIR_GET, 4). -define(STATUS_OK, 0). -define(STATUS_ERROR, 1). @@ -43,6 +60,8 @@ -define(ERROR_CURSOR_OPEN, -29005). % Cursor already active on this port -define(ERROR_NO_CURSOR, -29006). % No cursor open on this port -define(ERROR_DB_ACTIVE, -29007). % Database is currently active; operation requires otherwise +-define(ERROR_INVALID_CMD, -29008). % Invalid command +-define(ERROR_INVALID_DB_TYPE,-29009). % Invalid database type %% DB (user visible) error return codes. -define(DB_BUFFER_SMALL, -30999). % User memory too small for return diff --git a/src/bdberl.erl b/src/bdberl.erl index 2a73fe5..05a9cf7 100644 --- a/src/bdberl.erl +++ b/src/bdberl.erl @@ -13,7 +13,22 @@ txn_commit/0, txn_commit/1, txn_abort/0, get_cache_size/0, get_data_dirs/0, + get_data_dirs_info/0, + get_lg_dir_info/0, get_txn_timeout/0, + stat/1, stat/2, + stat_print/1, stat_print/2, + lock_stat/0, lock_stat/1, + lock_stat_print/0, lock_stat_print/1, + log_stat/0, log_stat/1, + log_stat_print/0, log_stat_print/1, + memp_stat/0, memp_stat/1, + memp_stat_print/0, memp_stat_print/1, + mutex_stat/0, mutex_stat/1, + mutex_stat_print/0, mutex_stat_print/1, + txn_stat/0, txn_stat/1, + txn_stat_print/0, txn_stat_print/1, + env_stat_print/0, env_stat_print/1, transaction/1, transaction/2, transaction/3, put/3, put/4, put_r/3, put_r/4, @@ -35,10 +50,11 @@ -type db_name() :: [byte(),...]. -type db_type() :: btree | hash. -type db_flags() :: [atom()]. +-type db_fsid() :: binary(). -type db_key() :: term(). +-type db_mbytes() :: non_neg_integer(). -type db_value() :: term(). -type db_ret_value() :: not_found | db_value(). - -type db_error_reason() :: atom() | {unknown, integer()}. -type db_error() :: {error, db_error_reason()}. @@ -127,28 +143,35 @@ open(Name, Type) -> %% @spec open(Name, Type, Opts) -> {ok, Db} | {error, Error} %% where %% Name = string() -%% Type = btree | hash +%% Type = btree | hash | unknown %% Opts = [atom()] %% Db = integer() %% %% @end %%-------------------------------------------------------------------- --spec open(Name :: db_name(), Type :: db_type(), Opts :: db_flags()) -> +-spec open(Name :: db_name(), Type :: db_type() | unknown, Opts :: db_flags()) -> {ok, db()} | {error, integer()}. open(Name, Type, Opts) -> % Map database type into an integer code case Type of btree -> TypeCode = ?DB_TYPE_BTREE; - hash -> TypeCode = ?DB_TYPE_HASH + hash -> TypeCode = ?DB_TYPE_HASH; + unknown -> TypeCode = ?DB_TYPE_UNKNOWN %% BDB automatically determines if file exists end, Flags = process_flags(lists:umerge(Opts, [auto_commit, threaded])), Cmd = <>, - case erlang:port_control(get_port(), ?CMD_OPEN_DB, Cmd) of - <> -> - {ok, Db}; - <> -> - {error, Errno} + <> = erlang:port_control(get_port(), ?CMD_OPEN_DB, Cmd), + case decode_rc(Rc) of + ok -> + receive + {ok, DbRef} -> + {ok, DbRef}; + {error, Reason} -> + {error, Reason} + end; + Error -> + {error, Error} end. @@ -213,9 +236,14 @@ close(Db, Opts) -> <> = erlang:port_control(get_port(), ?CMD_CLOSE_DB, Cmd), case decode_rc(Rc) of ok -> - ok; - Reason -> - {error, Reason} + receive + ok -> + ok; + {error, Reason} -> + {error, Reason} + end; + Error -> + {error, Error} end. @@ -331,6 +359,7 @@ txn_begin(Opts) -> ok -> ok; {error, Reason} -> {error, decode_rc(Reason)} end; + Error -> {error, Error} end. @@ -402,7 +431,7 @@ txn_commit(Opts) -> ok -> receive ok -> ok; - {error, Reason} -> {error, decode_rc(Reason)} + {error, Reason} -> {error, Reason} end; Error -> {error, Error} @@ -437,7 +466,8 @@ txn_abort() -> ok -> receive ok -> ok; - {error, Reason} -> {error, decode_rc(Reason)} + {error, no_txn} -> ok; + {error, Reason} -> {error, Reason} end; no_txn -> @@ -514,13 +544,14 @@ transaction(Fun, Retries) -> {ok, db_value()} | db_txn_error(). transaction(_Fun, 0, _Opts) -> - ok = txn_abort(), {error, {transaction_failed, retry_limit_reached}}; + transaction(Fun, Retries, Opts) -> case txn_begin(Opts) of ok -> try Fun() of abort -> + error_logger:info_msg("function requested abort"), ok = txn_abort(), {error, transaction_aborted}; @@ -540,6 +571,7 @@ transaction(Fun, Retries, Opts) -> transaction(Fun, R); _ : Reason -> + error_logger:info_msg("function threw non-lock error - ~p", [Reason]), ok = txn_abort(), {error, {transaction_failed, Reason}} end; @@ -840,7 +872,7 @@ get(Db, Key, Opts) -> receive {ok, _, Bin} -> {ok, binary_to_term(Bin)}; not_found -> not_found; - {error, Reason} -> {error, decode_rc(Reason)} + {error, Reason} -> {error, Reason} end; Error -> {error, Error} @@ -1024,7 +1056,7 @@ truncate(Db) -> ok -> receive ok -> ok; - {error, Reason} -> {error, decode_rc(Reason)} + {error, Reason} -> {error, Reason} end; Error -> @@ -1053,7 +1085,11 @@ cursor_open(Db) -> <> = erlang:port_control(get_port(), ?CMD_CURSOR_OPEN, Cmd), case decode_rc(Rc) of ok -> - ok; + receive + ok -> ok; + {error, Reason} -> {error, Reason} + end; + Reason -> {error, Reason} end. @@ -1159,7 +1195,10 @@ cursor_close() -> <> = erlang:port_control(get_port(), ?CMD_CURSOR_CLOSE, <<>>), case decode_rc(Rc) of ok -> - ok; + receive + ok -> ok; + {error, Reason} -> {error, Reason} + end; Reason -> {error, Reason} end. @@ -1192,7 +1231,10 @@ delete_database(Filename) -> <> = erlang:port_control(get_port(), ?CMD_REMOVE_DB, Cmd), case decode_rc(Rc) of ok -> - ok; + receive + ok -> ok; + {error, Reason} -> {error, Reason} + end; Reason -> {error, Reason} end. @@ -1234,6 +1276,63 @@ get_data_dirs() -> end. +%%-------------------------------------------------------------------- +%% @doc +%% Returns the list of directories that bdberl searches to find databases +%% with the number of megabytes available for each dir +%% +%% @spec get_data_dirs_info() -> {ok, [DirName, Fsid, MbytesAvail]} | {error, Error} +%% where +%% DirName = string() +%% Fsid = binary() +%% MbytesAvail = integer() +%% +%% @end +%%-------------------------------------------------------------------- +-spec get_data_dirs_info() -> {ok, [{string(), db_fsid(), db_mbytes()}]} | db_error(). + +get_data_dirs_info() -> + % Call into the BDB library and get a list of configured data directories + Cmd = <<>>, + <> = erlang:port_control(get_port(),?CMD_DATA_DIRS_INFO, Cmd), + case decode_rc(Result) of + ok -> + recv_dirs_info([]); + Reason -> + {error, Reason} + end. + + +%%-------------------------------------------------------------------- +%% @doc +%% Returns the log directory info (name and megabytes available) +%% +%% @spec get_lg_dir_info() -> {ok, DirName, Fsid, MbytesAvail} | {error, Error} +%% where +%% DirName = string() +%% Fsid = binary() +%% MbytesAvail = integer() +%% +%% @end +%%-------------------------------------------------------------------- +-spec get_lg_dir_info() -> {ok, string(), db_fsid(), db_mbytes()} | db_error(). +get_lg_dir_info() -> + % Call into the BDB library and get the log dir and filesystem info + Cmd = <<>>, + <> = erlang:port_control(get_port(), ?CMD_LOG_DIR_INFO, Cmd), + case decode_rc(Result) of + ok -> + receive + {dirinfo, Path, FsId, MbytesAvail} -> + {ok, Path, FsId, MbytesAvail}; + {error, Reason} -> + {error, Reason} + end; + Reason -> + {error, Reason} + end. + + %%-------------------------------------------------------------------- %% @doc %% Returns the size of the in-memory cache. @@ -1286,6 +1385,670 @@ get_txn_timeout() -> {error, decode_rc(Result)} end. +%%-------------------------------------------------------------------- +%% @doc +%% Retrieve database stats +%% +%% This function retrieves database statistics +%% +%% === Options === +%% +%%
+%%
fast_stat
+%%
Return only the values which do not require traversal of the database. +%% Among other things, this flag makes it possible for applications to +%% request key and record counts without incurring the performance +%% penalty of traversing the entire database.
+%%
read_committed
+%%
Database items read during a transactional call will have degree 2 +%% isolation. This ensures the stability of the data items read during +%% the stat operation but permits that data to be modified or deleted by +%% other transactions prior to the commit of the specified +%% transaction.
+%%
read_uncommitted
+%%
Database items read during a transactional call will have degree 1 +%% isolation, including modified but not yet committed data. Silently +%% ignored if the read_committed flag was not specified when the +%% underlying database was opened.
+%%
+%% +%% @spec stat(Db, Opts) -> {ok, [{atom(), number()}]} | {error, Error} +%% where +%% Db = integer() +%% Opts = [atom()] +%% +%% @end +%%-------------------------------------------------------------------- +-spec stat(Db :: db(), Opts :: db_flags()) -> + {ok, [{atom(), number()}]} | db_error(). + +stat(Db, Opts) -> + Flags = process_flags(Opts), + Cmd = <>, + <> = erlang:port_control(get_port(), ?CMD_DB_STAT, Cmd), + case decode_rc(Result) of + ok -> + receive + {error, Reason} -> + {error, Reason}; + {ok, Stats} -> + {ok, Stats} + end; + Error -> + {error, Error} + end. + +%%-------------------------------------------------------------------- +%% @doc +%% Retrieve database stats with empty flags +%% +%% @spec stat(Db) -> {ok, [{atom(), number()}]} | {error, Error} +%% where +%% Db = integer() +%% +%% @end +%%-------------------------------------------------------------------- +-spec stat(Opts :: db_flags()) -> + {ok, [{atom(), number()}]} | db_error(). +stat(Db) -> + stat(Db, []). + + +%%-------------------------------------------------------------------- +%% @doc +%% Print database stats +%% +%% This function prints statistics from the database to wherever +%% BDB messages are being sent +%% +%% === Options === +%% +%%
+%%
fast_stat
+%%
Return only the values which do not require traversal of the database. +%% Among other things, this flag makes it possible for applications to +%% request key and record counts without incurring the performance +%% penalty of traversing the entire database.
+%%
read_committed
+%%
Database items read during a transactional call will have degree 2 +%% isolation. This ensures the stability of the data items read during +%% the stat operation but permits that data to be modified or deleted by +%% other transactions prior to the commit of the specified transaction. +%%
+%%
read_uncommitted
+%%
Database items read during a transactional call will have degree 1 +%% isolation, including modified but not yet committed data. Silently +%% ignored if the read_committed flag was not specified when the +%% underlying database was opened.
+%%
+%% +%% @spec stat_print(Db, Opts) -> ok | {error, Error} +%% where +%% Db = integer() +%% Opts = [atom()] +%% +%% @end +%%-------------------------------------------------------------------- +-spec stat_print(Db :: db(), Opts :: db_flags()) -> + ok | db_error(). +stat_print(Db, Opts) -> + Flags = process_flags(Opts), + Cmd = <>, + <> = erlang:port_control(get_port(), ?CMD_DB_STAT_PRINT, Cmd), + case decode_rc(Result) of + ok -> ok; + Error -> {error, Error} + end. + +%%-------------------------------------------------------------------- +%% @doc +%% Print database stats with empty flags +%% +%% @spec stat_print(Db) -> ok | {error, Error} +%% where +%% Db = integer() +%% +%% @end +%%-------------------------------------------------------------------- +-spec stat_print(Db :: db()) -> + ok | db_error(). +stat_print(Db) -> + stat_print(Db, []). + +%%-------------------------------------------------------------------- +%% @doc +%% Retrieve lock stats +%% +%% This function retrieves lock statistics from the database. +%% +%% === Options === +%% +%%
+%%
stat_clear
+%%
Reset statistics after returning their values
+%%
+%% +%% @spec lock_stat(Opts) -> {ok, [{atom(), number()}]} | {error, Error} +%% where +%% Opts = [atom()] +%% +%% @end +%%-------------------------------------------------------------------- +-spec lock_stat(Opts :: db_flags()) -> + {ok, [{atom(), number()}]} | db_error(). + +lock_stat(Opts) -> + Flags = process_flags(Opts), + Cmd = <>, + <> = erlang:port_control(get_port(), ?CMD_LOCK_STAT, Cmd), + case decode_rc(Result) of + ok -> + receive + {error, Reason} -> + {error, Reason}; + {ok, Stats} -> + {ok, Stats} + end; + Error -> + {error, Error} + end. + +%%-------------------------------------------------------------------- +%% @doc +%% Retrieve lock stats with no flags +%% +%% @spec lock_stat() -> {ok, [{atom(), number()}]} | {error, Error} +%% +%% @end +%%-------------------------------------------------------------------- +-spec lock_stat() -> + {ok, [{atom(), number()}]} | db_error(). +lock_stat() -> + lock_stat([]). + +%%-------------------------------------------------------------------- +%% @doc +%% Print lock stats +%% +%% This function prints lock statistics to wherever +%% BDB messages are being sent +%% +%% === Options === +%% +%%
+%%
stat_all
+%%
Display all available information.
+%%
stat_clear
+%%
Reset statistics after displaying their values.
+%%
stat_lock_conf
+%%
Display the lock conflict matrix.
+%%
stat_lock_lockers
+%%
Display the lockers within hash chains.
+%%
stat_lock_objects
+%%
Display the lock objects within hash chains.
+%%
stat_lock_params
+%%
Display the locking subsystem parameters.
+%%
+%% +%% @spec lock_stat_print(Opts) -> ok | {error, Error} +%% where +%% Opts = [atom()] +%% +%% @end +%%-------------------------------------------------------------------- +-spec lock_stat_print(Opts :: db_flags()) -> + ok | db_error(). +lock_stat_print(Opts) -> + Flags = process_flags(Opts), + Cmd = <>, + <> = erlang:port_control(get_port(), ?CMD_LOCK_STAT_PRINT, Cmd), + case decode_rc(Result) of + ok -> ok; + Error -> {error, Error} + end. + +%%-------------------------------------------------------------------- +%% @doc +%% Print lock stats with empty flags +%% +%% @spec lock_stat_print() -> ok | {error, Error} +%% +%% @end +%%-------------------------------------------------------------------- +-spec lock_stat_print() -> + ok | db_error(). +lock_stat_print() -> + lock_stat_print([]). + + +%%-------------------------------------------------------------------- +%% @doc +%% Retrieve log stats +%% +%% This function retrieves bdb log statistics +%% +%% === Options === +%% +%%
+%%
stat_clear
+%%
Reset statistics after returning their values
+%%
+%% +%% @spec log_stat(Opts) -> {ok, [{atom(), number()}]} | {error, Error} +%% where +%% Opts = [atom()] +%% +%% @end +%%-------------------------------------------------------------------- +-spec log_stat(Opts :: db_flags()) -> + {ok, [{atom(), number()}]} | db_error(). + +log_stat(Opts) -> + Flags = process_flags(Opts), + Cmd = <>, + <> = erlang:port_control(get_port(), ?CMD_LOG_STAT, Cmd), + case decode_rc(Result) of + ok -> + receive + {error, Reason} -> + {error, Reason}; + {ok, Stats} -> + {ok, Stats} + end; + Error -> + {error, Error} + end. + +%%-------------------------------------------------------------------- +%% @doc +%% Retrieve log stats with empty flags +%% +%% @spec log_stat() -> {ok, [{atom(), number()}]} | {error, Error} +%% +%% @end +%%-------------------------------------------------------------------- +-spec log_stat() -> + {ok, [{atom(), number()}]} | db_error(). +log_stat() -> + log_stat([]). + +%%-------------------------------------------------------------------- +%% @doc +%% Print log stats +%% +%% This function prints bdb log statistics to wherever +%% BDB messages are being sent +%% +%% === Options === +%% +%%
+%%
stat_all
+%%
Display all available information.
+%%
stat_clear
+%%
Reset statistics after displaying their values.
+%%
+%% +%% @spec log_stat_print(Opts) -> ok | {error, Error} +%% where +%% Opts = [atom()] +%% +%% @end +%%-------------------------------------------------------------------- +-spec log_stat_print(Opts :: db_flags()) -> + ok | db_error(). +log_stat_print(Opts) -> + Flags = process_flags(Opts), + Cmd = <>, + <> = erlang:port_control(get_port(), ?CMD_LOG_STAT_PRINT, Cmd), + case decode_rc(Result) of + ok -> ok; + Error -> {error, Error} + end. + +%%-------------------------------------------------------------------- +%% @doc +%% Print log stats with empty flags +%% +%% @spec log_stat_print() -> ok | {error, Error} +%% +%% @end +%%-------------------------------------------------------------------- +-spec log_stat_print() -> + ok | db_error(). +log_stat_print() -> + log_stat_print([]). + +%%-------------------------------------------------------------------- +%% @doc +%% Retrieve memory pool stats +%% +%% This function retrieves bdb mpool statistics +%% +%% === Options === +%% +%%
+%%
stat_clear
+%%
Reset statistics after returning their values
+%%
+%% +%% @spec memp_stat(Opts) -> {ok, [{atom(), number()}]} | {error, Error} +%% where +%% Opts = [atom()] +%% +%% @end +%%-------------------------------------------------------------------- +-spec memp_stat(Opts :: db_flags()) -> + {ok, [{atom(), number()}]} | db_error(). + +memp_stat(Opts) -> + Flags = process_flags(Opts), + Cmd = <>, + <> = erlang:port_control(get_port(), ?CMD_MEMP_STAT, Cmd), + case decode_rc(Result) of + ok -> + recv_memp_stat([]); + Error -> + {error, Error} + end. + +%%-------------------------------------------------------------------- +%% @doc +%% Retrieve memory pool stats with empty flags +%% +%% @spec memp_stat() -> {ok, [{atom(), number()}]} | {error, Error} +%% +%% @end +%%-------------------------------------------------------------------- +-spec memp_stat() -> + {ok, [{atom(), number()}]} | db_error(). +memp_stat() -> + memp_stat([]). + +%%-------------------------------------------------------------------- +%% @doc +%% Print memory pool stats +%% +%% This function prints bdb mpool statistics to wherever +%% BDB messages are being sent +%% +%% === Options === +%% +%%
+%%
stat_all
+%%
Display all available information.
+%%
stat_clear
+%%
Reset statistics after displaying their values.
+%%
stat_memp_hash
+%%
Display the buffers with hash chains.
+%%
+%% +%% @spec memp_stat_print(Opts) -> ok | {error, Error} +%% where +%% Opts = [atom()] +%% +%% @end +%%-------------------------------------------------------------------- +-spec memp_stat_print(Opts :: db_flags()) -> + ok | db_error(). +memp_stat_print(Opts) -> + Flags = process_flags(Opts), + Cmd = <>, + <> = erlang:port_control(get_port(), ?CMD_MEMP_STAT_PRINT, Cmd), + case decode_rc(Result) of + ok -> ok; + Error -> {error, Error} + end. + +%%-------------------------------------------------------------------- +%% @doc +%% Print memory pool stats with empty flags +%% +%% @spec memp_stat_print() -> ok | {error, Error} +%% +%% @end +%%-------------------------------------------------------------------- +-spec memp_stat_print() -> + ok | db_error(). +memp_stat_print() -> + memp_stat_print([]). + +%%-------------------------------------------------------------------- +%% @doc +%% Retrieve mutex stats +%% +%% This function retrieves mutex statistics +%% +%% === Options === +%% +%%
+%%
stat_clear
+%%
Reset statistics after returning their values
+%%
+%% +%% @spec mutex_stat(Opts) -> {ok, [{atom(), number()}]} | {error, Error} +%% where +%% Opts = [atom()] +%% +%% @end +%%-------------------------------------------------------------------- +-spec mutex_stat(Opts :: db_flags()) -> + {ok, [{atom(), number()}]} | db_error(). + +mutex_stat(Opts) -> + Flags = process_flags(Opts), + Cmd = <>, + <> = erlang:port_control(get_port(), ?CMD_MUTEX_STAT, Cmd), + case decode_rc(Result) of + ok -> + receive + {error, Reason} -> + {error, Reason}; + {ok, Stats} -> + {ok, Stats} + end; + Error -> + {error, Error} + end. + +%%-------------------------------------------------------------------- +%% @doc +%% Retrieve mutex stats with empty flags +%% +%% @spec mutex_stat() -> {ok, [{atom(), number()}]} | {error, Error} +%% +%% @end +%%-------------------------------------------------------------------- +-spec mutex_stat() -> + {ok, [{atom(), number()}]} | db_error(). +mutex_stat() -> + mutex_stat([]). + +%%-------------------------------------------------------------------- +%% @doc +%% Print mutex stats +%% +%% This function prints mutex statistics to wherever +%% BDB messages are being sent +%% +%% === Options === +%% +%%
+%%
stat_all
+%%
Display all available information.
+%%
stat_clear
+%%
Reset statistics after displaying their values.
+%%
+%% +%% @spec mutex_stat_print(Opts) -> ok | {error, Error} +%% where +%% Opts = [atom()] +%% +%% @end +%%-------------------------------------------------------------------- +-spec mutex_stat_print(Opts :: db_flags()) -> + ok | db_error(). +mutex_stat_print(Opts) -> + Flags = process_flags(Opts), + Cmd = <>, + <> = erlang:port_control(get_port(), ?CMD_MUTEX_STAT_PRINT, Cmd), + case decode_rc(Result) of + ok -> ok; + Error -> {error, Error} + end. + +%%-------------------------------------------------------------------- +%% @doc +%% Print mutex stats with empty flags +%% +%% @spec mutex_stat_print() -> ok | {error, Error} +%% +%% @end +%%-------------------------------------------------------------------- +-spec mutex_stat_print() -> + ok | db_error(). +mutex_stat_print() -> + mutex_stat_print([]). + +%%-------------------------------------------------------------------- +%% @doc +%% Retrieve transaction stats +%% +%% This function retrieves transaction statistics +%% +%% === Options === +%% +%%
+%%
stat_clear
+%%
Reset statistics after returning their values
+%%
+%% +%% @spec txn_stat(Opts) -> {ok, [{atom(), number()}], [[{atom(), number()}]]} | {error, Error} +%% where +%% Opts = [atom()] +%% +%% @end +%%-------------------------------------------------------------------- +-spec txn_stat(Opts :: db_flags()) -> + {ok, [{atom(), number()}], [[{atom(), number()}]]} | db_error(). + +txn_stat(Opts) -> + Flags = process_flags(Opts), + Cmd = <>, + <> = erlang:port_control(get_port(), ?CMD_TXN_STAT, Cmd), + case decode_rc(Result) of + ok -> + recv_txn_stat([]); + Error -> + {error, Error} + end. + +%%-------------------------------------------------------------------- +%% @doc +%% Retrieve transaction stats with empty flags +%% +%% @spec txn_stat() -> {ok, [{atom(), number()}], [[{atom(), number()}]]} | {error, Error} +%% +%% @end +%%-------------------------------------------------------------------- +-spec txn_stat() -> + {ok, [{atom(), number()}], [[{atom(), number()}]]} | db_error(). +txn_stat() -> + txn_stat([]). + +%%-------------------------------------------------------------------- +%% @doc +%% Print transaction stats +%% +%% This function prints transaction statistics to wherever +%% BDB messages are being sent +%% +%% === Options === +%% +%%
+%%
stat_all
+%%
Display all available information.
+%%
stat_clear
+%%
Reset statistics after displaying their values.
+%%
+%% +%% @spec txn_stat_print(Opts) -> ok | {error, Error} +%% where +%% Opts = [atom()] +%% +%% @end +%%-------------------------------------------------------------------- +-spec txn_stat_print(Opts :: db_flags()) -> + ok | db_error(). +txn_stat_print(Opts) -> + Flags = process_flags(Opts), + Cmd = <>, + <> = erlang:port_control(get_port(), ?CMD_TXN_STAT_PRINT, Cmd), + case decode_rc(Result) of + ok -> ok; + Error -> {error, Error} + end. + +%%-------------------------------------------------------------------- +%% @doc +%% Print transaction stats with empty flags +%% +%% @spec txn_stat_print() -> ok | {error, Error} +%% +%% @end +%%-------------------------------------------------------------------- +-spec txn_stat_print() -> + ok | db_error(). +txn_stat_print() -> + txn_stat_print([]). + +%%-------------------------------------------------------------------- +%% @doc +%% Print environment stats +%% +%% This function prints environment statistics to wherever +%% BDB messages are being sent. There is no documented way +%% to get this programatically +%% +%% === Options === +%% +%%
+%%
stat_all
+%%
Display all available information.
+%%
stat_clear
+%%
Reset statistics after displaying their values.
+%%
stat_subsystem
+%%
Display information for all configured subsystems.
+%%
+%% +%% @spec env_stat_print(Opts) -> ok | {error, Error} +%% where +%% Opts = [atom()] +%% +%% @end +%%-------------------------------------------------------------------- +-spec env_stat_print(Opts :: db_flags()) -> + ok | db_error(). +env_stat_print(Opts) -> + Flags = process_flags(Opts), + Cmd = <>, + <> = erlang:port_control(get_port(), ?CMD_ENV_STAT_PRINT, Cmd), + case decode_rc(Result) of + ok -> ok; + Error -> {error, Error} + end. + +%%-------------------------------------------------------------------- +%% @doc +%% Print environment stats with empty flags +%% +%% @spec env_stat_print() -> ok | {error, Error} +%% +%% @end +%%-------------------------------------------------------------------- +-spec env_stat_print() -> + ok | db_error(). +env_stat_print() -> + env_stat_print([]). %%-------------------------------------------------------------------- %% @doc @@ -1337,6 +2100,7 @@ get_port() -> %% %% Decode a integer return value into an atom representation %% +decode_rc(Rc) when is_atom(Rc) -> Rc; decode_rc(?ERROR_NONE) -> ok; decode_rc(?ERROR_ASYNC_PENDING) -> async_pending; decode_rc(?ERROR_INVALID_DBREF) -> invalid_db; @@ -1385,6 +2149,7 @@ flag_value(Flag) -> consume_wait -> ?DB_CONSUME_WAIT; create -> ?DB_CREATE; exclusive -> ?DB_EXCL; + fast_stat -> ?DB_FAST_STAT; get_both -> ?DB_GET_BOTH; ignore_lease -> ?DB_IGNORE_LEASE; multiple -> ?DB_MULTIPLE; @@ -1398,6 +2163,14 @@ flag_value(Flag) -> readonly -> ?DB_RDONLY; rmw -> ?DB_RMW; set_recno -> ?DB_SET_RECNO; + stat_all -> ?DB_STAT_ALL; + stat_clear -> ?DB_STAT_CLEAR; + stat_lock_conf -> ?DB_STAT_LOCK_CONF; + stat_lock_lockers -> ?DB_STAT_LOCK_LOCKERS; + stat_lock_objects -> ?DB_STAT_LOCK_OBJECTS; + stat_lock_params -> ?DB_STAT_LOCK_PARAMS; + stat_memp_hash -> ?DB_STAT_MEMP_HASH; + stat_subsystem -> ?DB_STAT_SUBSYSTEM; threaded -> ?DB_THREAD; truncate -> ?DB_TRUNCATE; txn_no_sync -> ?DB_TXN_NOSYNC; @@ -1462,3 +2235,44 @@ split_bin(Delimiter, <>, ItemAcc, Acc) -> split_bin(Delimiter, Rest, <<>> ,[ItemAcc | Acc]); split_bin(Delimiter, <>, ItemAcc, Acc) -> split_bin(Delimiter, Rest, <>, Acc). + + +%% +%% Receive memory pool stats +%% +recv_memp_stat(Fstats) -> + receive + {error, Reason} -> + {error, decode_rc(Reason)}; + {fstat, Fstat} -> + recv_memp_stat([Fstat|Fstats]); + {ok, Stats} -> + {ok, Stats, Fstats} + end. + +%% +%% Receive transaction stats +%% +recv_txn_stat(Tstats) -> + receive + {error, Reason} -> + {error, decode_rc(Reason)}; + {txn, Tstat} -> + recv_txn_stat([Tstat|Tstats]); + {ok, Stats} -> + {ok, Stats, Tstats} + end. + +%% +%% Receive directory info messages until ok +%% +recv_dirs_info(DirInfos) -> + receive + {dirinfo, Path, FsId, MbytesAvail} -> + recv_dirs_info([{Path, FsId, MbytesAvail} | DirInfos]); + {error, Reason} -> + {error, Reason}; + ok -> + {ok, DirInfos} + end. + diff --git a/test/bdberl_SUITE.erl b/test/bdberl_SUITE.erl index 14895d6..82b0830 100644 --- a/test/bdberl_SUITE.erl +++ b/test/bdberl_SUITE.erl @@ -31,7 +31,18 @@ all() -> delete_should_remove_file, delete_should_fail_if_db_inuse, truncate_should_empty_database, - truncate_all_should_empty_all_databases]. + truncate_all_should_empty_all_databases, + btree_stat_should_report_on_success, + hash_stat_should_report_on_success, + stat_should_fail_on_bad_dbref, + lock_stat_should_report_on_success, + log_stat_should_report_on_success, + memp_stat_should_report_on_success, + mutex_stat_should_report_on_success, + txn_stat_should_report_on_success, + data_dirs_info_should_report_on_success, + lg_dir_info_should_report_on_success]. + dbconfig(Config) -> @@ -237,3 +248,77 @@ truncate_all_should_empty_all_databases(Config) -> ok = bdberl:put(Db, mykey, avalue), ok = bdberl:truncate(), not_found = bdberl:get(Db, mykey). + +btree_stat_should_report_on_success(_Config) -> + {ok, Db} = bdberl:open("btree_stat.bdb", btree), + {ok, Stat} = bdberl:stat(Db, []), + %% Check stats are zero on the new db + 0 = proplists:get_value(nkeys, Stat), + 0 = proplists:get_value(ndata, Stat), + + %% Put a record and check the number of records updates + ok = bdberl:put(Db, mykey, avalue), + + {ok, Stat1} = bdberl:stat(Db, []), + %% Check stats are zero on the new db + 1 = proplists:get_value(nkeys, Stat1), + 1 = proplists:get_value(ndata, Stat1), + done. + + +hash_stat_should_report_on_success(_Config) -> + {ok, Db} = bdberl:open("hash_stat.bdb", hash), + {ok, Stat} = bdberl:stat(Db, []), + %% Check stats are zero on the new db + 0 = proplists:get_value(nkeys, Stat), + 0 = proplists:get_value(ndata, Stat), + + %% Put a record and check the number of records updates + ok = bdberl:put(Db, mykey, avalue), + + {ok, Stat1} = bdberl:stat(Db, []), + %% Check stats are zero on the new db + 1 = proplists:get_value(nkeys, Stat1), + 1 = proplists:get_value(ndata, Stat1), + done. + +stat_should_fail_on_bad_dbref(_Config) -> + {error, invalid_db} = bdberl:stat(10000000, []), + done. + +lock_stat_should_report_on_success(_Config) -> + {ok, Stat} = bdberl:lock_stat([]), + %% Check a lock stat that that probably won't change + 2147483647 = proplists:get_value(cur_maxid, Stat), + done. + +log_stat_should_report_on_success(_Config) -> + {ok, Stat} = bdberl:log_stat([]), + %% Check a log stat that that probably won't change + 264584 = proplists:get_value(magic, Stat), + done. + +memp_stat_should_report_on_success(_Config) -> + {ok, Gstat, Fstat} = bdberl:memp_stat([]), + true = is_list(Fstat), + true = is_list(Gstat), + done. + +mutex_stat_should_report_on_success(_Config) -> + {ok, _Stat} = bdberl:mutex_stat([]), + done. + +txn_stat_should_report_on_success(_Config) -> + {ok, _GStat1, []} = bdberl:txn_stat([]), + bdberl:txn_begin(), + {ok, _GStat2, [_ATxnStat]} = bdberl:txn_stat([]), + bdberl:txn_abort(), + {ok, _GStat3, []} = bdberl:txn_stat([]), + done. + +data_dirs_info_should_report_on_success(_Config) -> + {ok, _DataDirs} = bdberl:get_data_dirs_info(). + +lg_dir_info_should_report_on_success(_Config) -> + {ok, _LgDir, _Fsid, _MBytesAvail} = bdberl:get_lg_dir_info(). +