diff --git a/c_src/bdberl_drv.c b/c_src/bdberl_drv.c index 4d2fb98..99c1f48 100644 --- a/c_src/bdberl_drv.c +++ b/c_src/bdberl_drv.c @@ -22,6 +22,8 @@ static int open_database(const char* name, DBTYPE type, unsigned flags, PortData* data, int* errno); static int close_database(int dbref, unsigned flags, PortData* data); +static void tune_system(int target, void* values, BinHelper* bh); + static void do_async_put(void* arg); static void do_async_get(void* arg); static void do_async_txnop(void* arg); @@ -39,6 +41,7 @@ static void* zalloc(unsigned int size); static void signal_port(PortData* d); static void* deadlock_check(void* arg); +static void* trickle_write(void* arg); /** * Global instance of DB_ENV; only a single one exists per O/S process. @@ -71,12 +74,25 @@ static int G_DATABASES_SIZE; static ErlDrvRWLock* G_DATABASES_RWLOCK; static hive_hash* G_DATABASES_NAMES; + /** - * G_DEADLOCK_* a + * Deadlock detector thread variables. We run a single thread per VM to detect deadlocks within + * our global environment. G_DEADLOCK_CHECK_INTERVAL is the time between runs in milliseconds. */ static ErlDrvTid G_DEADLOCK_THREAD; -static unsigned int G_DEADLOCK_CHECK_ACTIVE = 1; -static unsigned int G_DEADLOCK_CHECK_INTERVAL = 100; +static unsigned int G_DEADLOCK_CHECK_ACTIVE = 1; +static unsigned int G_DEADLOCK_CHECK_INTERVAL = 100; /* Milliseconds between checks */ + + +/** + * Trickle writer for dirty pages. We run a single thread per VM to perform background + * trickling of dirty pages to disk. G_TRICKLE_INTERVAL is the time between runs in seconds. + */ +static ErlDrvTid G_TRICKLE_THREAD; +static unsigned int G_TRICKLE_ACTIVE = 1; +static unsigned int G_TRICKLE_INTERVAL = 60 * 15; /* Seconds between trickle writes */ +static unsigned int G_TRICKLE_PERCENTAGE = 10; /* Desired % of clean pages in cache */ + /** * @@ -99,7 +115,7 @@ static TPool* G_TPOOL_TXNS; #define UNPACK_STRING(_buf, _off) (char*)(_buf+(_off)) #define UNPACK_BLOB(_buf, _off) (void*)(_buf+(_off)) -#define RETURN_BH(bh, outbuf) *outbuf = (char*)bh.bin; return bh.bin->orig_size; +#define RETURN_BH(bh, outbuf) *outbuf = (char*)bh.bin; return bh.offset; #define RETURN_INT(val, outbuf) { \ BinHelper bh; \ @@ -123,6 +139,7 @@ DRIVER_INIT(bdberl_drv) // Initialize global environment -- use environment variable DB_HOME to // specify where the working directory is db_env_create(&G_DB_ENV, 0); + G_DB_ENV_ERROR = G_DB_ENV->open(G_DB_ENV, 0, flags, 0); if (G_DB_ENV_ERROR == 0) { @@ -157,10 +174,14 @@ DRIVER_INIT(bdberl_drv) erl_drv_thread_create("bdberl_drv_deadlock_checker", &G_DEADLOCK_THREAD, &deadlock_check, 0, 0); + // Startup trickle write thread + erl_drv_thread_create("bdberl_drv_trickle_write", &G_TRICKLE_THREAD, + &trickle_write, 0, 0); + // Startup our thread pools // TODO: Make configurable/adjustable - G_TPOOL_GENERAL = bdberl_tpool_start(5); - G_TPOOL_TXNS = bdberl_tpool_start(5); + G_TPOOL_GENERAL = bdberl_tpool_start(10); + G_TPOOL_TXNS = bdberl_tpool_start(10); } else { @@ -245,6 +266,10 @@ static void bdberl_drv_finish() bdberl_tpool_stop(G_TPOOL_GENERAL); bdberl_tpool_stop(G_TPOOL_TXNS); + // Signal the trickle write thread to shutdown + G_TRICKLE_ACTIVE = 0; + erl_drv_thread_join(G_TRICKLE_THREAD, 0); + // Signal the deadlock checker to shutdown -- then wait for it G_DEADLOCK_CHECK_ACTIVE = 0; erl_drv_thread_join(G_DEADLOCK_THREAD, 0); @@ -427,6 +452,20 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, RETURN_INT(ERROR_INVALID_DBREF, outbuf); } } + case CMD_TUNE: + { + // Inbuf is: << Target:32, Values/binary >> + int target = UNPACK_INT(inbuf, 0); + char* values = UNPACK_BLOB(inbuf, 4); + + // Setup a binhelper + BinHelper bh; + + // Execute the tuning -- the result to send back to the caller is wrapped + // up in the provided binhelper + tune_system(target, values, &bh); + RETURN_BH(bh, outbuf); + } } *outbuf = 0; return 0; @@ -681,6 +720,56 @@ static int close_database(int dbref, unsigned flags, PortData* data) return 0; } +/** + * Given a target system parameter/action adjust/return the requested value + */ +static void tune_system(int target, void* values, BinHelper* bh) +{ + switch(target) + { + case SYSP_CACHESIZE_SET: + { + unsigned int gbytes = UNPACK_INT(values, 0); + unsigned int bytes = UNPACK_INT(values, 4); + unsigned int ncache = UNPACK_INT(values, 8); + int rc = G_DB_ENV->set_cachesize(G_DB_ENV, gbytes, bytes, ncache); + bin_helper_init(bh, 4); + bin_helper_push_int32(bh, rc); + break; + } + case SYSP_CACHESIZE_GET: + { + unsigned int gbytes = 0; + unsigned int bytes = 0; + int caches = 0; + int rc = G_DB_ENV->get_cachesize(G_DB_ENV, &gbytes, &bytes, &caches); + bin_helper_init(bh, 16); + bin_helper_push_int32(bh, rc); + bin_helper_push_int32(bh, gbytes); + bin_helper_push_int32(bh, bytes); + bin_helper_push_int32(bh, caches); + break; + } + case SYSP_TXN_TIMEOUT_SET: + { + unsigned int timeout = UNPACK_INT(values, 0); + int rc = G_DB_ENV->set_timeout(G_DB_ENV, timeout, DB_SET_TXN_TIMEOUT); + bin_helper_init(bh, 4); + bin_helper_push_int32(bh, rc); + break; + } + case SYSP_TXN_TIMEOUT_GET: + { + unsigned int timeout = 0; + int rc = G_DB_ENV->get_timeout(G_DB_ENV, &timeout, DB_SET_TXN_TIMEOUT); + bin_helper_init(bh, 8); + bin_helper_push_int32(bh, rc); + bin_helper_push_int32(bh, timeout); + break; + } + } +} + static void do_async_put(void* arg) { // Payload is: <> @@ -986,3 +1075,33 @@ static void* deadlock_check(void* arg) printf("Deadlock checker exiting.\n"); return 0; } + +/** + * Thread function that trickle writes dirty pages to disk + */ +static void* trickle_write(void* arg) +{ + int elapsed_secs = 0; + while(G_TRICKLE_ACTIVE) + { + if (elapsed_secs == G_TRICKLE_INTERVAL) + { + // Enough time has passed -- time to run the trickle operation again + int pages_wrote = 0; + G_DB_ENV->memp_trickle(G_DB_ENV, G_TRICKLE_PERCENTAGE, &pages_wrote); + printf("Wrote %d pages to achieve %d trickle\n", pages_wrote, G_TRICKLE_PERCENTAGE); + + // Reset the counter + elapsed_secs = 0; + } + else + { + // TODO: Use nanosleep + usleep(1000 * 1000); /* Sleep for 1 second */ + elapsed_secs++; + } + } + + printf("Trickle writer exiting.\n"); + return 0; +} diff --git a/c_src/bdberl_drv.h b/c_src/bdberl_drv.h index 582b702..66ce577 100644 --- a/c_src/bdberl_drv.h +++ b/c_src/bdberl_drv.h @@ -38,7 +38,8 @@ static void bdberl_drv_ready_input(ErlDrvData handle, ErlDrvEvent ev); #define CMD_TXN_ABORT 5 #define CMD_GET 6 #define CMD_PUT 7 -#define CMD_PUT_ATOMIC 8 +#define CMD_TUNE 8 + /** * Command status values @@ -62,6 +63,19 @@ static void bdberl_drv_ready_input(ErlDrvData handle, ErlDrvEvent ev); #define ERROR_TXN_OPEN (-29003) /* Transaction already active on this port */ #define ERROR_NO_TXN (-29004) /* No transaction open on this port */ + +/** + * Tunable system parameters/actions + */ +#define SYSP_CACHESIZE_SET 0 +#define SYSP_CACHESIZE_GET 1 +#define SYSP_TXN_TIMEOUT_SET 2 +#define SYSP_TXN_TIMEOUT_GET 3 +#define SYSP_TXN_TIMEOUT 1 +#define SYSP_DEADLOCK_CHECK_INTERVAL 2 +#define SYSP_TRICKLE_INTERVAL 3 +#define SYSP_TRICKLE_PERCENTAGE 4 + /** * Driver Entry */ diff --git a/include/bdberl.hrl b/include/bdberl.hrl index ddaa666..b6984c3 100644 --- a/include/bdberl.hrl +++ b/include/bdberl.hrl @@ -13,11 +13,16 @@ -define(CMD_TXN_ABORT, 5). -define(CMD_GET, 6). -define(CMD_PUT, 7). --define(CMD_PUT_ATOMIC, 8). +-define(CMD_TUNE, 8). -define(DB_TYPE_BTREE, 1). -define(DB_TYPE_HASH, 2). +-define(SYSP_CACHESIZE_SET, 0). +-define(SYSP_CACHESIZE_GET, 1). +-define(SYSP_TXN_TIMEOUT_SET, 2). +-define(SYSP_TXN_TIMEOUT_GET, 3). + -define(STATUS_OK, 0). -define(STATUS_ERROR, 1). diff --git a/src/bdberl_port.erl b/src/bdberl_port.erl index 12bc80b..144f153 100644 --- a/src/bdberl_port.erl +++ b/src/bdberl_port.erl @@ -11,6 +11,8 @@ close_database/2, txn_begin/1, txn_begin/2, txn_commit/1, txn_commit/2, txn_abort/1, + get_cache_size/1, set_cache_size/4, + get_txn_timeout/1, set_txn_timeout/2, put/4, put/5, get/3, get/4]). @@ -133,6 +135,49 @@ get(Port, DbRef, Key, Opts) -> Error -> {error, {get, decode_rc(Error)}} end. + +get_cache_size(Port) -> + Cmd = <>, + <> = + erlang:port_control(Port, ?CMD_TUNE, Cmd), + case Result of + 0 -> + {ok, Gbytes, Bytes, Ncaches}; + _ -> + {error, Result} + end. + +set_cache_size(Port, Gbytes, Bytes, Ncaches) -> + Cmd = <>, + <> = erlang:port_control(Port, ?CMD_TUNE, Cmd), + case Result of + 0 -> + ok; + _ -> + {error, Result} + end. + + +get_txn_timeout(Port) -> + Cmd = <>, + <> = erlang:port_control(Port, ?CMD_TUNE, Cmd), + case Result of + 0 -> + {ok, Timeout}; + _ -> + {error, Result} + end. + +set_txn_timeout(Port, Timeout) -> + Cmd = <>, + <> = erlang:port_control(Port, ?CMD_TUNE, Cmd), + case Result of + 0 -> + ok; + _ -> + {error, Result} + end. + %% ==================================================================== diff --git a/test/port_SUITE.erl b/test/port_SUITE.erl index dddc135..38d7405 100644 --- a/test/port_SUITE.erl +++ b/test/port_SUITE.erl @@ -9,8 +9,8 @@ -compile(export_all). all() -> -% [test_db]. - [test_put].% test_txn]. +% [test_db, test_put, test_txn, test_tune]. + [test_tune]. init_per_testcase(_TestCase, Config) -> Config. @@ -71,3 +71,13 @@ test_txn(_Config) -> ok = bdberl_port:txn_begin(P), not_found = bdberl_port:get(P, 0, akey), ok = bdberl_port:txn_commit(P). + +test_tune(_Config) -> + {ok, P} = bdberl_port:new(), + + % Test transaction timeouts + {ok, 500000} = bdberl_port:get_txn_timeout(P), + ok = bdberl_port:set_txn_timeout(P, 250000), + {ok, 250000} = bdberl_port:get_txn_timeout(P). + + diff --git a/test/thrash_SUITE.erl b/test/thrash_SUITE.erl index 8e68032..7ad8dac 100644 --- a/test/thrash_SUITE.erl +++ b/test/thrash_SUITE.erl @@ -11,7 +11,7 @@ all() -> [test_thrash]. --define(PROCS, 2). +-define(PROCS, 10). test_thrash(_Config) -> %% Spin up 15 processes (async thread pool is 10) @@ -71,7 +71,7 @@ thrash_incr_loop(Port, Owner, Count) -> get_or_die(Port, DbRef, Key) -> - case bdberl_port:get(Port, DbRef, Key) of + case bdberl_port:get(Port, DbRef, Key, [rmw]) of not_found -> not_found; {ok, Value} ->