diff --git a/c_src/bdberl_drv.c b/c_src/bdberl_drv.c index 82ef185..11757a9 100644 --- a/c_src/bdberl_drv.c +++ b/c_src/bdberl_drv.c @@ -34,6 +34,7 @@ static void do_async_get(void* arg); static void do_async_txnop(void* arg); static void do_async_cursor_get(void* arg); static void do_async_truncate(void* arg); +static void do_async_stat(void* arg); static int add_dbref(PortData* data, int dbref); static int del_dbref(PortData* data, int dbref); @@ -765,6 +766,85 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, WRITE_UNLOCK(G_LOG_RWLOCK); } + *outbuf = 0; + return 0; + } + case CMD_DB_STAT: + { + FAIL_IF_ASYNC_PENDING(d, outbuf); + + // Inbuf is << DbRef:32, Flags:32 >> + int dbref = UNPACK_INT(inbuf, 0); + + // Make sure this port currently has dbref open -- if it doesn't, error out. Of note, + // if it's in our list, we don't need to grab the RWLOCK, as we don't have to worry about + // the underlying handle disappearing since we have a reference. + if (has_dbref(d, dbref)) + { + // If the working buffer is large enough, copy the data to put/get into it. Otherwise, realloc + // until it is large enough + if (d->work_buffer_sz < inbuf_sz) + { + d->work_buffer = driver_realloc(d->work_buffer, inbuf_sz); + d->work_buffer_sz = inbuf_sz; + } + + // Copy the payload into place + memcpy(d->work_buffer, inbuf, inbuf_sz); + d->work_buffer_offset = inbuf_sz; + + // Mark the port as busy and then schedule the appropriate async operation + d->async_op = cmd; + d->async_pool = G_TPOOL_GENERAL; + bdberl_tpool_run(d->async_pool, &do_async_stat, d, 0, &d->async_job); + + // Let caller know that the operation is in progress + // Outbuf is: <<0:32>> + RETURN_INT(0, outbuf); + } + else + { + // Invalid dbref + RETURN_INT(ERROR_INVALID_DBREF, outbuf); + } + } + case CMD_DB_STAT_PRINT: + { + FAIL_IF_ASYNC_PENDING(d, outbuf); + + // Inbuf is << DbRef:32, Flags:32 >> + int dbref = UNPACK_INT(inbuf, 0); + + // Make sure this port currently has dbref open -- if it doesn't, error out. Of note, + // if it's in our list, we don't need to grab the RWLOCK, as we don't have to worry about + // the underlying handle disappearing since we have a reference. + if (has_dbref(d, dbref)) + { + DB* db = G_DATABASES[dbref].db; + unsigned int flags = UNPACK_INT(inbuf, 4); + + // Outbuf is <> + // Run the command on the VM thread - this is for debugging only, + // any real monitoring + int rc = db->stat_print(db, flags); + RETURN_INT(rc, outbuf); + } + else + { + // Invalid dbref + RETURN_INT(ERROR_INVALID_DBREF, outbuf); + } + } + case CMD_ENV_STAT_PRINT: + { + FAIL_IF_ASYNC_PENDING(d, outbuf); + + // Inbuf is << Flags:32 >> + unsigned int flags = UNPACK_INT(inbuf, 0); + + // Outbuf is <> + int rc = G_DB_ENV->stat_print(G_DB_ENV, flags); + RETURN_INT(rc, outbuf); } } *outbuf = 0; @@ -1074,6 +1154,154 @@ static void async_cleanup_and_send_kv(PortData* d, int rc, DBT* key, DBT* value) } } + + +#define BT_STATS_TUPLE(base, member) \ + ERL_DRV_ATOM, driver_mk_atom(#member), \ + ERL_DRV_UINT, (base)->bt_##member, \ + ERL_DRV_TUPLE, 2 +static void async_cleanup_and_send_btree_stats(PortData* d, char *type, DB_BTREE_STAT *bsp) +{ + // Save the port and pid references -- we need copies independent from the PortData + // structure. Once we release the port_lock after clearing the cmd, it's possible that + // the port could go away without waiting on us to finish. This is acceptable, but we need + // to be certain that there is no overlap of data between the two threads. driver_send_term + // is safe to use from a thread, even if the port you're sending from has already expired. + ErlDrvPort port = d->port; + ErlDrvTermData pid = d->port_owner; + async_cleanup(d); + + ErlDrvTermData response[] = { + ERL_DRV_ATOM, driver_mk_atom("ok"), + // Start of list + ERL_DRV_ATOM, driver_mk_atom("type"), + ERL_DRV_ATOM, driver_mk_atom(type), + ERL_DRV_TUPLE, 2, + BT_STATS_TUPLE(bsp, magic), /* Magic number. */ + BT_STATS_TUPLE(bsp, version), /* Version number. */ + BT_STATS_TUPLE(bsp, metaflags), /* Metadata flags. */ + BT_STATS_TUPLE(bsp, nkeys), /* Number of unique keys. */ + BT_STATS_TUPLE(bsp, ndata), /* Number of data items. */ + BT_STATS_TUPLE(bsp, pagecnt), /* Page count. */ + BT_STATS_TUPLE(bsp, pagesize), /* Page size. */ + BT_STATS_TUPLE(bsp, minkey), /* Minkey value. */ + BT_STATS_TUPLE(bsp, re_len), /* Fixed-length record length. */ + BT_STATS_TUPLE(bsp, re_pad), /* Fixed-length record pad. */ + BT_STATS_TUPLE(bsp, levels), /* Tree levels. */ + BT_STATS_TUPLE(bsp, int_pg), /* Internal pages. */ + BT_STATS_TUPLE(bsp, leaf_pg), /* Leaf pages. */ + BT_STATS_TUPLE(bsp, dup_pg), /* Duplicate pages. */ + BT_STATS_TUPLE(bsp, over_pg), /* Overflow pages. */ + BT_STATS_TUPLE(bsp, empty_pg), /* Empty pages. */ + BT_STATS_TUPLE(bsp, free), /* Pages on the free list. */ + BT_STATS_TUPLE(bsp, int_pgfree), /* Bytes free in internal pages. */ + BT_STATS_TUPLE(bsp, leaf_pgfree), /* Bytes free in leaf pages. */ + BT_STATS_TUPLE(bsp, dup_pgfree), /* Bytes free in duplicate pages. */ + BT_STATS_TUPLE(bsp, over_pgfree), /* Bytes free in overflow pages. */ + // End of list + ERL_DRV_NIL, + ERL_DRV_LIST, 21+2, + ERL_DRV_TUPLE, 2 + }; + driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); +} +#undef BT_STATS_TUPLE + + +#define HASH_STATS_TUPLE(base, member) \ + ERL_DRV_ATOM, driver_mk_atom(#member), \ + ERL_DRV_UINT, (base)->hash_##member, \ + ERL_DRV_TUPLE, 2 + +static void async_cleanup_and_send_hash_stats(PortData* d, DB_HASH_STAT *hsp) +{ + // Save the port and pid references -- we need copies independent from the PortData + // structure. Once we release the port_lock after clearing the cmd, it's possible that + // the port could go away without waiting on us to finish. This is acceptable, but we need + // to be certain that there is no overlap of data between the two threads. driver_send_term + // is safe to use from a thread, even if the port you're sending from has already expired. + ErlDrvPort port = d->port; + ErlDrvTermData pid = d->port_owner; + async_cleanup(d); + + ErlDrvTermData response[] = { + ERL_DRV_ATOM, driver_mk_atom("ok"), + // Start of list + ERL_DRV_ATOM, driver_mk_atom("type"), + ERL_DRV_ATOM, driver_mk_atom("hash"), + ERL_DRV_TUPLE, 2, + HASH_STATS_TUPLE(hsp, magic), /* Magic number. */ + HASH_STATS_TUPLE(hsp, version), /* Version number. */ + HASH_STATS_TUPLE(hsp, metaflags), /* Metadata flags. */ + HASH_STATS_TUPLE(hsp, nkeys), /* Number of unique keys. */ + HASH_STATS_TUPLE(hsp, ndata), /* Number of data items. */ + HASH_STATS_TUPLE(hsp, pagecnt), /* Page count. */ + HASH_STATS_TUPLE(hsp, pagesize), /* Page size. */ + HASH_STATS_TUPLE(hsp, ffactor), /* Fill factor specified at create. */ + HASH_STATS_TUPLE(hsp, buckets), /* Number of hash buckets. */ + HASH_STATS_TUPLE(hsp, free), /* Pages on the free list. */ + HASH_STATS_TUPLE(hsp, bfree), /* Bytes free on bucket pages. */ + HASH_STATS_TUPLE(hsp, bigpages), /* Number of big key/data pages. */ + HASH_STATS_TUPLE(hsp, big_bfree), /* Bytes free on big item pages. */ + HASH_STATS_TUPLE(hsp, overflows), /* Number of overflow pages. */ + HASH_STATS_TUPLE(hsp, ovfl_free), /* Bytes free on ovfl pages. */ + HASH_STATS_TUPLE(hsp, dup), /* Number of dup pages. */ + HASH_STATS_TUPLE(hsp, dup_free), /* Bytes free on duplicate pages. */ + // End of list + ERL_DRV_NIL, + ERL_DRV_LIST, 17+2, + ERL_DRV_TUPLE, 2 + }; + driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); +} +#undef HASH_STATS_TUPLE + +#ifdef ENABLE_QUEUE // If we ever decide to support Queues + +#define QS_STATS_TUPLE(base, member) \ + ERL_DRV_ATOM, driver_mk_atom(#member), \ + ERL_DRV_UINT, (base)->qs_##member, \ + ERL_DRV_TUPLE, 2 +static void async_cleanup_and_send_queue_stats(PortData* d, DB_QUEUE_STAT *qsp) +{ + // Save the port and pid references -- we need copies independent from the PortData + // structure. Once we release the port_lock after clearing the cmd, it's possible that + // the port could go away without waiting on us to finish. This is acceptable, but we need + // to be certain that there is no overlap of data between the two threads. driver_send_term + // is safe to use from a thread, even if the port you're sending from has already expired. + ErlDrvPort port = d->port; + ErlDrvTermData pid = d->port_owner; + async_cleanup(d); + + ErlDrvTermData response[] = { + ERL_DRV_ATOM, driver_mk_atom("ok"), + // Start of list + ERL_DRV_ATOM, driver_mk_atom("type"), + ERL_DRV_ATOM, driver_mk_atom("queue"), + ERL_DRV_TUPLE, 2, + QS_STAT_TUPLE(qsp, qs_magic), /* Magic number. */ + QS_STAT_TUPLE(qsp, version), /* Version number. */ + QS_STAT_TUPLE(qsp, metaflags), /* Metadata flags. */ + QS_STAT_TUPLE(qsp, nkeys), /* Number of unique keys. */ + QS_STAT_TUPLE(qsp, ndata), /* Number of data items. */ + QS_STAT_TUPLE(qsp, pagesize), /* Page size. */ + QS_STAT_TUPLE(qsp, extentsize), /* Pages per extent. */ + QS_STAT_TUPLE(qsp, pages), /* Data pages. */ + QS_STAT_TUPLE(qsp, re_len), /* Fixed-length record length. */ + QS_STAT_TUPLE(qsp, re_pad), /* Fixed-length record pad. */ + QS_STAT_TUPLE(qsp, pgfree), /* Bytes free in data pages. */ + QS_STAT_TUPLE(qsp, first_recno), /* First not deleted record. */ + QS_STAT_TUPLE(qsp, cur_recno), /* Next available record number. */ + // End of list + ERL_DRV_NIL, + ERL_DRV_LIST, 13+2, + ERL_DRV_TUPLE, 2 + }; + driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); +} +#undef QUEUE_STATS_TUPLE +#endif // ENABLE_QUEUE + static void do_async_put(void* arg) { // Payload is: <> @@ -1289,6 +1517,61 @@ static void do_async_truncate(void* arg) } +static void do_async_stat(void* arg) +{ + // Payload is: << DbRef:32, Flags:32 >> + PortData* d = (PortData*)arg; + + // Get the database object, using the provided ref + int dbref = UNPACK_INT(d->work_buffer, 0); + DB* db = G_DATABASES[dbref].db; + DBTYPE type = DB_UNKNOWN; + int rc = db->get_type(db, &type); + if (rc != 0) + { + async_cleanup_and_send_rc(d, rc); + return; + } + + // Extract operation flags + unsigned flags = UNPACK_INT(d->work_buffer, 4); + + void *sp = NULL; + rc = db->stat(db, d->txn, &sp, flags); + if (rc != 0 || sp == NULL) + { + async_cleanup_and_send_rc(d, ERROR_INVALID_DB_TYPE); + } + else + { + switch(type) + { + case DB_BTREE: /*FALLTHRU*/ + case DB_RECNO: + async_cleanup_and_send_btree_stats(d, type == DB_BTREE ? "btree" :"recno", sp); + break; + case DB_HASH: + async_cleanup_and_send_hash_stats(d, sp); + break; +#ifdef ENABLE_QUEUE + case DB_QUEUE: + async_cleanup_and_send_queue_stats(d, sp); + break; +#endif + default: + async_cleanup_and_send_rc(d, ERROR_INVALID_DB_TYPE); + break; + } + } + + // Finally, clean up value buffer (driver_send_term made a copy) + if (NULL != sp) + { + free(sp); + } +} + + static void* zalloc(unsigned int size) { void* res = driver_alloc(size); diff --git a/c_src/bdberl_drv.h b/c_src/bdberl_drv.h index a3388d7..c30eaf0 100644 --- a/c_src/bdberl_drv.h +++ b/c_src/bdberl_drv.h @@ -46,6 +46,9 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, #define CMD_REMOVE_DB 15 #define CMD_TRUNCATE 16 #define CMD_REGISTER_LOGGER 17 +#define CMD_DB_STAT 18 +#define CMD_DB_STAT_PRINT 19 +#define CMD_ENV_STAT_PRINT 20 /** * Command status values @@ -74,7 +77,8 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, #define ERROR_CURSOR_OPEN (-29005) /* Cursor already active on this port */ #define ERROR_NO_CURSOR (-29006) /* No cursor open on this port */ #define ERROR_DB_ACTIVE (-29007) /* Database is currently active; operation requires otherwise */ - +#define ERROR_INVALID_CMD (-29008) /* Invalid command code requested */ +#define ERROR_INVALID_DB_TYPE (-29009) /* Invalid database type */ /** * System information ids diff --git a/include/bdberl.hrl b/include/bdberl.hrl index a306b30..4fdec9f 100644 --- a/include/bdberl.hrl +++ b/include/bdberl.hrl @@ -23,6 +23,9 @@ -define(CMD_REMOVE_DB, 15). -define(CMD_TRUNCATE, 16). -define(CMD_REGISTER_LOGGER, 17). +-define(CMD_DB_STAT, 18). +-define(CMD_DB_STAT_PRINT, 19). +-define(CMD_ENV_STAT_PRINT, 20). -define(DB_TYPE_BTREE, 1). -define(DB_TYPE_HASH, 2). @@ -43,6 +46,8 @@ -define(ERROR_CURSOR_OPEN, -29005). % Cursor already active on this port -define(ERROR_NO_CURSOR, -29006). % No cursor open on this port -define(ERROR_DB_ACTIVE, -29007). % Database is currently active; operation requires otherwise +-define(ERROR_INVALID_CMD, -29008). % Invalid command +-define(ERROR_INVALID_DB_TYPE,-29009). % Invalid database type %% DB (user visible) error return codes. -define(DB_BUFFER_SMALL, -30999). % User memory too small for return diff --git a/src/bdberl.erl b/src/bdberl.erl index 1323032..cc2184a 100644 --- a/src/bdberl.erl +++ b/src/bdberl.erl @@ -14,6 +14,8 @@ get_cache_size/0, get_data_dirs/0, get_txn_timeout/0, + stat/2, + stat_print/1, stat_print/2, transaction/1, transaction/2, transaction/3, put/3, put/4, put_r/3, put_r/4, @@ -1280,6 +1282,146 @@ get_txn_timeout() -> {error, decode_rc(Result)} end. +%%-------------------------------------------------------------------- +%% @doc +%% Retrieve database stats +%% +%% This function retrieves performance statistics from the database. +%% +%% === Options === +%% +%%
+%%
fast_stat
+%%
Return only the values which do not require traversal of the database. +%% Among other things, this flag makes it possible for applications to +%% request key and record counts without incurring the performance +%% penalty of traversing the entire database.
+%%
read_committed
+%%
Database items read during a transactional call will have degree 2 +%% isolation. This ensures the stability of the data items read during +%% the stat operation but permits that data to be modified or deleted by +%% other transactions prior to the commit of the specified +%% transaction.
+%%
read_uncommitted
+%%
Database items read during a transactional call will have degree 1 +%% isolation, including modified but not yet committed data. Silently +%% ignored if the read_committed flag was not specified when the +%% underlying database was opened.
+%%
+%% +%% @spec stat(Db, Opts) -> {ok, [{atom(), number()}]} | {error, Error} +%% where +%% Db = integer() +%% Opts = [atom()] +%% +%% @end +%%-------------------------------------------------------------------- +-spec stat(Db :: db(), Opts :: db_flags()) -> + {ok, [{atom(), number()}]} | db_error(). + +stat(Db, Opts) -> + Flags = process_flags(Opts), + Cmd = <>, + <> = erlang:port_control(get_port(), ?CMD_DB_STAT, Cmd), + case decode_rc(Result) of + ok -> + receive + {error, Reason} -> + {error, decode_rc(Reason)}; + {ok, Stats} -> + {ok, Stats} + end; + Error -> + {error, Error} + end. + + +%%-------------------------------------------------------------------- +%% @doc +%% Print database stats +%% +%% This function prints performance statistics from the database to wherever +%% BDB messages are being sent +%% +%% === Options === +%% +%%
+%%
fast_stat
+%%
Return only the values which do not require traversal of the database. +%% Among other things, this flag makes it possible for applications to +%% request key and record counts without incurring the performance +%% penalty of traversing the entire database.
+%%
read_committed
+%%
Database items read during a transactional call will have degree 2 +%% isolation. This ensures the stability of the data items read during +%% the stat operation but permits that data to be modified or deleted by +%% other transactions prior to the commit of the specified transaction. +%%
+%%
read_uncommitted
+%%
Database items read during a transactional call will have degree 1 +%% isolation, including modified but not yet committed data. Silently +%% ignored if the read_committed flag was not specified when the +%% underlying database was opened.
+%%
+%% +%% @spec stat_print(Db, Opts) -> ok | {error, Error} +%% where +%% Db = integer() +%% Opts = [atom()] +%% +%% @end +%%-------------------------------------------------------------------- +-spec stat_print(Db :: db(), Opts :: db_flags()) -> + ok | db_error(). +stat_print(Db, Opts) -> + Flags = process_flags(Opts), + Cmd = <>, + <> = erlang:port_control(get_port(), ?CMD_DB_STAT_PRINT, Cmd), + case decode_rc(Result) of + ok -> ok; + Error -> {error, Error} + end. + + +%%-------------------------------------------------------------------- +%% @doc +%% Print environment stats +%% +%% This function prints environment statistics to wherever +%% BDB messages are being sent. There is no documented way +%% to get this programatically +%% +%% === Options === +%% +%%
+%%
stat_all
+%%
Display all available information.
+%%
stat_clear
+%%
Reset statistics after displaying their values.
+%%
stat_subsystem
+%%
Display information for all configured subsystems.
+%%
+%% +%% @spec stat_print(Opts) -> ok | {error, Error} +%% where +%% Db = integer() +%% Opts = [atom()] +%% +%% @end +%%-------------------------------------------------------------------- +-spec stat_print(Opts :: db_flags()) -> + ok | db_error(). +stat_print(Opts) -> + Flags = process_flags(Opts), + Cmd = <>, + <> = erlang:port_control(get_port(), ?CMD_ENV_STAT_PRINT, Cmd), + case decode_rc(Result) of + ok -> ok; + Error -> {error, Error} + end. + + + %%-------------------------------------------------------------------- %% @doc @@ -1379,6 +1521,7 @@ flag_value(Flag) -> consume_wait -> ?DB_CONSUME_WAIT; create -> ?DB_CREATE; exclusive -> ?DB_EXCL; + fast_stat -> ?DB_FAST_STAT; get_both -> ?DB_GET_BOTH; ignore_lease -> ?DB_IGNORE_LEASE; multiple -> ?DB_MULTIPLE; @@ -1392,6 +1535,9 @@ flag_value(Flag) -> readonly -> ?DB_RDONLY; rmw -> ?DB_RMW; set_recno -> ?DB_SET_RECNO; + stat_all -> ?DB_STAT_ALL; + stat_clear -> ?DB_STAT_CLEAR; + stat_subsystem -> ?DB_STAT_SUBSYSTEM; threaded -> ?DB_THREAD; truncate -> ?DB_TRUNCATE; txn_no_sync -> ?DB_TXN_NOSYNC; diff --git a/test/bdberl_SUITE.erl b/test/bdberl_SUITE.erl index 14895d6..a9a2dc9 100644 --- a/test/bdberl_SUITE.erl +++ b/test/bdberl_SUITE.erl @@ -31,7 +31,10 @@ all() -> delete_should_remove_file, delete_should_fail_if_db_inuse, truncate_should_empty_database, - truncate_all_should_empty_all_databases]. + truncate_all_should_empty_all_databases, + btree_stat_should_report_on_success, + hash_stat_should_report_on_success, + stat_should_fail_on_bad_dbref]. dbconfig(Config) -> @@ -237,3 +240,40 @@ truncate_all_should_empty_all_databases(Config) -> ok = bdberl:put(Db, mykey, avalue), ok = bdberl:truncate(), not_found = bdberl:get(Db, mykey). + +btree_stat_should_report_on_success(_Config) -> + {ok, Db} = bdberl:open("btree_stat.bdb", btree), + {ok, Stat} = bdberl:stat(Db, []), + %% Check stats are zero on the new db + 0 = proplists:get_value(nkeys, Stat), + 0 = proplists:get_value(ndata, Stat), + + %% Put a record and check the number of records updates + ok = bdberl:put(Db, mykey, avalue), + + {ok, Stat1} = bdberl:stat(Db, []), + %% Check stats are zero on the new db + 1 = proplists:get_value(nkeys, Stat1), + 1 = proplists:get_value(ndata, Stat1), + done. + + +hash_stat_should_report_on_success(_Config) -> + {ok, Db} = bdberl:open("hash_stat.bdb", hash), + {ok, Stat} = bdberl:stat(Db, []), + %% Check stats are zero on the new db + 0 = proplists:get_value(nkeys, Stat), + 0 = proplists:get_value(ndata, Stat), + + %% Put a record and check the number of records updates + ok = bdberl:put(Db, mykey, avalue), + + {ok, Stat1} = bdberl:stat(Db, []), + %% Check stats are zero on the new db + 1 = proplists:get_value(nkeys, Stat1), + 1 = proplists:get_value(ndata, Stat1), + done. + +stat_should_fail_on_bad_dbref(_Config) -> + {error, invalid_db} = bdberl:stat(10000000, []), + done.