Added del()ete to Erlang API.

This commit is contained in:
Gregory Burd 2011-08-10 17:26:19 -04:00
parent d9ece6b7a9
commit 469bf225a2
8 changed files with 317 additions and 90 deletions

View file

@ -108,6 +108,7 @@ static void get_info(int target, void* values, BinHelper* bh);
static void do_async_put(void* arg); static void do_async_put(void* arg);
static void do_async_get(void* arg); static void do_async_get(void* arg);
static void do_async_del(void* arg);
static void do_async_txnop(void* arg); static void do_async_txnop(void* arg);
static void do_async_cursor_get(void* arg); static void do_async_cursor_get(void* arg);
static void do_async_truncate(void* arg); static void do_async_truncate(void* arg);
@ -187,11 +188,11 @@ static unsigned int G_TRICKLE_PERCENTAGE = 50; /* Desired % of clean pa
/** /**
* Transaction checkpoint monitor. We run a single thread per VM to flush transaction * Transaction checkpoint monitor. We run a single thread per VM to flush transaction
* logs into the backing data store. G_CHECKPOINT_INTERVAL is the time between runs in seconds. * logs into the backing data store. G_CHECKPOINT_INTERVAL is the time between runs in seconds.
* TODO The interval should be configurable. * TODO: The checkpoint interval should be configurable.
*/ */
static ErlDrvTid G_CHECKPOINT_THREAD = 0; static ErlDrvTid G_CHECKPOINT_THREAD = 0;
static unsigned int G_CHECKPOINT_ACTIVE = 1; static unsigned int G_CHECKPOINT_ACTIVE = 1;
static unsigned int G_CHECKPOINT_INTERVAL = 60 * 60; /* Seconds between checkpoints */ static unsigned int G_CHECKPOINT_INTERVAL = 60; /* Seconds between checkpoints */
/** /**
* Pipe to used to wake up the various monitors. Instead of just sleeping * Pipe to used to wake up the various monitors. Instead of just sleeping
@ -214,11 +215,11 @@ static ErlDrvPort G_LOG_PORT;
*/ */
static unsigned int G_PAGE_SIZE = 0; static unsigned int G_PAGE_SIZE = 0;
/** Thread pools /**
* * Thread pools
*/ */
static unsigned int G_NUM_GENERAL_THREADS = 10; static unsigned int G_NUM_GENERAL_THREADS = 16;
static unsigned int G_NUM_TXN_THREADS = 10; static unsigned int G_NUM_TXN_THREADS = 16;
static TPool* G_TPOOL_GENERAL = NULL; static TPool* G_TPOOL_GENERAL = NULL;
static TPool* G_TPOOL_TXNS = NULL; static TPool* G_TPOOL_TXNS = NULL;
@ -360,7 +361,7 @@ DRIVER_INIT(bdberl_drv)
&deadlock_check, 0, 0); &deadlock_check, 0, 0);
// Use the BDBERL_CHECKPOINT_TIME environment value to determine the // Use the BDBERL_CHECKPOINT_TIME environment value to determine the
// interval between transaction checkpoints. Defaults to 1 hour. // interval between transaction checkpoints. Defaults to 1 minute.
check_pos_env("BDBERL_CHECKPOINT_TIME", &G_CHECKPOINT_INTERVAL); check_pos_env("BDBERL_CHECKPOINT_TIME", &G_CHECKPOINT_INTERVAL);
// Startup checkpoint thread // Startup checkpoint thread
@ -670,6 +671,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
} }
case CMD_PUT: case CMD_PUT:
case CMD_GET: case CMD_GET:
case CMD_DEL:
case CMD_PUT_COMMIT: case CMD_PUT_COMMIT:
{ {
FAIL_IF_ASYNC_PENDING(d, outbuf); FAIL_IF_ASYNC_PENDING(d, outbuf);
@ -681,7 +683,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
RETURN_INT(0, outbuf); RETURN_INT(0, outbuf);
} }
// Inbuf is: << DbRef:32, Rest/binary>> // Inbuf is: <<DbRef:32, Rest/binary>>
int dbref = UNPACK_INT(inbuf, 0); int dbref = UNPACK_INT(inbuf, 0);
// Make sure this port currently has dbref open -- if it doesn't, error out. Of note, // Make sure this port currently has dbref open -- if it doesn't, error out. Of note,
@ -705,15 +707,25 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
d->async_op = cmd; d->async_op = cmd;
d->async_dbref = dbref; d->async_dbref = dbref;
TPoolJobFunc fn; TPoolJobFunc fn;
if (cmd == CMD_PUT || cmd == CMD_PUT_COMMIT) switch(cmd) {
case CMD_PUT: case CMD_PUT_COMMIT:
{ {
fn = &do_async_put; fn = &do_async_put;
} }
else break;
case CMD_DEL:
{
fn = &do_async_del;
}
break;
case CMD_GET:
{ {
assert(cmd == CMD_GET);
fn = &do_async_get; fn = &do_async_get;
} }
break;
default:
assert(cmd);
}
bdberl_general_tpool_run(fn, d, 0, &d->async_job); bdberl_general_tpool_run(fn, d, 0, &d->async_job);
// Let caller know that the operation is in progress // Let caller know that the operation is in progress
@ -1608,6 +1620,38 @@ static void do_async_get(void* arg)
free(value.data); free(value.data);
} }
static void do_async_del(void* arg)
{
// Payload is: << DbRef:32, Flags:32, KeyLen:32, Key:KeyLen >>
PortData* d = (PortData*)arg;
// Get the database object, using the provided ref
int dbref = UNPACK_INT(d->work_buffer, 0);
DB* db = bdberl_lookup_dbref(dbref);
// Extract operation flags
unsigned flags = UNPACK_INT(d->work_buffer, 4);
// Setup DBTs
DBT key;
memset(&key, '\0', sizeof(DBT));
// Parse payload into DBT
key.size = UNPACK_INT(d->work_buffer, 8);
key.data = UNPACK_BLOB(d->work_buffer, 12);
int rc = db->del(db, d->txn, &key, flags);
// Cleanup transaction as necessary
if (rc && rc != DB_NOTFOUND && d->txn)
{
d->txn->abort(d->txn);
d->txn = 0;
}
bdberl_async_cleanup_and_send_rc(d, rc);
}
static void do_async_txnop(void* arg) static void do_async_txnop(void* arg)
{ {
PortData* d = (PortData*)arg; PortData* d = (PortData*)arg;

View file

@ -46,32 +46,33 @@
#define CMD_TXN_ABORT 5 #define CMD_TXN_ABORT 5
#define CMD_GET 6 #define CMD_GET 6
#define CMD_PUT 7 #define CMD_PUT 7
#define CMD_GETINFO 8 #define CMD_DEL 8
#define CMD_CURSOR_OPEN 9 #define CMD_GETINFO 9
#define CMD_CURSOR_CURR 10 #define CMD_CURSOR_OPEN 10
#define CMD_CURSOR_NEXT 11 #define CMD_CURSOR_CURR 11
#define CMD_CURSOR_PREV 12 #define CMD_CURSOR_NEXT 12
#define CMD_CURSOR_CLOSE 13 #define CMD_CURSOR_PREV 13
#define CMD_PUT_COMMIT 14 #define CMD_CURSOR_CLOSE 14
#define CMD_REMOVE_DB 15 #define CMD_PUT_COMMIT 15
#define CMD_TRUNCATE 16 #define CMD_REMOVE_DB 16
#define CMD_REGISTER_LOGGER 17 #define CMD_TRUNCATE 17
#define CMD_DB_STAT 18 #define CMD_REGISTER_LOGGER 18
#define CMD_DB_STAT_PRINT 19 #define CMD_DB_STAT 19
#define CMD_ENV_STAT_PRINT 20 #define CMD_DB_STAT_PRINT 20
#define CMD_LOCK_STAT 21 #define CMD_ENV_STAT_PRINT 21
#define CMD_LOCK_STAT_PRINT 22 #define CMD_LOCK_STAT 22
#define CMD_LOG_STAT 23 #define CMD_LOCK_STAT_PRINT 23
#define CMD_LOG_STAT_PRINT 24 #define CMD_LOG_STAT 24
#define CMD_MEMP_STAT 25 #define CMD_LOG_STAT_PRINT 25
#define CMD_MEMP_STAT_PRINT 26 #define CMD_MEMP_STAT 26
#define CMD_MUTEX_STAT 27 #define CMD_MEMP_STAT_PRINT 27
#define CMD_MUTEX_STAT_PRINT 28 #define CMD_MUTEX_STAT 28
#define CMD_TXN_STAT 29 #define CMD_MUTEX_STAT_PRINT 29
#define CMD_TXN_STAT_PRINT 30 #define CMD_TXN_STAT 30
#define CMD_DATA_DIRS_INFO 31 #define CMD_TXN_STAT_PRINT 31
#define CMD_LOG_DIR_INFO 32 #define CMD_DATA_DIRS_INFO 32
#define CMD_DRIVER_INFO 33 #define CMD_LOG_DIR_INFO 33
#define CMD_DRIVER_INFO 34
/** /**
* Command status values * Command status values

View file

@ -39,38 +39,36 @@
-define(CMD_TXN_ABORT, 5). -define(CMD_TXN_ABORT, 5).
-define(CMD_GET, 6). -define(CMD_GET, 6).
-define(CMD_PUT, 7). -define(CMD_PUT, 7).
-define(CMD_GETINFO, 8). -define(CMD_DEL, 8).
-define(CMD_CURSOR_OPEN, 9). -define(CMD_GETINFO, 9).
-define(CMD_CURSOR_CURR, 10). -define(CMD_CURSOR_OPEN, 10).
-define(CMD_CURSOR_NEXT, 11). -define(CMD_CURSOR_CURR, 11).
-define(CMD_CURSOR_PREV, 12). -define(CMD_CURSOR_NEXT, 12).
-define(CMD_CURSOR_CLOSE, 13). -define(CMD_CURSOR_PREV, 13).
-define(CMD_PUT_COMMIT, 14). -define(CMD_CURSOR_CLOSE, 14).
-define(CMD_REMOVE_DB, 15). -define(CMD_PUT_COMMIT, 15).
-define(CMD_TRUNCATE, 16). -define(CMD_REMOVE_DB, 16).
-define(CMD_REGISTER_LOGGER, 17). -define(CMD_TRUNCATE, 17).
-define(CMD_DB_STAT, 18). -define(CMD_REGISTER_LOGGER, 18).
-define(CMD_DB_STAT_PRINT, 19). -define(CMD_DB_STAT, 19).
-define(CMD_ENV_STAT_PRINT, 20). -define(CMD_DB_STAT_PRINT, 20).
-define(CMD_LOCK_STAT, 21). -define(CMD_ENV_STAT_PRINT, 21).
-define(CMD_LOCK_STAT_PRINT, 22). -define(CMD_LOCK_STAT, 22).
-define(CMD_LOG_STAT, 23). -define(CMD_LOCK_STAT_PRINT, 23).
-define(CMD_LOG_STAT_PRINT, 24). -define(CMD_LOG_STAT, 24).
-define(CMD_MEMP_STAT, 25). -define(CMD_LOG_STAT_PRINT, 25).
-define(CMD_MEMP_STAT_PRINT, 26). -define(CMD_MEMP_STAT, 26).
-define(CMD_MUTEX_STAT, 27). -define(CMD_MEMP_STAT_PRINT, 27).
-define(CMD_MUTEX_STAT_PRINT,28). -define(CMD_MUTEX_STAT, 28).
-define(CMD_TXN_STAT, 29). -define(CMD_MUTEX_STAT_PRINT,29).
-define(CMD_TXN_STAT_PRINT, 30). -define(CMD_TXN_STAT, 30).
-define(CMD_DATA_DIRS_INFO, 31). -define(CMD_TXN_STAT_PRINT, 31).
-define(CMD_LOG_DIR_INFO, 32). -define(CMD_DATA_DIRS_INFO, 32).
-define(CMD_DRIVER_INFO, 33). -define(CMD_LOG_DIR_INFO, 33).
-define(CMD_DRIVER_INFO, 34).
-define(DB_TYPE_BTREE, 1). -define(DB_TYPE_BTREE, 1).
-define(DB_TYPE_HASH, 2). -define(DB_TYPE_HASH, 2).
%%-define(DB_TYPE_QUEUE, 3).
%%-define(DB_TYPE_RECNO, 4).
%%-define(DB_TYPE_HEAP, 5).
-define(DB_TYPE_UNKNOWN, 6). -define(DB_TYPE_UNKNOWN, 6).
-define(SYSP_CACHESIZE_GET, 1). -define(SYSP_CACHESIZE_GET, 1).

View file

@ -2,4 +2,4 @@ set_flags DB_TXN_WRITE_NOSYNC
set_cachesize 0 536870912 1 set_cachesize 0 536870912 1
set_lg_max 104857600 set_lg_max 104857600
set_lg_bsize 536870912 set_lg_bsize 536870912
set_log_config DB_LOG_IN_MEMORY log_set_config DB_LOG_IN_MEMORY

View file

@ -0,0 +1,134 @@
--- Test run on 2011/08/10 17:16:52 ---
Common Test v1.5.4 starting (cwd is /Users/gburd/Projects/bdberl)
Common Test: Running make in test directories...
Including the following directories:
"/Users/gburd/Projects/bdberl/include"
CWD set to: "/Users/gburd/Projects/bdberl/logs/ct_run.test@tautog.local.2011-08-10_17.16.52"
TEST INFO: 1 test(s), 1 case(s) in 1 suite(s)
Cover compiling 2 modules - this may take some time... done
Testing Projects.bdberl: Starting test, 1 test cases
DRIVER INIT
db_env_create(0x1994bb30, 0) = 0
G_DB_ENV->open(0x1994bb30, 0, 00042517, 0) = 0
Using "BDBERL_DEADLOCK_CHECK_INTERVAL" value 1
Using "BDBERL_NUM_GENERAL_THREADS" value 10
Using "BDBERL_NUM_TXN_THREADS" value 10
threadid 0x7f9c31934048 port 0x1ad: BDB DRIVER STARTING
threadid 0x7f9c31934048 port 0x1ad: BDB DRIVER STARTED
threadid 0x7f9c31959b80 port 0x1ae: BDB DRIVER STARTING
threadid 0x7f9c31959b80 port 0x1ae: BDB DRIVER STARTED
=ERROR REPORT==== 10-Aug-2011::17:16:52 ===
SNMP is not running; bdberl stats will not be published.
----------------------------------------------------
2011-08-10 17:16:52.616
Driver running with:
{ok,[{databases_size,1024},
{deadlock_interval,1},
{trickle_interval,300},
{trickle_percentage,50},
{checkpoint_interval,60},
{num_general_threads,10},
{num_txn_threads,10},
{general_jobs_pending,0},
{general_jobs_active,0},
{txn_jobs_pending,0},
{txn_jobs_active,0}]}
threadid 0x7f9c31942a30 port 0x1f2: BDB DRIVER STARTING
threadid 0x7f9c31942a30 port 0x1f2: BDB DRIVER STARTED
threadid 0x7f9c31942a30 port 0x1f2: locking G_DATABASES
threadid 0x7f9c31942a30 port 0x1f2: locked G_DATABASES
threadid 0x7f9c31963230 port 0x1f3: BDB DRIVER STARTING
threadid 0x7f9c31942a30 port 0x1f2: db_create(&db, 0x7f9c35000000, 0);
threadid 0x7f9c31934048 port 0x1f4: BDB DRIVER STARTING
threadid 0x7f9c31963230 port 0x1f3: BDB DRIVER STARTED
threadid 0x7f9c3195b030 port 0x1f5: BDB DRIVER STARTING
threadid 0x7f9c31942a30 port 0x1f2: rc = ok (0) db = 0x7f9c33000600
threadid 0x7f9c31934048 port 0x1f4: BDB DRIVER STARTED
threadid 0x7f9c31963230 port 0x1f3: locking G_DATABASES
threadid 0x7f9c3195b030 port 0x1f5: BDB DRIVER STARTED
threadid 0x7f9c31942a30 port 0x1f2: db->open(0x7f9c33000600, 0, 'bug200_13.bdb', 0, 1, 00000315, 0);
threadid 0x7f9c31934048 port 0x1f4: locking G_DATABASES
threadid 0x7f9c3195b030 port 0x1f5: locking G_DATABASES
threadid 0x7f9c31942a30 port 0x1f2: rc = ok (0)
threadid 0x7f9c31942a30 port 0x1f2: unlocking G_DATABASES
threadid 0x7f9c31942a30 port 0x1f2: unlocked G_DATABASES
Adding dbref 0 to port 0x1f2
threadid 0x7f9c31963230 port 0x1f3: locked G_DATABASES
Adding dbref 0 to port 0x1f3
Adding port 0x1f3 to dbref 0
threadid 0x7f9c31963230 port 0x1f3: unlocking G_DATABASES
threadid 0x7f9c31963230 port 0x1f3: unlocked G_DATABASES
threadid 0x7f9c31934048 port 0x1f4: locked G_DATABASES
Adding dbref 0 to port 0x1f4
Adding port 0x1f4 to dbref 0
threadid 0x7f9c31934048 port 0x1f4: unlocking G_DATABASES
threadid 0x7f9c31934048 port 0x1f4: unlocked G_DATABASES
threadid 0x7f9c3195b030 port 0x1f5: locked G_DATABASES
Adding dbref 0 to port 0x1f5
Adding port 0x1f5 to dbref 0
threadid 0x7f9c3195b030 port 0x1f5: unlocking G_DATABASES
threadid 0x7f9c3195b030 port 0x1f5: unlocked G_DATABASES
----------------------------------------------------
2011-08-10 17:16:52.631
----------------------------------------------------
2011-08-10 17:16:52.631
----------------------------------------------------
2011-08-10 17:16:52.631
<0.74.0> starting for 250 requests
<0.73.0> starting for 250 requests
----------------------------------------------------
2011-08-10 17:16:52.631
<0.71.0> starting for 250 requests
<0.72.0> starting for 250 requests
----------------------------------------------------
2011-08-10 17:16:52.631
----------------------------------------------------
2011-08-10 17:16:52.631
----------------------------------------------------
2011-08-10 17:16:52.631
----------------------------------------------------
2011-08-10 17:16:52.631
Pid <0.74.0> count 250
Pid <0.73.0> count 250
Pid <0.71.0> count 250
Pid <0.72.0> count 250
threadid 0x7f9c319597e0 port 0x1f5: G_DB_ENV->txn_begin(0x7f9c35000000, 0, 0x0, 00000010)
threadid 0x7f9c319597e0 port 0x1f5: rc = ok (0) d->txn = 0x7f9c31500540
threadid 0x7f9c319599c0 port 0x1f3: G_DB_ENV->txn_begin(0x7f9c35000000, 0, 0x0, 00000010)
threadid 0x7f9c319599c0 port 0x1f3: rc = ok (0) d->txn = 0x7f9c329005d0
threadid 0x7f9c31959a80 port 0x1f4: G_DB_ENV->txn_begin(0x7f9c35000000, 0, 0x0, 00000010)
threadid 0x7f9c319597e0 port 0x1f2: G_DB_ENV->txn_begin(0x7f9c35000000, 0, 0x0, 00000010)
threadid 0x7f9c31959a80 port 0x1f4: rc = ok (0) d->txn = 0x7f9c31500690
threadid 0x7f9c319597e0 port 0x1f2: rc = ok (0) d->txn = 0x7f9c32c00000
threadid 0x7f9c31959470 port 0x1f5: db->put(0x7f9c33000600, 0x7f9c31500540, 0xb171ae18, 0xb171adf0, 00000000) dbref 0 key=0x7f9c3195c1e4(6) value=0x7f9c3195c1ee(9084)
threadid 0x7f9c31959470 port 0x1f5: rc = ok (0)
threadid 0x7f9c31959720 port 0x1f5: d->txn->txn_commit(0x7f9c31500540, 00000000)
threadid 0x7f9c31959720 port 0x1f5: rc = ok (0)

View file

@ -43,7 +43,7 @@ bug200(_Config) ->
os:putenv("BDBERL_DEADLOCK_CHECK_INTERVAL", "1"), os:putenv("BDBERL_DEADLOCK_CHECK_INTERVAL", "1"),
%% Copy in the DB_CONFIG %% Copy in the DB_CONFIG
{ok, _} = file:copy("../../int_test/PROD_DB_CONFIG","DB_CONFIG"), {ok, _} = file:copy("../../int_test/DB_CONFIG","DB_CONFIG"),
crypto:start(), crypto:start(),
Data = crypto:rand_bytes(?DATA_SIZE), Data = crypto:rand_bytes(?DATA_SIZE),

View file

@ -63,6 +63,7 @@
get/2, get/3, get/2, get/3,
get_r/2, get_r/3, get_r/2, get_r/3,
update/3, update/4, update/5, update/6, update/7, update/3, update/4, update/5, update/6, update/7,
del/2,
truncate/0, truncate/1, truncate/0, truncate/1,
delete_database/1, delete_database/1,
cursor_open/1, cursor_next/0, cursor_prev/0, cursor_current/0, cursor_close/0, cursor_open/1, cursor_next/0, cursor_prev/0, cursor_current/0, cursor_close/0,
@ -958,6 +959,47 @@ get_r(Db, Key, Opts) ->
end. end.
%%--------------------------------------------------------------------
%% @doc
%% Delete a value based on key.
%%
%% This function removes key/data pairs from the database. The key/data
%% pair associated with the specified key is discarded from the database.
%% In the presence of duplicate key values, all records associated with
%% the designated key will be discarded.
%%
%% This function will return `not_found' if the specified key is not in
%% the database.
%%
%% @spec del(Db, Key) -> not_found | {ok, Value} | {error, Error}
%% where
%% Db = integer()
%% Key = term()
%%
%% @equiv del(Db, Key, [])
%% @end
%%--------------------------------------------------------------------
-spec del(Db :: db(), Key :: db_key()) ->
not_found | ok | db_error().
del(Db, Key) ->
{KeyLen, KeyBin} = to_binary(Key),
Cmd = <<Db:32/signed-native, 0:32/native, KeyLen:32/native, KeyBin/bytes>>,
<<Result:32/signed-native>> = erlang:port_control(get_port(), ?CMD_DEL, Cmd),
case decode_rc(Result) of
ok ->
receive
{ok, _, _} -> ok;
not_found -> not_found;
{error, Reason} -> {error, Reason}
end;
Error ->
{error, Error}
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc %% @doc
%% Updates the value of a key by executing a fun. %% Updates the value of a key by executing a fun.

View file

@ -45,6 +45,7 @@ all() ->
get_should_return_a_value_when_getting_a_valid_record, get_should_return_a_value_when_getting_a_valid_record,
put_should_succeed_with_manual_transaction, put_should_succeed_with_manual_transaction,
put_should_rollback_with_failed_manual_transaction, put_should_rollback_with_failed_manual_transaction,
% del_should_remove_a_value,
transaction_should_commit_on_success, transaction_should_commit_on_success,
transaction_should_abort_on_exception, transaction_should_abort_on_exception,
transaction_should_abort_on_user_abort, transaction_should_abort_on_user_abort,
@ -131,6 +132,13 @@ put_should_rollback_with_failed_manual_transaction(Config) ->
ok = bdberl:txn_abort(), ok = bdberl:txn_abort(),
not_found = bdberl:get(Db, mykey). not_found = bdberl:get(Db, mykey).
del_should_remove_a_value(Config) ->
Db = ?config(db, Config),
ok = bdberl:put(Db, mykey, avalue),
{ok, avalue} = bdberl:get(Db, mykey),
ok = bdberl:del(Db, mykey),
not_found = bdberl:get(Db, mykey).
transaction_should_commit_on_success(Config) -> transaction_should_commit_on_success(Config) ->
Db = ?config(db, Config), Db = ?config(db, Config),
F = fun() -> bdberl:put(Db, mykey, avalue) end, F = fun() -> bdberl:put(Db, mykey, avalue) end,