diff --git a/Rakefile b/Rakefile index 54306d0..3254178 100644 --- a/Rakefile +++ b/Rakefile @@ -22,7 +22,11 @@ end rule ".o" => ["%X.c", "%X.h"] do |t| puts "compiling #{t.source}..." - sh "gcc -g -c -Wall -Werror -fPIC -Ic_src/system/include -I#{erts_dir()}/include #{t.source} -o #{t.name}", :verbose => false + sh "gcc -g -c -Wall -Werror -fPIC #{dflag} -Ic_src/system/include -I#{erts_dir()}/include #{t.source} -o #{t.name}", :verbose => false +end + +def dflag() + ENV["release"] ? "" : "-DDEBUG" end task :compile_c => ['c_src'] + C_OBJS diff --git a/c_src/bdberl_drv.c b/c_src/bdberl_drv.c index 0c870d2..6a27fa8 100644 --- a/c_src/bdberl_drv.c +++ b/c_src/bdberl_drv.c @@ -134,12 +134,16 @@ static TPool* G_TPOOL_TXNS; erl_drv_mutex_unlock(d->port_lock); \ }} - +#ifdef DEBUG +# define DBG printf +#else +# define DBG(arg1,...) +#endif DRIVER_INIT(bdberl_drv) { - printf("DRIVER INIT\n"); + DBG("DRIVER INIT\n"); // Setup flags we'll use to init the environment int flags = DB_INIT_LOCK | /* Enable support for locking */ @@ -254,9 +258,9 @@ static void bdberl_drv_stop(ErlDrvData handle) // Drop the lock prior to starting the wait for the async process erl_drv_mutex_unlock(d->port_lock); - printf("Cancelling async job for port: %p\n", d->port); + DBG("Cancelling async job for port: %p\n", d->port); bdberl_tpool_cancel(d->async_pool, d->async_job); - printf("Canceled async job for port: %p\n", d->port); + DBG("Canceled async job for port: %p\n", d->port); } else { @@ -285,7 +289,7 @@ static void bdberl_drv_stop(ErlDrvData handle) close_database(d->dbrefs->dbref, 0, d); } - printf("Stopped port: %p\n", d->port); + DBG("Stopped port: %p\n", d->port); // Release the port instance data driver_free(d->work_buffer); @@ -313,7 +317,7 @@ static void bdberl_drv_finish() erl_drv_rwlock_destroy(G_DATABASES_RWLOCK); hive_hash_destroy(G_DATABASES_NAMES); - printf("DRIVER_FINISH\n"); + DBG("DRIVER_FINISH\n"); } static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, @@ -697,7 +701,7 @@ static int close_database(int dbref, unsigned flags, PortData* data) Database* database = &G_DATABASES[dbref]; if (database->ports == 0) { - printf("Closing actual database for dbref %d\n", dbref); + DBG("Closing actual database for dbref %d\n", dbref); // Close out the BDB handle database->db->close(database->db, flags); @@ -1229,14 +1233,14 @@ static void* deadlock_check(void* arg) G_DB_ENV->lock_detect(G_DB_ENV, 0, DB_LOCK_DEFAULT, &count); if (count > 0) { - printf("Rejected deadlocks: %d\n", count); + DBG("Rejected deadlocks: %d\n", count); } // TODO: Use nanosleep usleep(G_DEADLOCK_CHECK_INTERVAL * 1000); } - printf("Deadlock checker exiting.\n"); + DBG("Deadlock checker exiting.\n"); return 0; } @@ -1253,7 +1257,7 @@ static void* trickle_write(void* arg) // Enough time has passed -- time to run the trickle operation again int pages_wrote = 0; G_DB_ENV->memp_trickle(G_DB_ENV, G_TRICKLE_PERCENTAGE, &pages_wrote); - printf("Wrote %d pages to achieve %d trickle\n", pages_wrote, G_TRICKLE_PERCENTAGE); + DBG("Wrote %d pages to achieve %d trickle\n", pages_wrote, G_TRICKLE_PERCENTAGE); // Reset the counter elapsed_secs = 0; @@ -1266,7 +1270,7 @@ static void* trickle_write(void* arg) } } - printf("Trickle writer exiting.\n"); + DBG("Trickle writer exiting.\n"); return 0; } diff --git a/src/bdberl.erl b/src/bdberl.erl index 5163686..2aa0f4a 100644 --- a/src/bdberl.erl +++ b/src/bdberl.erl @@ -12,7 +12,7 @@ txn_commit/0, txn_commit/1, txn_abort/0, get_cache_size/0, set_cache_size/3, get_txn_timeout/0, set_txn_timeout/1, - transaction/1, + transaction/1, transaction/2, put/3, put/4, put_r/3, put_r/4, put_commit/3, put_commit/4, @@ -24,6 +24,8 @@ -include("bdberl.hrl"). +-define(is_lock_error(Error), (Error =:= deadlock orelse Error =:= lock_not_granted)). + open(Name, Type) -> open(Name, Type, [create]). @@ -97,18 +99,34 @@ txn_abort() -> Error -> {error, {txn_abort, Error}} end. - + transaction(Fun) -> + transaction(Fun, infinity). + +transaction(_Fun, 0) -> + txn_abort(), + {error, {transaction_failed, retry_limit_reached}}; +transaction(Fun, Retries) -> txn_begin(), try Fun() of abort -> txn_abort(), {error, transaction_aborted}; + Value -> txn_commit(), {ok, Value} catch - _ : Reason -> + throw : {error, {_Op, Error}} when ?is_lock_error(Error) -> + txn_abort(), + erlang:yield(), + R = case Retries of + infinity -> infinity; + Retries -> Retries - 1 + end, + transaction(Fun, R); + + _ : Reason -> txn_abort(), {error, {transaction_failed, Reason}} end. @@ -142,7 +160,6 @@ put_commit_r(Db, Key, Value, Opts) -> ok -> ok; Error -> throw(Error) end. - get(Db, Key) -> get(Db, Key, []). @@ -178,7 +195,7 @@ update(Db, Key, Fun) -> update(Db, Key, Fun, Args) -> F = fun() -> - Value = case get(Db, Key, [rmw]) of + Value = case get_r(Db, Key, [rmw]) of not_found -> not_found; {ok, Val} -> Val end, @@ -186,7 +203,7 @@ update(Db, Key, Fun, Args) -> undefined -> Fun(Key, Value); Args -> Fun(Key, Value, Args) end, - ok = put_commit(Db, Key, NewValue), + put_commit_r(Db, Key, NewValue), NewValue end, transaction(F). diff --git a/test/thrash_SUITE.erl b/test/thrash_SUITE.erl index 21cb9d7..5e6b7a3 100644 --- a/test/thrash_SUITE.erl +++ b/test/thrash_SUITE.erl @@ -29,7 +29,7 @@ wait_for_finish(0) -> wait_for_finish(Count) -> receive {finished, Pid} -> - io:format("~p is done; ~p remaining.\n", [Pid, Count-1]), + ct:print("~p is done; ~p remaining.\n", [Pid, Count-1]), wait_for_finish(Count-1) end. @@ -47,34 +47,16 @@ thrash_run(Owner) -> thrash_incr_loop(Owner, 0) -> Owner ! {finished, self()}; thrash_incr_loop(Owner, Count) -> - ct:print("~p\n", [Count]), + ct:print("~p", [Count]), %% Choose random key Key = random:uniform(1200), %% Start a txn that will read the current value of the key and increment by 1 - F = fun() -> - case bdberl:get(0, Key, [rmw]) of - not_found -> - Value = 0; - - {ok, Value} -> - Value - end, - ok = bdberl:put(0, Key, Value) + F = fun(_Key, Value) -> + case Value of + not_found -> 0; + Value -> Value + 1 + end end, - ok = do_txn(F, 0), + {ok, _} = bdberl:update(0, Key, F), thrash_incr_loop(Owner, Count-1). - -do_txn(F, Count) -> - case bdberl:txn_begin() of - ok -> - case catch(F()) of - {'EXIT', _Reason} -> - io:format("Txn attempt ~p failed; retrying", [Count]), - do_txn(F, Count+1); - _Other -> - ok = bdberl:txn_commit() - end; - {error, _Reason} -> - do_txn(F, Count+1) - end.