Merge branch 'master' of ecommons@wax.hive:bdberl

Conflicts:
	src/bdberl_port.erl
This commit is contained in:
Phillip Toland 2008-12-11 15:18:10 -06:00
commit a76a2bb5fa
6 changed files with 205 additions and 12 deletions

View file

@ -22,6 +22,8 @@
static int open_database(const char* name, DBTYPE type, unsigned flags, PortData* data, int* errno);
static int close_database(int dbref, unsigned flags, PortData* data);
static void tune_system(int target, void* values, BinHelper* bh);
static void do_async_put(void* arg);
static void do_async_get(void* arg);
static void do_async_txnop(void* arg);
@ -39,6 +41,7 @@ static void* zalloc(unsigned int size);
static void signal_port(PortData* d);
static void* deadlock_check(void* arg);
static void* trickle_write(void* arg);
/**
* Global instance of DB_ENV; only a single one exists per O/S process.
@ -71,12 +74,25 @@ static int G_DATABASES_SIZE;
static ErlDrvRWLock* G_DATABASES_RWLOCK;
static hive_hash* G_DATABASES_NAMES;
/**
* G_DEADLOCK_* a
* Deadlock detector thread variables. We run a single thread per VM to detect deadlocks within
* our global environment. G_DEADLOCK_CHECK_INTERVAL is the time between runs in milliseconds.
*/
static ErlDrvTid G_DEADLOCK_THREAD;
static unsigned int G_DEADLOCK_CHECK_ACTIVE = 1;
static unsigned int G_DEADLOCK_CHECK_INTERVAL = 100;
static unsigned int G_DEADLOCK_CHECK_ACTIVE = 1;
static unsigned int G_DEADLOCK_CHECK_INTERVAL = 100; /* Milliseconds between checks */
/**
* Trickle writer for dirty pages. We run a single thread per VM to perform background
* trickling of dirty pages to disk. G_TRICKLE_INTERVAL is the time between runs in seconds.
*/
static ErlDrvTid G_TRICKLE_THREAD;
static unsigned int G_TRICKLE_ACTIVE = 1;
static unsigned int G_TRICKLE_INTERVAL = 60 * 15; /* Seconds between trickle writes */
static unsigned int G_TRICKLE_PERCENTAGE = 10; /* Desired % of clean pages in cache */
/**
*
@ -99,7 +115,7 @@ static TPool* G_TPOOL_TXNS;
#define UNPACK_STRING(_buf, _off) (char*)(_buf+(_off))
#define UNPACK_BLOB(_buf, _off) (void*)(_buf+(_off))
#define RETURN_BH(bh, outbuf) *outbuf = (char*)bh.bin; return bh.bin->orig_size;
#define RETURN_BH(bh, outbuf) *outbuf = (char*)bh.bin; return bh.offset;
#define RETURN_INT(val, outbuf) { \
BinHelper bh; \
@ -123,6 +139,7 @@ DRIVER_INIT(bdberl_drv)
// Initialize global environment -- use environment variable DB_HOME to
// specify where the working directory is
db_env_create(&G_DB_ENV, 0);
G_DB_ENV_ERROR = G_DB_ENV->open(G_DB_ENV, 0, flags, 0);
if (G_DB_ENV_ERROR == 0)
{
@ -157,10 +174,14 @@ DRIVER_INIT(bdberl_drv)
erl_drv_thread_create("bdberl_drv_deadlock_checker", &G_DEADLOCK_THREAD,
&deadlock_check, 0, 0);
// Startup trickle write thread
erl_drv_thread_create("bdberl_drv_trickle_write", &G_TRICKLE_THREAD,
&trickle_write, 0, 0);
// Startup our thread pools
// TODO: Make configurable/adjustable
G_TPOOL_GENERAL = bdberl_tpool_start(5);
G_TPOOL_TXNS = bdberl_tpool_start(5);
G_TPOOL_GENERAL = bdberl_tpool_start(10);
G_TPOOL_TXNS = bdberl_tpool_start(10);
}
else
{
@ -245,6 +266,10 @@ static void bdberl_drv_finish()
bdberl_tpool_stop(G_TPOOL_GENERAL);
bdberl_tpool_stop(G_TPOOL_TXNS);
// Signal the trickle write thread to shutdown
G_TRICKLE_ACTIVE = 0;
erl_drv_thread_join(G_TRICKLE_THREAD, 0);
// Signal the deadlock checker to shutdown -- then wait for it
G_DEADLOCK_CHECK_ACTIVE = 0;
erl_drv_thread_join(G_DEADLOCK_THREAD, 0);
@ -427,6 +452,20 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
RETURN_INT(ERROR_INVALID_DBREF, outbuf);
}
}
case CMD_TUNE:
{
// Inbuf is: << Target:32, Values/binary >>
int target = UNPACK_INT(inbuf, 0);
char* values = UNPACK_BLOB(inbuf, 4);
// Setup a binhelper
BinHelper bh;
// Execute the tuning -- the result to send back to the caller is wrapped
// up in the provided binhelper
tune_system(target, values, &bh);
RETURN_BH(bh, outbuf);
}
}
*outbuf = 0;
return 0;
@ -681,6 +720,56 @@ static int close_database(int dbref, unsigned flags, PortData* data)
return 0;
}
/**
* Given a target system parameter/action adjust/return the requested value
*/
static void tune_system(int target, void* values, BinHelper* bh)
{
switch(target)
{
case SYSP_CACHESIZE_SET:
{
unsigned int gbytes = UNPACK_INT(values, 0);
unsigned int bytes = UNPACK_INT(values, 4);
unsigned int ncache = UNPACK_INT(values, 8);
int rc = G_DB_ENV->set_cachesize(G_DB_ENV, gbytes, bytes, ncache);
bin_helper_init(bh, 4);
bin_helper_push_int32(bh, rc);
break;
}
case SYSP_CACHESIZE_GET:
{
unsigned int gbytes = 0;
unsigned int bytes = 0;
int caches = 0;
int rc = G_DB_ENV->get_cachesize(G_DB_ENV, &gbytes, &bytes, &caches);
bin_helper_init(bh, 16);
bin_helper_push_int32(bh, rc);
bin_helper_push_int32(bh, gbytes);
bin_helper_push_int32(bh, bytes);
bin_helper_push_int32(bh, caches);
break;
}
case SYSP_TXN_TIMEOUT_SET:
{
unsigned int timeout = UNPACK_INT(values, 0);
int rc = G_DB_ENV->set_timeout(G_DB_ENV, timeout, DB_SET_TXN_TIMEOUT);
bin_helper_init(bh, 4);
bin_helper_push_int32(bh, rc);
break;
}
case SYSP_TXN_TIMEOUT_GET:
{
unsigned int timeout = 0;
int rc = G_DB_ENV->get_timeout(G_DB_ENV, &timeout, DB_SET_TXN_TIMEOUT);
bin_helper_init(bh, 8);
bin_helper_push_int32(bh, rc);
bin_helper_push_int32(bh, timeout);
break;
}
}
}
static void do_async_put(void* arg)
{
// Payload is: <<DbRef:32, Flags:32, KeyLen:32, Key:KeyLen, ValLen:32, Val:ValLen>>
@ -986,3 +1075,33 @@ static void* deadlock_check(void* arg)
printf("Deadlock checker exiting.\n");
return 0;
}
/**
* Thread function that trickle writes dirty pages to disk
*/
static void* trickle_write(void* arg)
{
int elapsed_secs = 0;
while(G_TRICKLE_ACTIVE)
{
if (elapsed_secs == G_TRICKLE_INTERVAL)
{
// Enough time has passed -- time to run the trickle operation again
int pages_wrote = 0;
G_DB_ENV->memp_trickle(G_DB_ENV, G_TRICKLE_PERCENTAGE, &pages_wrote);
printf("Wrote %d pages to achieve %d trickle\n", pages_wrote, G_TRICKLE_PERCENTAGE);
// Reset the counter
elapsed_secs = 0;
}
else
{
// TODO: Use nanosleep
usleep(1000 * 1000); /* Sleep for 1 second */
elapsed_secs++;
}
}
printf("Trickle writer exiting.\n");
return 0;
}

View file

@ -38,7 +38,8 @@ static void bdberl_drv_ready_input(ErlDrvData handle, ErlDrvEvent ev);
#define CMD_TXN_ABORT 5
#define CMD_GET 6
#define CMD_PUT 7
#define CMD_PUT_ATOMIC 8
#define CMD_TUNE 8
/**
* Command status values
@ -62,6 +63,19 @@ static void bdberl_drv_ready_input(ErlDrvData handle, ErlDrvEvent ev);
#define ERROR_TXN_OPEN (-29003) /* Transaction already active on this port */
#define ERROR_NO_TXN (-29004) /* No transaction open on this port */
/**
* Tunable system parameters/actions
*/
#define SYSP_CACHESIZE_SET 0
#define SYSP_CACHESIZE_GET 1
#define SYSP_TXN_TIMEOUT_SET 2
#define SYSP_TXN_TIMEOUT_GET 3
#define SYSP_TXN_TIMEOUT 1
#define SYSP_DEADLOCK_CHECK_INTERVAL 2
#define SYSP_TRICKLE_INTERVAL 3
#define SYSP_TRICKLE_PERCENTAGE 4
/**
* Driver Entry
*/

View file

@ -13,11 +13,16 @@
-define(CMD_TXN_ABORT, 5).
-define(CMD_GET, 6).
-define(CMD_PUT, 7).
-define(CMD_PUT_ATOMIC, 8).
-define(CMD_TUNE, 8).
-define(DB_TYPE_BTREE, 1).
-define(DB_TYPE_HASH, 2).
-define(SYSP_CACHESIZE_SET, 0).
-define(SYSP_CACHESIZE_GET, 1).
-define(SYSP_TXN_TIMEOUT_SET, 2).
-define(SYSP_TXN_TIMEOUT_GET, 3).
-define(STATUS_OK, 0).
-define(STATUS_ERROR, 1).

View file

@ -11,6 +11,8 @@
close_database/2,
txn_begin/1, txn_begin/2,
txn_commit/1, txn_commit/2, txn_abort/1,
get_cache_size/1, set_cache_size/4,
get_txn_timeout/1, set_txn_timeout/2,
put/4, put/5,
get/3, get/4]).
@ -133,6 +135,49 @@ get(Port, DbRef, Key, Opts) ->
Error ->
{error, {get, decode_rc(Error)}}
end.
get_cache_size(Port) ->
Cmd = <<?SYSP_CACHESIZE_GET:32/native>>,
<<Result:32/signed-native, Gbytes:32/native, Bytes:32/native, Ncaches:32/native>> =
erlang:port_control(Port, ?CMD_TUNE, Cmd),
case Result of
0 ->
{ok, Gbytes, Bytes, Ncaches};
_ ->
{error, Result}
end.
set_cache_size(Port, Gbytes, Bytes, Ncaches) ->
Cmd = <<?SYSP_CACHESIZE_SET:32/native, Gbytes:32/native, Bytes:32/native, Ncaches:32/native>>,
<<Result:32/signed-native>> = erlang:port_control(Port, ?CMD_TUNE, Cmd),
case Result of
0 ->
ok;
_ ->
{error, Result}
end.
get_txn_timeout(Port) ->
Cmd = <<?SYSP_TXN_TIMEOUT_GET:32/native>>,
<<Result:32/signed-native, Timeout:32/native>> = erlang:port_control(Port, ?CMD_TUNE, Cmd),
case Result of
0 ->
{ok, Timeout};
_ ->
{error, Result}
end.
set_txn_timeout(Port, Timeout) ->
Cmd = <<?SYSP_TXN_TIMEOUT_SET:32/native, Timeout:32/native>>,
<<Result:32/signed-native>> = erlang:port_control(Port, ?CMD_TUNE, Cmd),
case Result of
0 ->
ok;
_ ->
{error, Result}
end.
%% ====================================================================

View file

@ -9,8 +9,8 @@
-compile(export_all).
all() ->
% [test_db].
[test_put].% test_txn].
% [test_db, test_put, test_txn, test_tune].
[test_tune].
init_per_testcase(_TestCase, Config) ->
Config.
@ -71,3 +71,13 @@ test_txn(_Config) ->
ok = bdberl_port:txn_begin(P),
not_found = bdberl_port:get(P, 0, akey),
ok = bdberl_port:txn_commit(P).
test_tune(_Config) ->
{ok, P} = bdberl_port:new(),
% Test transaction timeouts
{ok, 500000} = bdberl_port:get_txn_timeout(P),
ok = bdberl_port:set_txn_timeout(P, 250000),
{ok, 250000} = bdberl_port:get_txn_timeout(P).

View file

@ -11,7 +11,7 @@
all() ->
[test_thrash].
-define(PROCS, 2).
-define(PROCS, 10).
test_thrash(_Config) ->
%% Spin up 15 processes (async thread pool is 10)
@ -71,7 +71,7 @@ thrash_incr_loop(Port, Owner, Count) ->
get_or_die(Port, DbRef, Key) ->
case bdberl_port:get(Port, DbRef, Key) of
case bdberl_port:get(Port, DbRef, Key, [rmw]) of
not_found ->
not_found;
{ok, Value} ->