From 05c1754e7f7e1598013f8a2b5d2f5448622c72ae Mon Sep 17 00:00:00 2001 From: Jon Meredith Date: Wed, 27 May 2009 14:03:11 -0600 Subject: [PATCH 01/14] Added stat/2 and stat_print/1, stat_print/2 functions for retrieving/displaying stats about the BDB subsystem. --- c_src/bdberl_drv.c | 283 ++++++++++++++++++++++++++++++++++++++++++ c_src/bdberl_drv.h | 6 +- include/bdberl.hrl | 5 + src/bdberl.erl | 146 ++++++++++++++++++++++ test/bdberl_SUITE.erl | 42 ++++++- 5 files changed, 480 insertions(+), 2 deletions(-) diff --git a/c_src/bdberl_drv.c b/c_src/bdberl_drv.c index 82ef185..11757a9 100644 --- a/c_src/bdberl_drv.c +++ b/c_src/bdberl_drv.c @@ -34,6 +34,7 @@ static void do_async_get(void* arg); static void do_async_txnop(void* arg); static void do_async_cursor_get(void* arg); static void do_async_truncate(void* arg); +static void do_async_stat(void* arg); static int add_dbref(PortData* data, int dbref); static int del_dbref(PortData* data, int dbref); @@ -765,6 +766,85 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, WRITE_UNLOCK(G_LOG_RWLOCK); } + *outbuf = 0; + return 0; + } + case CMD_DB_STAT: + { + FAIL_IF_ASYNC_PENDING(d, outbuf); + + // Inbuf is << DbRef:32, Flags:32 >> + int dbref = UNPACK_INT(inbuf, 0); + + // Make sure this port currently has dbref open -- if it doesn't, error out. Of note, + // if it's in our list, we don't need to grab the RWLOCK, as we don't have to worry about + // the underlying handle disappearing since we have a reference. + if (has_dbref(d, dbref)) + { + // If the working buffer is large enough, copy the data to put/get into it. Otherwise, realloc + // until it is large enough + if (d->work_buffer_sz < inbuf_sz) + { + d->work_buffer = driver_realloc(d->work_buffer, inbuf_sz); + d->work_buffer_sz = inbuf_sz; + } + + // Copy the payload into place + memcpy(d->work_buffer, inbuf, inbuf_sz); + d->work_buffer_offset = inbuf_sz; + + // Mark the port as busy and then schedule the appropriate async operation + d->async_op = cmd; + d->async_pool = G_TPOOL_GENERAL; + bdberl_tpool_run(d->async_pool, &do_async_stat, d, 0, &d->async_job); + + // Let caller know that the operation is in progress + // Outbuf is: <<0:32>> + RETURN_INT(0, outbuf); + } + else + { + // Invalid dbref + RETURN_INT(ERROR_INVALID_DBREF, outbuf); + } + } + case CMD_DB_STAT_PRINT: + { + FAIL_IF_ASYNC_PENDING(d, outbuf); + + // Inbuf is << DbRef:32, Flags:32 >> + int dbref = UNPACK_INT(inbuf, 0); + + // Make sure this port currently has dbref open -- if it doesn't, error out. Of note, + // if it's in our list, we don't need to grab the RWLOCK, as we don't have to worry about + // the underlying handle disappearing since we have a reference. + if (has_dbref(d, dbref)) + { + DB* db = G_DATABASES[dbref].db; + unsigned int flags = UNPACK_INT(inbuf, 4); + + // Outbuf is <> + // Run the command on the VM thread - this is for debugging only, + // any real monitoring + int rc = db->stat_print(db, flags); + RETURN_INT(rc, outbuf); + } + else + { + // Invalid dbref + RETURN_INT(ERROR_INVALID_DBREF, outbuf); + } + } + case CMD_ENV_STAT_PRINT: + { + FAIL_IF_ASYNC_PENDING(d, outbuf); + + // Inbuf is << Flags:32 >> + unsigned int flags = UNPACK_INT(inbuf, 0); + + // Outbuf is <> + int rc = G_DB_ENV->stat_print(G_DB_ENV, flags); + RETURN_INT(rc, outbuf); } } *outbuf = 0; @@ -1074,6 +1154,154 @@ static void async_cleanup_and_send_kv(PortData* d, int rc, DBT* key, DBT* value) } } + + +#define BT_STATS_TUPLE(base, member) \ + ERL_DRV_ATOM, driver_mk_atom(#member), \ + ERL_DRV_UINT, (base)->bt_##member, \ + ERL_DRV_TUPLE, 2 +static void async_cleanup_and_send_btree_stats(PortData* d, char *type, DB_BTREE_STAT *bsp) +{ + // Save the port and pid references -- we need copies independent from the PortData + // structure. Once we release the port_lock after clearing the cmd, it's possible that + // the port could go away without waiting on us to finish. This is acceptable, but we need + // to be certain that there is no overlap of data between the two threads. driver_send_term + // is safe to use from a thread, even if the port you're sending from has already expired. + ErlDrvPort port = d->port; + ErlDrvTermData pid = d->port_owner; + async_cleanup(d); + + ErlDrvTermData response[] = { + ERL_DRV_ATOM, driver_mk_atom("ok"), + // Start of list + ERL_DRV_ATOM, driver_mk_atom("type"), + ERL_DRV_ATOM, driver_mk_atom(type), + ERL_DRV_TUPLE, 2, + BT_STATS_TUPLE(bsp, magic), /* Magic number. */ + BT_STATS_TUPLE(bsp, version), /* Version number. */ + BT_STATS_TUPLE(bsp, metaflags), /* Metadata flags. */ + BT_STATS_TUPLE(bsp, nkeys), /* Number of unique keys. */ + BT_STATS_TUPLE(bsp, ndata), /* Number of data items. */ + BT_STATS_TUPLE(bsp, pagecnt), /* Page count. */ + BT_STATS_TUPLE(bsp, pagesize), /* Page size. */ + BT_STATS_TUPLE(bsp, minkey), /* Minkey value. */ + BT_STATS_TUPLE(bsp, re_len), /* Fixed-length record length. */ + BT_STATS_TUPLE(bsp, re_pad), /* Fixed-length record pad. */ + BT_STATS_TUPLE(bsp, levels), /* Tree levels. */ + BT_STATS_TUPLE(bsp, int_pg), /* Internal pages. */ + BT_STATS_TUPLE(bsp, leaf_pg), /* Leaf pages. */ + BT_STATS_TUPLE(bsp, dup_pg), /* Duplicate pages. */ + BT_STATS_TUPLE(bsp, over_pg), /* Overflow pages. */ + BT_STATS_TUPLE(bsp, empty_pg), /* Empty pages. */ + BT_STATS_TUPLE(bsp, free), /* Pages on the free list. */ + BT_STATS_TUPLE(bsp, int_pgfree), /* Bytes free in internal pages. */ + BT_STATS_TUPLE(bsp, leaf_pgfree), /* Bytes free in leaf pages. */ + BT_STATS_TUPLE(bsp, dup_pgfree), /* Bytes free in duplicate pages. */ + BT_STATS_TUPLE(bsp, over_pgfree), /* Bytes free in overflow pages. */ + // End of list + ERL_DRV_NIL, + ERL_DRV_LIST, 21+2, + ERL_DRV_TUPLE, 2 + }; + driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); +} +#undef BT_STATS_TUPLE + + +#define HASH_STATS_TUPLE(base, member) \ + ERL_DRV_ATOM, driver_mk_atom(#member), \ + ERL_DRV_UINT, (base)->hash_##member, \ + ERL_DRV_TUPLE, 2 + +static void async_cleanup_and_send_hash_stats(PortData* d, DB_HASH_STAT *hsp) +{ + // Save the port and pid references -- we need copies independent from the PortData + // structure. Once we release the port_lock after clearing the cmd, it's possible that + // the port could go away without waiting on us to finish. This is acceptable, but we need + // to be certain that there is no overlap of data between the two threads. driver_send_term + // is safe to use from a thread, even if the port you're sending from has already expired. + ErlDrvPort port = d->port; + ErlDrvTermData pid = d->port_owner; + async_cleanup(d); + + ErlDrvTermData response[] = { + ERL_DRV_ATOM, driver_mk_atom("ok"), + // Start of list + ERL_DRV_ATOM, driver_mk_atom("type"), + ERL_DRV_ATOM, driver_mk_atom("hash"), + ERL_DRV_TUPLE, 2, + HASH_STATS_TUPLE(hsp, magic), /* Magic number. */ + HASH_STATS_TUPLE(hsp, version), /* Version number. */ + HASH_STATS_TUPLE(hsp, metaflags), /* Metadata flags. */ + HASH_STATS_TUPLE(hsp, nkeys), /* Number of unique keys. */ + HASH_STATS_TUPLE(hsp, ndata), /* Number of data items. */ + HASH_STATS_TUPLE(hsp, pagecnt), /* Page count. */ + HASH_STATS_TUPLE(hsp, pagesize), /* Page size. */ + HASH_STATS_TUPLE(hsp, ffactor), /* Fill factor specified at create. */ + HASH_STATS_TUPLE(hsp, buckets), /* Number of hash buckets. */ + HASH_STATS_TUPLE(hsp, free), /* Pages on the free list. */ + HASH_STATS_TUPLE(hsp, bfree), /* Bytes free on bucket pages. */ + HASH_STATS_TUPLE(hsp, bigpages), /* Number of big key/data pages. */ + HASH_STATS_TUPLE(hsp, big_bfree), /* Bytes free on big item pages. */ + HASH_STATS_TUPLE(hsp, overflows), /* Number of overflow pages. */ + HASH_STATS_TUPLE(hsp, ovfl_free), /* Bytes free on ovfl pages. */ + HASH_STATS_TUPLE(hsp, dup), /* Number of dup pages. */ + HASH_STATS_TUPLE(hsp, dup_free), /* Bytes free on duplicate pages. */ + // End of list + ERL_DRV_NIL, + ERL_DRV_LIST, 17+2, + ERL_DRV_TUPLE, 2 + }; + driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); +} +#undef HASH_STATS_TUPLE + +#ifdef ENABLE_QUEUE // If we ever decide to support Queues + +#define QS_STATS_TUPLE(base, member) \ + ERL_DRV_ATOM, driver_mk_atom(#member), \ + ERL_DRV_UINT, (base)->qs_##member, \ + ERL_DRV_TUPLE, 2 +static void async_cleanup_and_send_queue_stats(PortData* d, DB_QUEUE_STAT *qsp) +{ + // Save the port and pid references -- we need copies independent from the PortData + // structure. Once we release the port_lock after clearing the cmd, it's possible that + // the port could go away without waiting on us to finish. This is acceptable, but we need + // to be certain that there is no overlap of data between the two threads. driver_send_term + // is safe to use from a thread, even if the port you're sending from has already expired. + ErlDrvPort port = d->port; + ErlDrvTermData pid = d->port_owner; + async_cleanup(d); + + ErlDrvTermData response[] = { + ERL_DRV_ATOM, driver_mk_atom("ok"), + // Start of list + ERL_DRV_ATOM, driver_mk_atom("type"), + ERL_DRV_ATOM, driver_mk_atom("queue"), + ERL_DRV_TUPLE, 2, + QS_STAT_TUPLE(qsp, qs_magic), /* Magic number. */ + QS_STAT_TUPLE(qsp, version), /* Version number. */ + QS_STAT_TUPLE(qsp, metaflags), /* Metadata flags. */ + QS_STAT_TUPLE(qsp, nkeys), /* Number of unique keys. */ + QS_STAT_TUPLE(qsp, ndata), /* Number of data items. */ + QS_STAT_TUPLE(qsp, pagesize), /* Page size. */ + QS_STAT_TUPLE(qsp, extentsize), /* Pages per extent. */ + QS_STAT_TUPLE(qsp, pages), /* Data pages. */ + QS_STAT_TUPLE(qsp, re_len), /* Fixed-length record length. */ + QS_STAT_TUPLE(qsp, re_pad), /* Fixed-length record pad. */ + QS_STAT_TUPLE(qsp, pgfree), /* Bytes free in data pages. */ + QS_STAT_TUPLE(qsp, first_recno), /* First not deleted record. */ + QS_STAT_TUPLE(qsp, cur_recno), /* Next available record number. */ + // End of list + ERL_DRV_NIL, + ERL_DRV_LIST, 13+2, + ERL_DRV_TUPLE, 2 + }; + driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); +} +#undef QUEUE_STATS_TUPLE +#endif // ENABLE_QUEUE + static void do_async_put(void* arg) { // Payload is: <> @@ -1289,6 +1517,61 @@ static void do_async_truncate(void* arg) } +static void do_async_stat(void* arg) +{ + // Payload is: << DbRef:32, Flags:32 >> + PortData* d = (PortData*)arg; + + // Get the database object, using the provided ref + int dbref = UNPACK_INT(d->work_buffer, 0); + DB* db = G_DATABASES[dbref].db; + DBTYPE type = DB_UNKNOWN; + int rc = db->get_type(db, &type); + if (rc != 0) + { + async_cleanup_and_send_rc(d, rc); + return; + } + + // Extract operation flags + unsigned flags = UNPACK_INT(d->work_buffer, 4); + + void *sp = NULL; + rc = db->stat(db, d->txn, &sp, flags); + if (rc != 0 || sp == NULL) + { + async_cleanup_and_send_rc(d, ERROR_INVALID_DB_TYPE); + } + else + { + switch(type) + { + case DB_BTREE: /*FALLTHRU*/ + case DB_RECNO: + async_cleanup_and_send_btree_stats(d, type == DB_BTREE ? "btree" :"recno", sp); + break; + case DB_HASH: + async_cleanup_and_send_hash_stats(d, sp); + break; +#ifdef ENABLE_QUEUE + case DB_QUEUE: + async_cleanup_and_send_queue_stats(d, sp); + break; +#endif + default: + async_cleanup_and_send_rc(d, ERROR_INVALID_DB_TYPE); + break; + } + } + + // Finally, clean up value buffer (driver_send_term made a copy) + if (NULL != sp) + { + free(sp); + } +} + + static void* zalloc(unsigned int size) { void* res = driver_alloc(size); diff --git a/c_src/bdberl_drv.h b/c_src/bdberl_drv.h index a3388d7..c30eaf0 100644 --- a/c_src/bdberl_drv.h +++ b/c_src/bdberl_drv.h @@ -46,6 +46,9 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, #define CMD_REMOVE_DB 15 #define CMD_TRUNCATE 16 #define CMD_REGISTER_LOGGER 17 +#define CMD_DB_STAT 18 +#define CMD_DB_STAT_PRINT 19 +#define CMD_ENV_STAT_PRINT 20 /** * Command status values @@ -74,7 +77,8 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, #define ERROR_CURSOR_OPEN (-29005) /* Cursor already active on this port */ #define ERROR_NO_CURSOR (-29006) /* No cursor open on this port */ #define ERROR_DB_ACTIVE (-29007) /* Database is currently active; operation requires otherwise */ - +#define ERROR_INVALID_CMD (-29008) /* Invalid command code requested */ +#define ERROR_INVALID_DB_TYPE (-29009) /* Invalid database type */ /** * System information ids diff --git a/include/bdberl.hrl b/include/bdberl.hrl index a306b30..4fdec9f 100644 --- a/include/bdberl.hrl +++ b/include/bdberl.hrl @@ -23,6 +23,9 @@ -define(CMD_REMOVE_DB, 15). -define(CMD_TRUNCATE, 16). -define(CMD_REGISTER_LOGGER, 17). +-define(CMD_DB_STAT, 18). +-define(CMD_DB_STAT_PRINT, 19). +-define(CMD_ENV_STAT_PRINT, 20). -define(DB_TYPE_BTREE, 1). -define(DB_TYPE_HASH, 2). @@ -43,6 +46,8 @@ -define(ERROR_CURSOR_OPEN, -29005). % Cursor already active on this port -define(ERROR_NO_CURSOR, -29006). % No cursor open on this port -define(ERROR_DB_ACTIVE, -29007). % Database is currently active; operation requires otherwise +-define(ERROR_INVALID_CMD, -29008). % Invalid command +-define(ERROR_INVALID_DB_TYPE,-29009). % Invalid database type %% DB (user visible) error return codes. -define(DB_BUFFER_SMALL, -30999). % User memory too small for return diff --git a/src/bdberl.erl b/src/bdberl.erl index 1323032..cc2184a 100644 --- a/src/bdberl.erl +++ b/src/bdberl.erl @@ -14,6 +14,8 @@ get_cache_size/0, get_data_dirs/0, get_txn_timeout/0, + stat/2, + stat_print/1, stat_print/2, transaction/1, transaction/2, transaction/3, put/3, put/4, put_r/3, put_r/4, @@ -1280,6 +1282,146 @@ get_txn_timeout() -> {error, decode_rc(Result)} end. +%%-------------------------------------------------------------------- +%% @doc +%% Retrieve database stats +%% +%% This function retrieves performance statistics from the database. +%% +%% === Options === +%% +%%
+%%
fast_stat
+%%
Return only the values which do not require traversal of the database. +%% Among other things, this flag makes it possible for applications to +%% request key and record counts without incurring the performance +%% penalty of traversing the entire database.
+%%
read_committed
+%%
Database items read during a transactional call will have degree 2 +%% isolation. This ensures the stability of the data items read during +%% the stat operation but permits that data to be modified or deleted by +%% other transactions prior to the commit of the specified +%% transaction.
+%%
read_uncommitted
+%%
Database items read during a transactional call will have degree 1 +%% isolation, including modified but not yet committed data. Silently +%% ignored if the read_committed flag was not specified when the +%% underlying database was opened.
+%%
+%% +%% @spec stat(Db, Opts) -> {ok, [{atom(), number()}]} | {error, Error} +%% where +%% Db = integer() +%% Opts = [atom()] +%% +%% @end +%%-------------------------------------------------------------------- +-spec stat(Db :: db(), Opts :: db_flags()) -> + {ok, [{atom(), number()}]} | db_error(). + +stat(Db, Opts) -> + Flags = process_flags(Opts), + Cmd = <>, + <> = erlang:port_control(get_port(), ?CMD_DB_STAT, Cmd), + case decode_rc(Result) of + ok -> + receive + {error, Reason} -> + {error, decode_rc(Reason)}; + {ok, Stats} -> + {ok, Stats} + end; + Error -> + {error, Error} + end. + + +%%-------------------------------------------------------------------- +%% @doc +%% Print database stats +%% +%% This function prints performance statistics from the database to wherever +%% BDB messages are being sent +%% +%% === Options === +%% +%%
+%%
fast_stat
+%%
Return only the values which do not require traversal of the database. +%% Among other things, this flag makes it possible for applications to +%% request key and record counts without incurring the performance +%% penalty of traversing the entire database.
+%%
read_committed
+%%
Database items read during a transactional call will have degree 2 +%% isolation. This ensures the stability of the data items read during +%% the stat operation but permits that data to be modified or deleted by +%% other transactions prior to the commit of the specified transaction. +%%
+%%
read_uncommitted
+%%
Database items read during a transactional call will have degree 1 +%% isolation, including modified but not yet committed data. Silently +%% ignored if the read_committed flag was not specified when the +%% underlying database was opened.
+%%
+%% +%% @spec stat_print(Db, Opts) -> ok | {error, Error} +%% where +%% Db = integer() +%% Opts = [atom()] +%% +%% @end +%%-------------------------------------------------------------------- +-spec stat_print(Db :: db(), Opts :: db_flags()) -> + ok | db_error(). +stat_print(Db, Opts) -> + Flags = process_flags(Opts), + Cmd = <>, + <> = erlang:port_control(get_port(), ?CMD_DB_STAT_PRINT, Cmd), + case decode_rc(Result) of + ok -> ok; + Error -> {error, Error} + end. + + +%%-------------------------------------------------------------------- +%% @doc +%% Print environment stats +%% +%% This function prints environment statistics to wherever +%% BDB messages are being sent. There is no documented way +%% to get this programatically +%% +%% === Options === +%% +%%
+%%
stat_all
+%%
Display all available information.
+%%
stat_clear
+%%
Reset statistics after displaying their values.
+%%
stat_subsystem
+%%
Display information for all configured subsystems.
+%%
+%% +%% @spec stat_print(Opts) -> ok | {error, Error} +%% where +%% Db = integer() +%% Opts = [atom()] +%% +%% @end +%%-------------------------------------------------------------------- +-spec stat_print(Opts :: db_flags()) -> + ok | db_error(). +stat_print(Opts) -> + Flags = process_flags(Opts), + Cmd = <>, + <> = erlang:port_control(get_port(), ?CMD_ENV_STAT_PRINT, Cmd), + case decode_rc(Result) of + ok -> ok; + Error -> {error, Error} + end. + + + %%-------------------------------------------------------------------- %% @doc @@ -1379,6 +1521,7 @@ flag_value(Flag) -> consume_wait -> ?DB_CONSUME_WAIT; create -> ?DB_CREATE; exclusive -> ?DB_EXCL; + fast_stat -> ?DB_FAST_STAT; get_both -> ?DB_GET_BOTH; ignore_lease -> ?DB_IGNORE_LEASE; multiple -> ?DB_MULTIPLE; @@ -1392,6 +1535,9 @@ flag_value(Flag) -> readonly -> ?DB_RDONLY; rmw -> ?DB_RMW; set_recno -> ?DB_SET_RECNO; + stat_all -> ?DB_STAT_ALL; + stat_clear -> ?DB_STAT_CLEAR; + stat_subsystem -> ?DB_STAT_SUBSYSTEM; threaded -> ?DB_THREAD; truncate -> ?DB_TRUNCATE; txn_no_sync -> ?DB_TXN_NOSYNC; diff --git a/test/bdberl_SUITE.erl b/test/bdberl_SUITE.erl index 14895d6..a9a2dc9 100644 --- a/test/bdberl_SUITE.erl +++ b/test/bdberl_SUITE.erl @@ -31,7 +31,10 @@ all() -> delete_should_remove_file, delete_should_fail_if_db_inuse, truncate_should_empty_database, - truncate_all_should_empty_all_databases]. + truncate_all_should_empty_all_databases, + btree_stat_should_report_on_success, + hash_stat_should_report_on_success, + stat_should_fail_on_bad_dbref]. dbconfig(Config) -> @@ -237,3 +240,40 @@ truncate_all_should_empty_all_databases(Config) -> ok = bdberl:put(Db, mykey, avalue), ok = bdberl:truncate(), not_found = bdberl:get(Db, mykey). + +btree_stat_should_report_on_success(_Config) -> + {ok, Db} = bdberl:open("btree_stat.bdb", btree), + {ok, Stat} = bdberl:stat(Db, []), + %% Check stats are zero on the new db + 0 = proplists:get_value(nkeys, Stat), + 0 = proplists:get_value(ndata, Stat), + + %% Put a record and check the number of records updates + ok = bdberl:put(Db, mykey, avalue), + + {ok, Stat1} = bdberl:stat(Db, []), + %% Check stats are zero on the new db + 1 = proplists:get_value(nkeys, Stat1), + 1 = proplists:get_value(ndata, Stat1), + done. + + +hash_stat_should_report_on_success(_Config) -> + {ok, Db} = bdberl:open("hash_stat.bdb", hash), + {ok, Stat} = bdberl:stat(Db, []), + %% Check stats are zero on the new db + 0 = proplists:get_value(nkeys, Stat), + 0 = proplists:get_value(ndata, Stat), + + %% Put a record and check the number of records updates + ok = bdberl:put(Db, mykey, avalue), + + {ok, Stat1} = bdberl:stat(Db, []), + %% Check stats are zero on the new db + 1 = proplists:get_value(nkeys, Stat1), + 1 = proplists:get_value(ndata, Stat1), + done. + +stat_should_fail_on_bad_dbref(_Config) -> + {error, invalid_db} = bdberl:stat(10000000, []), + done. From 70dd9ee11a17730352a8621736c3bec1678104c2 Mon Sep 17 00:00:00 2001 From: Jon Meredith Date: Wed, 27 May 2009 15:02:58 -0600 Subject: [PATCH 02/14] Added lock_stat/1 and lock_stat_print/1 functions to get lock stats. --- c_src/bdberl_drv.c | 143 +++++++++++++++++++++++++++++++++++++++++- c_src/bdberl_drv.h | 2 + include/bdberl.hrl | 2 + src/bdberl.erl | 99 +++++++++++++++++++++++++++-- test/bdberl_SUITE.erl | 9 ++- 5 files changed, 248 insertions(+), 7 deletions(-) diff --git a/c_src/bdberl_drv.c b/c_src/bdberl_drv.c index 11757a9..e84f2b6 100644 --- a/c_src/bdberl_drv.c +++ b/c_src/bdberl_drv.c @@ -35,6 +35,7 @@ static void do_async_txnop(void* arg); static void do_async_cursor_get(void* arg); static void do_async_truncate(void* arg); static void do_async_stat(void* arg); +static void do_async_lock_stat(void* arg); static int add_dbref(PortData* data, int dbref); static int del_dbref(PortData* data, int dbref); @@ -846,6 +847,45 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, int rc = G_DB_ENV->stat_print(G_DB_ENV, flags); RETURN_INT(rc, outbuf); } + case CMD_LOCK_STAT: + { + FAIL_IF_ASYNC_PENDING(d, outbuf); + + // Inbuf is <> + // If the working buffer is large enough, copy the data to put/get into it. Otherwise, realloc + // until it is large enough + if (d->work_buffer_sz < inbuf_sz) + { + d->work_buffer = driver_realloc(d->work_buffer, inbuf_sz); + d->work_buffer_sz = inbuf_sz; + } + + // Copy the payload into place + memcpy(d->work_buffer, inbuf, inbuf_sz); + d->work_buffer_offset = inbuf_sz; + + // Mark the port as busy and then schedule the appropriate async operation + d->async_op = cmd; + d->async_pool = G_TPOOL_GENERAL; + bdberl_tpool_run(d->async_pool, &do_async_lock_stat, d, 0, &d->async_job); + + // Let caller know that the operation is in progress + // Outbuf is: <<0:32>> + RETURN_INT(0, outbuf); + } + case CMD_LOCK_STAT_PRINT: + { + FAIL_IF_ASYNC_PENDING(d, outbuf); + + // Inbuf is << Flags:32 >> + unsigned int flags = UNPACK_INT(inbuf, 0); + + // Outbuf is <> + // Run the command on the VM thread - this is for debugging only, + // any real monitoring will use the async lock_stat + int rc = G_DB_ENV->lock_stat_print(G_DB_ENV, flags); + RETURN_INT(rc, outbuf); + } } *outbuf = 0; return 0; @@ -1302,6 +1342,81 @@ static void async_cleanup_and_send_queue_stats(PortData* d, DB_QUEUE_STAT *qsp) #undef QUEUE_STATS_TUPLE #endif // ENABLE_QUEUE +#define ST_STATS_TUPLE(base, member) \ + ERL_DRV_ATOM, driver_mk_atom(#member), \ + ERL_DRV_UINT, (base)->st_##member, \ + ERL_DRV_TUPLE, 2 + +#define ST_STATS_INT_TUPLE(base, member) \ + ERL_DRV_ATOM, driver_mk_atom(#member), \ + ERL_DRV_INT, (base)->st_##member, \ + ERL_DRV_TUPLE, 2 + +static void async_cleanup_and_send_lock_stats(PortData* d, DB_LOCK_STAT *lsp) +{ + // Save the port and pid references -- we need copies independent from the PortData + // structure. Once we release the port_lock after clearing the cmd, it's possible that + // the port could go away without waiting on us to finish. This is acceptable, but we need + // to be certain that there is no overlap of data between the two threads. driver_send_term + // is safe to use from a thread, even if the port you're sending from has already expired. + ErlDrvPort port = d->port; + ErlDrvTermData pid = d->port_owner; + async_cleanup(d); + + ErlDrvTermData response[] = { + ERL_DRV_ATOM, driver_mk_atom("ok"), + // Start of list + ST_STATS_TUPLE(lsp, id), /* Last allocated locker ID. */ + ST_STATS_TUPLE(lsp, cur_maxid), /* Current maximum unused ID. */ + ST_STATS_TUPLE(lsp, maxlocks), /* Maximum number of locks in table. */ + ST_STATS_TUPLE(lsp, maxlockers), /* Maximum num of lockers in table. */ + ST_STATS_TUPLE(lsp, maxobjects), /* Maximum num of objects in table. */ + ST_STATS_TUPLE(lsp, partitions), /* number of partitions. */ + ST_STATS_INT_TUPLE(lsp, nmodes), /* Number of lock modes. */ + ST_STATS_TUPLE(lsp, nlockers), /* Current number of lockers. */ + ST_STATS_TUPLE(lsp, nlocks), /* Current number of locks. */ + ST_STATS_TUPLE(lsp, maxnlocks), /* Maximum number of locks so far. */ + ST_STATS_TUPLE(lsp, maxhlocks), /* Maximum number of locks in any bucket. */ + ST_STATS_TUPLE(lsp, locksteals), /* Number of lock steals so far. */ + ST_STATS_TUPLE(lsp, maxlsteals), /* Maximum number steals in any partition. */ + ST_STATS_TUPLE(lsp, maxnlockers), /* Maximum number of lockers so far. */ + ST_STATS_TUPLE(lsp, nobjects), /* Current number of objects. */ + ST_STATS_TUPLE(lsp, maxnobjects), /* Maximum number of objects so far. */ + ST_STATS_TUPLE(lsp, maxhobjects), /* Maximum number of objectsin any bucket. */ + ST_STATS_TUPLE(lsp, objectsteals), /* Number of objects steals so far. */ + ST_STATS_TUPLE(lsp, maxosteals), /* Maximum number of steals in any partition. */ + ST_STATS_TUPLE(lsp, nrequests), /* Number of lock gets. */ + ST_STATS_TUPLE(lsp, nreleases), /* Number of lock puts. */ + ST_STATS_TUPLE(lsp, nupgrade), /* Number of lock upgrades. */ + ST_STATS_TUPLE(lsp, ndowngrade), /* Number of lock downgrades. */ + ST_STATS_TUPLE(lsp, lock_wait), /* Lock conflicts w/ subsequent wait */ + ST_STATS_TUPLE(lsp, lock_nowait), /* Lock conflicts w/o subsequent wait */ + ST_STATS_TUPLE(lsp, ndeadlocks), /* Number of lock deadlocks. */ + ST_STATS_TUPLE(lsp, locktimeout), /* Lock timeout. */ + ST_STATS_TUPLE(lsp, nlocktimeouts), /* Number of lock timeouts. */ + ST_STATS_TUPLE(lsp, txntimeout), /* Transaction timeout. */ + ST_STATS_TUPLE(lsp, ntxntimeouts), /* Number of transaction timeouts. */ + ST_STATS_TUPLE(lsp, part_wait), /* Partition lock granted after wait. */ + ST_STATS_TUPLE(lsp, part_nowait), /* Partition lock granted without wait. */ + ST_STATS_TUPLE(lsp, part_max_wait), /* Max partition lock granted after wait. */ + ST_STATS_TUPLE(lsp, part_max_nowait), /* Max partition lock granted without wait. */ + ST_STATS_TUPLE(lsp, objs_wait), /* Object lock granted after wait. */ + ST_STATS_TUPLE(lsp, objs_nowait), /* Object lock granted without wait. */ + ST_STATS_TUPLE(lsp, lockers_wait), /* Locker lock granted after wait. */ + ST_STATS_TUPLE(lsp, lockers_nowait),/* Locker lock granted without wait. */ + ST_STATS_TUPLE(lsp, region_wait), /* Region lock granted after wait. */ + ST_STATS_TUPLE(lsp, region_nowait), /* Region lock granted without wait. */ + ST_STATS_TUPLE(lsp, hash_len), /* Max length of bucket. */ + ST_STATS_TUPLE(lsp, regsize), /* Region size. - will have to cast to uint */ + + // End of list + ERL_DRV_NIL, + ERL_DRV_LIST, 42+1, + ERL_DRV_TUPLE, 2 + }; + driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); +} + static void do_async_put(void* arg) { // Payload is: <> @@ -1540,7 +1655,7 @@ static void do_async_stat(void* arg) rc = db->stat(db, d->txn, &sp, flags); if (rc != 0 || sp == NULL) { - async_cleanup_and_send_rc(d, ERROR_INVALID_DB_TYPE); + async_cleanup_and_send_rc(d, rc); } else { @@ -1571,6 +1686,32 @@ static void do_async_stat(void* arg) } } +static void do_async_lock_stat(void* arg) +{ + // Payload is: <> + PortData* d = (PortData*)arg; + + // Extract operation flags + unsigned flags = UNPACK_INT(d->work_buffer, 0); + + DB_LOCK_STAT *lsp = NULL; + int rc = G_DB_ENV->lock_stat(G_DB_ENV, &lsp, flags); + if (rc != 0 || lsp == NULL) + { + async_cleanup_and_send_rc(d, rc); + } + else + { + async_cleanup_and_send_lock_stats(d, lsp); + } + + // Finally, clean up lock stats + if (NULL != lsp) + { + free(lsp); + } +} + static void* zalloc(unsigned int size) { diff --git a/c_src/bdberl_drv.h b/c_src/bdberl_drv.h index c30eaf0..ddb89d8 100644 --- a/c_src/bdberl_drv.h +++ b/c_src/bdberl_drv.h @@ -49,6 +49,8 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, #define CMD_DB_STAT 18 #define CMD_DB_STAT_PRINT 19 #define CMD_ENV_STAT_PRINT 20 +#define CMD_LOCK_STAT 21 +#define CMD_LOCK_STAT_PRINT 22 /** * Command status values diff --git a/include/bdberl.hrl b/include/bdberl.hrl index 4fdec9f..92aa674 100644 --- a/include/bdberl.hrl +++ b/include/bdberl.hrl @@ -26,6 +26,8 @@ -define(CMD_DB_STAT, 18). -define(CMD_DB_STAT_PRINT, 19). -define(CMD_ENV_STAT_PRINT, 20). +-define(CMD_LOCK_STAT, 21). +-define(CMD_LOCK_STAT_PRINT, 22). -define(DB_TYPE_BTREE, 1). -define(DB_TYPE_HASH, 2). diff --git a/src/bdberl.erl b/src/bdberl.erl index cc2184a..e2127be 100644 --- a/src/bdberl.erl +++ b/src/bdberl.erl @@ -15,7 +15,10 @@ get_data_dirs/0, get_txn_timeout/0, stat/2, - stat_print/1, stat_print/2, + stat_print/2, + lock_stat/1, + lock_stat_print/1, + env_stat_print/1, transaction/1, transaction/2, transaction/3, put/3, put/4, put_r/3, put_r/4, @@ -1286,7 +1289,7 @@ get_txn_timeout() -> %% @doc %% Retrieve database stats %% -%% This function retrieves performance statistics from the database. +%% This function retrieves database statistics %% %% === Options === %% @@ -1336,11 +1339,12 @@ stat(Db, Opts) -> end. + %%-------------------------------------------------------------------- %% @doc %% Print database stats %% -%% This function prints performance statistics from the database to wherever +%% This function prints statistics from the database to wherever %% BDB messages are being sent %% %% === Options === @@ -1382,6 +1386,87 @@ stat_print(Db, Opts) -> Error -> {error, Error} end. +%%-------------------------------------------------------------------- +%% @doc +%% Retrieve lock stats +%% +%% This function retrieves lock statistics from the database. +%% +%% === Options === +%% +%%
+%%
stat_clear
+%%
Reset statistics after returning their values
+%%
+%% +%% @spec stat(Opts) -> {ok, [{atom(), number()}]} | {error, Error} +%% where +%% Opts = [atom()] +%% +%% @end +%%-------------------------------------------------------------------- +-spec lock_stat(Opts :: db_flags()) -> + {ok, [{atom(), number()}]} | db_error(). + +lock_stat(Opts) -> + Flags = process_flags(Opts), + Cmd = <>, + <> = erlang:port_control(get_port(), ?CMD_LOCK_STAT, Cmd), + case decode_rc(Result) of + ok -> + receive + {error, Reason} -> + {error, decode_rc(Reason)}; + {ok, Stats} -> + {ok, Stats} + end; + Error -> + {error, Error} + end. + + + +%%-------------------------------------------------------------------- +%% @doc +%% Print lock stats +%% +%% This function prints lock statistics to wherever +%% BDB messages are being sent +%% +%% === Options === +%% +%%
+%%
stat_all
+%%
Display all available information.
+%%
stat_clear
+%%
Reset statistics after displaying their values.
+%%
stat_lock_conf
+%%
Display the lock conflict matrix.
+%%
stat_lock_lockers
+%%
Display the lockers within hash chains.
+%%
stat_lock_objects
+%%
Display the lock objects within hash chains.
+%%
stat_lock_params
+%%
Display the locking subsystem parameters.
+%%
+%% +%% @spec lock_stat_print(Opts) -> ok | {error, Error} +%% where +%% Opts = [atom()] +%% +%% @end +%%-------------------------------------------------------------------- +-spec lock_stat_print(Opts :: db_flags()) -> + ok | db_error(). +lock_stat_print(Opts) -> + Flags = process_flags(Opts), + Cmd = <>, + <> = erlang:port_control(get_port(), ?CMD_LOCK_STAT_PRINT, Cmd), + case decode_rc(Result) of + ok -> ok; + Error -> {error, Error} + end. + %%-------------------------------------------------------------------- %% @doc @@ -1409,9 +1494,9 @@ stat_print(Db, Opts) -> %% %% @end %%-------------------------------------------------------------------- --spec stat_print(Opts :: db_flags()) -> +-spec env_stat_print(Opts :: db_flags()) -> ok | db_error(). -stat_print(Opts) -> +env_stat_print(Opts) -> Flags = process_flags(Opts), Cmd = <>, <> = erlang:port_control(get_port(), ?CMD_ENV_STAT_PRINT, Cmd), @@ -1537,6 +1622,10 @@ flag_value(Flag) -> set_recno -> ?DB_SET_RECNO; stat_all -> ?DB_STAT_ALL; stat_clear -> ?DB_STAT_CLEAR; + stat_lock_conf -> ?DB_STAT_LOCK_CONF; + stat_lock_lockers -> ?DB_STAT_LOCK_LOCKERS; + stat_lock_objects -> ?DB_STAT_LOCK_OBJECTS; + stat_lock_params -> ?DB_STAT_LOCK_PARAMS; stat_subsystem -> ?DB_STAT_SUBSYSTEM; threaded -> ?DB_THREAD; truncate -> ?DB_TRUNCATE; diff --git a/test/bdberl_SUITE.erl b/test/bdberl_SUITE.erl index a9a2dc9..53ab943 100644 --- a/test/bdberl_SUITE.erl +++ b/test/bdberl_SUITE.erl @@ -34,7 +34,8 @@ all() -> truncate_all_should_empty_all_databases, btree_stat_should_report_on_success, hash_stat_should_report_on_success, - stat_should_fail_on_bad_dbref]. + stat_should_fail_on_bad_dbref, + lock_stat_should_report_on_success]. dbconfig(Config) -> @@ -277,3 +278,9 @@ hash_stat_should_report_on_success(_Config) -> stat_should_fail_on_bad_dbref(_Config) -> {error, invalid_db} = bdberl:stat(10000000, []), done. + +lock_stat_should_report_on_success(_Config) -> + {ok, Stat} = bdberl:lock_stat([]), + %% Check a lock stat that that probably won't change + 2147483647 = proplists:get_value(cur_maxid, Stat), + done. From 62d45225806c2dc867790e35d17562c70a97a206 Mon Sep 17 00:00:00 2001 From: Jon Meredith Date: Wed, 27 May 2009 15:25:50 -0600 Subject: [PATCH 03/14] Added log stats --- c_src/bdberl_drv.c | 112 ++++++++++++++++++++++++++++++++++++++++++ c_src/bdberl_drv.h | 2 + include/bdberl.hrl | 2 + src/bdberl.erl | 78 ++++++++++++++++++++++++++++- test/bdberl_SUITE.erl | 9 +++- 5 files changed, 200 insertions(+), 3 deletions(-) diff --git a/c_src/bdberl_drv.c b/c_src/bdberl_drv.c index e84f2b6..e8310bb 100644 --- a/c_src/bdberl_drv.c +++ b/c_src/bdberl_drv.c @@ -36,6 +36,7 @@ static void do_async_cursor_get(void* arg); static void do_async_truncate(void* arg); static void do_async_stat(void* arg); static void do_async_lock_stat(void* arg); +static void do_async_log_stat(void* arg); static int add_dbref(PortData* data, int dbref); static int del_dbref(PortData* data, int dbref); @@ -886,6 +887,45 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, int rc = G_DB_ENV->lock_stat_print(G_DB_ENV, flags); RETURN_INT(rc, outbuf); } + case CMD_LOG_STAT: + { + FAIL_IF_ASYNC_PENDING(d, outbuf); + + // Inbuf is <> + // If the working buffer is large enough, copy the data to put/get into it. Otherwise, realloc + // until it is large enough + if (d->work_buffer_sz < inbuf_sz) + { + d->work_buffer = driver_realloc(d->work_buffer, inbuf_sz); + d->work_buffer_sz = inbuf_sz; + } + + // Copy the payload into place + memcpy(d->work_buffer, inbuf, inbuf_sz); + d->work_buffer_offset = inbuf_sz; + + // Mark the port as busy and then schedule the appropriate async operation + d->async_op = cmd; + d->async_pool = G_TPOOL_GENERAL; + bdberl_tpool_run(d->async_pool, &do_async_log_stat, d, 0, &d->async_job); + + // Let caller know that the operation is in progress + // Outbuf is: <<0:32>> + RETURN_INT(0, outbuf); + } + case CMD_LOG_STAT_PRINT: + { + FAIL_IF_ASYNC_PENDING(d, outbuf); + + // Inbuf is << Flags:32 >> + unsigned int flags = UNPACK_INT(inbuf, 0); + + // Outbuf is <> + // Run the command on the VM thread - this is for debugging only, + // any real monitoring will use the async lock_stat + int rc = G_DB_ENV->log_stat_print(G_DB_ENV, flags); + RETURN_INT(rc, outbuf); + } } *outbuf = 0; return 0; @@ -1417,6 +1457,52 @@ static void async_cleanup_and_send_lock_stats(PortData* d, DB_LOCK_STAT *lsp) driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); } +static void async_cleanup_and_send_log_stats(PortData* d, DB_LOG_STAT *lsp) +{ + // Save the port and pid references -- we need copies independent from the PortData + // structure. Once we release the port_lock after clearing the cmd, it's possible that + // the port could go away without waiting on us to finish. This is acceptable, but we need + // to be certain that there is no overlap of data between the two threads. driver_send_term + // is safe to use from a thread, even if the port you're sending from has already expired. + ErlDrvPort port = d->port; + ErlDrvTermData pid = d->port_owner; + async_cleanup(d); + + ErlDrvTermData response[] = { + ERL_DRV_ATOM, driver_mk_atom("ok"), + // Start of list + ST_STATS_TUPLE(lsp, magic), /* Log file magic number. */ + ST_STATS_TUPLE(lsp, version), /* Log file version number. */ + ST_STATS_INT_TUPLE(lsp, mode), /* Log file permissions mode. */ + ST_STATS_TUPLE(lsp, lg_bsize), /* Log buffer size. */ + ST_STATS_TUPLE(lsp, lg_size), /* Log file size. */ + ST_STATS_TUPLE(lsp, wc_bytes), /* Bytes to log since checkpoint. */ + ST_STATS_TUPLE(lsp, wc_mbytes), /* Megabytes to log since checkpoint. */ + ST_STATS_TUPLE(lsp, record), /* Records entered into the log. */ + ST_STATS_TUPLE(lsp, w_bytes), /* Bytes to log. */ + ST_STATS_TUPLE(lsp, w_mbytes), /* Megabytes to log. */ + ST_STATS_TUPLE(lsp, wcount), /* Total I/O writes to the log. */ + ST_STATS_TUPLE(lsp, wcount_fill),/* Overflow writes to the log. */ + ST_STATS_TUPLE(lsp, rcount), /* Total I/O reads from the log. */ + ST_STATS_TUPLE(lsp, scount), /* Total syncs to the log. */ + ST_STATS_TUPLE(lsp, region_wait), /* Region lock granted after wait. */ + ST_STATS_TUPLE(lsp, region_nowait), /* Region lock granted without wait. */ + ST_STATS_TUPLE(lsp, cur_file), /* Current log file number. */ + ST_STATS_TUPLE(lsp, cur_offset),/* Current log file offset. */ + ST_STATS_TUPLE(lsp, disk_file), /* Known on disk log file number. */ + ST_STATS_TUPLE(lsp, disk_offset), /* Known on disk log file offset. */ + ST_STATS_TUPLE(lsp, maxcommitperflush), /* Max number of commits in a flush. */ + ST_STATS_TUPLE(lsp, mincommitperflush), /* Min number of commits in a flush. */ + ST_STATS_TUPLE(lsp, regsize), /* Region size. */ + + // End of list + ERL_DRV_NIL, + ERL_DRV_LIST, 23+1, + ERL_DRV_TUPLE, 2 + }; + driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); +} + static void do_async_put(void* arg) { // Payload is: <> @@ -1712,6 +1798,32 @@ static void do_async_lock_stat(void* arg) } } +static void do_async_log_stat(void* arg) +{ + // Payload is: <> + PortData* d = (PortData*)arg; + + // Extract operation flags + unsigned flags = UNPACK_INT(d->work_buffer, 0); + + DB_LOG_STAT *lsp = NULL; + int rc = G_DB_ENV->log_stat(G_DB_ENV, &lsp, flags); + if (rc != 0 || lsp == NULL) + { + async_cleanup_and_send_rc(d, rc); + } + else + { + async_cleanup_and_send_log_stats(d, lsp); + } + + // Finally, clean up lock stats + if (NULL != lsp) + { + free(lsp); + } +} + static void* zalloc(unsigned int size) { diff --git a/c_src/bdberl_drv.h b/c_src/bdberl_drv.h index ddb89d8..e817c74 100644 --- a/c_src/bdberl_drv.h +++ b/c_src/bdberl_drv.h @@ -51,6 +51,8 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, #define CMD_ENV_STAT_PRINT 20 #define CMD_LOCK_STAT 21 #define CMD_LOCK_STAT_PRINT 22 +#define CMD_LOG_STAT 23 +#define CMD_LOG_STAT_PRINT 24 /** * Command status values diff --git a/include/bdberl.hrl b/include/bdberl.hrl index 92aa674..6fe5137 100644 --- a/include/bdberl.hrl +++ b/include/bdberl.hrl @@ -28,6 +28,8 @@ -define(CMD_ENV_STAT_PRINT, 20). -define(CMD_LOCK_STAT, 21). -define(CMD_LOCK_STAT_PRINT, 22). +-define(CMD_LOG_STAT, 23). +-define(CMD_LOG_STAT_PRINT, 24). -define(DB_TYPE_BTREE, 1). -define(DB_TYPE_HASH, 2). diff --git a/src/bdberl.erl b/src/bdberl.erl index e2127be..520db82 100644 --- a/src/bdberl.erl +++ b/src/bdberl.erl @@ -18,6 +18,8 @@ stat_print/2, lock_stat/1, lock_stat_print/1, + log_stat/1, + log_stat_print/1, env_stat_print/1, transaction/1, transaction/2, transaction/3, put/3, put/4, @@ -1399,7 +1401,7 @@ stat_print(Db, Opts) -> %%
Reset statistics after returning their values
%% %% -%% @spec stat(Opts) -> {ok, [{atom(), number()}]} | {error, Error} +%% @spec lock_stat(Opts) -> {ok, [{atom(), number()}]} | {error, Error} %% where %% Opts = [atom()] %% @@ -1425,7 +1427,6 @@ lock_stat(Opts) -> end. - %%-------------------------------------------------------------------- %% @doc %% Print lock stats @@ -1468,6 +1469,79 @@ lock_stat_print(Opts) -> end. +%%-------------------------------------------------------------------- +%% @doc +%% Retrieve log stats +%% +%% This function retrieves bdb log statistics +%% +%% === Options === +%% +%%
+%%
stat_clear
+%%
Reset statistics after returning their values
+%%
+%% +%% @spec log_stat(Opts) -> {ok, [{atom(), number()}]} | {error, Error} +%% where +%% Opts = [atom()] +%% +%% @end +%%-------------------------------------------------------------------- +-spec log_stat(Opts :: db_flags()) -> + {ok, [{atom(), number()}]} | db_error(). + +log_stat(Opts) -> + Flags = process_flags(Opts), + Cmd = <>, + <> = erlang:port_control(get_port(), ?CMD_LOG_STAT, Cmd), + case decode_rc(Result) of + ok -> + receive + {error, Reason} -> + {error, decode_rc(Reason)}; + {ok, Stats} -> + {ok, Stats} + end; + Error -> + {error, Error} + end. + + +%%-------------------------------------------------------------------- +%% @doc +%% Print log stats +%% +%% This function prints bdb log statistics to wherever +%% BDB messages are being sent +%% +%% === Options === +%% +%%
+%%
stat_all
+%%
Display all available information.
+%%
stat_clear
+%%
Reset statistics after displaying their values.
+%%
+%% +%% @spec log_stat_print(Opts) -> ok | {error, Error} +%% where +%% Opts = [atom()] +%% +%% @end +%%-------------------------------------------------------------------- +-spec log_stat_print(Opts :: db_flags()) -> + ok | db_error(). +log_stat_print(Opts) -> + Flags = process_flags(Opts), + Cmd = <>, + <> = erlang:port_control(get_port(), ?CMD_LOG_STAT_PRINT, Cmd), + case decode_rc(Result) of + ok -> ok; + Error -> {error, Error} + end. + + %%-------------------------------------------------------------------- %% @doc %% Print environment stats diff --git a/test/bdberl_SUITE.erl b/test/bdberl_SUITE.erl index 53ab943..dbe546e 100644 --- a/test/bdberl_SUITE.erl +++ b/test/bdberl_SUITE.erl @@ -35,7 +35,8 @@ all() -> btree_stat_should_report_on_success, hash_stat_should_report_on_success, stat_should_fail_on_bad_dbref, - lock_stat_should_report_on_success]. + lock_stat_should_report_on_success, + log_stat_should_report_on_success]. dbconfig(Config) -> @@ -284,3 +285,9 @@ lock_stat_should_report_on_success(_Config) -> %% Check a lock stat that that probably won't change 2147483647 = proplists:get_value(cur_maxid, Stat), done. + +log_stat_should_report_on_success(_Config) -> + {ok, Stat} = bdberl:log_stat([]), + %% Check a log stat that that probably won't change + 264584 = proplists:get_value(magic, Stat), + done. From e80dc97d6d1ba891a2cb57d61f40d46807bf3bc7 Mon Sep 17 00:00:00 2001 From: Jon Meredith Date: Wed, 27 May 2009 16:31:26 -0600 Subject: [PATCH 04/14] Added memory pool statistics. --- c_src/bdberl_drv.c | 169 ++++++++++++++++++++++++++++++++++++++++++ c_src/bdberl_drv.h | 2 + include/bdberl.hrl | 2 + src/bdberl.erl | 87 ++++++++++++++++++++++ test/bdberl_SUITE.erl | 9 ++- 5 files changed, 268 insertions(+), 1 deletion(-) diff --git a/c_src/bdberl_drv.c b/c_src/bdberl_drv.c index e8310bb..0c9292a 100644 --- a/c_src/bdberl_drv.c +++ b/c_src/bdberl_drv.c @@ -37,6 +37,7 @@ static void do_async_truncate(void* arg); static void do_async_stat(void* arg); static void do_async_lock_stat(void* arg); static void do_async_log_stat(void* arg); +static void do_async_memp_stat(void* arg); static int add_dbref(PortData* data, int dbref); static int del_dbref(PortData* data, int dbref); @@ -926,6 +927,45 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, int rc = G_DB_ENV->log_stat_print(G_DB_ENV, flags); RETURN_INT(rc, outbuf); } + case CMD_MEMP_STAT: + { + FAIL_IF_ASYNC_PENDING(d, outbuf); + + // Inbuf is <> + // If the working buffer is large enough, copy the data to put/get into it. Otherwise, realloc + // until it is large enough + if (d->work_buffer_sz < inbuf_sz) + { + d->work_buffer = driver_realloc(d->work_buffer, inbuf_sz); + d->work_buffer_sz = inbuf_sz; + } + + // Copy the payload into place + memcpy(d->work_buffer, inbuf, inbuf_sz); + d->work_buffer_offset = inbuf_sz; + + // Mark the port as busy and then schedule the appropriate async operation + d->async_op = cmd; + d->async_pool = G_TPOOL_GENERAL; + bdberl_tpool_run(d->async_pool, &do_async_memp_stat, d, 0, &d->async_job); + + // Let caller know that the operation is in progress + // Outbuf is: <<0:32>> + RETURN_INT(0, outbuf); + } + case CMD_MEMP_STAT_PRINT: + { + FAIL_IF_ASYNC_PENDING(d, outbuf); + + // Inbuf is << Flags:32 >> + unsigned int flags = UNPACK_INT(inbuf, 0); + + // Outbuf is <> + // Run the command on the VM thread - this is for debugging only, + // any real monitoring will use the async lock_stat + int rc = G_DB_ENV->memp_stat_print(G_DB_ENV, flags); + RETURN_INT(rc, outbuf); + } } *outbuf = 0; return 0; @@ -1503,6 +1543,104 @@ static void async_cleanup_and_send_log_stats(PortData* d, DB_LOG_STAT *lsp) driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); } +static void send_mpool_fstat(ErlDrvPort port, ErlDrvTermData pid, DB_MPOOL_FSTAT *fsp) +{ + char *name = fsp->file_name ? fsp->file_name : ""; + int name_len = strlen(name); + ErlDrvTermData response[] = { + ERL_DRV_ATOM, driver_mk_atom("fstat"), + // Start of list + ERL_DRV_ATOM, driver_mk_atom("name"), + ERL_DRV_STRING, (ErlDrvTermData) name, name_len, + ERL_DRV_TUPLE, 2, + ST_STATS_TUPLE(fsp, map), /* Pages from mapped files. */ + ST_STATS_TUPLE(fsp, cache_hit), /* Pages found in the cache. */ + ST_STATS_TUPLE(fsp, cache_miss), /* Pages not found in the cache. */ + ST_STATS_TUPLE(fsp, page_create), /* Pages created in the cache. */ + ST_STATS_TUPLE(fsp, page_in), /* Pages read in. */ + ST_STATS_TUPLE(fsp, page_out), /* Pages written out. */ + // End of list + ERL_DRV_NIL, + ERL_DRV_LIST, 7+1, + ERL_DRV_TUPLE, 2 + }; + driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); +} + +static void async_cleanup_and_send_memp_stats(PortData* d, DB_MPOOL_STAT *gsp, + DB_MPOOL_FSTAT **fsp) +{ + // Save the port and pid references -- we need copies independent from the PortData + // structure. Once we release the port_lock after clearing the cmd, it's possible that + // the port could go away without waiting on us to finish. This is acceptable, but we need + // to be certain that there is no overlap of data between the two threads. driver_send_term + // is safe to use from a thread, even if the port you're sending from has already expired. + ErlDrvPort port = d->port; + ErlDrvTermData pid = d->port_owner; + async_cleanup(d); + + // First send the per-file stats + int i; + for (i = 0; fsp != NULL && fsp[i] != NULL; i++) + { + send_mpool_fstat(port, pid, fsp[i]); + } + + // Then send the global stats + ErlDrvTermData response[] = { + ERL_DRV_ATOM, driver_mk_atom("ok"), + // Start of list + ST_STATS_TUPLE(gsp, gbytes), /* Total cache size: GB. */ + ST_STATS_TUPLE(gsp, bytes), /* Total cache size: B. */ + ST_STATS_TUPLE(gsp, ncache), /* Number of cache regions. */ + ST_STATS_TUPLE(gsp, max_ncache), /* Maximum number of regions. */ + ST_STATS_INT_TUPLE(gsp, mmapsize), /* Maximum file size for mmap. */ + ST_STATS_INT_TUPLE(gsp, maxopenfd), /* Maximum number of open fd's. */ + ST_STATS_INT_TUPLE(gsp, maxwrite), /* Maximum buffers to write. */ + ST_STATS_TUPLE(gsp, maxwrite_sleep), /* Sleep after writing max buffers. */ + ST_STATS_TUPLE(gsp, pages), /* Total number of pages. */ + ST_STATS_TUPLE(gsp, map), /* Pages from mapped files. */ + ST_STATS_TUPLE(gsp, cache_hit), /* Pages found in the cache. */ + ST_STATS_TUPLE(gsp, cache_miss), /* Pages not found in the cache. */ + ST_STATS_TUPLE(gsp, page_create), /* Pages created in the cache. */ + ST_STATS_TUPLE(gsp, page_in), /* Pages read in. */ + ST_STATS_TUPLE(gsp, page_out), /* Pages written out. */ + ST_STATS_TUPLE(gsp, ro_evict), /* Clean pages forced from the cache. */ + ST_STATS_TUPLE(gsp, rw_evict), /* Dirty pages forced from the cache. */ + ST_STATS_TUPLE(gsp, page_trickle), /* Pages written by memp_trickle. */ + ST_STATS_TUPLE(gsp, page_clean), /* Clean pages. */ + ST_STATS_TUPLE(gsp, page_dirty), /* Dirty pages. */ + ST_STATS_TUPLE(gsp, hash_buckets), /* Number of hash buckets. */ + ST_STATS_TUPLE(gsp, hash_searches), /* Total hash chain searches. */ + ST_STATS_TUPLE(gsp, hash_longest), /* Longest hash chain searched. */ + ST_STATS_TUPLE(gsp, hash_examined), /* Total hash entries searched. */ + ST_STATS_TUPLE(gsp, hash_nowait), /* Hash lock granted with nowait. */ + ST_STATS_TUPLE(gsp, hash_wait), /* Hash lock granted after wait. */ + ST_STATS_TUPLE(gsp, hash_max_nowait), /* Max hash lock granted with nowait. */ + ST_STATS_TUPLE(gsp, hash_max_wait), /* Max hash lock granted after wait. */ + ST_STATS_TUPLE(gsp, region_nowait), /* Region lock granted with nowait. */ + ST_STATS_TUPLE(gsp, region_wait), /* Region lock granted after wait. */ + ST_STATS_TUPLE(gsp, mvcc_frozen), /* Buffers frozen. */ + ST_STATS_TUPLE(gsp, mvcc_thawed), /* Buffers thawed. */ + ST_STATS_TUPLE(gsp, mvcc_freed), /* Frozen buffers freed. */ + ST_STATS_TUPLE(gsp, alloc), /* Number of page allocations. */ + ST_STATS_TUPLE(gsp, alloc_buckets), /* Buckets checked during allocation. */ + ST_STATS_TUPLE(gsp, alloc_max_buckets), /* Max checked during allocation. */ + ST_STATS_TUPLE(gsp, alloc_pages), /* Pages checked during allocation. */ + ST_STATS_TUPLE(gsp, alloc_max_pages), /* Max checked during allocation. */ + ST_STATS_TUPLE(gsp, io_wait), /* Thread waited on buffer I/O. */ + ST_STATS_TUPLE(gsp, regsize), /* Region size. */ + + // End of list + ERL_DRV_NIL, + ERL_DRV_LIST, 40+1, + ERL_DRV_TUPLE, 2 + }; + driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); +} + + + static void do_async_put(void* arg) { // Payload is: <> @@ -1824,6 +1962,37 @@ static void do_async_log_stat(void* arg) } } +static void do_async_memp_stat(void* arg) +{ + // Payload is: <> + PortData* d = (PortData*)arg; + + // Extract operation flags + unsigned flags = UNPACK_INT(d->work_buffer, 0); + + DB_MPOOL_STAT *gsp = NULL; + DB_MPOOL_FSTAT **fsp = NULL; + int rc = G_DB_ENV->memp_stat(G_DB_ENV, &gsp, &fsp, flags); + if (rc != 0 || gsp == NULL) + { + async_cleanup_and_send_rc(d, rc); + } + else + { + async_cleanup_and_send_memp_stats(d, gsp, fsp); + } + + // Finally, clean up lock stats + if (NULL != gsp) + { + free(gsp); + } + if (NULL != fsp) + { + free(fsp); + } +} + static void* zalloc(unsigned int size) { diff --git a/c_src/bdberl_drv.h b/c_src/bdberl_drv.h index e817c74..bea6194 100644 --- a/c_src/bdberl_drv.h +++ b/c_src/bdberl_drv.h @@ -53,6 +53,8 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, #define CMD_LOCK_STAT_PRINT 22 #define CMD_LOG_STAT 23 #define CMD_LOG_STAT_PRINT 24 +#define CMD_MEMP_STAT 25 +#define CMD_MEMP_STAT_PRINT 26 /** * Command status values diff --git a/include/bdberl.hrl b/include/bdberl.hrl index 6fe5137..a206c1a 100644 --- a/include/bdberl.hrl +++ b/include/bdberl.hrl @@ -30,6 +30,8 @@ -define(CMD_LOCK_STAT_PRINT, 22). -define(CMD_LOG_STAT, 23). -define(CMD_LOG_STAT_PRINT, 24). +-define(CMD_MEMP_STAT, 25). +-define(CMD_MEMP_STAT_PRINT,26). -define(DB_TYPE_BTREE, 1). -define(DB_TYPE_HASH, 2). diff --git a/src/bdberl.erl b/src/bdberl.erl index 520db82..974bd22 100644 --- a/src/bdberl.erl +++ b/src/bdberl.erl @@ -20,6 +20,8 @@ lock_stat_print/1, log_stat/1, log_stat_print/1, + memp_stat/1, + memp_stat_print/1, env_stat_print/1, transaction/1, transaction/2, transaction/3, put/3, put/4, @@ -1542,6 +1544,76 @@ log_stat_print(Opts) -> end. +%%-------------------------------------------------------------------- +%% @doc +%% Retrieve memory pool stats +%% +%% This function retrieves bdb mpool statistics +%% +%% === Options === +%% +%%
+%%
stat_clear
+%%
Reset statistics after returning their values
+%%
+%% +%% @spec memp_stat(Opts) -> {ok, [{atom(), number()}]} | {error, Error} +%% where +%% Opts = [atom()] +%% +%% @end +%%-------------------------------------------------------------------- +-spec memp_stat(Opts :: db_flags()) -> + {ok, [{atom(), number()}]} | db_error(). + +memp_stat(Opts) -> + Flags = process_flags(Opts), + Cmd = <>, + <> = erlang:port_control(get_port(), ?CMD_MEMP_STAT, Cmd), + case decode_rc(Result) of + ok -> + recv_memp_stat([]); + Error -> + {error, Error} + end. + + +%%-------------------------------------------------------------------- +%% @doc +%% Print memory pool stats +%% +%% This function prints bdb mpool statistics to wherever +%% BDB messages are being sent +%% +%% === Options === +%% +%%
+%%
stat_all
+%%
Display all available information.
+%%
stat_clear
+%%
Reset statistics after displaying their values.
+%%
stat_memp_hash
+%% +%%
+%% +%% @spec memp_stat_print(Opts) -> ok | {error, Error} +%% where +%% Opts = [atom()] +%% +%% @end +%%-------------------------------------------------------------------- +-spec memp_stat_print(Opts :: db_flags()) -> + ok | db_error(). +memp_stat_print(Opts) -> + Flags = process_flags(Opts), + Cmd = <>, + <> = erlang:port_control(get_port(), ?CMD_MEMP_STAT_PRINT, Cmd), + case decode_rc(Result) of + ok -> ok; + Error -> {error, Error} + end. + + %%-------------------------------------------------------------------- %% @doc %% Print environment stats @@ -1700,6 +1772,7 @@ flag_value(Flag) -> stat_lock_lockers -> ?DB_STAT_LOCK_LOCKERS; stat_lock_objects -> ?DB_STAT_LOCK_OBJECTS; stat_lock_params -> ?DB_STAT_LOCK_PARAMS; + stat_memp_hash -> ?DB_STAT_MEMP_HASH; stat_subsystem -> ?DB_STAT_SUBSYSTEM; threaded -> ?DB_THREAD; truncate -> ?DB_TRUNCATE; @@ -1765,3 +1838,17 @@ split_bin(Delimiter, <>, ItemAcc, Acc) -> split_bin(Delimiter, Rest, <<>> ,[ItemAcc | Acc]); split_bin(Delimiter, <>, ItemAcc, Acc) -> split_bin(Delimiter, Rest, <>, Acc). + + +%% +%% Receive memory pool stats +%% +recv_memp_stat(Fstats) -> + receive + {error, Reason} -> + {error, decode_rc(Reason)}; + {fstat, Fstat} -> + recv_memp_stat([Fstat|Fstats]); + {ok, Stats} -> + {ok, Stats, Fstats} + end. diff --git a/test/bdberl_SUITE.erl b/test/bdberl_SUITE.erl index dbe546e..1c9dd78 100644 --- a/test/bdberl_SUITE.erl +++ b/test/bdberl_SUITE.erl @@ -36,7 +36,8 @@ all() -> hash_stat_should_report_on_success, stat_should_fail_on_bad_dbref, lock_stat_should_report_on_success, - log_stat_should_report_on_success]. + log_stat_should_report_on_success, + memp_stat_should_report_on_success]. dbconfig(Config) -> @@ -291,3 +292,9 @@ log_stat_should_report_on_success(_Config) -> %% Check a log stat that that probably won't change 264584 = proplists:get_value(magic, Stat), done. + +memp_stat_should_report_on_success(_Config) -> + {ok, Gstat, Fstat} = bdberl:memp_stat([]), + true = is_list(Fstat), + true = is_list(Gstat), + done. From e4757031a2319de524cc4da6c881b8b2d306ee20 Mon Sep 17 00:00:00 2001 From: Jon Meredith Date: Wed, 27 May 2009 16:49:18 -0600 Subject: [PATCH 05/14] Added mutex stats. --- c_src/bdberl_drv.c | 100 +++++++++++++++++++++++++++++++++++++++++- c_src/bdberl_drv.h | 2 + include/bdberl.hrl | 6 ++- src/bdberl.erl | 74 +++++++++++++++++++++++++++++++ test/bdberl_SUITE.erl | 7 ++- 5 files changed, 184 insertions(+), 5 deletions(-) diff --git a/c_src/bdberl_drv.c b/c_src/bdberl_drv.c index 0c9292a..f14ca4f 100644 --- a/c_src/bdberl_drv.c +++ b/c_src/bdberl_drv.c @@ -38,6 +38,7 @@ static void do_async_stat(void* arg); static void do_async_lock_stat(void* arg); static void do_async_log_stat(void* arg); static void do_async_memp_stat(void* arg); +static void do_async_mutex_stat(void* arg); static int add_dbref(PortData* data, int dbref); static int del_dbref(PortData* data, int dbref); @@ -966,6 +967,45 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, int rc = G_DB_ENV->memp_stat_print(G_DB_ENV, flags); RETURN_INT(rc, outbuf); } + case CMD_MUTEX_STAT: + { + FAIL_IF_ASYNC_PENDING(d, outbuf); + + // Inbuf is <> + // If the working buffer is large enough, copy the data to put/get into it. Otherwise, realloc + // until it is large enough + if (d->work_buffer_sz < inbuf_sz) + { + d->work_buffer = driver_realloc(d->work_buffer, inbuf_sz); + d->work_buffer_sz = inbuf_sz; + } + + // Copy the payload into place + memcpy(d->work_buffer, inbuf, inbuf_sz); + d->work_buffer_offset = inbuf_sz; + + // Mark the port as busy and then schedule the appropriate async operation + d->async_op = cmd; + d->async_pool = G_TPOOL_GENERAL; + bdberl_tpool_run(d->async_pool, &do_async_mutex_stat, d, 0, &d->async_job); + + // Let caller know that the operation is in progress + // Outbuf is: <<0:32>> + RETURN_INT(0, outbuf); + } + case CMD_MUTEX_STAT_PRINT: + { + FAIL_IF_ASYNC_PENDING(d, outbuf); + + // Inbuf is << Flags:32 >> + unsigned int flags = UNPACK_INT(inbuf, 0); + + // Outbuf is <> + // Run the command on the VM thread - this is for debugging only, + // any real monitoring will use the async lock_stat + int rc = G_DB_ENV->mutex_stat_print(G_DB_ENV, flags); + RETURN_INT(rc, outbuf); + } } *outbuf = 0; return 0; @@ -1640,6 +1680,37 @@ static void async_cleanup_and_send_memp_stats(PortData* d, DB_MPOOL_STAT *gsp, } +static void async_cleanup_and_send_mutex_stats(PortData* d, DB_MUTEX_STAT *msp) +{ + // Save the port and pid references -- we need copies independent from the PortData + // structure. Once we release the port_lock after clearing the cmd, it's possible that + // the port could go away without waiting on us to finish. This is acceptable, but we need + // to be certain that there is no overlap of data between the two threads. driver_send_term + // is safe to use from a thread, even if the port you're sending from has already expired. + ErlDrvPort port = d->port; + ErlDrvTermData pid = d->port_owner; + async_cleanup(d); + + ErlDrvTermData response[] = { + ERL_DRV_ATOM, driver_mk_atom("ok"), + // Start of list + ST_STATS_TUPLE(msp, mutex_align), /* Mutex alignment */ + ST_STATS_TUPLE(msp, mutex_tas_spins), /* Mutex test-and-set spins */ + ST_STATS_TUPLE(msp, mutex_cnt), /* Mutex count */ + ST_STATS_TUPLE(msp, mutex_free), /* Available mutexes */ + ST_STATS_TUPLE(msp, mutex_inuse), /* Mutexes in use */ + ST_STATS_TUPLE(msp, mutex_inuse_max), /* Maximum mutexes ever in use */ + ST_STATS_TUPLE(msp, region_wait), /* Region lock granted after wait. */ + ST_STATS_TUPLE(msp, region_nowait), /* Region lock granted without wait. */ + ST_STATS_TUPLE(msp, regsize), /* Region size. */ + // End of list + ERL_DRV_NIL, + ERL_DRV_LIST, 9+1, + ERL_DRV_TUPLE, 2 + }; + driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); +} + static void do_async_put(void* arg) { @@ -1955,7 +2026,7 @@ static void do_async_log_stat(void* arg) async_cleanup_and_send_log_stats(d, lsp); } - // Finally, clean up lock stats + // Finally, clean up stats if (NULL != lsp) { free(lsp); @@ -1982,7 +2053,7 @@ static void do_async_memp_stat(void* arg) async_cleanup_and_send_memp_stats(d, gsp, fsp); } - // Finally, clean up lock stats + // Finally, clean up stats if (NULL != gsp) { free(gsp); @@ -1993,6 +2064,31 @@ static void do_async_memp_stat(void* arg) } } +static void do_async_mutex_stat(void* arg) +{ + // Payload is: <> + PortData* d = (PortData*)arg; + + // Extract operation flags + unsigned flags = UNPACK_INT(d->work_buffer, 0); + + DB_MUTEX_STAT *msp = NULL; + int rc = G_DB_ENV->mutex_stat(G_DB_ENV, &msp, flags); + if (rc != 0 || msp == NULL) + { + async_cleanup_and_send_rc(d, rc); + } + else + { + async_cleanup_and_send_mutex_stats(d, msp); + } + + // Finally, clean up stats + if (NULL != msp) + { + free(msp); + } +} static void* zalloc(unsigned int size) { diff --git a/c_src/bdberl_drv.h b/c_src/bdberl_drv.h index bea6194..b30bc5b 100644 --- a/c_src/bdberl_drv.h +++ b/c_src/bdberl_drv.h @@ -55,6 +55,8 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, #define CMD_LOG_STAT_PRINT 24 #define CMD_MEMP_STAT 25 #define CMD_MEMP_STAT_PRINT 26 +#define CMD_MUTEX_STAT 27 +#define CMD_MUTEX_STAT_PRINT 28 /** * Command status values diff --git a/include/bdberl.hrl b/include/bdberl.hrl index a206c1a..c4cee89 100644 --- a/include/bdberl.hrl +++ b/include/bdberl.hrl @@ -30,8 +30,10 @@ -define(CMD_LOCK_STAT_PRINT, 22). -define(CMD_LOG_STAT, 23). -define(CMD_LOG_STAT_PRINT, 24). --define(CMD_MEMP_STAT, 25). --define(CMD_MEMP_STAT_PRINT,26). +-define(CMD_MEMP_STAT, 25). +-define(CMD_MEMP_STAT_PRINT, 26). +-define(CMD_MUTEX_STAT, 27). +-define(CMD_MUTEX_STAT_PRINT,28). -define(DB_TYPE_BTREE, 1). -define(DB_TYPE_HASH, 2). diff --git a/src/bdberl.erl b/src/bdberl.erl index 974bd22..84fb557 100644 --- a/src/bdberl.erl +++ b/src/bdberl.erl @@ -22,6 +22,8 @@ log_stat_print/1, memp_stat/1, memp_stat_print/1, + mutex_stat/1, + mutex_stat_print/1, env_stat_print/1, transaction/1, transaction/2, transaction/3, put/3, put/4, @@ -1613,6 +1615,78 @@ memp_stat_print(Opts) -> Error -> {error, Error} end. +%%-------------------------------------------------------------------- +%% @doc +%% Retrieve mutex stats +%% +%% This function retrieves mutex statistics +%% +%% === Options === +%% +%%
+%%
stat_clear
+%%
Reset statistics after returning their values
+%%
+%% +%% @spec mutex_stat(Opts) -> {ok, [{atom(), number()}]} | {error, Error} +%% where +%% Opts = [atom()] +%% +%% @end +%%-------------------------------------------------------------------- +-spec mutex_stat(Opts :: db_flags()) -> + {ok, [{atom(), number()}]} | db_error(). + +mutex_stat(Opts) -> + Flags = process_flags(Opts), + Cmd = <>, + <> = erlang:port_control(get_port(), ?CMD_MUTEX_STAT, Cmd), + case decode_rc(Result) of + ok -> + receive + {error, Reason} -> + {error, decode_rc(Reason)}; + {ok, Stats} -> + {ok, Stats} + end; + Error -> + {error, Error} + end. + + +%%-------------------------------------------------------------------- +%% @doc +%% Print mutex stats +%% +%% This function prints mutex statistics to wherever +%% BDB messages are being sent +%% +%% === Options === +%% +%%
+%%
stat_all
+%%
Display all available information.
+%%
stat_clear
+%%
Reset statistics after displaying their values.
+%%
+%% +%% @spec mutex_stat_print(Opts) -> ok | {error, Error} +%% where +%% Opts = [atom()] +%% +%% @end +%%-------------------------------------------------------------------- +-spec mutex_stat_print(Opts :: db_flags()) -> + ok | db_error(). +mutex_stat_print(Opts) -> + Flags = process_flags(Opts), + Cmd = <>, + <> = erlang:port_control(get_port(), ?CMD_MUTEX_STAT_PRINT, Cmd), + case decode_rc(Result) of + ok -> ok; + Error -> {error, Error} + end. + %%-------------------------------------------------------------------- %% @doc diff --git a/test/bdberl_SUITE.erl b/test/bdberl_SUITE.erl index 1c9dd78..55e9d5d 100644 --- a/test/bdberl_SUITE.erl +++ b/test/bdberl_SUITE.erl @@ -37,7 +37,8 @@ all() -> stat_should_fail_on_bad_dbref, lock_stat_should_report_on_success, log_stat_should_report_on_success, - memp_stat_should_report_on_success]. + memp_stat_should_report_on_success, + mutex_stat_should_report_on_success]. dbconfig(Config) -> @@ -298,3 +299,7 @@ memp_stat_should_report_on_success(_Config) -> true = is_list(Fstat), true = is_list(Gstat), done. + +mutex_stat_should_report_on_success(_Config) -> + {ok, _Stat} = bdberl:mutex_stat([]), + done. From c74beeba9f2b69d68c8a0e50662025019f43221c Mon Sep 17 00:00:00 2001 From: Jon Meredith Date: Thu, 28 May 2009 12:05:52 -0600 Subject: [PATCH 06/14] Added transaction stats. --- c_src/bdberl_drv.c | 189 ++++++++++++++++++++++++++++++++++++++++++ c_src/bdberl_drv.h | 2 + c_src/buildlib.sh | 2 +- include/bdberl.hrl | 2 + src/bdberl.erl | 81 ++++++++++++++++++ test/bdberl_SUITE.erl | 11 ++- 6 files changed, 285 insertions(+), 2 deletions(-) diff --git a/c_src/bdberl_drv.c b/c_src/bdberl_drv.c index f14ca4f..8f45482 100644 --- a/c_src/bdberl_drv.c +++ b/c_src/bdberl_drv.c @@ -39,6 +39,7 @@ static void do_async_lock_stat(void* arg); static void do_async_log_stat(void* arg); static void do_async_memp_stat(void* arg); static void do_async_mutex_stat(void* arg); +static void do_async_txn_stat(void* arg); static int add_dbref(PortData* data, int dbref); static int del_dbref(PortData* data, int dbref); @@ -1006,6 +1007,46 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, int rc = G_DB_ENV->mutex_stat_print(G_DB_ENV, flags); RETURN_INT(rc, outbuf); } + case CMD_TXN_STAT: + { + FAIL_IF_ASYNC_PENDING(d, outbuf); + + // Inbuf is <> + // If the working buffer is large enough, copy the data to put/get into it. Otherwise, realloc + // until it is large enough + if (d->work_buffer_sz < inbuf_sz) + { + d->work_buffer = driver_realloc(d->work_buffer, inbuf_sz); + d->work_buffer_sz = inbuf_sz; + } + + // Copy the payload into place + memcpy(d->work_buffer, inbuf, inbuf_sz); + d->work_buffer_offset = inbuf_sz; + + // Mark the port as busy and then schedule the appropriate async operation + d->async_op = cmd; + d->async_pool = G_TPOOL_GENERAL; + bdberl_tpool_run(d->async_pool, &do_async_txn_stat, d, 0, &d->async_job); + + // Let caller know that the operation is in progress + // Outbuf is: <<0:32>> + RETURN_INT(0, outbuf); + } + case CMD_TXN_STAT_PRINT: + { + FAIL_IF_ASYNC_PENDING(d, outbuf); + + // Inbuf is << Flags:32 >> + unsigned int flags = UNPACK_INT(inbuf, 0); + + // Outbuf is <> + // Run the command on the VM thread - this is for debugging only, + // any real monitoring will use the async lock_stat + int rc = G_DB_ENV->txn_stat_print(G_DB_ENV, flags); + RETURN_INT(rc, outbuf); + } + } *outbuf = 0; return 0; @@ -1711,6 +1752,127 @@ static void async_cleanup_and_send_mutex_stats(PortData* d, DB_MUTEX_STAT *msp) driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); } +#define STATS_TUPLE(base, member) \ + ERL_DRV_ATOM, driver_mk_atom(#member), \ + ERL_DRV_UINT, (base)->member, \ + ERL_DRV_TUPLE, 2 + +#define STATS_LSN_TUPLE(base, member) \ + ERL_DRV_ATOM, driver_mk_atom(#member), \ + ERL_DRV_UINT, (base)->member.file, \ + ERL_DRV_UINT, (base)->member.offset, \ + ERL_DRV_TUPLE, 2, \ + ERL_DRV_TUPLE, 2 + +static void send_txn_tstat(ErlDrvPort port, ErlDrvTermData pid, DB_TXN_ACTIVE *tasp) +{ + char *name = tasp->name ? tasp->name : ""; + int name_len = strlen(name); + char tid_str[32]; + char *status_str; + switch (tasp->status) + { + case TXN_ABORTED: + status_str = "aborted"; + break; + case TXN_COMMITTED: + status_str = "committed"; + break; + case TXN_PREPARED: + status_str = "prepared"; + break; + case TXN_RUNNING: + status_str = "running"; + break; + default: + status_str = "undefined"; + break; + } + + int tid_str_len = snprintf(tid_str, sizeof(tid_str), "%lu", (unsigned long) tasp->tid); + + ErlDrvTermData response[] = { + ERL_DRV_ATOM, driver_mk_atom("txn"), + STATS_TUPLE(tasp, txnid), /* Transaction ID */ + STATS_TUPLE(tasp, parentid), /* Transaction ID of parent */ + STATS_TUPLE(tasp, pid), /* Process owning txn ID - pid_t */ + ERL_DRV_ATOM, driver_mk_atom("tid"),/* OSX has 32-bit ints in erlang, so return as */ + ERL_DRV_STRING, (ErlDrvTermData) tid_str, tid_str_len, /* a string */ + ERL_DRV_TUPLE, 2, + STATS_LSN_TUPLE(tasp, lsn), /* LSN when transaction began */ + STATS_LSN_TUPLE(tasp, read_lsn), /* Read LSN for MVCC */ + STATS_TUPLE(tasp, mvcc_ref), /* MVCC reference count */ + + // Start of list + ERL_DRV_ATOM, driver_mk_atom("status"), + ERL_DRV_ATOM, driver_mk_atom(status_str), + ERL_DRV_TUPLE, 2, + + ERL_DRV_ATOM, driver_mk_atom("name"), + ERL_DRV_STRING, (ErlDrvTermData) name, name_len, + ERL_DRV_TUPLE, 2, + + + // End of list + ERL_DRV_NIL, + ERL_DRV_LIST, 9+1, + ERL_DRV_TUPLE, 2 + }; + driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); +} + +#define ST_STATS_LSN_TUPLE(base, member) \ + ERL_DRV_ATOM, driver_mk_atom(#member), \ + ERL_DRV_UINT, (base)->st_##member.file, \ + ERL_DRV_UINT, (base)->st_##member.offset, \ + ERL_DRV_TUPLE, 2, \ + ERL_DRV_TUPLE, 2 + +static void async_cleanup_and_send_txn_stats(PortData* d, DB_TXN_STAT *tsp) +{ + // Save the port and pid references -- we need copies independent from the PortData + // structure. Once we release the port_lock after clearing the cmd, it's possible that + // the port could go away without waiting on us to finish. This is acceptable, but we need + // to be certain that there is no overlap of data between the two threads. driver_send_term + // is safe to use from a thread, even if the port you're sending from has already expired. + ErlDrvPort port = d->port; + ErlDrvTermData pid = d->port_owner; + async_cleanup(d); + + // First send the array of active transactions */ + int i; + for (i = 0; i < tsp->st_nactive; i++) + { + send_txn_tstat(port, pid, tsp->st_txnarray+i); + } + + // Then send the global stats + ErlDrvTermData response[] = { + ERL_DRV_ATOM, driver_mk_atom("ok"), + // Start of list + ST_STATS_TUPLE(tsp, nrestores), /* number of restored transactions + after recovery. */ + ST_STATS_LSN_TUPLE(tsp, last_ckp), /* lsn of the last checkpoint */ + ST_STATS_TUPLE(tsp, time_ckp), /* time of last checkpoint (time_t to uint) */ + ST_STATS_TUPLE(tsp, last_txnid), /* last transaction id given out */ + ST_STATS_TUPLE(tsp, maxtxns), /* maximum txns possible */ + ST_STATS_TUPLE(tsp, naborts), /* number of aborted transactions */ + ST_STATS_TUPLE(tsp, nbegins), /* number of begun transactions */ + ST_STATS_TUPLE(tsp, ncommits), /* number of committed transactions */ + ST_STATS_TUPLE(tsp, nactive), /* number of active transactions */ + ST_STATS_TUPLE(tsp, nsnapshot), /* number of snapshot transactions */ + ST_STATS_TUPLE(tsp, maxnactive), /* maximum active transactions */ + ST_STATS_TUPLE(tsp, maxnsnapshot), /* maximum snapshot transactions */ + ST_STATS_TUPLE(tsp, region_wait), /* Region lock granted after wait. */ + ST_STATS_TUPLE(tsp, region_nowait), /* Region lock granted without wait. */ + ST_STATS_TUPLE(tsp, regsize), /* Region size. */ + // End of list + ERL_DRV_NIL, + ERL_DRV_LIST, 15+1, + ERL_DRV_TUPLE, 2 + }; + driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); +} static void do_async_put(void* arg) { @@ -2090,6 +2252,33 @@ static void do_async_mutex_stat(void* arg) } } + +static void do_async_txn_stat(void* arg) +{ + // Payload is: <> + PortData* d = (PortData*)arg; + + // Extract operation flags + unsigned flags = UNPACK_INT(d->work_buffer, 0); + + DB_TXN_STAT *tsp = NULL; + int rc = G_DB_ENV->txn_stat(G_DB_ENV, &tsp, flags); + if (rc != 0 || tsp == NULL) + { + async_cleanup_and_send_rc(d, rc); + } + else + { + async_cleanup_and_send_txn_stats(d, tsp); + } + + // Finally, clean up stats + if (NULL != tsp) + { + free(tsp); + } +} + static void* zalloc(unsigned int size) { void* res = driver_alloc(size); diff --git a/c_src/bdberl_drv.h b/c_src/bdberl_drv.h index b30bc5b..e22d60e 100644 --- a/c_src/bdberl_drv.h +++ b/c_src/bdberl_drv.h @@ -57,6 +57,8 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, #define CMD_MEMP_STAT_PRINT 26 #define CMD_MUTEX_STAT 27 #define CMD_MUTEX_STAT_PRINT 28 +#define CMD_TXN_STAT 29 +#define CMD_TXN_STAT_PRINT 30 /** * Command status values diff --git a/c_src/buildlib.sh b/c_src/buildlib.sh index 635f36e..03f723f 100755 --- a/c_src/buildlib.sh +++ b/c_src/buildlib.sh @@ -21,7 +21,7 @@ rm -rf system db-${DB_VER} tar -xzf db-${DB_VER}.tar.gz && \ ##(cd db-${DB_VER} && patch -p0 < ../bdb-align.patch ) && \ (cd db-${DB_VER}/build_unix && \ - ../dist/configure --prefix=$WORKDIR --disable-replication --disable-shared --with-pic && make && ranlib libdb-*.a && make install) && \ + ../dist/configure --prefix=$WORKDIR --enable-debug --disable-replication --disable-shared --with-pic && make && ranlib libdb-*.a && make install) && \ rm -rf db-${DB_VER} diff --git a/include/bdberl.hrl b/include/bdberl.hrl index c4cee89..21f2cc1 100644 --- a/include/bdberl.hrl +++ b/include/bdberl.hrl @@ -34,6 +34,8 @@ -define(CMD_MEMP_STAT_PRINT, 26). -define(CMD_MUTEX_STAT, 27). -define(CMD_MUTEX_STAT_PRINT,28). +-define(CMD_TXN_STAT, 29). +-define(CMD_TXN_STAT_PRINT, 30). -define(DB_TYPE_BTREE, 1). -define(DB_TYPE_HASH, 2). diff --git a/src/bdberl.erl b/src/bdberl.erl index 84fb557..98b1365 100644 --- a/src/bdberl.erl +++ b/src/bdberl.erl @@ -24,6 +24,8 @@ memp_stat_print/1, mutex_stat/1, mutex_stat_print/1, + txn_stat/1, + txn_stat_print/1, env_stat_print/1, transaction/1, transaction/2, transaction/3, put/3, put/4, @@ -1687,6 +1689,72 @@ mutex_stat_print(Opts) -> Error -> {error, Error} end. +%%-------------------------------------------------------------------- +%% @doc +%% Retrieve transaction stats +%% +%% This function retrieves transaction statistics +%% +%% === Options === +%% +%%
+%%
stat_clear
+%%
Reset statistics after returning their values
+%%
+%% +%% @spec txn_stat(Opts) -> {ok, [{atom(), number()}]} | {error, Error} +%% where +%% Opts = [atom()] +%% +%% @end +%%-------------------------------------------------------------------- +-spec txn_stat(Opts :: db_flags()) -> + {ok, [{atom(), number()}], [any()]} | db_error(). + +txn_stat(Opts) -> + Flags = process_flags(Opts), + Cmd = <>, + <> = erlang:port_control(get_port(), ?CMD_TXN_STAT, Cmd), + case decode_rc(Result) of + ok -> + recv_txn_stat([]); + Error -> + {error, Error} + end. + + +%%-------------------------------------------------------------------- +%% @doc +%% Print transaction stats +%% +%% This function prints transaction statistics to wherever +%% BDB messages are being sent +%% +%% === Options === +%% +%%
+%%
stat_all
+%%
Display all available information.
+%%
stat_clear
+%%
Reset statistics after displaying their values.
+%%
+%% +%% @spec txn_stat_print(Opts) -> ok | {error, Error} +%% where +%% Opts = [atom()] +%% +%% @end +%%-------------------------------------------------------------------- +-spec txn_stat_print(Opts :: db_flags()) -> + ok | db_error(). +txn_stat_print(Opts) -> + Flags = process_flags(Opts), + Cmd = <>, + <> = erlang:port_control(get_port(), ?CMD_TXN_STAT_PRINT, Cmd), + case decode_rc(Result) of + ok -> ok; + Error -> {error, Error} + end. %%-------------------------------------------------------------------- %% @doc @@ -1926,3 +1994,16 @@ recv_memp_stat(Fstats) -> {ok, Stats} -> {ok, Stats, Fstats} end. + +%% +%% Receive transaction stats +%% +recv_txn_stat(Tstats) -> + receive + {error, Reason} -> + {error, decode_rc(Reason)}; + {txn, Tstat} -> + recv_txn_stat([Tstat|Tstats]); + {ok, Stats} -> + {ok, Stats, Tstats} + end. diff --git a/test/bdberl_SUITE.erl b/test/bdberl_SUITE.erl index 55e9d5d..57caf4f 100644 --- a/test/bdberl_SUITE.erl +++ b/test/bdberl_SUITE.erl @@ -38,7 +38,8 @@ all() -> lock_stat_should_report_on_success, log_stat_should_report_on_success, memp_stat_should_report_on_success, - mutex_stat_should_report_on_success]. + mutex_stat_should_report_on_success, + txn_stat_should_report_on_success]. dbconfig(Config) -> @@ -303,3 +304,11 @@ memp_stat_should_report_on_success(_Config) -> mutex_stat_should_report_on_success(_Config) -> {ok, _Stat} = bdberl:mutex_stat([]), done. + +txn_stat_should_report_on_success(_Config) -> + {ok, _GStat1, []} = bdberl:txn_stat([]), + bdberl:txn_begin(), + {ok, _GStat2, [_ATxnStat]} = bdberl:txn_stat([]), + bdberl:txn_abort(), + {ok, _GStat3, []} = bdberl:txn_stat([]), + done. From 9d081c0572b431d28888d550d92bd80e5fc9fbb8 Mon Sep 17 00:00:00 2001 From: Jon Meredith Date: Thu, 28 May 2009 12:39:14 -0600 Subject: [PATCH 07/14] Added empty flag version for each of the stat functions. --- src/bdberl.erl | 202 ++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 182 insertions(+), 20 deletions(-) diff --git a/src/bdberl.erl b/src/bdberl.erl index 98b1365..3a0a344 100644 --- a/src/bdberl.erl +++ b/src/bdberl.erl @@ -14,19 +14,19 @@ get_cache_size/0, get_data_dirs/0, get_txn_timeout/0, - stat/2, - stat_print/2, - lock_stat/1, - lock_stat_print/1, - log_stat/1, - log_stat_print/1, - memp_stat/1, - memp_stat_print/1, - mutex_stat/1, - mutex_stat_print/1, - txn_stat/1, - txn_stat_print/1, - env_stat_print/1, + stat/1, stat/2, + stat_print/1, stat_print/2, + lock_stat/0, lock_stat/1, + lock_stat_print/0, lock_stat_print/1, + log_stat/0, log_stat/1, + log_stat_print/0, log_stat_print/1, + memp_stat/0, memp_stat/1, + memp_stat_print/0, memp_stat_print/1, + mutex_stat/0, mutex_stat/1, + mutex_stat_print/0, mutex_stat_print/1, + txn_stat/0, txn_stat/1, + txn_stat_print/0, txn_stat_print/1, + env_stat_print/0, env_stat_print/1, transaction/1, transaction/2, transaction/3, put/3, put/4, put_r/3, put_r/4, @@ -1346,6 +1346,20 @@ stat(Db, Opts) -> {error, Error} end. +%%-------------------------------------------------------------------- +%% @doc +%% Retrieve database stats with empty flags +%% +%% @spec stat(Db) -> {ok, [{atom(), number()}]} | {error, Error} +%% where +%% Db = integer() +%% +%% @end +%%-------------------------------------------------------------------- +-spec stat(Opts :: db_flags()) -> + {ok, [{atom(), number()}]} | db_error(). +stat(Db) -> + stat(Db, []). %%-------------------------------------------------------------------- @@ -1394,6 +1408,21 @@ stat_print(Db, Opts) -> Error -> {error, Error} end. +%%-------------------------------------------------------------------- +%% @doc +%% Print database stats with empty flags +%% +%% @spec stat_print(Db) -> ok | {error, Error} +%% where +%% Db = integer() +%% +%% @end +%%-------------------------------------------------------------------- +-spec stat_print(Db :: db()) -> + ok | db_error(). +stat_print(Db) -> + stat_print(Db, []). + %%-------------------------------------------------------------------- %% @doc %% Retrieve lock stats @@ -1432,6 +1461,18 @@ lock_stat(Opts) -> {error, Error} end. +%%-------------------------------------------------------------------- +%% @doc +%% Retrieve lock stats with no flags +%% +%% @spec lock_stat() -> {ok, [{atom(), number()}]} | {error, Error} +%% +%% @end +%%-------------------------------------------------------------------- +-spec lock_stat() -> + {ok, [{atom(), number()}]} | db_error(). +lock_stat() -> + lock_stat([]). %%-------------------------------------------------------------------- %% @doc @@ -1474,6 +1515,19 @@ lock_stat_print(Opts) -> Error -> {error, Error} end. +%%-------------------------------------------------------------------- +%% @doc +%% Print lock stats with empty flags +%% +%% @spec lock_stat_print() -> ok | {error, Error} +%% +%% @end +%%-------------------------------------------------------------------- +-spec lock_stat_print() -> + ok | db_error(). +lock_stat_print() -> + lock_stat_print([]). + %%-------------------------------------------------------------------- %% @doc @@ -1513,6 +1567,18 @@ log_stat(Opts) -> {error, Error} end. +%%-------------------------------------------------------------------- +%% @doc +%% Retrieve log stats with empty flags +%% +%% @spec log_stat() -> {ok, [{atom(), number()}]} | {error, Error} +%% +%% @end +%%-------------------------------------------------------------------- +-spec log_stat() -> + {ok, [{atom(), number()}]} | db_error(). +log_stat() -> + log_stat([]). %%-------------------------------------------------------------------- %% @doc @@ -1547,6 +1613,18 @@ log_stat_print(Opts) -> Error -> {error, Error} end. +%%-------------------------------------------------------------------- +%% @doc +%% Print log stats with empty flags +%% +%% @spec log_stat_print() -> ok | {error, Error} +%% +%% @end +%%-------------------------------------------------------------------- +-spec log_stat_print() -> + ok | db_error(). +log_stat_print() -> + log_stat_print([]). %%-------------------------------------------------------------------- %% @doc @@ -1581,6 +1659,18 @@ memp_stat(Opts) -> {error, Error} end. +%%-------------------------------------------------------------------- +%% @doc +%% Retrieve memory pool stats with empty flags +%% +%% @spec memp_stat() -> {ok, [{atom(), number()}]} | {error, Error} +%% +%% @end +%%-------------------------------------------------------------------- +-spec memp_stat() -> + {ok, [{atom(), number()}]} | db_error(). +memp_stat() -> + memp_stat([]). %%-------------------------------------------------------------------- %% @doc @@ -1597,7 +1687,7 @@ memp_stat(Opts) -> %%
stat_clear
%%
Reset statistics after displaying their values.
%%
stat_memp_hash
-%% +%%
Display the buffers with hash chains.
%% %% %% @spec memp_stat_print(Opts) -> ok | {error, Error} @@ -1617,6 +1707,19 @@ memp_stat_print(Opts) -> Error -> {error, Error} end. +%%-------------------------------------------------------------------- +%% @doc +%% Print memory pool stats with empty flags +%% +%% @spec memp_stat_print() -> ok | {error, Error} +%% +%% @end +%%-------------------------------------------------------------------- +-spec memp_stat_print() -> + ok | db_error(). +memp_stat_print() -> + memp_stat_print([]). + %%-------------------------------------------------------------------- %% @doc %% Retrieve mutex stats @@ -1655,6 +1758,18 @@ mutex_stat(Opts) -> {error, Error} end. +%%-------------------------------------------------------------------- +%% @doc +%% Retrieve mutex stats with empty flags +%% +%% @spec mutex_stat() -> {ok, [{atom(), number()}]} | {error, Error} +%% +%% @end +%%-------------------------------------------------------------------- +-spec mutex_stat() -> + {ok, [{atom(), number()}]} | db_error(). +mutex_stat() -> + mutex_stat([]). %%-------------------------------------------------------------------- %% @doc @@ -1689,6 +1804,19 @@ mutex_stat_print(Opts) -> Error -> {error, Error} end. +%%-------------------------------------------------------------------- +%% @doc +%% Print mutex stats with empty flags +%% +%% @spec mutex_stat_print() -> ok | {error, Error} +%% +%% @end +%%-------------------------------------------------------------------- +-spec mutex_stat_print() -> + ok | db_error(). +mutex_stat_print() -> + mutex_stat_print([]). + %%-------------------------------------------------------------------- %% @doc %% Retrieve transaction stats @@ -1702,14 +1830,14 @@ mutex_stat_print(Opts) -> %%
Reset statistics after returning their values
%% %% -%% @spec txn_stat(Opts) -> {ok, [{atom(), number()}]} | {error, Error} +%% @spec txn_stat(Opts) -> {ok, [{atom(), number()}], [[{atom(), number()}]]} | {error, Error} %% where %% Opts = [atom()] %% %% @end %%-------------------------------------------------------------------- -spec txn_stat(Opts :: db_flags()) -> - {ok, [{atom(), number()}], [any()]} | db_error(). + {ok, [{atom(), number()}], [[{atom(), number()}]]} | db_error(). txn_stat(Opts) -> Flags = process_flags(Opts), @@ -1722,6 +1850,18 @@ txn_stat(Opts) -> {error, Error} end. +%%-------------------------------------------------------------------- +%% @doc +%% Retrieve transaction stats with empty flags +%% +%% @spec txn_stat() -> {ok, [{atom(), number()}], [[{atom(), number()}]]} | {error, Error} +%% +%% @end +%%-------------------------------------------------------------------- +-spec txn_stat() -> + {ok, [{atom(), number()}], [[{atom(), number()}]]} | db_error(). +txn_stat() -> + txn_stat([]). %%-------------------------------------------------------------------- %% @doc @@ -1756,6 +1896,19 @@ txn_stat_print(Opts) -> Error -> {error, Error} end. +%%-------------------------------------------------------------------- +%% @doc +%% Print transaction stats with empty flags +%% +%% @spec txn_stat_print() -> ok | {error, Error} +%% +%% @end +%%-------------------------------------------------------------------- +-spec txn_stat_print() -> + ok | db_error(). +txn_stat_print() -> + txn_stat_print([]). + %%-------------------------------------------------------------------- %% @doc %% Print environment stats @@ -1775,9 +1928,8 @@ txn_stat_print(Opts) -> %%
Display information for all configured subsystems.
%% %% -%% @spec stat_print(Opts) -> ok | {error, Error} +%% @spec env_stat_print(Opts) -> ok | {error, Error} %% where -%% Db = integer() %% Opts = [atom()] %% %% @end @@ -1793,8 +1945,18 @@ env_stat_print(Opts) -> Error -> {error, Error} end. - - +%%-------------------------------------------------------------------- +%% @doc +%% Print environment stats with empty flags +%% +%% @spec env_stat_print() -> ok | {error, Error} +%% +%% @end +%%-------------------------------------------------------------------- +-spec env_stat_print() -> + ok | db_error(). +env_stat_print() -> + env_stat_print([]). %%-------------------------------------------------------------------- %% @doc From 553ed4eda102d4413522c45491a7da203cd12e3c Mon Sep 17 00:00:00 2001 From: Jon Meredith Date: Fri, 29 May 2009 07:26:59 -0600 Subject: [PATCH 08/14] Enhanced error reporting to return atoms for libc errno, common bdb errors and bdberl errors. --- c_src/bdberl_drv.c | 235 ++++++++++++++++++++++++++++----------------- include/bdberl.hrl | 1 + src/bdberl.erl | 57 ++++++++--- 3 files changed, 188 insertions(+), 105 deletions(-) diff --git a/c_src/bdberl_drv.c b/c_src/bdberl_drv.c index 8f45482..d61c016 100644 --- a/c_src/bdberl_drv.c +++ b/c_src/bdberl_drv.c @@ -41,6 +41,8 @@ static void do_async_memp_stat(void* arg); static void do_async_mutex_stat(void* arg); static void do_async_txn_stat(void* arg); +static void send_rc(ErlDrvPort port, ErlDrvTermData pid, int rc); + static int add_dbref(PortData* data, int dbref); static int del_dbref(PortData* data, int dbref); static int has_dbref(PortData* data, int dbref); @@ -177,6 +179,26 @@ static TPool* G_TPOOL_TXNS = NULL; erl_drv_mutex_unlock(d->port_lock); \ }} +#define FAIL_IF_CURSOR_OPEN(d, outbuf) { \ + if (NULL != d->cursor) \ + { \ + send_rc(d->port, d->port_owner, ERROR_CURSOR_OPEN); \ + RETURN_INT(0, outbuf); \ + }} +#define FAIL_IF_NO_CURSOR(d, outbuf) { \ + if (NULL == d->cursor) \ + { \ + send_rc(d->port, d->port_owner, ERROR_NO_CURSOR); \ + RETURN_INT(0, outbuf); \ + }} + +#define FAIL_IF_TXN_OPEN(d, outbuf) { \ + if (d->txn) \ + { \ + send_rc(d->port, d->port_owner, ERROR_TXN_OPEN); \ + RETURN_INT(0, outbuf); \ + }} + #ifdef DEBUG # define DBG printf #else @@ -478,41 +500,29 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, DBTYPE type = (DBTYPE) UNPACK_BYTE(inbuf, 4); char* name = UNPACK_STRING(inbuf, 5); int dbref; - int status; int rc = open_database(name, type, flags, d, &dbref); - if (rc == 0) - { - status = STATUS_OK; - } - else - { - status = STATUS_ERROR; - dbref = rc; - } - // Pack the status and dbref (or errno) into a binary and return it - // Outbuf is: <> - BinHelper bh; - bin_helper_init(&bh); - bin_helper_push_byte(&bh, status); - bin_helper_push_int32(&bh, dbref); - RETURN_BH(bh, outbuf); + // Queue up a message for bdberl:open to process + if (rc == 0) // success: send {ok, DbRef} + { + ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("ok"), + ERL_DRV_INT, dbref, + ERL_DRV_TUPLE, 2}; + driver_send_term(d->port, d->port_owner, + response, sizeof(response) / sizeof(response[0])); + } + else // failure: send {error, atom() | {error, {unknown, Rc}} + { + send_rc(d->port, d->port_owner, rc); + } + // Outbuf is: <> - always send 0 and the driver will receive the real value + RETURN_INT(0, outbuf); } case CMD_CLOSE_DB: { FAIL_IF_ASYNC_PENDING(d, outbuf); - - // Fail if a cursor is open - if (d->cursor != 0) - { - RETURN_INT(ERROR_CURSOR_OPEN, outbuf); - } - - // Fail if a txn is open - if (d->txn != 0) - { - RETURN_INT(ERROR_TXN_OPEN, outbuf); - } + FAIL_IF_CURSOR_OPEN(d, outbuf); + FAIL_IF_TXN_OPEN(d, outbuf); // Take the provided dbref and attempt to close it // Inbuf is: <> @@ -521,25 +531,24 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, int rc = close_database(dbref, flags, d); - // Outbuf is: <> - RETURN_INT(rc, outbuf); + // Queue up a message for bdberl:close to process + send_rc(d->port, d->port_owner, rc); + // Outbuf is: <> - always send 0 and the driver will receive the real value + RETURN_INT(0, outbuf); } case CMD_TXN_BEGIN: { FAIL_IF_ASYNC_PENDING(d, outbuf); - - // If we already have a txn open, fail - if (d->txn != 0) - { - RETURN_INT(ERROR_TXN_OPEN, outbuf); - } + FAIL_IF_TXN_OPEN(d, outbuf); // Inbuf is <> unsigned flags = UNPACK_INT(inbuf, 0); // Outbuf is <> int rc = G_DB_ENV->txn_begin(G_DB_ENV, 0, &(d->txn), flags); - RETURN_INT(rc, outbuf); + + send_rc(d->port, d->port_owner, rc); + RETURN_INT(0, outbuf); } case CMD_TXN_COMMIT: case CMD_TXN_ABORT: @@ -549,7 +558,8 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, // If we don't already have a txn open, fail if (d->txn == 0) { - RETURN_INT(ERROR_NO_TXN, outbuf); + send_rc(d->port, d->port_owner, ERROR_NO_TXN); + RETURN_INT(0, outbuf); } // Setup async command and schedule it on the txns threadpool @@ -573,7 +583,8 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, // Put/commit requires a transaction to be active if (cmd == CMD_PUT_COMMIT && (!d->txn)) { - RETURN_INT(ERROR_NO_TXN, outbuf); + send_rc(d->port, d->port_owner, ERROR_NO_TXN); + RETURN_INT(0, outbuf); } // Inbuf is: << DbRef:32, Rest/binary>> @@ -618,7 +629,8 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, else { // Invalid dbref - RETURN_INT(ERROR_INVALID_DBREF, outbuf); + send_rc(d->port, d->port_owner, ERROR_INVALID_DBREF); + RETURN_INT(0, outbuf); } } case CMD_GETINFO: @@ -636,11 +648,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, case CMD_CURSOR_OPEN: { FAIL_IF_ASYNC_PENDING(d, outbuf); - - if (d->cursor) - { - RETURN_INT(ERROR_CURSOR_OPEN, outbuf); - } + FAIL_IF_CURSOR_OPEN(d, outbuf); // Inbuf is << DbRef:32, Flags:32 >> int dbref = UNPACK_INT(inbuf, 0); @@ -652,11 +660,13 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, // Grab the database handle and open the cursor DB* db = G_DATABASES[dbref].db; int rc = db->cursor(db, d->txn, &(d->cursor), flags); - RETURN_INT(rc, outbuf); + send_rc(d->port, d->port_owner, rc); + RETURN_INT(0, outbuf); } else { - RETURN_INT(ERROR_INVALID_DBREF, outbuf); + send_rc(d->port, d->port_owner, ERROR_INVALID_DBREF); + RETURN_INT(0, outbuf); } } case CMD_CURSOR_CURR: @@ -664,12 +674,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, case CMD_CURSOR_PREV: { FAIL_IF_ASYNC_PENDING(d, outbuf); - - // Fail if no cursor currently open - if (!d->cursor) - { - RETURN_INT(ERROR_NO_CURSOR, outbuf); - } + FAIL_IF_NO_CURSOR(d, outbuf); // Schedule the operation d->async_op = cmd; @@ -682,12 +687,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, case CMD_CURSOR_CLOSE: { FAIL_IF_ASYNC_PENDING(d, outbuf); - - // Fail if no cursor open - if (!d->cursor) - { - RETURN_INT(ERROR_NO_CURSOR, outbuf); - } + FAIL_IF_NO_CURSOR(d, outbuf); // It's possible to get a deadlock when closing a cursor -- in that situation we also // need to go ahead and abort the txn @@ -700,33 +700,26 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, // Regardless of what happens, clear out the cursor pointer d->cursor = 0; - + + // Send result code + send_rc(d->port, d->port_owner, rc); RETURN_INT(0, outbuf); } case CMD_REMOVE_DB: { FAIL_IF_ASYNC_PENDING(d, outbuf); - - // Fail if a txn is open - if (d->txn != 0) - { - RETURN_INT(ERROR_TXN_OPEN, outbuf); - } + FAIL_IF_TXN_OPEN(d, outbuf); // Inbuf is: << dbname/bytes, 0:8 >> const char* dbname = UNPACK_STRING(inbuf, 0); int rc = delete_database(dbname); - RETURN_INT(rc, outbuf); + send_rc(d->port, d->port_owner, rc); + RETURN_INT(0, outbuf); } case CMD_TRUNCATE: { FAIL_IF_ASYNC_PENDING(d, outbuf); - - // Fail if a cursor is open - if (d->cursor) - { - RETURN_INT(ERROR_CURSOR_OPEN, outbuf); - } + FAIL_IF_CURSOR_OPEN(d, outbuf); // Inbuf is: <> int dbref = UNPACK_INT(inbuf, 0); @@ -1232,6 +1225,7 @@ static int delete_database(const char* name) DBG("Attempting to delete database: %s\r\n", name); int rc = G_DB_ENV->dbremove(G_DB_ENV, 0, name, 0, DB_AUTO_COMMIT); WRITE_UNLOCK(G_DATABASES_RWLOCK); + return rc; } @@ -1291,6 +1285,82 @@ static void async_cleanup(PortData* d) erl_drv_mutex_unlock(d->port_lock); } + +// Convert an rc from BDB into a string suitable for driver_mk_atom +// returns NULL on no match +static char *rc_to_atom_str(int rc) +{ + char *error = erl_errno_id(rc); + //fprintf(stderr, "erl_errno_id(%d) = %s db_strerror = %s\n", rc, error, db_strerror(rc)); + if (NULL != error && strcmp("unknown", error) != 0) + { + return error; + } + else + { + switch (rc) + { + // bdberl driver errors + case ERROR_ASYNC_PENDING: return "async_pending"; + case ERROR_INVALID_DBREF: return "invalid_db"; + case ERROR_TXN_OPEN: return "transaction_open"; + case ERROR_NO_TXN: return "no_txn"; + case ERROR_CURSOR_OPEN: return "cursor_open"; + case ERROR_NO_CURSOR: return "no_cursor"; + // bonafide BDB errors + case DB_BUFFER_SMALL: return "buffer_small"; + case DB_DONOTINDEX: return "do_not_index"; + case DB_FOREIGN_CONFLICT: return "foreign_conflict"; + case DB_KEYEMPTY: return "key_empty"; + case DB_KEYEXIST: return "key_exist"; + case DB_LOCK_DEADLOCK: return "deadlock"; + case DB_LOCK_NOTGRANTED: return "lock_not_granted"; + case DB_LOG_BUFFER_FULL: return "log_buffer_full"; + case DB_NOTFOUND: return "not_found"; + case DB_OLD_VERSION: return "old_version"; + case DB_PAGE_NOTFOUND: return "page_not_found"; + case DB_RUNRECOVERY: return "run_recovery"; + case DB_VERIFY_BAD: return "verify_bad"; + case DB_VERSION_MISMATCH: return "version_mismatch"; + default: return NULL; + } + } +} + + +static void send_rc(ErlDrvPort port, ErlDrvTermData pid, int rc) +{ + // TODO: May need to tag the messages a bit more explicitly so that if another async + // job runs to completion before the message gets delivered we don't mis-interpret this + // response code. + if (rc == 0) + { + ErlDrvTermData response[] = {ERL_DRV_ATOM, driver_mk_atom("ok")}; + driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); + } + else + { + // See if this is a standard errno that we have an erlang code for + char *error = rc_to_atom_str(rc); + if (NULL != error) + { + ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"), + ERL_DRV_ATOM, driver_mk_atom(error), + ERL_DRV_TUPLE, 2}; + driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); + } + else + { + ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"), + ERL_DRV_ATOM, driver_mk_atom("unknown"), + ERL_DRV_INT, rc, + ERL_DRV_TUPLE, 2, + ERL_DRV_TUPLE, 2}; + driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); + } + } +} + static void async_cleanup_and_send_rc(PortData* d, int rc) { // Save the port and pid references -- we need copies independent from the PortData @@ -1302,22 +1372,7 @@ static void async_cleanup_and_send_rc(PortData* d, int rc) ErlDrvTermData pid = d->port_owner; async_cleanup(d); - - // TODO: May need to tag the messages a bit more explicitly so that if another async - // job runs to completion before the message gets delivered we don't mis-interpret this - // response code. - if (rc == 0) - { - ErlDrvTermData response[] = {ERL_DRV_ATOM, driver_mk_atom("ok")}; - driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); - } - else - { - ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"), - ERL_DRV_INT, rc, - ERL_DRV_TUPLE, 2}; - driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); - } + send_rc(port, pid, rc); } static void async_cleanup_and_send_kv(PortData* d, int rc, DBT* key, DBT* value) diff --git a/include/bdberl.hrl b/include/bdberl.hrl index 21f2cc1..e3f9ff8 100644 --- a/include/bdberl.hrl +++ b/include/bdberl.hrl @@ -37,6 +37,7 @@ -define(CMD_TXN_STAT, 29). -define(CMD_TXN_STAT_PRINT, 30). +-define(DB_TYPE_UNKNOWN, 0). -define(DB_TYPE_BTREE, 1). -define(DB_TYPE_HASH, 2). diff --git a/src/bdberl.erl b/src/bdberl.erl index 3a0a344..0950019 100644 --- a/src/bdberl.erl +++ b/src/bdberl.erl @@ -140,28 +140,35 @@ open(Name, Type) -> %% @spec open(Name, Type, Opts) -> {ok, Db} | {error, Error} %% where %% Name = string() -%% Type = btree | hash +%% Type = btree | hash | unknown %% Opts = [atom()] %% Db = integer() %% %% @end %%-------------------------------------------------------------------- --spec open(Name :: db_name(), Type :: db_type(), Opts :: db_flags()) -> +-spec open(Name :: db_name(), Type :: db_type() | unknown, Opts :: db_flags()) -> {ok, db()} | {error, integer()}. open(Name, Type, Opts) -> % Map database type into an integer code case Type of btree -> TypeCode = ?DB_TYPE_BTREE; - hash -> TypeCode = ?DB_TYPE_HASH + hash -> TypeCode = ?DB_TYPE_HASH; + unknown -> TypeCode = ?DB_TYPE_UNKNOWN %% BDB automatically determines if file exists end, Flags = process_flags(lists:umerge(Opts, [auto_commit, threaded])), Cmd = <>, - case erlang:port_control(get_port(), ?CMD_OPEN_DB, Cmd) of - <> -> - {ok, Db}; - <> -> - {error, Errno} + <> = erlang:port_control(get_port(), ?CMD_OPEN_DB, Cmd), + case decode_rc(Rc) of + ok -> + receive + {ok, DbRef} -> + {ok, DbRef}; + {error, Reason} -> + {error, Reason} + end; + Error -> + {error, Error} end. @@ -226,9 +233,14 @@ close(Db, Opts) -> <> = erlang:port_control(get_port(), ?CMD_CLOSE_DB, Cmd), case decode_rc(Rc) of ok -> - ok; - Reason -> - {error, Reason} + receive + ok -> + ok; + {error, Reason} -> + {error, Reason} + end; + Error -> + {error, Error} end. @@ -339,7 +351,11 @@ txn_begin(Opts) -> Cmd = <>, <> = erlang:port_control(get_port(), ?CMD_TXN_BEGIN, Cmd), case decode_rc(Result) of - ok -> ok; + ok -> + receive + ok -> ok; + {error, Reason} -> {error, decode_rc(Reason)} + end; Error -> {error, Error} end. @@ -1060,7 +1076,11 @@ cursor_open(Db) -> <> = erlang:port_control(get_port(), ?CMD_CURSOR_OPEN, Cmd), case decode_rc(Rc) of ok -> - ok; + receive + ok -> ok; + {error, Reason} -> {error, Reason} + end; + Reason -> {error, Reason} end. @@ -1166,7 +1186,10 @@ cursor_close() -> <> = erlang:port_control(get_port(), ?CMD_CURSOR_CLOSE, <<>>), case decode_rc(Rc) of ok -> - ok; + receive + ok -> ok; + {error, Reason} -> {error, Reason} + end; Reason -> {error, Reason} end. @@ -1199,7 +1222,10 @@ delete_database(Filename) -> <> = erlang:port_control(get_port(), ?CMD_REMOVE_DB, Cmd), case decode_rc(Rc) of ok -> - ok; + receive + ok -> ok; + {error, Reason} -> {error, decode_rc(Reason)} + end; Reason -> {error, Reason} end. @@ -2008,6 +2034,7 @@ get_port() -> %% %% Decode a integer return value into an atom representation %% +decode_rc(Rc) when is_atom(Rc) -> Rc; decode_rc(?ERROR_NONE) -> ok; decode_rc(?ERROR_ASYNC_PENDING) -> async_pending; decode_rc(?ERROR_INVALID_DBREF) -> invalid_db; From 2c6277bdf4c5d291d09853396f83b6f857162c78 Mon Sep 17 00:00:00 2001 From: Jon Meredith Date: Fri, 29 May 2009 07:42:18 -0600 Subject: [PATCH 09/14] Added unknown db type for opening existing databases. --- c_src/bdberl_drv.c | 16 ++++++++-------- include/bdberl.hrl | 2 +- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/c_src/bdberl_drv.c b/c_src/bdberl_drv.c index d61c016..8b53950 100644 --- a/c_src/bdberl_drv.c +++ b/c_src/bdberl_drv.c @@ -193,11 +193,17 @@ static TPool* G_TPOOL_TXNS = NULL; }} #define FAIL_IF_TXN_OPEN(d, outbuf) { \ - if (d->txn) \ + if (NULL != d->txn) \ { \ send_rc(d->port, d->port_owner, ERROR_TXN_OPEN); \ RETURN_INT(0, outbuf); \ }} +#define FAIL_IF_NO_TXN(d, outbuf) { \ + if (NULL == d->txn) \ + { \ + send_rc(d->port, d->port_owner, ERROR_NO_TXN); \ + RETURN_INT(0, outbuf); \ + }} #ifdef DEBUG # define DBG printf @@ -554,13 +560,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, case CMD_TXN_ABORT: { FAIL_IF_ASYNC_PENDING(d, outbuf); - - // If we don't already have a txn open, fail - if (d->txn == 0) - { - send_rc(d->port, d->port_owner, ERROR_NO_TXN); - RETURN_INT(0, outbuf); - } + FAIL_IF_NO_TXN(d, outbuf); // Setup async command and schedule it on the txns threadpool d->async_op = cmd; diff --git a/include/bdberl.hrl b/include/bdberl.hrl index e3f9ff8..4836c56 100644 --- a/include/bdberl.hrl +++ b/include/bdberl.hrl @@ -37,9 +37,9 @@ -define(CMD_TXN_STAT, 29). -define(CMD_TXN_STAT_PRINT, 30). --define(DB_TYPE_UNKNOWN, 0). -define(DB_TYPE_BTREE, 1). -define(DB_TYPE_HASH, 2). +-define(DB_TYPE_UNKNOWN, 5). -define(SYSP_CACHESIZE_GET, 1). -define(SYSP_TXN_TIMEOUT_GET, 2). From 1409096860f185f51adc9632011ecd9112907b70 Mon Sep 17 00:00:00 2001 From: Jon Meredith Date: Fri, 29 May 2009 12:13:19 -0600 Subject: [PATCH 10/14] Renamed get_lg_dir() to get_lg_dir_info and added get_data_dirs_info(). Both new calls return the filesystem id and the number of mbytes available on that filesystem. --- c_src/bdberl_drv.c | 170 ++++++++++++++++++++++++++++++++++++++++++ c_src/bdberl_drv.h | 3 + c_src/bin_helper.c | 4 + include/bdberl.hrl | 3 + src/bdberl.erl | 76 ++++++++++++++++++- test/bdberl_SUITE.erl | 12 ++- 6 files changed, 266 insertions(+), 2 deletions(-) diff --git a/c_src/bdberl_drv.c b/c_src/bdberl_drv.c index 8b53950..398fa55 100644 --- a/c_src/bdberl_drv.c +++ b/c_src/bdberl_drv.c @@ -9,12 +9,15 @@ #include #include #include +#include +#include #include #include #include #include #include #include +#include #include "hive_hash.h" #include "bdberl_drv.h" @@ -40,7 +43,9 @@ static void do_async_log_stat(void* arg); static void do_async_memp_stat(void* arg); static void do_async_mutex_stat(void* arg); static void do_async_txn_stat(void* arg); +static void do_sync_data_dirs_info(PortData *p); +static int send_dir_info(ErlDrvPort port, ErlDrvTermData pid, const char *path); static void send_rc(ErlDrvPort port, ErlDrvTermData pid, int rc); static int add_dbref(PortData* data, int dbref); @@ -1039,6 +1044,44 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, int rc = G_DB_ENV->txn_stat_print(G_DB_ENV, flags); RETURN_INT(rc, outbuf); } + case CMD_DATA_DIRS_INFO: + { + FAIL_IF_ASYNC_PENDING(d, outbuf); + + do_sync_data_dirs_info(d); + + // Let caller know that the operation is in progress + // Outbuf is: <<0:32>> + RETURN_INT(0, outbuf); + } + + case CMD_LOG_DIR_INFO: + { + FAIL_IF_ASYNC_PENDING(d, outbuf); + + // Find the log dir or use DB_HOME - error if not present + const char *lg_dir = NULL; + int rc = G_DB_ENV->get_lg_dir(G_DB_ENV, &lg_dir); + if (0 == rc && NULL == lg_dir) + { + rc = G_DB_ENV->get_home(G_DB_ENV, &lg_dir); + } + // Send info if we can get a dir, otherwise return the error + if (0 == rc) + { + // send a dirinfo message - will send an error message on a NULL lg_dir + send_dir_info(d->port, d->port_owner, lg_dir); + } + else + { + send_rc(d->port, d->port_owner, rc); + } + + // Let caller know that the operation is in progress + // Outbuf is: <<0:32>> + RETURN_INT(0, outbuf); + } + } *outbuf = 0; @@ -1271,6 +1314,21 @@ static void get_info(int target, void* values, BinHelper* bh) } break; } + case SYSP_LOG_DIR_GET: + { + const char* dir = 0; + // Get the log dir - according to BDB docs, if not set + // the DB_HOME is used. + int rc = G_DB_ENV->get_lg_dir(G_DB_ENV, &dir); + if (NULL == dir) + { + dir = getenv("DB_HOME"); + } + bin_helper_init(bh); + bin_helper_push_int32(bh, rc); + bin_helper_push_string(bh, dir); + break; + } } } @@ -1328,6 +1386,53 @@ static char *rc_to_atom_str(int rc) } +// Send a {dirinfo, Path, FsId, MbyteAvail} message to pid given. +// Send an {errno, Reason} on failure +// returns 0 on success, errno on failure +static int send_dir_info(ErlDrvPort port, ErlDrvTermData pid, const char *path) +{ + struct statvfs svfs; + int rc; + + if (NULL == path) + { + rc = EINVAL; + } + else if (0 != statvfs(path, &svfs)) + { + rc = errno; + } + else + { + rc = 0; + } + + if (0 != rc) + { + send_rc(port, pid, rc); + } + else + { + fsblkcnt_t blocks_per_mbyte = 1024 * 1024 / svfs.f_frsize; + assert(blocks_per_mbyte > 0); + unsigned int mbyte_avail = (unsigned int) (svfs.f_bavail / blocks_per_mbyte); + int path_len = strlen(path); + + ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("dirinfo"), + ERL_DRV_STRING, (ErlDrvTermData) path, path_len, + // send fsid as a binary as will only be used + // to compare which physical filesystem is on + // and the definintion varies between platforms. + ERL_DRV_BUF2BINARY, (ErlDrvTermData) &svfs.f_fsid, + sizeof(svfs.f_fsid), + ERL_DRV_UINT, mbyte_avail, + ERL_DRV_TUPLE, 4}; + driver_send_term(port, pid, response, sizeof(response) / sizeof(response[0])); + } + return rc; +} + + static void send_rc(ErlDrvPort port, ErlDrvTermData pid, int rc) { // TODO: May need to tag the messages a bit more explicitly so that if another async @@ -2334,6 +2439,71 @@ static void do_async_txn_stat(void* arg) } } + +static void do_sync_data_dirs_info(PortData *d) +{ + // Get DB_HOME and find the real path + const char *db_home = NULL; + const char *data_dir = NULL; + const char **data_dirs = NULL; + char db_home_realpath[PATH_MAX+1]; + char data_dir_realpath[PATH_MAX+1]; + int got_db_home = 0; + + // Lookup the environment and add it if not explicitly included in the data_dirs + int rc = G_DB_ENV->get_home(G_DB_ENV, &db_home); + if (rc != 0 || NULL == db_home) + { + // If no db_home we'll have to rely on whatever the global environment is configured with + got_db_home = 1; + } + else + { + if (NULL == realpath(db_home, db_home_realpath)) + rc = errno; + } + + // Get the data first + rc = G_DB_ENV->get_data_dirs(G_DB_ENV, &data_dirs); + int i; + for (i = 0; 0 == rc && NULL != data_dirs && NULL != data_dirs[i]; i++) + { + data_dir = data_dirs[i]; + + if (!got_db_home) + { + // Get the real path of the data dir + if (NULL == realpath(data_dir, data_dir_realpath)) + { + rc = errno; + } + else + { + // Set got_db_home if it matches + if (0 == strcmp(data_dir_realpath, db_home_realpath)) + { + got_db_home = 1; + } + } + } + + if (0 == rc) + { + rc = send_dir_info(d->port, d->port_owner, data_dir); + } + } + + // BDB always searches the environment home too so add it to the list + if (!got_db_home && rc == 0) + { + rc = send_dir_info(d->port, d->port_owner, db_home); + } + + // Send the return code - will termiante the receive loop in bdberl.erl + send_rc(d->port, d->port_owner, rc); +} + + static void* zalloc(unsigned int size) { void* res = driver_alloc(size); diff --git a/c_src/bdberl_drv.h b/c_src/bdberl_drv.h index e22d60e..e282b0d 100644 --- a/c_src/bdberl_drv.h +++ b/c_src/bdberl_drv.h @@ -59,6 +59,8 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, #define CMD_MUTEX_STAT_PRINT 28 #define CMD_TXN_STAT 29 #define CMD_TXN_STAT_PRINT 30 +#define CMD_DATA_DIRS_INFO 31 +#define CMD_LOG_DIR_INFO 32 /** * Command status values @@ -96,6 +98,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, #define SYSP_CACHESIZE_GET 1 #define SYSP_TXN_TIMEOUT_GET 2 #define SYSP_DATA_DIR_GET 3 +#define SYSP_LOG_DIR_GET 4 /** * Driver Entry diff --git a/c_src/bin_helper.c b/c_src/bin_helper.c index beedafb..bc1c0d2 100644 --- a/c_src/bin_helper.c +++ b/c_src/bin_helper.c @@ -44,6 +44,10 @@ void bin_helper_push_int32(BinHelper* bh, int value) void bin_helper_push_string(BinHelper* bh, const char* string) { + if (NULL == string) + { + string = ""; + } int sz = strlen(string); bin_helper_check_size(bh, sz+1); strncpy(bh->bin->orig_bytes+(bh->offset), string, sz+1); diff --git a/include/bdberl.hrl b/include/bdberl.hrl index 4836c56..939c741 100644 --- a/include/bdberl.hrl +++ b/include/bdberl.hrl @@ -36,6 +36,8 @@ -define(CMD_MUTEX_STAT_PRINT,28). -define(CMD_TXN_STAT, 29). -define(CMD_TXN_STAT_PRINT, 30). +-define(CMD_DATA_DIRS_INFO, 31). +-define(CMD_LOG_DIR_INFO, 32). -define(DB_TYPE_BTREE, 1). -define(DB_TYPE_HASH, 2). @@ -44,6 +46,7 @@ -define(SYSP_CACHESIZE_GET, 1). -define(SYSP_TXN_TIMEOUT_GET, 2). -define(SYSP_DATA_DIR_GET, 3). +-define(SYSP_LOG_DIR_GET, 4). -define(STATUS_OK, 0). -define(STATUS_ERROR, 1). diff --git a/src/bdberl.erl b/src/bdberl.erl index 0950019..35bd6ae 100644 --- a/src/bdberl.erl +++ b/src/bdberl.erl @@ -13,6 +13,8 @@ txn_commit/0, txn_commit/1, txn_abort/0, get_cache_size/0, get_data_dirs/0, + get_data_dirs_info/0, + get_lg_dir_info/0, get_txn_timeout/0, stat/1, stat/2, stat_print/1, stat_print/2, @@ -48,10 +50,11 @@ -type db_name() :: [byte(),...]. -type db_type() :: btree | hash. -type db_flags() :: [atom()]. +-type db_fsid() :: binary(). -type db_key() :: term(). +-type db_mbytes() :: non_neg_integer(). -type db_value() :: term(). -type db_ret_value() :: not_found | db_value(). - -type db_error_reason() :: atom() | {unknown, integer()}. -type db_error() :: {error, db_error_reason()}. @@ -1267,6 +1270,63 @@ get_data_dirs() -> end. +%%-------------------------------------------------------------------- +%% @doc +%% Returns the list of directories that bdberl searches to find databases +%% with the number of megabytes available for each dir +%% +%% @spec get_data_dirs_info() -> {ok, [DirName, Fsid, MbytesAvail]} | {error, Error} +%% where +%% DirName = string() +%% Fsid = binary() +%% MbytesAvail = integer() +%% +%% @end +%%-------------------------------------------------------------------- +-spec get_data_dirs_info() -> {ok, [{string(), db_fsid(), db_mbytes()}]} | db_error(). + +get_data_dirs_info() -> + % Call into the BDB library and get a list of configured data directories + Cmd = <<>>, + <> = erlang:port_control(get_port(),?CMD_DATA_DIRS_INFO, Cmd), + case decode_rc(Result) of + ok -> + recv_dirs_info([]); + Reason -> + {error, Reason} + end. + + +%%-------------------------------------------------------------------- +%% @doc +%% Returns the log directory info (name and megabytes available) +%% +%% @spec get_lg_dir_info() -> {ok, DirName, Fsid, MbytesAvail} | {error, Error} +%% where +%% DirName = string() +%% Fsid = binary() +%% MbytesAvail = integer() +%% +%% @end +%%-------------------------------------------------------------------- +-spec get_lg_dir_info() -> {ok, string(), db_fsid(), db_mbytes()} | db_error(). +get_lg_dir_info() -> + % Call into the BDB library and get the log dir and filesystem info + Cmd = <<>>, + <> = erlang:port_control(get_port(), ?CMD_LOG_DIR_INFO, Cmd), + case decode_rc(Result) of + ok -> + receive + {dirinfo, Path, FsId, MbytesAvail} -> + {ok, Path, FsId, MbytesAvail}; + {error, Reason} -> + {error, Reason} + end; + Reason -> + {error, Reason} + end. + + %%-------------------------------------------------------------------- %% @doc %% Returns the size of the in-memory cache. @@ -2196,3 +2256,17 @@ recv_txn_stat(Tstats) -> {ok, Stats} -> {ok, Stats, Tstats} end. + +%% +%% Receive directory info messages until ok +%% +recv_dirs_info(DirInfos) -> + receive + {dirinfo, Path, FsId, MbytesAvail} -> + recv_dirs_info([{Path, FsId, MbytesAvail} | DirInfos]); + {error, Reason} -> + {error, Reason}; + ok -> + {ok, DirInfos} + end. + diff --git a/test/bdberl_SUITE.erl b/test/bdberl_SUITE.erl index 57caf4f..82b0830 100644 --- a/test/bdberl_SUITE.erl +++ b/test/bdberl_SUITE.erl @@ -39,7 +39,10 @@ all() -> log_stat_should_report_on_success, memp_stat_should_report_on_success, mutex_stat_should_report_on_success, - txn_stat_should_report_on_success]. + txn_stat_should_report_on_success, + data_dirs_info_should_report_on_success, + lg_dir_info_should_report_on_success]. + dbconfig(Config) -> @@ -312,3 +315,10 @@ txn_stat_should_report_on_success(_Config) -> bdberl:txn_abort(), {ok, _GStat3, []} = bdberl:txn_stat([]), done. + +data_dirs_info_should_report_on_success(_Config) -> + {ok, _DataDirs} = bdberl:get_data_dirs_info(). + +lg_dir_info_should_report_on_success(_Config) -> + {ok, _LgDir, _Fsid, _MBytesAvail} = bdberl:get_lg_dir_info(). + From a3462d0fa7b8d466def27dd8bb3dcbf720f35bd5 Mon Sep 17 00:00:00 2001 From: Jon Meredith Date: Fri, 29 May 2009 13:45:58 -0600 Subject: [PATCH 11/14] txn_abort() used to return ok even if no txn. The error code changes broke that behavior. Changed it to return ok if there was no txn. --- src/bdberl.erl | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/bdberl.erl b/src/bdberl.erl index 35bd6ae..e6b2451 100644 --- a/src/bdberl.erl +++ b/src/bdberl.erl @@ -463,6 +463,7 @@ txn_abort() -> ok -> receive ok -> ok; + {error, no_txn} -> ok; {error, Reason} -> {error, decode_rc(Reason)} end; @@ -540,13 +541,14 @@ transaction(Fun, Retries) -> {ok, db_value()} | db_txn_error(). transaction(_Fun, 0, _Opts) -> - ok = txn_abort(), {error, {transaction_failed, retry_limit_reached}}; + transaction(Fun, Retries, Opts) -> case txn_begin(Opts) of ok -> try Fun() of abort -> + error_logger:info_msg("function requested abort"), ok = txn_abort(), {error, transaction_aborted}; @@ -566,6 +568,7 @@ transaction(Fun, Retries, Opts) -> transaction(Fun, R); _ : Reason -> + error_logger:info_msg("function threw non-lock error - ~p", [Reason]), ok = txn_abort(), {error, {transaction_failed, Reason}} end; From 9361739244efbebb44c4a87e5143b75dd60c85c4 Mon Sep 17 00:00:00 2001 From: Jon Meredith Date: Fri, 29 May 2009 13:49:49 -0600 Subject: [PATCH 12/14] Removed debug statements left in by accident. --- src/bdberl.erl | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/bdberl.erl b/src/bdberl.erl index e6b2451..f6e773c 100644 --- a/src/bdberl.erl +++ b/src/bdberl.erl @@ -548,7 +548,6 @@ transaction(Fun, Retries, Opts) -> ok -> try Fun() of abort -> - error_logger:info_msg("function requested abort"), ok = txn_abort(), {error, transaction_aborted}; @@ -568,7 +567,6 @@ transaction(Fun, Retries, Opts) -> transaction(Fun, R); _ : Reason -> - error_logger:info_msg("function threw non-lock error - ~p", [Reason]), ok = txn_abort(), {error, {transaction_failed, Reason}} end; From 40393a8e62c7fc109763f6ecd3405c95f3422c69 Mon Sep 17 00:00:00 2001 From: Jon Meredith Date: Mon, 1 Jun 2009 12:14:26 -0600 Subject: [PATCH 13/14] Cleaned up driver finish function to make sure resources had been allocated before they were freed. --- c_src/bdberl_drv.c | 72 ++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 60 insertions(+), 12 deletions(-) diff --git a/c_src/bdberl_drv.c b/c_src/bdberl_drv.c index 398fa55..ee9394f 100644 --- a/c_src/bdberl_drv.c +++ b/c_src/bdberl_drv.c @@ -353,6 +353,10 @@ DRIVER_INIT(bdberl_drv) G_LOG_PORT = 0; G_LOG_PID = 0; } + else + { + DBG("DRIVER INIT FAILED - %s\r\n", db_strerror(G_DB_ENV_ERROR)); + } return &bdberl_drv_entry; } @@ -461,8 +465,17 @@ static void bdberl_drv_stop(ErlDrvData handle) static void bdberl_drv_finish() { // Stop the thread pools - bdberl_tpool_stop(G_TPOOL_GENERAL); - bdberl_tpool_stop(G_TPOOL_TXNS); + if (NULL != G_TPOOL_GENERAL) + { + bdberl_tpool_stop(G_TPOOL_GENERAL); + G_TPOOL_GENERAL = NULL; + } + + if (NULL != G_TPOOL_TXNS) + { + bdberl_tpool_stop(G_TPOOL_TXNS); + G_TPOOL_TXNS = NULL; + } // Signal the utility threads time is up G_TRICKLE_ACTIVE = 0; @@ -470,28 +483,63 @@ static void bdberl_drv_finish() G_CHECKPOINT_ACTIVE = 0; // Close the writer fd on the pipe to signal finish to the utility threads - close(G_BDBERL_PIPE[1]); - G_BDBERL_PIPE[1] = -1; + if (-1 != G_BDBERL_PIPE[1]) + { + close(G_BDBERL_PIPE[1]); + G_BDBERL_PIPE[1] = -1; + } // Wait for the deadlock checker to shutdown -- then wait for it - erl_drv_thread_join(G_DEADLOCK_THREAD, 0); + if (0 != G_DEADLOCK_THREAD) + { + erl_drv_thread_join(G_DEADLOCK_THREAD, 0); + G_DEADLOCK_THREAD = 0; + } // Wait for the checkpointer to shutdown -- then wait for it - erl_drv_thread_join(G_CHECKPOINT_THREAD, 0); + if (0 != G_CHECKPOINT_THREAD) + { + erl_drv_thread_join(G_CHECKPOINT_THREAD, 0); + G_CHECKPOINT_THREAD = 0; + } // Close the reader fd on the pipe now utility threads are closed - close(G_BDBERL_PIPE[0]); + if (-1 != G_BDBERL_PIPE[0]) + { + close(G_BDBERL_PIPE[0]); + } G_BDBERL_PIPE[0] = -1; // Cleanup and shut down the BDB environment. Note that we assume // all ports have been released and thuse all databases/txns/etc are also gone. - G_DB_ENV->close(G_DB_ENV, 0); - driver_free(G_DATABASES); - erl_drv_rwlock_destroy(G_DATABASES_RWLOCK); - hive_hash_destroy(G_DATABASES_NAMES); + if (NULL != G_DB_ENV) + { + G_DB_ENV->close(G_DB_ENV, 0); + G_DB_ENV = NULL; + } + if (NULL != G_DATABASES) + { + driver_free(G_DATABASES); + G_DATABASES = NULL; + } + if (NULL != G_DATABASES_RWLOCK) + { + erl_drv_rwlock_destroy(G_DATABASES_RWLOCK); + G_DATABASES_RWLOCK = NULL; + } + if (NULL != G_DATABASES_NAMES) + { + hive_hash_destroy(G_DATABASES_NAMES); + G_DATABASES_NAMES = NULL; + } + // Release the logging rwlock - erl_drv_rwlock_destroy(G_LOG_RWLOCK); + if (NULL != G_LOG_RWLOCK) + { + erl_drv_rwlock_destroy(G_LOG_RWLOCK); + G_LOG_RWLOCK = NULL; + } DBG("DRIVER_FINISH\n"); } From 58b870d15c14a74e8c095c6fd4ae452115735671 Mon Sep 17 00:00:00 2001 From: Jon Meredith Date: Mon, 1 Jun 2009 15:52:06 -0600 Subject: [PATCH 14/14] Removed decode_rc calls that are now decoded within bdberl_drv.c --- src/bdberl.erl | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/src/bdberl.erl b/src/bdberl.erl index f6e773c..3a92bed 100644 --- a/src/bdberl.erl +++ b/src/bdberl.erl @@ -428,7 +428,7 @@ txn_commit(Opts) -> ok -> receive ok -> ok; - {error, Reason} -> {error, decode_rc(Reason)} + {error, Reason} -> {error, Reason} end; Error -> {error, Error} @@ -464,7 +464,7 @@ txn_abort() -> receive ok -> ok; {error, no_txn} -> ok; - {error, Reason} -> {error, decode_rc(Reason)} + {error, Reason} -> {error, Reason} end; no_txn -> @@ -548,6 +548,7 @@ transaction(Fun, Retries, Opts) -> ok -> try Fun() of abort -> + error_logger:info_msg("function requested abort"), ok = txn_abort(), {error, transaction_aborted}; @@ -567,6 +568,7 @@ transaction(Fun, Retries, Opts) -> transaction(Fun, R); _ : Reason -> + error_logger:info_msg("function threw non-lock error - ~p", [Reason]), ok = txn_abort(), {error, {transaction_failed, Reason}} end; @@ -867,7 +869,7 @@ get(Db, Key, Opts) -> receive {ok, _, Bin} -> {ok, binary_to_term(Bin)}; not_found -> not_found; - {error, Reason} -> {error, decode_rc(Reason)} + {error, Reason} -> {error, Reason} end; Error -> {error, Error} @@ -1051,7 +1053,7 @@ truncate(Db) -> ok -> receive ok -> ok; - {error, Reason} -> {error, decode_rc(Reason)} + {error, Reason} -> {error, Reason} end; Error -> @@ -1228,7 +1230,7 @@ delete_database(Filename) -> ok -> receive ok -> ok; - {error, Reason} -> {error, decode_rc(Reason)} + {error, Reason} -> {error, Reason} end; Reason -> {error, Reason} @@ -1425,7 +1427,7 @@ stat(Db, Opts) -> ok -> receive {error, Reason} -> - {error, decode_rc(Reason)}; + {error, Reason}; {ok, Stats} -> {ok, Stats} end; @@ -1540,7 +1542,7 @@ lock_stat(Opts) -> ok -> receive {error, Reason} -> - {error, decode_rc(Reason)}; + {error, Reason}; {ok, Stats} -> {ok, Stats} end; @@ -1646,7 +1648,7 @@ log_stat(Opts) -> ok -> receive {error, Reason} -> - {error, decode_rc(Reason)}; + {error, Reason}; {ok, Stats} -> {ok, Stats} end; @@ -1837,7 +1839,7 @@ mutex_stat(Opts) -> ok -> receive {error, Reason} -> - {error, decode_rc(Reason)}; + {error, Reason}; {ok, Stats} -> {ok, Stats} end;