Added truncate command.
This commit is contained in:
parent
4ba6a03aec
commit
df6222a51c
5 changed files with 104 additions and 1 deletions
|
@ -29,6 +29,7 @@ static void do_async_put(void* arg);
|
||||||
static void do_async_get(void* arg);
|
static void do_async_get(void* arg);
|
||||||
static void do_async_txnop(void* arg);
|
static void do_async_txnop(void* arg);
|
||||||
static void do_async_cursor_get(void* arg);
|
static void do_async_cursor_get(void* arg);
|
||||||
|
static void do_async_truncate(void* arg);
|
||||||
|
|
||||||
static int add_dbref(PortData* data, int dbref);
|
static int add_dbref(PortData* data, int dbref);
|
||||||
static int del_dbref(PortData* data, int dbref);
|
static int del_dbref(PortData* data, int dbref);
|
||||||
|
@ -606,6 +607,42 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
|
||||||
int rc = delete_database(dbname);
|
int rc = delete_database(dbname);
|
||||||
RETURN_INT(rc, outbuf);
|
RETURN_INT(rc, outbuf);
|
||||||
}
|
}
|
||||||
|
case CMD_TRUNCATE:
|
||||||
|
{
|
||||||
|
FAIL_IF_ASYNC_PENDING(d, outbuf);
|
||||||
|
|
||||||
|
// Fail if a cursor is open
|
||||||
|
if (d->cursor)
|
||||||
|
{
|
||||||
|
RETURN_INT(ERROR_CURSOR_OPEN, outbuf);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Inbuf is: <<DbRef:32>>
|
||||||
|
int dbref = UNPACK_INT(inbuf, 0);
|
||||||
|
|
||||||
|
DBG("Truncating...\n");
|
||||||
|
|
||||||
|
// Make sure this port currently has dbref open -- if it doesn't, error out. Of note,
|
||||||
|
// if it's in our list, we don't need to grab the RWLOCK, as we don't have to worry about
|
||||||
|
// the underlying handle disappearing since we have a reference.
|
||||||
|
if (has_dbref(d, dbref))
|
||||||
|
{
|
||||||
|
DBG("Doing truncate....");
|
||||||
|
// Mark the port as busy and then schedule the appropriate async operation
|
||||||
|
d->async_op = cmd;
|
||||||
|
d->async_pool = G_TPOOL_GENERAL;
|
||||||
|
d->async_job = bdberl_tpool_run(G_TPOOL_GENERAL, &do_async_truncate, d, 0);
|
||||||
|
|
||||||
|
// Let caller know that the operation is in progress
|
||||||
|
// Outbuf is: <<0:32>>
|
||||||
|
RETURN_INT(0, outbuf);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// Invalid dbref
|
||||||
|
RETURN_INT(ERROR_INVALID_DBREF, outbuf);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
*outbuf = 0;
|
*outbuf = 0;
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -1104,6 +1141,48 @@ static void do_async_cursor_get(void* arg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void do_async_truncate(void* arg)
|
||||||
|
{
|
||||||
|
// Payload is: <<DbRef:32>>
|
||||||
|
PortData* d = (PortData*)arg;
|
||||||
|
|
||||||
|
// Get the database reference and flags from the payload
|
||||||
|
int dbref = UNPACK_INT(d->work_buffer, 0);
|
||||||
|
DB* db = G_DATABASES[dbref].db;
|
||||||
|
|
||||||
|
DBG("Truncating dbref %i\n", dbref);
|
||||||
|
|
||||||
|
u_int32_t count = 0;
|
||||||
|
int rc = db->truncate(db, d->txn, &count, 0);
|
||||||
|
|
||||||
|
// If any error occurs while we have a txn action, abort it
|
||||||
|
if (d->txn && rc)
|
||||||
|
{
|
||||||
|
d->txn->abort(d->txn);
|
||||||
|
d->txn = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Save the port and pid references -- we need copies independent from the PortData
|
||||||
|
// structure. Once we release the port_lock after clearing the cmd, it's possible that
|
||||||
|
// the port could go away without waiting on us to finish. This is acceptable, but we need
|
||||||
|
// to be certain that there is no overlap of data between the two threads. driver_send_term
|
||||||
|
// is safe to use from a thread, even if the port you're sending from has already expired.
|
||||||
|
ErlDrvPort port = d->port;
|
||||||
|
ErlDrvTermData port_owner = d->port_owner;
|
||||||
|
|
||||||
|
// Release the port for another operation
|
||||||
|
d->async_pool = 0;
|
||||||
|
d->async_job = 0;
|
||||||
|
d->work_buffer_offset = 0;
|
||||||
|
erl_drv_mutex_lock(d->port_lock);
|
||||||
|
d->async_op = CMD_NONE;
|
||||||
|
erl_drv_mutex_unlock(d->port_lock);
|
||||||
|
|
||||||
|
// TODO: May need to tag the messages a bit more explicitly so that if another async
|
||||||
|
// job runs to completion before the message gets delivered we don't mis-interpret this
|
||||||
|
// response code.
|
||||||
|
send_ok_or_error(port, port_owner, rc);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static void* zalloc(unsigned int size)
|
static void* zalloc(unsigned int size)
|
||||||
|
|
|
@ -44,6 +44,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
|
||||||
#define CMD_CURSOR_CLOSE 13
|
#define CMD_CURSOR_CLOSE 13
|
||||||
#define CMD_PUT_COMMIT 14
|
#define CMD_PUT_COMMIT 14
|
||||||
#define CMD_REMOVE_DB 15
|
#define CMD_REMOVE_DB 15
|
||||||
|
#define CMD_TRUNCATE 16
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -21,6 +21,7 @@
|
||||||
-define(CMD_CURSOR_CLOSE, 13).
|
-define(CMD_CURSOR_CLOSE, 13).
|
||||||
-define(CMD_PUT_COMMIT, 14).
|
-define(CMD_PUT_COMMIT, 14).
|
||||||
-define(CMD_REMOVE_DB, 15).
|
-define(CMD_REMOVE_DB, 15).
|
||||||
|
-define(CMD_TRUNCATE, 16).
|
||||||
|
|
||||||
-define(DB_TYPE_BTREE, 1).
|
-define(DB_TYPE_BTREE, 1).
|
||||||
-define(DB_TYPE_HASH, 2).
|
-define(DB_TYPE_HASH, 2).
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
get/2, get/3,
|
get/2, get/3,
|
||||||
get_r/2, get_r/3,
|
get_r/2, get_r/3,
|
||||||
update/3, update/4,
|
update/3, update/4,
|
||||||
|
truncate/1,
|
||||||
delete_database/1,
|
delete_database/1,
|
||||||
cursor_open/1, cursor_next/0, cursor_prev/0, cursor_current/0, cursor_close/0]).
|
cursor_open/1, cursor_next/0, cursor_prev/0, cursor_current/0, cursor_close/0]).
|
||||||
|
|
||||||
|
@ -209,6 +210,20 @@ update(Db, Key, Fun, Args) ->
|
||||||
end,
|
end,
|
||||||
transaction(F).
|
transaction(F).
|
||||||
|
|
||||||
|
truncate(Db) ->
|
||||||
|
Cmd = <<Db:32/native>>,
|
||||||
|
<<Result:32/native-signed>> = erlang:port_control(get_port(), ?CMD_TRUNCATE, Cmd),
|
||||||
|
case decode_rc(Result) of
|
||||||
|
ok ->
|
||||||
|
receive
|
||||||
|
ok -> ok;
|
||||||
|
{error, Reason} -> {error, {truncate, decode_rc(Reason)}}
|
||||||
|
end;
|
||||||
|
|
||||||
|
Error ->
|
||||||
|
{error, {truncate, decode_rc(Error)}}
|
||||||
|
end.
|
||||||
|
|
||||||
cursor_open(Db) ->
|
cursor_open(Db) ->
|
||||||
Cmd = <<Db:32/native, 0:32/native>>,
|
Cmd = <<Db:32/native, 0:32/native>>,
|
||||||
<<Rc:32/native-signed>> = erlang:port_control(get_port(), ?CMD_CURSOR_OPEN, Cmd),
|
<<Rc:32/native-signed>> = erlang:port_control(get_port(), ?CMD_CURSOR_OPEN, Cmd),
|
||||||
|
|
|
@ -28,7 +28,8 @@ all() ->
|
||||||
put_commit_should_end_txn,
|
put_commit_should_end_txn,
|
||||||
data_dir_should_be_priv_dir,
|
data_dir_should_be_priv_dir,
|
||||||
delete_should_remove_file,
|
delete_should_remove_file,
|
||||||
delete_should_fail_if_db_inuse].
|
delete_should_fail_if_db_inuse,
|
||||||
|
truncate_should_empty_database].
|
||||||
|
|
||||||
|
|
||||||
dbconfig(Config) ->
|
dbconfig(Config) ->
|
||||||
|
@ -212,3 +213,9 @@ delete_should_fail_if_db_inuse(Config) ->
|
||||||
true = filelib:is_file(Fname),
|
true = filelib:is_file(Fname),
|
||||||
{error, _} = bdberl:delete_database(Fname),
|
{error, _} = bdberl:delete_database(Fname),
|
||||||
true = filelib:is_file(Fname).
|
true = filelib:is_file(Fname).
|
||||||
|
|
||||||
|
truncate_should_empty_database(Config) ->
|
||||||
|
Db = ?config(db, Config),
|
||||||
|
ok = bdberl:put(Db, mykey, avalue),
|
||||||
|
ok = bdberl:truncate(Db),
|
||||||
|
not_found = bdberl:get(Db, mykey).
|
||||||
|
|
Loading…
Reference in a new issue