Now with 100% more threadiness and 50% less deadlocking

This commit is contained in:
Dave Smith 2008-12-10 19:47:00 -07:00
parent ed0752019f
commit a71f18ee1b
4 changed files with 85 additions and 59 deletions

View file

@ -38,6 +38,7 @@ static int alloc_dbref();
static void* zalloc(unsigned int size); static void* zalloc(unsigned int size);
static void signal_port(PortData* d); static void signal_port(PortData* d);
static void* deadlock_check(void* arg);
/** /**
* Global instance of DB_ENV; only a single one exists per O/S process. * Global instance of DB_ENV; only a single one exists per O/S process.
@ -70,6 +71,12 @@ static int G_DATABASES_SIZE;
static ErlDrvRWLock* G_DATABASES_RWLOCK; static ErlDrvRWLock* G_DATABASES_RWLOCK;
static hive_hash* G_DATABASES_NAMES; static hive_hash* G_DATABASES_NAMES;
/**
* G_DEADLOCK_* a
*/
static ErlDrvTid G_DEADLOCK_THREAD;
static unsigned int G_DEADLOCK_CHECK_ACTIVE = 1;
static unsigned int G_DEADLOCK_CHECK_INTERVAL = 100;
/** /**
* *
@ -130,12 +137,6 @@ DRIVER_INIT(bdberl_drv)
// Make sure we can distiguish between lock timeouts and deadlocks // Make sure we can distiguish between lock timeouts and deadlocks
int rc = G_DB_ENV->set_flags(G_DB_ENV, DB_TIME_NOTGRANTED, 1); int rc = G_DB_ENV->set_flags(G_DB_ENV, DB_TIME_NOTGRANTED, 1);
printf("TIME_NOT_GRANTED rc: %d\n", rc); printf("TIME_NOT_GRANTED rc: %d\n", rc);
rc = G_DB_ENV->set_lk_detect(G_DB_ENV, 1);
printf("LOCK DETECT rc: %d\n", rc);
rc = G_DB_ENV->set_timeout(G_DB_ENV, 50000000, DB_SET_TXN_TIMEOUT);
printf("DB_SET_TXN_TIMEOUT rc: %d\n", rc);
rc = G_DB_ENV->set_timeout(G_DB_ENV, 50000000, DB_SET_LOCK_TIMEOUT);
printf("DB_SET_LOCK_TIMEOUT rc: %d\n", rc);
// BDB is setup -- allocate structures for tracking databases // BDB is setup -- allocate structures for tracking databases
G_DATABASES = (Database*) driver_alloc(sizeof(Database) * G_DATABASES_SIZE); G_DATABASES = (Database*) driver_alloc(sizeof(Database) * G_DATABASES_SIZE);
@ -143,6 +144,10 @@ DRIVER_INIT(bdberl_drv)
G_DATABASES_RWLOCK = erl_drv_rwlock_create("bdberl_drv: G_DATABASES_RWLOCK"); G_DATABASES_RWLOCK = erl_drv_rwlock_create("bdberl_drv: G_DATABASES_RWLOCK");
G_DATABASES_NAMES = hive_hash_new(G_DATABASES_SIZE); G_DATABASES_NAMES = hive_hash_new(G_DATABASES_SIZE);
// Startup deadlock check thread
erl_drv_thread_create("bdberl_drv_deadlock_checker", &G_DEADLOCK_THREAD,
&deadlock_check, 0, 0);
// Startup our thread pools // Startup our thread pools
// TODO: Make configurable/adjustable // TODO: Make configurable/adjustable
G_TPOOL_GENERAL = bdberl_tpool_start(5); G_TPOOL_GENERAL = bdberl_tpool_start(5);
@ -191,7 +196,7 @@ static ErlDrvData bdberl_drv_start(ErlDrvPort port, char* buffer)
static void bdberl_drv_stop(ErlDrvData handle) static void bdberl_drv_stop(ErlDrvData handle)
{ {
PortData* d = (PortData*)handle; PortData* d = (PortData*)handle;
printf("%p: stop\n", d->port); // printf("%p: stop\n", d->port);
// If there is an async job pending, we need to cancel it. The cancel operation will // If there is an async job pending, we need to cancel it. The cancel operation will
// block until the job has either been removed or has run // block until the job has either been removed or has run
@ -227,11 +232,14 @@ static void bdberl_drv_stop(ErlDrvData handle)
static void bdberl_drv_finish() static void bdberl_drv_finish()
{ {
printf("finish\n");
// Stop the thread pools // Stop the thread pools
bdberl_tpool_stop(G_TPOOL_GENERAL); bdberl_tpool_stop(G_TPOOL_GENERAL);
bdberl_tpool_stop(G_TPOOL_TXNS); bdberl_tpool_stop(G_TPOOL_TXNS);
// Signal the deadlock checker to shutdown -- then wait for it
G_DEADLOCK_CHECK_ACTIVE = 0;
erl_drv_thread_join(G_DEADLOCK_THREAD, 0);
// Cleanup and shut down the BDB environment. Note that we assume // Cleanup and shut down the BDB environment. Note that we assume
// all ports have been released and thuse all databases/txns/etc are also gone. // all ports have been released and thuse all databases/txns/etc are also gone.
G_DB_ENV->close(G_DB_ENV, 0); G_DB_ENV->close(G_DB_ENV, 0);
@ -239,7 +247,7 @@ static void bdberl_drv_finish()
erl_drv_rwlock_destroy(G_DATABASES_RWLOCK); erl_drv_rwlock_destroy(G_DATABASES_RWLOCK);
hive_hash_destroy(G_DATABASES_NAMES); hive_hash_destroy(G_DATABASES_NAMES);
printf("finished\n"); printf("DRIVER_FINISH\n");
} }
static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
@ -299,13 +307,11 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
// If we already have a txn open, fail // If we already have a txn open, fail
if (d->txn != 0) if (d->txn != 0)
{ {
printf("ERROR_TXN_OPEN\n");
RETURN_INT(ERROR_TXN_OPEN, outbuf); RETURN_INT(ERROR_TXN_OPEN, outbuf);
} }
// Outbuf is <<Rc:32>> // Outbuf is <<Rc:32>>
int rc = G_DB_ENV->txn_begin(G_DB_ENV, 0, &(d->txn), 0); int rc = G_DB_ENV->txn_begin(G_DB_ENV, 0, &(d->txn), 0);
printf("TXN BEGIN: %d\n", rc);
RETURN_INT(rc, outbuf); RETURN_INT(rc, outbuf);
} }
case CMD_TXN_COMMIT: case CMD_TXN_COMMIT:
@ -408,7 +414,8 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
static void bdberl_drv_ready_input(ErlDrvData handle, ErlDrvEvent event) static void bdberl_drv_ready_input(ErlDrvData handle, ErlDrvEvent event)
{ {
PortData* d = (PortData*)handle; PortData* d = (PortData*)handle;
printf("%p: ready_input\n", d->port); // printf("%p: ready_input; cmd = %d; rc = %d\n", d->port, d->async_op,
// ((AsyncData*)d->async_data)->rc);
// Empty out the queue // Empty out the queue
int readbuf; int readbuf;
@ -422,17 +429,10 @@ static void bdberl_drv_ready_input(ErlDrvData handle, ErlDrvEvent event)
case CMD_TXN_COMMIT: case CMD_TXN_COMMIT:
case CMD_TXN_ABORT: case CMD_TXN_ABORT:
{ {
// If this was a commit/abort, clear out the txn handle
if (d->async_op == CMD_TXN_COMMIT || d->async_op == CMD_TXN_ABORT)
{
d->txn = 0;
}
// Extract return code == if it's zero, send back "ok" to driver process; otherwise
// send a {error, Reason} tuple
AsyncData* adata = (AsyncData*)d->async_data; AsyncData* adata = (AsyncData*)d->async_data;
printf("%p: %d op; rc = %d\n", d->port, d->async_op, adata->rc);
// Extract return code == if it's zero, send back "ok" to driver process; otherwise
// send a {error, Reason} tuple
if (adata->rc == 0) if (adata->rc == 0)
{ {
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("ok") }; ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("ok") };
@ -440,19 +440,20 @@ static void bdberl_drv_ready_input(ErlDrvData handle, ErlDrvEvent event)
} }
else else
{ {
// If we have a transactional PUT and a deadlock occurred, clear out txn ptr
if (d->async_op == CMD_GET && adata->port->txn &&
(adata->rc == DB_LOCK_DEADLOCK || adata->rc == DB_LOCK_NOTGRANTED))
{
d->txn = 0;
}
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"), ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"),
ERL_DRV_INT, adata->rc, ERL_DRV_INT, adata->rc,
ERL_DRV_TUPLE, 2}; ERL_DRV_TUPLE, 2};
driver_output_term(d->port, response, sizeof(response) / sizeof(response[0])); driver_output_term(d->port, response, sizeof(response) / sizeof(response[0]));
} }
// If this was a commit/abort, or a deadlock occurred while in a transaction,
// clear out the handle -- it's already invalid
if (d->async_op == CMD_TXN_COMMIT || d->async_op == CMD_TXN_ABORT ||
(d->txn && (adata->rc == DB_LOCK_NOTGRANTED || adata->rc == DB_LOCK_DEADLOCK)))
{
d->txn = 0;
}
// Cleanup async data and mark the port as not busy // Cleanup async data and mark the port as not busy
driver_free(d->async_data); driver_free(d->async_data);
d->async_data = 0; d->async_data = 0;
@ -466,7 +467,6 @@ static void bdberl_drv_ready_input(ErlDrvData handle, ErlDrvEvent event)
// Extract return code == if it's zero, send back {ok, Payload} or not_found to driver // Extract return code == if it's zero, send back {ok, Payload} or not_found to driver
// process; otherwise send a {error, Reason} tuple // process; otherwise send a {error, Reason} tuple
AsyncData* adata = (AsyncData*)d->async_data; AsyncData* adata = (AsyncData*)d->async_data;
printf("%p: %d op; rc = %d\n", d->port, d->async_op, adata->rc);
if (adata->rc == 0) if (adata->rc == 0)
{ {
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("ok"), ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("ok"),
@ -481,19 +481,19 @@ static void bdberl_drv_ready_input(ErlDrvData handle, ErlDrvEvent event)
} }
else else
{ {
printf("GET ERROR: %d\n", adata->rc);
if (adata->port->txn && (adata->rc == DB_LOCK_DEADLOCK || adata->rc == DB_LOCK_NOTGRANTED))
{
// Transaction has been invalidated -- null it out to avoid multiple txn closure
adata->port->txn = 0;
}
ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"), ErlDrvTermData response[] = { ERL_DRV_ATOM, driver_mk_atom("error"),
ERL_DRV_INT, adata->rc, ERL_DRV_INT, adata->rc,
ERL_DRV_TUPLE, 2}; ERL_DRV_TUPLE, 2};
driver_output_term(d->port, response, sizeof(response) / sizeof(response[0])); driver_output_term(d->port, response, sizeof(response) / sizeof(response[0]));
} }
// If a deadlock occurred while in a transaction, clear out the handle -- it's
// already invalid
if (d->txn && (adata->rc == DB_LOCK_DEADLOCK || adata->rc == DB_LOCK_NOTGRANTED))
{
d->txn = 0;
}
// Cleanup async data and mark the port as not busy // Cleanup async data and mark the port as not busy
driver_free(d->async_data); driver_free(d->async_data);
d->async_data = 0; d->async_data = 0;
@ -620,7 +620,7 @@ static int open_database(const char* name, DBTYPE type, PortData* data, int* dbr
static int close_database(int dbref, PortData* data) static int close_database(int dbref, PortData* data)
{ {
printf("Closing %d for port %p\n", dbref, data->port); // printf("Closing %d for port %p\n", dbref, data->port);
// Remove this database from our list // Remove this database from our list
if (del_dbref(data, dbref)) if (del_dbref(data, dbref))
@ -662,7 +662,7 @@ static int close_database(int dbref, PortData* data)
static void do_async_put(void* arg) static void do_async_put(void* arg)
{ {
AsyncData* adata = (AsyncData*)arg; AsyncData* adata = (AsyncData*)arg;
printf("%p: do_async_put\n", adata->port->port); // printf("%p: do_async_put\n", adata->port->port);
// Setup DBTs // Setup DBTs
DBT key; DBT key;
@ -681,6 +681,12 @@ static void do_async_put(void* arg)
// All databases are opened with AUTO_COMMIT, so if msg->port->txn is NULL, the put will still // All databases are opened with AUTO_COMMIT, so if msg->port->txn is NULL, the put will still
// be atomic // be atomic
adata->rc = adata->db->put(adata->db, adata->port->txn, &key, &value, 0); adata->rc = adata->db->put(adata->db, adata->port->txn, &key, &value, 0);
// If any error occurs while we have a txn action, abort it
if (adata->port->txn && adata->rc)
{
adata->port->txn->abort(adata->port->txn);
}
// Enqueue a signal for the port to know that the operation is complete // Enqueue a signal for the port to know that the operation is complete
signal_port(adata->port); signal_port(adata->port);
@ -689,7 +695,7 @@ static void do_async_put(void* arg)
static void do_async_get(void* arg) static void do_async_get(void* arg)
{ {
AsyncData* adata = (AsyncData*)arg; AsyncData* adata = (AsyncData*)arg;
printf("%p: do_async_get\n", adata->port->port); // printf("%p: do_async_get\n", adata->port->port);
// Setup DBTs // Setup DBTs
DBT key; DBT key;
@ -721,13 +727,19 @@ static void do_async_get(void* arg)
adata->payload_sz = value.size; // Not ulen -- we want the actual data size adata->payload_sz = value.size; // Not ulen -- we want the actual data size
adata->rc = rc; adata->rc = rc;
// If any error occurs while we have a txn action, abort it
if (adata->rc != DB_NOTFOUND && adata->port->txn && adata->rc)
{
adata->port->txn->abort(adata->port->txn);
}
signal_port(adata->port); signal_port(adata->port);
} }
static void do_async_txnop(void* arg) static void do_async_txnop(void* arg)
{ {
AsyncData* adata = (AsyncData*)arg; AsyncData* adata = (AsyncData*)arg;
printf("%p: do_async_txnop\n", adata->port->port); // printf("%p: do_async_txnop\n", adata->port->port);
// Execute the actual commit/abort // Execute the actual commit/abort
if (adata->port->async_op == CMD_TXN_COMMIT) if (adata->port->async_op == CMD_TXN_COMMIT)
@ -928,3 +940,21 @@ static void signal_port(PortData* d)
int flag = 1; int flag = 1;
write(d->pipe_fds[1], &flag, sizeof(flag)); write(d->pipe_fds[1], &flag, sizeof(flag));
} }
/**
* Thread function that runs the deadlock checker periodically
*/
static void* deadlock_check(void* arg)
{
while(G_DEADLOCK_CHECK_ACTIVE)
{
// Run the lock detection
G_DB_ENV->lock_detect(G_DB_ENV, 0, DB_LOCK_DEFAULT, 0);
// TODO: Use nanosleep
usleep(G_DEADLOCK_CHECK_INTERVAL * 1000);
}
printf("Deadlock checker exiting.\n");
return 0;
}

View file

@ -176,13 +176,9 @@ static void* bdberl_tpool_main(void* arg)
// Unlock to avoid blocking others // Unlock to avoid blocking others
UNLOCK(tpool); UNLOCK(tpool);
printf("Invoking job\n");
// Invoke the function // Invoke the function
(*(job->main_fn))(job->arg); (*(job->main_fn))(job->arg);
printf("Finished invoking job.\n");
// Relock // Relock
LOCK(tpool); LOCK(tpool);

View file

@ -37,6 +37,7 @@
-define(ERROR_NO_TXN, -29004). % No transaction open on this port -define(ERROR_NO_TXN, -29004). % No transaction open on this port
-define(ERROR_DB_LOCK_NOTGRANTED, -30993). % Lock was busy and not granted -define(ERROR_DB_LOCK_NOTGRANTED, -30993). % Lock was busy and not granted
-define(ERROR_DB_LOCK_DEADLOCK, -30994). % Deadlock occurred
new() -> new() ->
case erl_ddll:load_driver(code:priv_dir(bdberl), bdberl_drv) of case erl_ddll:load_driver(code:priv_dir(bdberl), bdberl_drv) of
@ -72,7 +73,6 @@ close_database(Port, DbRef) ->
txn_begin(Port) -> txn_begin(Port) ->
<<Result:32/native>> = erlang:port_control(Port, ?CMD_TXN_BEGIN, <<>>), <<Result:32/native>> = erlang:port_control(Port, ?CMD_TXN_BEGIN, <<>>),
io:format("TXN BEGIN erl: ~p\n", [Result]),
case decode_rc(Result) of case decode_rc(Result) of
ok -> ok; ok -> ok;
Error -> {error, {txn_begin, Error}} Error -> {error, {txn_begin, Error}}
@ -148,6 +148,7 @@ decode_rc(?ERROR_ASYNC_PENDING) -> async_pending;
decode_rc(?ERROR_INVALID_DBREF) -> invalid_dbref; decode_rc(?ERROR_INVALID_DBREF) -> invalid_dbref;
decode_rc(?ERROR_NO_TXN) -> no_txn; decode_rc(?ERROR_NO_TXN) -> no_txn;
decode_rc(?ERROR_DB_LOCK_NOTGRANTED) -> lock_not_granted; decode_rc(?ERROR_DB_LOCK_NOTGRANTED) -> lock_not_granted;
decode_rc(?ERROR_DB_LOCK_DEADLOCK) -> deadlock;
decode_rc(Rc) -> {unknown, Rc}. decode_rc(Rc) -> {unknown, Rc}.

View file

@ -11,7 +11,7 @@
all() -> all() ->
[test_thrash]. [test_thrash].
-define(PROCS, 1). -define(PROCS, 2).
test_thrash(_Config) -> test_thrash(_Config) ->
%% Spin up 15 processes (async thread pool is 10) %% Spin up 15 processes (async thread pool is 10)
@ -43,15 +43,16 @@ thrash_run(Owner) ->
{ok, 0} = bdberl_port:open_database(P, "thrash", btree), {ok, 0} = bdberl_port:open_database(P, "thrash", btree),
%% Start thrashing %% Start thrashing
thrash_incr_loop(P, Owner, 5000). thrash_incr_loop(P, Owner, 1000).
thrash_incr_loop(Port, Owner, 0) -> thrash_incr_loop(Port, Owner, 0) ->
Owner ! {finished, self()}; Owner ! {finished, self()};
thrash_incr_loop(Port, Owner, Count) -> thrash_incr_loop(Port, Owner, Count) ->
ct:print("~p\n", [Count]),
%% Choose random key %% Choose random key
Key = random:uniform(12), Key = random:uniform(1200),
%% Start a txn that will read the current value of the key and increment by 1 %% Start a txn that will read the current value of the key and increment by 1
F = fun() -> F = fun() ->
@ -59,15 +60,16 @@ thrash_incr_loop(Port, Owner, Count) ->
not_found -> not_found ->
Value = 0; Value = 0;
{ok, Value} -> Value ->
Value Value
end, end,
put_or_die(Port, 0, Key, Value) put_or_die(Port, 0, Key, Value)
end, end,
ok = do_txn(Port, F, 20), ok = do_txn(Port, F, 0),
thrash_incr_loop(Port, Owner, Count-1). thrash_incr_loop(Port, Owner, Count-1).
get_or_die(Port, DbRef, Key) -> get_or_die(Port, DbRef, Key) ->
case bdberl_port:get(Port, DbRef, Key) of case bdberl_port:get(Port, DbRef, Key) of
not_found -> not_found ->
@ -76,24 +78,21 @@ get_or_die(Port, DbRef, Key) ->
Value Value
end. end.
put_or_die(Port, DbRef, Key, Value) -> put_or_die(Port, DbRef, Key, Value) ->
ok = bdberl_port:put(Port, DbRef, Key, Value). ok = bdberl_port:put(Port, DbRef, Key, Value).
do_txn(Port, F, 0) ->
ct:print("Max retries exceeded for txn; giving up!"),
failed;
do_txn(Port, F, Count) -> do_txn(Port, F, Count) ->
case bdberl_port:txn_begin(Port) of case bdberl_port:txn_begin(Port) of
ok -> ok ->
case catch(F()) of case catch(F()) of
{'EXIT', Reason} -> {'EXIT', Reason} ->
io:format("Txn failed; retrying. Last error: ~p\n", [Reason]), io:format("Txn attempt ~p failed; retrying", [Count]),
do_txn(Port, F, Count-1); do_txn(Port, F, Count+1);
Other -> Other ->
io:format("Txn success (~p): ~p\n", [Count, Other]),
ok = bdberl_port:txn_commit(Port) ok = bdberl_port:txn_commit(Port)
end; end;
{error, Reason} -> {error, Reason} ->
io:format("Txn failed(2); retrying. Last error: ~p\n", [Reason]), do_txn(Port, F, Count+1)
do_txn(Port, F, Count-1)
end. end.