Merge branch 'master' of ecommons@wax.hive:bdberl

This commit is contained in:
Dave Smith 2008-12-15 14:35:02 -07:00
commit 61207c1482
4 changed files with 51 additions and 44 deletions

View file

@ -22,7 +22,11 @@ end
rule ".o" => ["%X.c", "%X.h"] do |t| rule ".o" => ["%X.c", "%X.h"] do |t|
puts "compiling #{t.source}..." puts "compiling #{t.source}..."
sh "gcc -g -c -Wall -Werror -fPIC -Ic_src/system/include -I#{erts_dir()}/include #{t.source} -o #{t.name}", :verbose => false sh "gcc -g -c -Wall -Werror -fPIC #{dflag} -Ic_src/system/include -I#{erts_dir()}/include #{t.source} -o #{t.name}", :verbose => false
end
def dflag()
ENV["release"] ? "" : "-DDEBUG"
end end
task :compile_c => ['c_src'] + C_OBJS task :compile_c => ['c_src'] + C_OBJS

View file

@ -134,12 +134,16 @@ static TPool* G_TPOOL_TXNS;
erl_drv_mutex_unlock(d->port_lock); \ erl_drv_mutex_unlock(d->port_lock); \
}} }}
#ifdef DEBUG
# define DBG printf
#else
# define DBG(arg1,...)
#endif
DRIVER_INIT(bdberl_drv) DRIVER_INIT(bdberl_drv)
{ {
printf("DRIVER INIT\n"); DBG("DRIVER INIT\n");
// Setup flags we'll use to init the environment // Setup flags we'll use to init the environment
int flags = int flags =
DB_INIT_LOCK | /* Enable support for locking */ DB_INIT_LOCK | /* Enable support for locking */
@ -254,9 +258,9 @@ static void bdberl_drv_stop(ErlDrvData handle)
// Drop the lock prior to starting the wait for the async process // Drop the lock prior to starting the wait for the async process
erl_drv_mutex_unlock(d->port_lock); erl_drv_mutex_unlock(d->port_lock);
printf("Cancelling async job for port: %p\n", d->port); DBG("Cancelling async job for port: %p\n", d->port);
bdberl_tpool_cancel(d->async_pool, d->async_job); bdberl_tpool_cancel(d->async_pool, d->async_job);
printf("Canceled async job for port: %p\n", d->port); DBG("Canceled async job for port: %p\n", d->port);
} }
else else
{ {
@ -285,7 +289,7 @@ static void bdberl_drv_stop(ErlDrvData handle)
close_database(d->dbrefs->dbref, 0, d); close_database(d->dbrefs->dbref, 0, d);
} }
printf("Stopped port: %p\n", d->port); DBG("Stopped port: %p\n", d->port);
// Release the port instance data // Release the port instance data
driver_free(d->work_buffer); driver_free(d->work_buffer);
@ -313,7 +317,7 @@ static void bdberl_drv_finish()
erl_drv_rwlock_destroy(G_DATABASES_RWLOCK); erl_drv_rwlock_destroy(G_DATABASES_RWLOCK);
hive_hash_destroy(G_DATABASES_NAMES); hive_hash_destroy(G_DATABASES_NAMES);
printf("DRIVER_FINISH\n"); DBG("DRIVER_FINISH\n");
} }
static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
@ -697,7 +701,7 @@ static int close_database(int dbref, unsigned flags, PortData* data)
Database* database = &G_DATABASES[dbref]; Database* database = &G_DATABASES[dbref];
if (database->ports == 0) if (database->ports == 0)
{ {
printf("Closing actual database for dbref %d\n", dbref); DBG("Closing actual database for dbref %d\n", dbref);
// Close out the BDB handle // Close out the BDB handle
database->db->close(database->db, flags); database->db->close(database->db, flags);
@ -1229,14 +1233,14 @@ static void* deadlock_check(void* arg)
G_DB_ENV->lock_detect(G_DB_ENV, 0, DB_LOCK_DEFAULT, &count); G_DB_ENV->lock_detect(G_DB_ENV, 0, DB_LOCK_DEFAULT, &count);
if (count > 0) if (count > 0)
{ {
printf("Rejected deadlocks: %d\n", count); DBG("Rejected deadlocks: %d\n", count);
} }
// TODO: Use nanosleep // TODO: Use nanosleep
usleep(G_DEADLOCK_CHECK_INTERVAL * 1000); usleep(G_DEADLOCK_CHECK_INTERVAL * 1000);
} }
printf("Deadlock checker exiting.\n"); DBG("Deadlock checker exiting.\n");
return 0; return 0;
} }
@ -1253,7 +1257,7 @@ static void* trickle_write(void* arg)
// Enough time has passed -- time to run the trickle operation again // Enough time has passed -- time to run the trickle operation again
int pages_wrote = 0; int pages_wrote = 0;
G_DB_ENV->memp_trickle(G_DB_ENV, G_TRICKLE_PERCENTAGE, &pages_wrote); G_DB_ENV->memp_trickle(G_DB_ENV, G_TRICKLE_PERCENTAGE, &pages_wrote);
printf("Wrote %d pages to achieve %d trickle\n", pages_wrote, G_TRICKLE_PERCENTAGE); DBG("Wrote %d pages to achieve %d trickle\n", pages_wrote, G_TRICKLE_PERCENTAGE);
// Reset the counter // Reset the counter
elapsed_secs = 0; elapsed_secs = 0;
@ -1266,7 +1270,7 @@ static void* trickle_write(void* arg)
} }
} }
printf("Trickle writer exiting.\n"); DBG("Trickle writer exiting.\n");
return 0; return 0;
} }

View file

@ -12,7 +12,7 @@
txn_commit/0, txn_commit/1, txn_abort/0, txn_commit/0, txn_commit/1, txn_abort/0,
get_cache_size/0, set_cache_size/3, get_cache_size/0, set_cache_size/3,
get_txn_timeout/0, set_txn_timeout/1, get_txn_timeout/0, set_txn_timeout/1,
transaction/1, transaction/1, transaction/2,
put/3, put/4, put/3, put/4,
put_r/3, put_r/4, put_r/3, put_r/4,
put_commit/3, put_commit/4, put_commit/3, put_commit/4,
@ -24,6 +24,8 @@
-include("bdberl.hrl"). -include("bdberl.hrl").
-define(is_lock_error(Error), (Error =:= deadlock orelse Error =:= lock_not_granted)).
open(Name, Type) -> open(Name, Type) ->
open(Name, Type, [create]). open(Name, Type, [create]).
@ -97,18 +99,34 @@ txn_abort() ->
Error -> Error ->
{error, {txn_abort, Error}} {error, {txn_abort, Error}}
end. end.
transaction(Fun) -> transaction(Fun) ->
transaction(Fun, infinity).
transaction(_Fun, 0) ->
txn_abort(),
{error, {transaction_failed, retry_limit_reached}};
transaction(Fun, Retries) ->
txn_begin(), txn_begin(),
try Fun() of try Fun() of
abort -> abort ->
txn_abort(), txn_abort(),
{error, transaction_aborted}; {error, transaction_aborted};
Value -> Value ->
txn_commit(), txn_commit(),
{ok, Value} {ok, Value}
catch catch
_ : Reason -> throw : {error, {_Op, Error}} when ?is_lock_error(Error) ->
txn_abort(),
erlang:yield(),
R = case Retries of
infinity -> infinity;
Retries -> Retries - 1
end,
transaction(Fun, R);
_ : Reason ->
txn_abort(), txn_abort(),
{error, {transaction_failed, Reason}} {error, {transaction_failed, Reason}}
end. end.
@ -142,7 +160,6 @@ put_commit_r(Db, Key, Value, Opts) ->
ok -> ok; ok -> ok;
Error -> throw(Error) Error -> throw(Error)
end. end.
get(Db, Key) -> get(Db, Key) ->
get(Db, Key, []). get(Db, Key, []).
@ -178,7 +195,7 @@ update(Db, Key, Fun) ->
update(Db, Key, Fun, Args) -> update(Db, Key, Fun, Args) ->
F = fun() -> F = fun() ->
Value = case get(Db, Key, [rmw]) of Value = case get_r(Db, Key, [rmw]) of
not_found -> not_found; not_found -> not_found;
{ok, Val} -> Val {ok, Val} -> Val
end, end,
@ -186,7 +203,7 @@ update(Db, Key, Fun, Args) ->
undefined -> Fun(Key, Value); undefined -> Fun(Key, Value);
Args -> Fun(Key, Value, Args) Args -> Fun(Key, Value, Args)
end, end,
ok = put_commit(Db, Key, NewValue), put_commit_r(Db, Key, NewValue),
NewValue NewValue
end, end,
transaction(F). transaction(F).

View file

@ -29,7 +29,7 @@ wait_for_finish(0) ->
wait_for_finish(Count) -> wait_for_finish(Count) ->
receive receive
{finished, Pid} -> {finished, Pid} ->
io:format("~p is done; ~p remaining.\n", [Pid, Count-1]), ct:print("~p is done; ~p remaining.\n", [Pid, Count-1]),
wait_for_finish(Count-1) wait_for_finish(Count-1)
end. end.
@ -47,34 +47,16 @@ thrash_run(Owner) ->
thrash_incr_loop(Owner, 0) -> thrash_incr_loop(Owner, 0) ->
Owner ! {finished, self()}; Owner ! {finished, self()};
thrash_incr_loop(Owner, Count) -> thrash_incr_loop(Owner, Count) ->
ct:print("~p\n", [Count]), ct:print("~p", [Count]),
%% Choose random key %% Choose random key
Key = random:uniform(1200), Key = random:uniform(1200),
%% Start a txn that will read the current value of the key and increment by 1 %% Start a txn that will read the current value of the key and increment by 1
F = fun() -> F = fun(_Key, Value) ->
case bdberl:get(0, Key, [rmw]) of case Value of
not_found -> not_found -> 0;
Value = 0; Value -> Value + 1
end
{ok, Value} ->
Value
end,
ok = bdberl:put(0, Key, Value)
end, end,
ok = do_txn(F, 0), {ok, _} = bdberl:update(0, Key, F),
thrash_incr_loop(Owner, Count-1). thrash_incr_loop(Owner, Count-1).
do_txn(F, Count) ->
case bdberl:txn_begin() of
ok ->
case catch(F()) of
{'EXIT', _Reason} ->
io:format("Txn attempt ~p failed; retrying", [Count]),
do_txn(F, Count+1);
_Other ->
ok = bdberl:txn_commit()
end;
{error, _Reason} ->
do_txn(F, Count+1)
end.