From a71f18ee1b245cc30afe5541e774c82e35a4ab23 Mon Sep 17 00:00:00 2001 From: Dave Smith Date: Wed, 10 Dec 2008 19:47:00 -0700 Subject: [PATCH] Now with 100% more threadiness and 50% less deadlocking --- c_src/bdberl_drv.c | 112 ++++++++++++++++++++++++++---------------- c_src/bdberl_tpool.c | 4 -- src/bdberl_port.erl | 3 +- test/thrash_SUITE.erl | 25 +++++----- 4 files changed, 85 insertions(+), 59 deletions(-) diff --git a/c_src/bdberl_drv.c b/c_src/bdberl_drv.c index 18e89b6..5232e4c 100644 --- a/c_src/bdberl_drv.c +++ b/c_src/bdberl_drv.c @@ -38,6 +38,7 @@ static int alloc_dbref(); static void* zalloc(unsigned int size); static void signal_port(PortData* d); +static void* deadlock_check(void* arg); /** * Global instance of DB_ENV; only a single one exists per O/S process. @@ -70,6 +71,12 @@ static int G_DATABASES_SIZE; static ErlDrvRWLock* G_DATABASES_RWLOCK; static hive_hash* G_DATABASES_NAMES; +/** + * G_DEADLOCK_* a + */ +static ErlDrvTid G_DEADLOCK_THREAD; +static unsigned int G_DEADLOCK_CHECK_ACTIVE = 1; +static unsigned int G_DEADLOCK_CHECK_INTERVAL = 100; /** * @@ -130,12 +137,6 @@ DRIVER_INIT(bdberl_drv) // Make sure we can distiguish between lock timeouts and deadlocks int rc = G_DB_ENV->set_flags(G_DB_ENV, DB_TIME_NOTGRANTED, 1); printf("TIME_NOT_GRANTED rc: %d\n", rc); - rc = G_DB_ENV->set_lk_detect(G_DB_ENV, 1); - printf("LOCK DETECT rc: %d\n", rc); - rc = G_DB_ENV->set_timeout(G_DB_ENV, 50000000, DB_SET_TXN_TIMEOUT); - printf("DB_SET_TXN_TIMEOUT rc: %d\n", rc); - rc = G_DB_ENV->set_timeout(G_DB_ENV, 50000000, DB_SET_LOCK_TIMEOUT); - printf("DB_SET_LOCK_TIMEOUT rc: %d\n", rc); // BDB is setup -- allocate structures for tracking databases G_DATABASES = (Database*) driver_alloc(sizeof(Database) * G_DATABASES_SIZE); @@ -143,6 +144,10 @@ DRIVER_INIT(bdberl_drv) G_DATABASES_RWLOCK = erl_drv_rwlock_create("bdberl_drv: G_DATABASES_RWLOCK"); G_DATABASES_NAMES = hive_hash_new(G_DATABASES_SIZE); + // Startup deadlock check thread + erl_drv_thread_create("bdberl_drv_deadlock_checker", &G_DEADLOCK_THREAD, + &deadlock_check, 0, 0); + // Startup our thread pools // TODO: Make configurable/adjustable G_TPOOL_GENERAL = bdberl_tpool_start(5); @@ -191,7 +196,7 @@ static ErlDrvData bdberl_drv_start(ErlDrvPort port, char* buffer) static void bdberl_drv_stop(ErlDrvData handle) { PortData* d = (PortData*)handle; - printf("%p: stop\n", d->port); +// printf("%p: stop\n", d->port); // If there is an async job pending, we need to cancel it. The cancel operation will // block until the job has either been removed or has run @@ -227,11 +232,14 @@ static void bdberl_drv_stop(ErlDrvData handle) static void bdberl_drv_finish() { - printf("finish\n"); // Stop the thread pools bdberl_tpool_stop(G_TPOOL_GENERAL); bdberl_tpool_stop(G_TPOOL_TXNS); + // Signal the deadlock checker to shutdown -- then wait for it + G_DEADLOCK_CHECK_ACTIVE = 0; + erl_drv_thread_join(G_DEADLOCK_THREAD, 0); + // Cleanup and shut down the BDB environment. Note that we assume // all ports have been released and thuse all databases/txns/etc are also gone. G_DB_ENV->close(G_DB_ENV, 0); @@ -239,7 +247,7 @@ static void bdberl_drv_finish() erl_drv_rwlock_destroy(G_DATABASES_RWLOCK); hive_hash_destroy(G_DATABASES_NAMES); - printf("finished\n"); + printf("DRIVER_FINISH\n"); } static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, @@ -299,13 +307,11 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, // If we already have a txn open, fail if (d->txn != 0) { - printf("ERROR_TXN_OPEN\n"); RETURN_INT(ERROR_TXN_OPEN, outbuf); } // Outbuf is <> int rc = G_DB_ENV->txn_begin(G_DB_ENV, 0, &(d->txn), 0); - printf("TXN BEGIN: %d\n", rc); RETURN_INT(rc, outbuf); } case CMD_TXN_COMMIT: @@ -408,7 +414,8 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, static void bdberl_drv_ready_input(ErlDrvData handle, ErlDrvEvent event) { PortData* d = (PortData*)handle; - printf("%p: ready_input\n", d->port); +// printf("%p: ready_input; cmd = %d; rc = %d\n", d->port, d->async_op, +// ((AsyncData*)d->async_data)->rc); // Empty out the queue int readbuf; @@ -422,17 +429,10 @@ static void bdberl_drv_ready_input(ErlDrvData handle, ErlDrvEvent event) case CMD_TXN_COMMIT: case CMD_TXN_ABORT: { - - // If this was a commit/abort, clear out the txn handle - if (d->async_op == CMD_TXN_COMMIT || d->async_op == CMD_TXN_ABORT) - { - d->txn = 0; - } - - // Extract return code == if it's zero, send back "ok" to driver process; otherwise - // send a {error, Reason} tuple AsyncData* adata = (AsyncData*)d->async_data; - printf("%p: %d op; rc = %d\n", d->port, d->async_op, adata->rc); + + // Extract return code == if it's zero, send back "ok" to driver process; otherwise + // send a {error, Reason} tuple if (adata->rc == 0) { ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("ok") }; @@ -440,19 +440,20 @@ static void bdberl_drv_ready_input(ErlDrvData handle, ErlDrvEvent event) } else { - // If we have a transactional PUT and a deadlock occurred, clear out txn ptr - if (d->async_op == CMD_GET && adata->port->txn && - (adata->rc == DB_LOCK_DEADLOCK || adata->rc == DB_LOCK_NOTGRANTED)) - { - d->txn = 0; - } - ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"), ERL_DRV_INT, adata->rc, ERL_DRV_TUPLE, 2}; driver_output_term(d->port, response, sizeof(response) / sizeof(response[0])); } + // If this was a commit/abort, or a deadlock occurred while in a transaction, + // clear out the handle -- it's already invalid + if (d->async_op == CMD_TXN_COMMIT || d->async_op == CMD_TXN_ABORT || + (d->txn && (adata->rc == DB_LOCK_NOTGRANTED || adata->rc == DB_LOCK_DEADLOCK))) + { + d->txn = 0; + } + // Cleanup async data and mark the port as not busy driver_free(d->async_data); d->async_data = 0; @@ -466,7 +467,6 @@ static void bdberl_drv_ready_input(ErlDrvData handle, ErlDrvEvent event) // Extract return code == if it's zero, send back {ok, Payload} or not_found to driver // process; otherwise send a {error, Reason} tuple AsyncData* adata = (AsyncData*)d->async_data; - printf("%p: %d op; rc = %d\n", d->port, d->async_op, adata->rc); if (adata->rc == 0) { ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("ok"), @@ -481,19 +481,19 @@ static void bdberl_drv_ready_input(ErlDrvData handle, ErlDrvEvent event) } else { - printf("GET ERROR: %d\n", adata->rc); - if (adata->port->txn && (adata->rc == DB_LOCK_DEADLOCK || adata->rc == DB_LOCK_NOTGRANTED)) - { - // Transaction has been invalidated -- null it out to avoid multiple txn closure - adata->port->txn = 0; - } - ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"), ERL_DRV_INT, adata->rc, ERL_DRV_TUPLE, 2}; driver_output_term(d->port, response, sizeof(response) / sizeof(response[0])); } + // If a deadlock occurred while in a transaction, clear out the handle -- it's + // already invalid + if (d->txn && (adata->rc == DB_LOCK_DEADLOCK || adata->rc == DB_LOCK_NOTGRANTED)) + { + d->txn = 0; + } + // Cleanup async data and mark the port as not busy driver_free(d->async_data); d->async_data = 0; @@ -620,7 +620,7 @@ static int open_database(const char* name, DBTYPE type, PortData* data, int* dbr static int close_database(int dbref, PortData* data) { - printf("Closing %d for port %p\n", dbref, data->port); +// printf("Closing %d for port %p\n", dbref, data->port); // Remove this database from our list if (del_dbref(data, dbref)) @@ -662,7 +662,7 @@ static int close_database(int dbref, PortData* data) static void do_async_put(void* arg) { AsyncData* adata = (AsyncData*)arg; - printf("%p: do_async_put\n", adata->port->port); +// printf("%p: do_async_put\n", adata->port->port); // Setup DBTs DBT key; @@ -681,6 +681,12 @@ static void do_async_put(void* arg) // All databases are opened with AUTO_COMMIT, so if msg->port->txn is NULL, the put will still // be atomic adata->rc = adata->db->put(adata->db, adata->port->txn, &key, &value, 0); + + // If any error occurs while we have a txn action, abort it + if (adata->port->txn && adata->rc) + { + adata->port->txn->abort(adata->port->txn); + } // Enqueue a signal for the port to know that the operation is complete signal_port(adata->port); @@ -689,7 +695,7 @@ static void do_async_put(void* arg) static void do_async_get(void* arg) { AsyncData* adata = (AsyncData*)arg; - printf("%p: do_async_get\n", adata->port->port); +// printf("%p: do_async_get\n", adata->port->port); // Setup DBTs DBT key; @@ -721,13 +727,19 @@ static void do_async_get(void* arg) adata->payload_sz = value.size; // Not ulen -- we want the actual data size adata->rc = rc; + // If any error occurs while we have a txn action, abort it + if (adata->rc != DB_NOTFOUND && adata->port->txn && adata->rc) + { + adata->port->txn->abort(adata->port->txn); + } + signal_port(adata->port); } static void do_async_txnop(void* arg) { AsyncData* adata = (AsyncData*)arg; - printf("%p: do_async_txnop\n", adata->port->port); +// printf("%p: do_async_txnop\n", adata->port->port); // Execute the actual commit/abort if (adata->port->async_op == CMD_TXN_COMMIT) @@ -928,3 +940,21 @@ static void signal_port(PortData* d) int flag = 1; write(d->pipe_fds[1], &flag, sizeof(flag)); } + +/** + * Thread function that runs the deadlock checker periodically + */ +static void* deadlock_check(void* arg) +{ + while(G_DEADLOCK_CHECK_ACTIVE) + { + // Run the lock detection + G_DB_ENV->lock_detect(G_DB_ENV, 0, DB_LOCK_DEFAULT, 0); + + // TODO: Use nanosleep + usleep(G_DEADLOCK_CHECK_INTERVAL * 1000); + } + + printf("Deadlock checker exiting.\n"); + return 0; +} diff --git a/c_src/bdberl_tpool.c b/c_src/bdberl_tpool.c index 9d12276..6dc5a6b 100644 --- a/c_src/bdberl_tpool.c +++ b/c_src/bdberl_tpool.c @@ -176,13 +176,9 @@ static void* bdberl_tpool_main(void* arg) // Unlock to avoid blocking others UNLOCK(tpool); - printf("Invoking job\n"); - // Invoke the function (*(job->main_fn))(job->arg); - printf("Finished invoking job.\n"); - // Relock LOCK(tpool); diff --git a/src/bdberl_port.erl b/src/bdberl_port.erl index 4413f86..ddac33b 100644 --- a/src/bdberl_port.erl +++ b/src/bdberl_port.erl @@ -37,6 +37,7 @@ -define(ERROR_NO_TXN, -29004). % No transaction open on this port -define(ERROR_DB_LOCK_NOTGRANTED, -30993). % Lock was busy and not granted +-define(ERROR_DB_LOCK_DEADLOCK, -30994). % Deadlock occurred new() -> case erl_ddll:load_driver(code:priv_dir(bdberl), bdberl_drv) of @@ -72,7 +73,6 @@ close_database(Port, DbRef) -> txn_begin(Port) -> <> = erlang:port_control(Port, ?CMD_TXN_BEGIN, <<>>), - io:format("TXN BEGIN erl: ~p\n", [Result]), case decode_rc(Result) of ok -> ok; Error -> {error, {txn_begin, Error}} @@ -148,6 +148,7 @@ decode_rc(?ERROR_ASYNC_PENDING) -> async_pending; decode_rc(?ERROR_INVALID_DBREF) -> invalid_dbref; decode_rc(?ERROR_NO_TXN) -> no_txn; decode_rc(?ERROR_DB_LOCK_NOTGRANTED) -> lock_not_granted; +decode_rc(?ERROR_DB_LOCK_DEADLOCK) -> deadlock; decode_rc(Rc) -> {unknown, Rc}. diff --git a/test/thrash_SUITE.erl b/test/thrash_SUITE.erl index b669cdb..8e68032 100644 --- a/test/thrash_SUITE.erl +++ b/test/thrash_SUITE.erl @@ -11,7 +11,7 @@ all() -> [test_thrash]. --define(PROCS, 1). +-define(PROCS, 2). test_thrash(_Config) -> %% Spin up 15 processes (async thread pool is 10) @@ -43,15 +43,16 @@ thrash_run(Owner) -> {ok, 0} = bdberl_port:open_database(P, "thrash", btree), %% Start thrashing - thrash_incr_loop(P, Owner, 5000). + thrash_incr_loop(P, Owner, 1000). thrash_incr_loop(Port, Owner, 0) -> Owner ! {finished, self()}; thrash_incr_loop(Port, Owner, Count) -> + ct:print("~p\n", [Count]), %% Choose random key - Key = random:uniform(12), + Key = random:uniform(1200), %% Start a txn that will read the current value of the key and increment by 1 F = fun() -> @@ -59,15 +60,16 @@ thrash_incr_loop(Port, Owner, Count) -> not_found -> Value = 0; - {ok, Value} -> + Value -> Value end, put_or_die(Port, 0, Key, Value) end, - ok = do_txn(Port, F, 20), + ok = do_txn(Port, F, 0), thrash_incr_loop(Port, Owner, Count-1). + get_or_die(Port, DbRef, Key) -> case bdberl_port:get(Port, DbRef, Key) of not_found -> @@ -76,24 +78,21 @@ get_or_die(Port, DbRef, Key) -> Value end. + put_or_die(Port, DbRef, Key, Value) -> ok = bdberl_port:put(Port, DbRef, Key, Value). -do_txn(Port, F, 0) -> - ct:print("Max retries exceeded for txn; giving up!"), - failed; + do_txn(Port, F, Count) -> case bdberl_port:txn_begin(Port) of ok -> case catch(F()) of {'EXIT', Reason} -> - io:format("Txn failed; retrying. Last error: ~p\n", [Reason]), - do_txn(Port, F, Count-1); + io:format("Txn attempt ~p failed; retrying", [Count]), + do_txn(Port, F, Count+1); Other -> - io:format("Txn success (~p): ~p\n", [Count, Other]), ok = bdberl_port:txn_commit(Port) end; {error, Reason} -> - io:format("Txn failed(2); retrying. Last error: ~p\n", [Reason]), - do_txn(Port, F, Count-1) + do_txn(Port, F, Count+1) end.