Add support for cursor_get() which is used to position the cursor as
well as to get items based on its location.
This commit is contained in:
parent
a29ccf2f22
commit
27438453bb
|
@ -59,33 +59,33 @@ static void bdberl_drv_stop(ErlDrvData handle);
|
||||||
|
|
||||||
static void bdberl_drv_finish();
|
static void bdberl_drv_finish();
|
||||||
|
|
||||||
static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
|
static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
|
||||||
char* inbuf, int inbuf_sz,
|
char* inbuf, int inbuf_sz,
|
||||||
char** outbuf, int outbuf_sz);
|
char** outbuf, int outbuf_sz);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Driver Entry
|
* Driver Entry
|
||||||
*/
|
*/
|
||||||
ErlDrvEntry bdberl_drv_entry =
|
ErlDrvEntry bdberl_drv_entry =
|
||||||
{
|
{
|
||||||
NULL, /* F_PTR init, N/A */
|
NULL, /* F_PTR init, N/A */
|
||||||
bdberl_drv_start, /* L_PTR start, called when port is opened */
|
bdberl_drv_start, /* L_PTR start, called when port is opened */
|
||||||
bdberl_drv_stop, /* F_PTR stop, called when port is closed */
|
bdberl_drv_stop, /* F_PTR stop, called when port is closed */
|
||||||
NULL, /* F_PTR output, called when erlang has sent */
|
NULL, /* F_PTR output, called when erlang has sent */
|
||||||
NULL, /* F_PTR ready_input, called when input descriptor ready */
|
NULL, /* F_PTR ready_input, called when input descriptor ready */
|
||||||
NULL, /* F_PTR ready_output, called when output descriptor ready */
|
NULL, /* F_PTR ready_output, called when output descriptor ready */
|
||||||
"bdberl_drv", /* driver_name */
|
"bdberl_drv", /* driver_name */
|
||||||
bdberl_drv_finish, /* F_PTR finish, called when unloaded */
|
bdberl_drv_finish, /* F_PTR finish, called when unloaded */
|
||||||
NULL, /* handle */
|
NULL, /* handle */
|
||||||
bdberl_drv_control, /* F_PTR control, port_command callback */
|
bdberl_drv_control, /* F_PTR control, port_command callback */
|
||||||
NULL, /* F_PTR timeout, reserved */
|
NULL, /* F_PTR timeout, reserved */
|
||||||
NULL, /* F_PTR outputv, reserved */
|
NULL, /* F_PTR outputv, reserved */
|
||||||
NULL, /* F_PTR ready_async */
|
NULL, /* F_PTR ready_async */
|
||||||
NULL, /* F_PTR flush */
|
NULL, /* F_PTR flush */
|
||||||
NULL, /* F_PTR call */
|
NULL, /* F_PTR call */
|
||||||
NULL, /* F_PTR event */
|
NULL, /* F_PTR event */
|
||||||
ERL_DRV_EXTENDED_MARKER,
|
ERL_DRV_EXTENDED_MARKER,
|
||||||
ERL_DRV_EXTENDED_MAJOR_VERSION,
|
ERL_DRV_EXTENDED_MAJOR_VERSION,
|
||||||
ERL_DRV_EXTENDED_MINOR_VERSION,
|
ERL_DRV_EXTENDED_MINOR_VERSION,
|
||||||
ERL_DRV_FLAG_USE_PORT_LOCKING,
|
ERL_DRV_FLAG_USE_PORT_LOCKING,
|
||||||
NULL, /* Reserved */
|
NULL, /* Reserved */
|
||||||
|
@ -110,7 +110,10 @@ static void do_async_put(void* arg);
|
||||||
static void do_async_get(void* arg);
|
static void do_async_get(void* arg);
|
||||||
static void do_async_del(void* arg);
|
static void do_async_del(void* arg);
|
||||||
static void do_async_txnop(void* arg);
|
static void do_async_txnop(void* arg);
|
||||||
|
static void do_async_cursor_put(void* arg);
|
||||||
static void do_async_cursor_get(void* arg);
|
static void do_async_cursor_get(void* arg);
|
||||||
|
static void do_async_cursor_del(void* arg);
|
||||||
|
static void do_async_cursor_cnp(void* arg);
|
||||||
static void do_async_truncate(void* arg);
|
static void do_async_truncate(void* arg);
|
||||||
static void do_sync_data_dirs_info(PortData *p);
|
static void do_sync_data_dirs_info(PortData *p);
|
||||||
static void do_sync_driver_info(PortData *d);
|
static void do_sync_driver_info(PortData *d);
|
||||||
|
@ -153,10 +156,10 @@ static int G_DB_ENV_ERROR = 0;
|
||||||
* G_DATABASES is a global array of Database structs. Used to track currently opened DB*
|
* G_DATABASES is a global array of Database structs. Used to track currently opened DB*
|
||||||
* handles and ensure that they get cleaned up when all ports which were using them exit or
|
* handles and ensure that they get cleaned up when all ports which were using them exit or
|
||||||
* explicitly close them.
|
* explicitly close them.
|
||||||
*
|
*
|
||||||
* This array is allocated when the driver is first initialized and does not grow/shrink
|
* This array is allocated when the driver is first initialized and does not grow/shrink
|
||||||
* dynamically. G_DATABASES_SIZE contains the size of the array. G_DATABASES_NAMES is a hash of
|
* dynamically. G_DATABASES_SIZE contains the size of the array. G_DATABASES_NAMES is a hash of
|
||||||
* filenames to array index for an opened Database.
|
* filenames to array index for an opened Database.
|
||||||
*
|
*
|
||||||
* All access to G_DATABASES and G_DATABASES_NAMES must be protected by the read/write lock
|
* All access to G_DATABASES and G_DATABASES_NAMES must be protected by the read/write lock
|
||||||
* G_DATABASES_MUTEX.
|
* G_DATABASES_MUTEX.
|
||||||
|
@ -196,7 +199,7 @@ static unsigned int G_CHECKPOINT_INTERVAL = 60; /* Seconds between check
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Pipe to used to wake up the various monitors. Instead of just sleeping
|
* Pipe to used to wake up the various monitors. Instead of just sleeping
|
||||||
* they wait for an exceptional condition on the read fd of the pipe. When it is time to
|
* they wait for an exceptional condition on the read fd of the pipe. When it is time to
|
||||||
* shutdown, the driver closes the write fd and waits for the threads to be joined.
|
* shutdown, the driver closes the write fd and waits for the threads to be joined.
|
||||||
*/
|
*/
|
||||||
static int G_BDBERL_PIPE[2] = {-1, -1};
|
static int G_BDBERL_PIPE[2] = {-1, -1};
|
||||||
|
@ -204,7 +207,7 @@ static int G_BDBERL_PIPE[2] = {-1, -1};
|
||||||
/**
|
/**
|
||||||
* Lock, port and pid reference for relaying BDB output into the SASL logger. READ lock
|
* Lock, port and pid reference for relaying BDB output into the SASL logger. READ lock
|
||||||
* is required to log data. WRITE lock is used when replacing the pid/port reference. If
|
* is required to log data. WRITE lock is used when replacing the pid/port reference. If
|
||||||
* no pid/port is available, no callback is registered with BDB.
|
* no pid/port is available, no callback is registered with BDB.
|
||||||
*/
|
*/
|
||||||
static ErlDrvRWLock* G_LOG_RWLOCK = 0;
|
static ErlDrvRWLock* G_LOG_RWLOCK = 0;
|
||||||
static ErlDrvTermData G_LOG_PID;
|
static ErlDrvTermData G_LOG_PID;
|
||||||
|
@ -254,7 +257,7 @@ static void bdberl_dbgcmdrc(PortData *d, int rc);
|
||||||
DBG("threadid %p port %p: unlocking G_DATABASES\r\n", erl_drv_thread_self(), P); \
|
DBG("threadid %p port %p: unlocking G_DATABASES\r\n", erl_drv_thread_self(), P); \
|
||||||
erl_drv_mutex_unlock(G_DATABASES_MUTEX); \
|
erl_drv_mutex_unlock(G_DATABASES_MUTEX); \
|
||||||
DBG("threadid %p port %p: unlocked G_DATABASES\r\n", erl_drv_thread_self(), P); \
|
DBG("threadid %p port %p: unlocked G_DATABASES\r\n", erl_drv_thread_self(), P); \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
|
|
||||||
#define READ_LOCK(L) erl_drv_rwlock_rlock(L)
|
#define READ_LOCK(L) erl_drv_rwlock_rlock(L)
|
||||||
|
@ -263,11 +266,11 @@ static void bdberl_dbgcmdrc(PortData *d, int rc);
|
||||||
#define WRITE_UNLOCK(L) erl_drv_rwlock_rwunlock(L)
|
#define WRITE_UNLOCK(L) erl_drv_rwlock_rwunlock(L)
|
||||||
|
|
||||||
|
|
||||||
DRIVER_INIT(bdberl_drv)
|
DRIVER_INIT(bdberl_drv)
|
||||||
{
|
{
|
||||||
DBG("DRIVER INIT\r\n");
|
DBG("DRIVER INIT\r\n");
|
||||||
// Setup flags we'll use to init the environment
|
// Setup flags we'll use to init the environment
|
||||||
int flags =
|
int flags =
|
||||||
DB_INIT_LOCK | /* Enable support for locking */
|
DB_INIT_LOCK | /* Enable support for locking */
|
||||||
DB_INIT_TXN | /* Enable support for transactions */
|
DB_INIT_TXN | /* Enable support for transactions */
|
||||||
DB_INIT_MPOOL | /* Enable support for memory pools */
|
DB_INIT_MPOOL | /* Enable support for memory pools */
|
||||||
|
@ -285,10 +288,10 @@ DRIVER_INIT(bdberl_drv)
|
||||||
flags |= DB_SYSTEM_MEM;
|
flags |= DB_SYSTEM_MEM;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialize global environment -- use environment variable DB_HOME to
|
// Initialize global environment -- use environment variable DB_HOME to
|
||||||
// specify where the working directory is
|
// specify where the working directory is
|
||||||
DBG("db_env_create(%p, 0)", &G_DB_ENV);
|
DBG("db_env_create(%p, 0)", &G_DB_ENV);
|
||||||
G_DB_ENV_ERROR = db_env_create(&G_DB_ENV, 0);
|
G_DB_ENV_ERROR = db_env_create(&G_DB_ENV, 0);
|
||||||
DBG(" = %d\r\n", G_DB_ENV_ERROR);
|
DBG(" = %d\r\n", G_DB_ENV_ERROR);
|
||||||
if (G_DB_ENV_ERROR != 0)
|
if (G_DB_ENV_ERROR != 0)
|
||||||
{
|
{
|
||||||
|
@ -301,7 +304,7 @@ DRIVER_INIT(bdberl_drv)
|
||||||
DBG(" = %d\r\n", G_DB_ENV_ERROR);
|
DBG(" = %d\r\n", G_DB_ENV_ERROR);
|
||||||
if (G_DB_ENV_ERROR != 0)
|
if (G_DB_ENV_ERROR != 0)
|
||||||
{
|
{
|
||||||
// Something bad happened while initializing BDB; in this situation we
|
// Something bad happened while initializing BDB; in this situation we
|
||||||
// cleanup and set the environment to zero. Attempts to open ports will
|
// cleanup and set the environment to zero. Attempts to open ports will
|
||||||
// fail and the user will have to sort out how to resolve the issue.
|
// fail and the user will have to sort out how to resolve the issue.
|
||||||
DBG("G_DB_ENV->close(%p, 0);\r\n", &G_DB_ENV);
|
DBG("G_DB_ENV->close(%p, 0);\r\n", &G_DB_ENV);
|
||||||
|
@ -354,10 +357,10 @@ DRIVER_INIT(bdberl_drv)
|
||||||
G_DATABASES = (Database*) driver_alloc(sizeof(Database) * G_DATABASES_SIZE);
|
G_DATABASES = (Database*) driver_alloc(sizeof(Database) * G_DATABASES_SIZE);
|
||||||
memset(G_DATABASES, '\0', sizeof(Database) * G_DATABASES_SIZE);
|
memset(G_DATABASES, '\0', sizeof(Database) * G_DATABASES_SIZE);
|
||||||
G_DATABASES_MUTEX = erl_drv_mutex_create("bdberl_drv: G_DATABASES_MUTEX");
|
G_DATABASES_MUTEX = erl_drv_mutex_create("bdberl_drv: G_DATABASES_MUTEX");
|
||||||
G_DATABASES_NAMES = hive_hash_new(G_DATABASES_SIZE);
|
G_DATABASES_NAMES = hive_hash_new(G_DATABASES_SIZE);
|
||||||
|
|
||||||
// Startup deadlock check thread
|
// Startup deadlock check thread
|
||||||
erl_drv_thread_create("bdberl_drv_deadlock_checker", &G_DEADLOCK_THREAD,
|
erl_drv_thread_create("bdberl_drv_deadlock_checker", &G_DEADLOCK_THREAD,
|
||||||
&deadlock_check, 0, 0);
|
&deadlock_check, 0, 0);
|
||||||
|
|
||||||
// Use the BDBERL_CHECKPOINT_TIME environment value to determine the
|
// Use the BDBERL_CHECKPOINT_TIME environment value to determine the
|
||||||
|
@ -390,7 +393,7 @@ DRIVER_INIT(bdberl_drv)
|
||||||
|
|
||||||
static ErlDrvData bdberl_drv_start(ErlDrvPort port, char* buffer)
|
static ErlDrvData bdberl_drv_start(ErlDrvPort port, char* buffer)
|
||||||
{
|
{
|
||||||
DBG("threadid %p port %p: BDB DRIVER STARTING\r\n",
|
DBG("threadid %p port %p: BDB DRIVER STARTING\r\n",
|
||||||
erl_drv_thread_self(), port);
|
erl_drv_thread_self(), port);
|
||||||
|
|
||||||
// Make sure we have a functional environment -- if we don't,
|
// Make sure we have a functional environment -- if we don't,
|
||||||
|
@ -399,7 +402,7 @@ static ErlDrvData bdberl_drv_start(ErlDrvPort port, char* buffer)
|
||||||
{
|
{
|
||||||
return ERL_DRV_ERROR_BADARG;
|
return ERL_DRV_ERROR_BADARG;
|
||||||
}
|
}
|
||||||
|
|
||||||
PortData* d = (PortData*)driver_alloc(sizeof(PortData));
|
PortData* d = (PortData*)driver_alloc(sizeof(PortData));
|
||||||
memset(d, '\0', sizeof(PortData));
|
memset(d, '\0', sizeof(PortData));
|
||||||
|
|
||||||
|
@ -450,7 +453,7 @@ static void bdberl_drv_stop(ErlDrvData handle)
|
||||||
// If there was no async job, drop the lock -- not needed
|
// If there was no async job, drop the lock -- not needed
|
||||||
erl_drv_mutex_unlock(d->port_lock);
|
erl_drv_mutex_unlock(d->port_lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cleanup the port lock
|
// Cleanup the port lock
|
||||||
erl_drv_mutex_destroy(d->port_lock);
|
erl_drv_mutex_destroy(d->port_lock);
|
||||||
|
|
||||||
|
@ -463,7 +466,7 @@ static void bdberl_drv_stop(ErlDrvData handle)
|
||||||
d->cursor->close(d->cursor);
|
d->cursor->close(d->cursor);
|
||||||
}
|
}
|
||||||
|
|
||||||
// If a txn is currently active, terminate it.
|
// If a txn is currently active, terminate it.
|
||||||
abort_txn(d);
|
abort_txn(d);
|
||||||
|
|
||||||
// Close all the databases we previously opened
|
// Close all the databases we previously opened
|
||||||
|
@ -477,7 +480,7 @@ static void bdberl_drv_stop(ErlDrvData handle)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If this port was registered as the endpoint for logging, go ahead and
|
// If this port was registered as the endpoint for logging, go ahead and
|
||||||
// remove it. Note that we don't need to lock to check this since we only
|
// remove it. Note that we don't need to lock to check this since we only
|
||||||
// unregister if it's already initialized to this port.
|
// unregister if it's already initialized to this port.
|
||||||
if (G_LOG_PORT == d->port)
|
if (G_LOG_PORT == d->port)
|
||||||
|
@ -489,7 +492,7 @@ static void bdberl_drv_stop(ErlDrvData handle)
|
||||||
// Remove the references
|
// Remove the references
|
||||||
G_LOG_PORT = 0;
|
G_LOG_PORT = 0;
|
||||||
G_LOG_PID = 0;
|
G_LOG_PID = 0;
|
||||||
|
|
||||||
// Unregister with BDB -- MUST DO THIS WITH WRITE LOCK HELD!
|
// Unregister with BDB -- MUST DO THIS WITH WRITE LOCK HELD!
|
||||||
G_DB_ENV->set_msgcall(G_DB_ENV, 0);
|
G_DB_ENV->set_msgcall(G_DB_ENV, 0);
|
||||||
G_DB_ENV->set_errcall(G_DB_ENV, 0);
|
G_DB_ENV->set_errcall(G_DB_ENV, 0);
|
||||||
|
@ -498,7 +501,7 @@ static void bdberl_drv_stop(ErlDrvData handle)
|
||||||
}
|
}
|
||||||
|
|
||||||
DBG("Stopped port: %p\r\n", d->port);
|
DBG("Stopped port: %p\r\n", d->port);
|
||||||
|
|
||||||
// Release the port instance data
|
// Release the port instance data
|
||||||
driver_free(d->work_buffer);
|
driver_free(d->work_buffer);
|
||||||
driver_free(handle);
|
driver_free(handle);
|
||||||
|
@ -577,7 +580,7 @@ static void bdberl_drv_finish()
|
||||||
hive_hash_destroy(G_DATABASES_NAMES);
|
hive_hash_destroy(G_DATABASES_NAMES);
|
||||||
G_DATABASES_NAMES = NULL;
|
G_DATABASES_NAMES = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Release the logging rwlock
|
// Release the logging rwlock
|
||||||
if (G_LOG_RWLOCK != NULL)
|
if (G_LOG_RWLOCK != NULL)
|
||||||
{
|
{
|
||||||
|
@ -588,12 +591,12 @@ static void bdberl_drv_finish()
|
||||||
DBG("BDB DRIVER FINISHED\r\n");
|
DBG("BDB DRIVER FINISHED\r\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
|
static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
|
||||||
char* inbuf, int inbuf_sz,
|
char* inbuf, int inbuf_sz,
|
||||||
char** outbuf, int outbuf_sz)
|
char** outbuf, int outbuf_sz)
|
||||||
{
|
{
|
||||||
PortData* d = (PortData*)handle;
|
PortData* d = (PortData*)handle;
|
||||||
switch(cmd)
|
switch(cmd)
|
||||||
{
|
{
|
||||||
case CMD_OPEN_DB:
|
case CMD_OPEN_DB:
|
||||||
{
|
{
|
||||||
|
@ -707,27 +710,27 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
|
||||||
d->async_op = cmd;
|
d->async_op = cmd;
|
||||||
d->async_dbref = dbref;
|
d->async_dbref = dbref;
|
||||||
TPoolJobFunc fn;
|
TPoolJobFunc fn;
|
||||||
switch(cmd) {
|
switch(cmd) {
|
||||||
case CMD_PUT: case CMD_PUT_COMMIT:
|
case CMD_PUT: case CMD_PUT_COMMIT:
|
||||||
{
|
{
|
||||||
fn = &do_async_put;
|
fn = &do_async_put;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case CMD_DEL:
|
case CMD_DEL:
|
||||||
{
|
{
|
||||||
fn = &do_async_del;
|
fn = &do_async_del;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case CMD_GET:
|
case CMD_GET:
|
||||||
{
|
{
|
||||||
fn = &do_async_get;
|
fn = &do_async_get;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
assert(cmd);
|
assert(cmd);
|
||||||
}
|
}
|
||||||
bdberl_general_tpool_run(fn, d, 0, &d->async_job);
|
bdberl_general_tpool_run(fn, d, 0, &d->async_job);
|
||||||
|
|
||||||
// Let caller know that the operation is in progress
|
// Let caller know that the operation is in progress
|
||||||
// Outbuf is: <<0:32>>
|
// Outbuf is: <<0:32>>
|
||||||
RETURN_INT(0, outbuf);
|
RETURN_INT(0, outbuf);
|
||||||
|
@ -738,7 +741,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
|
||||||
bdberl_send_rc(d->port, d->port_owner, ERROR_INVALID_DBREF);
|
bdberl_send_rc(d->port, d->port_owner, ERROR_INVALID_DBREF);
|
||||||
RETURN_INT(0, outbuf);
|
RETURN_INT(0, outbuf);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case CMD_GETINFO:
|
case CMD_GETINFO:
|
||||||
{
|
{
|
||||||
// Inbuf is: << Target:32, Values/binary >>
|
// Inbuf is: << Target:32, Values/binary >>
|
||||||
|
@ -753,7 +756,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
|
||||||
}
|
}
|
||||||
case CMD_CURSOR_OPEN:
|
case CMD_CURSOR_OPEN:
|
||||||
{
|
{
|
||||||
FAIL_IF_ASYNC_PENDING(d, outbuf);
|
FAIL_IF_ASYNC_PENDING(d, outbuf);
|
||||||
FAIL_IF_CURSOR_OPEN(d, outbuf);
|
FAIL_IF_CURSOR_OPEN(d, outbuf);
|
||||||
|
|
||||||
// Inbuf is << DbRef:32, Flags:32 >>
|
// Inbuf is << DbRef:32, Flags:32 >>
|
||||||
|
@ -775,16 +778,66 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
|
||||||
RETURN_INT(0, outbuf);
|
RETURN_INT(0, outbuf);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
case CMD_CURSOR_GET:
|
||||||
|
case CMD_CURSOR_PUT:
|
||||||
|
case CMD_CURSOR_DEL:
|
||||||
|
{
|
||||||
|
FAIL_IF_ASYNC_PENDING(d, outbuf);
|
||||||
|
FAIL_IF_NO_CURSOR(d, outbuf);
|
||||||
|
|
||||||
|
// Inbuf is <<Flags:32/native, KeyLen:32/native, KeyBin/bytes>>,
|
||||||
|
|
||||||
|
// If the working buffer is large enough, copy the data to put/get into it. Otherwise, realloc
|
||||||
|
// until it is large enough
|
||||||
|
if (d->work_buffer_sz < inbuf_sz)
|
||||||
|
{
|
||||||
|
d->work_buffer = driver_realloc(d->work_buffer, inbuf_sz);
|
||||||
|
d->work_buffer_sz = inbuf_sz;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Copy the payload into place
|
||||||
|
memcpy(d->work_buffer, inbuf, inbuf_sz);
|
||||||
|
d->work_buffer_offset = inbuf_sz;
|
||||||
|
|
||||||
|
// Mark the port as busy and then choose the appropriate async operation
|
||||||
|
d->async_op = cmd;
|
||||||
|
TPoolJobFunc fn;
|
||||||
|
switch(cmd) {
|
||||||
|
case CMD_CURSOR_PUT:
|
||||||
|
{
|
||||||
|
fn = &do_async_cursor_put;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case CMD_CURSOR_DEL:
|
||||||
|
{
|
||||||
|
fn = &do_async_cursor_del;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case CMD_CURSOR_GET:
|
||||||
|
{
|
||||||
|
fn = &do_async_cursor_get;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
assert(cmd);
|
||||||
|
}
|
||||||
|
// Now schedule the operation to run
|
||||||
|
bdberl_general_tpool_run(fn, d, 0, &d->async_job);
|
||||||
|
|
||||||
|
// Let caller know operation is in progress
|
||||||
|
// Outbuf is: <<0:32>>
|
||||||
|
RETURN_INT(0, outbuf);
|
||||||
|
}
|
||||||
case CMD_CURSOR_CURR:
|
case CMD_CURSOR_CURR:
|
||||||
case CMD_CURSOR_NEXT:
|
case CMD_CURSOR_NEXT:
|
||||||
case CMD_CURSOR_PREV:
|
case CMD_CURSOR_PREV:
|
||||||
{
|
{
|
||||||
FAIL_IF_ASYNC_PENDING(d, outbuf);
|
FAIL_IF_ASYNC_PENDING(d, outbuf);
|
||||||
FAIL_IF_NO_CURSOR(d, outbuf);
|
FAIL_IF_NO_CURSOR(d, outbuf);
|
||||||
|
|
||||||
// Schedule the operation
|
// Schedule the operation
|
||||||
d->async_op = cmd;
|
d->async_op = cmd;
|
||||||
bdberl_general_tpool_run(&do_async_cursor_get, d, 0, &d->async_job);
|
bdberl_general_tpool_run(&do_async_cursor_cnp, d, 0, &d->async_job);
|
||||||
|
|
||||||
// Let caller know operation is in progress
|
// Let caller know operation is in progress
|
||||||
RETURN_INT(0, outbuf);
|
RETURN_INT(0, outbuf);
|
||||||
|
@ -804,7 +857,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
|
||||||
|
|
||||||
// Regardless of what happens, clear out the cursor pointer
|
// Regardless of what happens, clear out the cursor pointer
|
||||||
d->cursor = 0;
|
d->cursor = 0;
|
||||||
|
|
||||||
// Send result code
|
// Send result code
|
||||||
bdberl_send_rc(d->port, d->port_owner, rc);
|
bdberl_send_rc(d->port, d->port_owner, rc);
|
||||||
RETURN_INT(0, outbuf);
|
RETURN_INT(0, outbuf);
|
||||||
|
@ -851,13 +904,13 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
|
||||||
case CMD_REGISTER_LOGGER:
|
case CMD_REGISTER_LOGGER:
|
||||||
{
|
{
|
||||||
// If this port is not the current logger, make it so. Only one logger can be registered
|
// If this port is not the current logger, make it so. Only one logger can be registered
|
||||||
// at a time.
|
// at a time.
|
||||||
if (G_LOG_PORT != d->port)
|
if (G_LOG_PORT != d->port)
|
||||||
{
|
{
|
||||||
// Grab the write lock and update the global vars; also make sure to update BDB callbacks
|
// Grab the write lock and update the global vars; also make sure to update BDB callbacks
|
||||||
// within the write lock to avoid race conditions.
|
// within the write lock to avoid race conditions.
|
||||||
WRITE_LOCK(G_LOG_RWLOCK);
|
WRITE_LOCK(G_LOG_RWLOCK);
|
||||||
|
|
||||||
G_LOG_PORT = d->port;
|
G_LOG_PORT = d->port;
|
||||||
G_LOG_PID = driver_connected(d->port);
|
G_LOG_PID = driver_connected(d->port);
|
||||||
|
|
||||||
|
@ -891,7 +944,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
|
||||||
FAIL_IF_ASYNC_PENDING(d, outbuf);
|
FAIL_IF_ASYNC_PENDING(d, outbuf);
|
||||||
|
|
||||||
do_sync_data_dirs_info(d);
|
do_sync_data_dirs_info(d);
|
||||||
|
|
||||||
// Let caller know that the operation is in progress
|
// Let caller know that the operation is in progress
|
||||||
// Outbuf is: <<0:32>>
|
// Outbuf is: <<0:32>>
|
||||||
RETURN_INT(0, outbuf);
|
RETURN_INT(0, outbuf);
|
||||||
|
@ -927,7 +980,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd,
|
||||||
FAIL_IF_ASYNC_PENDING(d, outbuf);
|
FAIL_IF_ASYNC_PENDING(d, outbuf);
|
||||||
|
|
||||||
do_sync_driver_info(d);
|
do_sync_driver_info(d);
|
||||||
|
|
||||||
// Let caller know that the operation is in progress
|
// Let caller know that the operation is in progress
|
||||||
// Outbuf is: <<0:32>>
|
// Outbuf is: <<0:32>>
|
||||||
RETURN_INT(0, outbuf);
|
RETURN_INT(0, outbuf);
|
||||||
|
@ -1062,7 +1115,7 @@ static int open_database(const char* name, DBTYPE type, unsigned int flags, Port
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
// Database hasn't been created while we were waiting on write lock, so
|
// Database hasn't been created while we were waiting on write lock, so
|
||||||
// create/open it
|
// create/open it
|
||||||
|
|
||||||
|
@ -1071,7 +1124,7 @@ static int open_database(const char* name, DBTYPE type, unsigned int flags, Port
|
||||||
int dbref = alloc_dbref();
|
int dbref = alloc_dbref();
|
||||||
if (dbref < 0)
|
if (dbref < 0)
|
||||||
{
|
{
|
||||||
// No more slots available
|
// No more slots available
|
||||||
UNLOCK_DATABASES(data->port);
|
UNLOCK_DATABASES(data->port);
|
||||||
return ERROR_MAX_DBS;
|
return ERROR_MAX_DBS;
|
||||||
}
|
}
|
||||||
|
@ -1083,7 +1136,7 @@ static int open_database(const char* name, DBTYPE type, unsigned int flags, Port
|
||||||
DBGCMD(data, "rc = %s (%d) db = %p", rc == 0 ? "ok" : bdberl_rc_to_atom_str(rc), rc, db);
|
DBGCMD(data, "rc = %s (%d) db = %p", rc == 0 ? "ok" : bdberl_rc_to_atom_str(rc), rc, db);
|
||||||
if (rc != 0)
|
if (rc != 0)
|
||||||
{
|
{
|
||||||
// Failure while creating the database handle -- drop our lock and return
|
// Failure while creating the database handle -- drop our lock and return
|
||||||
// the code
|
// the code
|
||||||
UNLOCK_DATABASES(data->port);
|
UNLOCK_DATABASES(data->port);
|
||||||
return rc;
|
return rc;
|
||||||
|
@ -1124,7 +1177,7 @@ static int open_database(const char* name, DBTYPE type, unsigned int flags, Port
|
||||||
// Drop the write lock
|
// Drop the write lock
|
||||||
UNLOCK_DATABASES(data->port);
|
UNLOCK_DATABASES(data->port);
|
||||||
|
|
||||||
// Add the dbref to the port list
|
// Add the dbref to the port list
|
||||||
add_dbref(data, dbref);
|
add_dbref(data, dbref);
|
||||||
*dbref_res = dbref;
|
*dbref_res = dbref;
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -1133,7 +1186,7 @@ static int open_database(const char* name, DBTYPE type, unsigned int flags, Port
|
||||||
|
|
||||||
static int close_database(int dbref, unsigned flags, PortData* data)
|
static int close_database(int dbref, unsigned flags, PortData* data)
|
||||||
{
|
{
|
||||||
// Remove this database from our list
|
// Remove this database from our list
|
||||||
if (del_dbref(data, dbref))
|
if (del_dbref(data, dbref))
|
||||||
{
|
{
|
||||||
// Something was actually deleted from our list -- now we need to disassociate the
|
// Something was actually deleted from our list -- now we need to disassociate the
|
||||||
|
@ -1243,7 +1296,7 @@ static int delete_database(const char* name, PortData *data)
|
||||||
*/
|
*/
|
||||||
static void get_info(int target, void* values, BinHelper* bh)
|
static void get_info(int target, void* values, BinHelper* bh)
|
||||||
{
|
{
|
||||||
switch(target)
|
switch(target)
|
||||||
{
|
{
|
||||||
case SYSP_CACHESIZE_GET:
|
case SYSP_CACHESIZE_GET:
|
||||||
{
|
{
|
||||||
|
@ -1397,7 +1450,7 @@ static int send_dir_info(ErlDrvPort port, ErlDrvTermData pid, const char *path)
|
||||||
// send fsid as a binary as will only be used
|
// send fsid as a binary as will only be used
|
||||||
// to compare which physical filesystem is on
|
// to compare which physical filesystem is on
|
||||||
// and the definintion varies between platforms.
|
// and the definintion varies between platforms.
|
||||||
ERL_DRV_BUF2BINARY, (ErlDrvTermData) &svfs.f_fsid,
|
ERL_DRV_BUF2BINARY, (ErlDrvTermData) &svfs.f_fsid,
|
||||||
sizeof(svfs.f_fsid),
|
sizeof(svfs.f_fsid),
|
||||||
ERL_DRV_UINT, mbyte_avail,
|
ERL_DRV_UINT, mbyte_avail,
|
||||||
ERL_DRV_TUPLE, 4};
|
ERL_DRV_TUPLE, 4};
|
||||||
|
@ -1514,7 +1567,7 @@ static void do_async_put(void* arg)
|
||||||
DB* db = bdberl_lookup_dbref(dbref);
|
DB* db = bdberl_lookup_dbref(dbref);
|
||||||
unsigned int flags = UNPACK_INT(d->work_buffer, 4);
|
unsigned int flags = UNPACK_INT(d->work_buffer, 4);
|
||||||
|
|
||||||
// Setup DBTs
|
// Setup DBTs
|
||||||
DBT key;
|
DBT key;
|
||||||
DBT value;
|
DBT value;
|
||||||
memset(&key, '\0', sizeof(DBT));
|
memset(&key, '\0', sizeof(DBT));
|
||||||
|
@ -1530,7 +1583,7 @@ static void do_async_put(void* arg)
|
||||||
assert(value.size >= 4);
|
assert(value.size >= 4);
|
||||||
uint32_t calc_crc32 = bdberl_crc32(value.data+4, value.size-4);
|
uint32_t calc_crc32 = bdberl_crc32(value.data+4, value.size-4);
|
||||||
uint32_t buf_crc32 = *(uint32_t*) value.data;
|
uint32_t buf_crc32 = *(uint32_t*) value.data;
|
||||||
|
|
||||||
int rc;
|
int rc;
|
||||||
if (calc_crc32 != buf_crc32)
|
if (calc_crc32 != buf_crc32)
|
||||||
{
|
{
|
||||||
|
@ -1569,36 +1622,36 @@ static void do_async_get(void* arg)
|
||||||
{
|
{
|
||||||
// Payload is: << DbRef:32, Flags:32, KeyLen:32, Key:KeyLen >>
|
// Payload is: << DbRef:32, Flags:32, KeyLen:32, Key:KeyLen >>
|
||||||
PortData* d = (PortData*)arg;
|
PortData* d = (PortData*)arg;
|
||||||
|
|
||||||
// Get the database object, using the provided ref
|
// Get the database object, using the provided ref
|
||||||
int dbref = UNPACK_INT(d->work_buffer, 0);
|
int dbref = UNPACK_INT(d->work_buffer, 0);
|
||||||
DB* db = bdberl_lookup_dbref(dbref);
|
DB* db = bdberl_lookup_dbref(dbref);
|
||||||
|
|
||||||
// Extract operation flags
|
// Extract operation flags
|
||||||
unsigned flags = UNPACK_INT(d->work_buffer, 4);
|
unsigned flags = UNPACK_INT(d->work_buffer, 4);
|
||||||
|
|
||||||
// Setup DBTs
|
// Setup DBTs
|
||||||
DBT key;
|
DBT key;
|
||||||
DBT value;
|
DBT value;
|
||||||
memset(&key, '\0', sizeof(DBT));
|
memset(&key, '\0', sizeof(DBT));
|
||||||
memset(&value, '\0', sizeof(DBT));
|
memset(&value, '\0', sizeof(DBT));
|
||||||
|
|
||||||
// Parse payload into DBT
|
// Parse payload into DBT
|
||||||
key.size = UNPACK_INT(d->work_buffer, 8);
|
key.size = UNPACK_INT(d->work_buffer, 8);
|
||||||
key.data = UNPACK_BLOB(d->work_buffer, 12);
|
key.data = UNPACK_BLOB(d->work_buffer, 12);
|
||||||
|
|
||||||
// Allocate a buffer for the output value
|
// Allocate a buffer for the output value
|
||||||
value.flags = DB_DBT_MALLOC;
|
value.flags = DB_DBT_MALLOC;
|
||||||
|
|
||||||
int rc = db->get(db, d->txn, &key, &value, flags);
|
int rc = db->get(db, d->txn, &key, &value, flags);
|
||||||
|
|
||||||
// Check CRC - first 4 bytes are CRC of rest of bytes
|
// Check CRC - first 4 bytes are CRC of rest of bytes
|
||||||
if (rc == 0)
|
if (rc == 0)
|
||||||
{
|
{
|
||||||
assert(value.size >= 4);
|
assert(value.size >= 4);
|
||||||
uint32_t calc_crc32 = bdberl_crc32(value.data+4, value.size-4);
|
uint32_t calc_crc32 = bdberl_crc32(value.data+4, value.size-4);
|
||||||
uint32_t buf_crc32 = *(uint32_t*) value.data;
|
uint32_t buf_crc32 = *(uint32_t*) value.data;
|
||||||
|
|
||||||
if (calc_crc32 != buf_crc32)
|
if (calc_crc32 != buf_crc32)
|
||||||
{
|
{
|
||||||
DBGCMD(d, "CRC-32 error on get data - buffer %08X calculated %08X.",
|
DBGCMD(d, "CRC-32 error on get data - buffer %08X calculated %08X.",
|
||||||
|
@ -1606,16 +1659,16 @@ static void do_async_get(void* arg)
|
||||||
rc = ERROR_INVALID_VALUE;
|
rc = ERROR_INVALID_VALUE;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cleanup transaction as necessary
|
// Cleanup transaction as necessary
|
||||||
if (rc && rc != DB_NOTFOUND && d->txn)
|
if (rc && rc != DB_NOTFOUND && d->txn)
|
||||||
{
|
{
|
||||||
d->txn->abort(d->txn);
|
d->txn->abort(d->txn);
|
||||||
d->txn = 0;
|
d->txn = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
async_cleanup_and_send_kv(d, rc, &key, &value);
|
async_cleanup_and_send_kv(d, rc, &key, &value);
|
||||||
|
|
||||||
// Finally, clean up value buffer (driver_send_term made a copy)
|
// Finally, clean up value buffer (driver_send_term made a copy)
|
||||||
free(value.data);
|
free(value.data);
|
||||||
}
|
}
|
||||||
|
@ -1624,31 +1677,31 @@ static void do_async_del(void* arg)
|
||||||
{
|
{
|
||||||
// Payload is: << DbRef:32, Flags:32, KeyLen:32, Key:KeyLen >>
|
// Payload is: << DbRef:32, Flags:32, KeyLen:32, Key:KeyLen >>
|
||||||
PortData* d = (PortData*)arg;
|
PortData* d = (PortData*)arg;
|
||||||
|
|
||||||
// Get the database object, using the provided ref
|
// Get the database object, using the provided ref
|
||||||
int dbref = UNPACK_INT(d->work_buffer, 0);
|
int dbref = UNPACK_INT(d->work_buffer, 0);
|
||||||
DB* db = bdberl_lookup_dbref(dbref);
|
DB* db = bdberl_lookup_dbref(dbref);
|
||||||
|
|
||||||
// Extract operation flags
|
// Extract operation flags
|
||||||
unsigned flags = UNPACK_INT(d->work_buffer, 4);
|
unsigned flags = UNPACK_INT(d->work_buffer, 4);
|
||||||
|
|
||||||
// Setup DBTs
|
// Setup DBTs
|
||||||
DBT key;
|
DBT key;
|
||||||
memset(&key, '\0', sizeof(DBT));
|
memset(&key, '\0', sizeof(DBT));
|
||||||
|
|
||||||
// Parse payload into DBT
|
// Parse payload into DBT
|
||||||
key.size = UNPACK_INT(d->work_buffer, 8);
|
key.size = UNPACK_INT(d->work_buffer, 8);
|
||||||
key.data = UNPACK_BLOB(d->work_buffer, 12);
|
key.data = UNPACK_BLOB(d->work_buffer, 12);
|
||||||
|
|
||||||
int rc = db->del(db, d->txn, &key, flags);
|
int rc = db->del(db, d->txn, &key, flags);
|
||||||
|
|
||||||
// Cleanup transaction as necessary
|
// Cleanup transaction as necessary
|
||||||
if (rc && rc != DB_NOTFOUND && d->txn)
|
if (rc && rc != DB_NOTFOUND && d->txn)
|
||||||
{
|
{
|
||||||
d->txn->abort(d->txn);
|
d->txn->abort(d->txn);
|
||||||
d->txn = 0;
|
d->txn = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
bdberl_async_cleanup_and_send_rc(d, rc);
|
bdberl_async_cleanup_and_send_rc(d, rc);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1673,7 +1726,7 @@ static void do_async_txnop(void* arg)
|
||||||
DBGCMDRC(d, rc);
|
DBGCMDRC(d, rc);
|
||||||
d->txn = 0;
|
d->txn = 0;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
assert(d->async_op == CMD_TXN_ABORT);
|
assert(d->async_op == CMD_TXN_ABORT);
|
||||||
abort_txn(d);
|
abort_txn(d);
|
||||||
|
@ -1683,13 +1736,90 @@ static void do_async_txnop(void* arg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void do_async_cursor_put(void* arg)
|
||||||
|
{
|
||||||
|
PortData* d = (PortData*)arg;
|
||||||
|
assert(d->cursor != NULL);
|
||||||
|
DBGCMD(d, "cursor_put/2 not yet implemented..."); /* TODO: implement this. */
|
||||||
|
bdberl_async_cleanup_and_send_rc(d, ERROR_DB_ACTIVE);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static void do_async_cursor_get(void* arg)
|
static void do_async_cursor_get(void* arg)
|
||||||
|
{
|
||||||
|
// Payload is: << Flags:32, KeyLen:32, Key:KeyLen >>
|
||||||
|
PortData* d = (PortData*)arg;
|
||||||
|
assert(d->cursor != NULL);
|
||||||
|
|
||||||
|
// Extract operation flags
|
||||||
|
unsigned flags = UNPACK_INT(d->work_buffer, 0);
|
||||||
|
|
||||||
|
// Setup DBTs
|
||||||
|
DBT key;
|
||||||
|
DBT value;
|
||||||
|
memset(&key, '\0', sizeof(DBT));
|
||||||
|
memset(&value, '\0', sizeof(DBT));
|
||||||
|
|
||||||
|
// Parse payload into DBT
|
||||||
|
key.size = UNPACK_INT(d->work_buffer, 4);
|
||||||
|
key.data = UNPACK_BLOB(d->work_buffer, 8);
|
||||||
|
|
||||||
|
// Allocate a buffer for the output value
|
||||||
|
value.flags = DB_DBT_MALLOC;
|
||||||
|
|
||||||
|
// Execute the operation
|
||||||
|
DBGCMD(d, "d->cursor->get(%p, %p, %p, %08X);", d->cursor, &key, &value, flags);
|
||||||
|
int rc = d->cursor->get(d->cursor, &key, &value, flags);
|
||||||
|
DBGCMDRC(d, rc);
|
||||||
|
|
||||||
|
// Check CRC - first 4 bytes are CRC of rest of bytes
|
||||||
|
if (rc == 0)
|
||||||
|
{
|
||||||
|
assert(value.size >= 4);
|
||||||
|
uint32_t calc_crc32 = bdberl_crc32(value.data+4, value.size-4);
|
||||||
|
uint32_t buf_crc32 = *(uint32_t*) value.data;
|
||||||
|
|
||||||
|
if (calc_crc32 != buf_crc32)
|
||||||
|
{
|
||||||
|
DBGCMD(d, "CRC-32 error on get data - buffer %08X calculated %08X.",
|
||||||
|
buf_crc32, calc_crc32);
|
||||||
|
rc = ERROR_INVALID_VALUE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cleanup cursor as necessary
|
||||||
|
if (rc && rc != DB_NOTFOUND && d->txn)
|
||||||
|
{
|
||||||
|
DBG("cursor flags=%d rc=%d\n", flags, rc);
|
||||||
|
|
||||||
|
d->cursor->close(d->cursor);
|
||||||
|
d->cursor = 0;
|
||||||
|
abort_txn(d);
|
||||||
|
}
|
||||||
|
|
||||||
|
async_cleanup_and_send_kv(d, rc, &key, &value);
|
||||||
|
|
||||||
|
// Finally, clean up value buffer (driver_send_term made a copy)
|
||||||
|
free(value.data);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void do_async_cursor_del(void* arg)
|
||||||
|
{
|
||||||
|
PortData* d = (PortData*)arg;
|
||||||
|
assert(d->cursor != NULL);
|
||||||
|
DBGCMD(d, "cursor_del/2 not yet implemented..."); /* TODO: implement this. */
|
||||||
|
bdberl_async_cleanup_and_send_rc(d, ERROR_DB_ACTIVE);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void do_async_cursor_cnp(void* arg)
|
||||||
{
|
{
|
||||||
// Payload is: << DbRef:32, Flags:32, KeyLen:32, Key:KeyLen >>
|
// Payload is: << DbRef:32, Flags:32, KeyLen:32, Key:KeyLen >>
|
||||||
PortData* d = (PortData*)arg;
|
PortData* d = (PortData*)arg;
|
||||||
assert(d->cursor != NULL);
|
assert(d->cursor != NULL);
|
||||||
|
|
||||||
// Setup DBTs
|
// Setup DBTs
|
||||||
DBT key;
|
DBT key;
|
||||||
DBT value;
|
DBT value;
|
||||||
memset(&key, '\0', sizeof(DBT));
|
memset(&key, '\0', sizeof(DBT));
|
||||||
|
@ -1699,12 +1829,12 @@ static void do_async_cursor_get(void* arg)
|
||||||
int flags = 0;
|
int flags = 0;
|
||||||
switch (d->async_op)
|
switch (d->async_op)
|
||||||
{
|
{
|
||||||
case CMD_CURSOR_NEXT:
|
case CMD_CURSOR_NEXT:
|
||||||
flags = DB_NEXT; break;
|
flags = DB_NEXT; break;
|
||||||
case CMD_CURSOR_PREV:
|
case CMD_CURSOR_PREV:
|
||||||
flags = DB_PREV; break;
|
flags = DB_PREV; break;
|
||||||
default:
|
default:
|
||||||
flags = DB_CURRENT;
|
flags = DB_CURRENT;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Execute the operation
|
// Execute the operation
|
||||||
|
@ -1763,7 +1893,7 @@ static void do_async_truncate(void* arg)
|
||||||
|
|
||||||
DBGCMD(d, "db->truncate(%p, %p, %p, 0) dbref=%d", db, d->txn, &count, i);
|
DBGCMD(d, "db->truncate(%p, %p, %p, 0) dbref=%d", db, d->txn, &count, i);
|
||||||
rc = db->truncate(db, d->txn, &count, 0);
|
rc = db->truncate(db, d->txn, &count, 0);
|
||||||
DBGCMD(d, "rc = %s (%d) count=%d",
|
DBGCMD(d, "rc = %s (%d) count=%d",
|
||||||
rc == 0 ? "ok" : bdberl_rc_to_atom_str(rc), rc, count);
|
rc == 0 ? "ok" : bdberl_rc_to_atom_str(rc), rc, count);
|
||||||
|
|
||||||
if (rc != 0)
|
if (rc != 0)
|
||||||
|
@ -1779,7 +1909,7 @@ static void do_async_truncate(void* arg)
|
||||||
u_int32_t count = 0;
|
u_int32_t count = 0;
|
||||||
DBGCMD(d, "db->truncate(%p, %p, %p, 0) dbref=%d", db, d->txn, &count, d->async_dbref);
|
DBGCMD(d, "db->truncate(%p, %p, %p, 0) dbref=%d", db, d->txn, &count, d->async_dbref);
|
||||||
rc = db->truncate(db, d->txn, &count, 0);
|
rc = db->truncate(db, d->txn, &count, 0);
|
||||||
DBGCMD(d, "rc = %s (%d) count=%d", rc == 0 ? "ok" : bdberl_rc_to_atom_str(rc),
|
DBGCMD(d, "rc = %s (%d) count=%d", rc == 0 ? "ok" : bdberl_rc_to_atom_str(rc),
|
||||||
rc, count);
|
rc, count);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1838,7 +1968,7 @@ static void do_sync_data_dirs_info(PortData *d)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (rc == 0)
|
if (rc == 0)
|
||||||
{
|
{
|
||||||
rc = send_dir_info(d->port, d->port_owner, data_dir);
|
rc = send_dir_info(d->port, d->port_owner, data_dir);
|
||||||
|
@ -1905,7 +2035,7 @@ static void do_sync_driver_info(PortData *d)
|
||||||
ERL_DRV_UINT, txn_active,
|
ERL_DRV_UINT, txn_active,
|
||||||
ERL_DRV_TUPLE, 2,
|
ERL_DRV_TUPLE, 2,
|
||||||
// End of list
|
// End of list
|
||||||
ERL_DRV_NIL,
|
ERL_DRV_NIL,
|
||||||
ERL_DRV_LIST, 11+1,
|
ERL_DRV_LIST, 11+1,
|
||||||
ERL_DRV_TUPLE, 2
|
ERL_DRV_TUPLE, 2
|
||||||
};
|
};
|
||||||
|
@ -1938,7 +2068,7 @@ static int add_portref(int dbref, ErlDrvPort port)
|
||||||
}
|
}
|
||||||
|
|
||||||
last = current;
|
last = current;
|
||||||
current = current->next;
|
current = current->next;
|
||||||
} while (current != 0);
|
} while (current != 0);
|
||||||
|
|
||||||
// At the end of the list -- allocate a new entry for this port
|
// At the end of the list -- allocate a new entry for this port
|
||||||
|
@ -2011,7 +2141,7 @@ static int add_dbref(PortData* data, int dbref)
|
||||||
current = current->next;
|
current = current->next;
|
||||||
} while (current != 0);
|
} while (current != 0);
|
||||||
|
|
||||||
// At the end of the list -- allocate a new entry
|
// At the end of the list -- allocate a new entry
|
||||||
current = zalloc(sizeof(DbRefList));
|
current = zalloc(sizeof(DbRefList));
|
||||||
current->dbref = dbref;
|
current->dbref = dbref;
|
||||||
last->next = current;
|
last->next = current;
|
||||||
|
@ -2028,7 +2158,7 @@ static int add_dbref(PortData* data, int dbref)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Delete a db reference from a port's DbRefList. Returns 1 if deleted; 0 if not
|
* Delete a db reference from a port's DbRefList. Returns 1 if deleted; 0 if not
|
||||||
*/
|
*/
|
||||||
static int del_dbref(PortData* data, int dbref)
|
static int del_dbref(PortData* data, int dbref)
|
||||||
{
|
{
|
||||||
|
@ -2068,7 +2198,7 @@ static int del_dbref(PortData* data, int dbref)
|
||||||
/**
|
/**
|
||||||
* Validate that a provided dbref is currently opened by a port. Return 1 if true; 0 if false.
|
* Validate that a provided dbref is currently opened by a port. Return 1 if true; 0 if false.
|
||||||
*/
|
*/
|
||||||
int bdberl_has_dbref(PortData* data, int dbref)
|
int bdberl_has_dbref(PortData* data, int dbref)
|
||||||
{
|
{
|
||||||
DbRefList* current = data->dbrefs;
|
DbRefList* current = data->dbrefs;
|
||||||
while (current)
|
while (current)
|
||||||
|
@ -2085,7 +2215,7 @@ int bdberl_has_dbref(PortData* data, int dbref)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Allocate a Database structure; find first available slot in G_DATABASES and return the
|
* Allocate a Database structure; find first available slot in G_DATABASES and return the
|
||||||
* index of it. If no free slots are available, return -1
|
* index of it. If no free slots are available, return -1
|
||||||
*/
|
*/
|
||||||
static int alloc_dbref()
|
static int alloc_dbref()
|
||||||
{
|
{
|
||||||
|
|
|
@ -53,6 +53,9 @@
|
||||||
#define CMD_CURSOR_NEXT 12
|
#define CMD_CURSOR_NEXT 12
|
||||||
#define CMD_CURSOR_PREV 13
|
#define CMD_CURSOR_PREV 13
|
||||||
#define CMD_CURSOR_CLOSE 14
|
#define CMD_CURSOR_CLOSE 14
|
||||||
|
#define CMD_CURSOR_GET 35 /* TODO: renumber these next 3 and match them with bdberl.hrl */
|
||||||
|
#define CMD_CURSOR_PUT 36
|
||||||
|
#define CMD_CURSOR_DEL 37
|
||||||
#define CMD_PUT_COMMIT 15
|
#define CMD_PUT_COMMIT 15
|
||||||
#define CMD_REMOVE_DB 16
|
#define CMD_REMOVE_DB 16
|
||||||
#define CMD_TRUNCATE 17
|
#define CMD_TRUNCATE 17
|
||||||
|
@ -140,7 +143,7 @@ typedef struct
|
||||||
/**
|
/**
|
||||||
* Structure for holding port instance data
|
* Structure for holding port instance data
|
||||||
*/
|
*/
|
||||||
typedef struct
|
typedef struct
|
||||||
{
|
{
|
||||||
ErlDrvPort port;
|
ErlDrvPort port;
|
||||||
|
|
||||||
|
|
|
@ -46,6 +46,9 @@
|
||||||
-define(CMD_CURSOR_NEXT, 12).
|
-define(CMD_CURSOR_NEXT, 12).
|
||||||
-define(CMD_CURSOR_PREV, 13).
|
-define(CMD_CURSOR_PREV, 13).
|
||||||
-define(CMD_CURSOR_CLOSE, 14).
|
-define(CMD_CURSOR_CLOSE, 14).
|
||||||
|
-define(CMD_CURSOR_GET, 35). %% TODO: renumber these 3 and match them to bdberl_drv.h
|
||||||
|
-define(CMD_CURSOR_PUT, 36).
|
||||||
|
-define(CMD_CURSOR_DEL, 37).
|
||||||
-define(CMD_PUT_COMMIT, 15).
|
-define(CMD_PUT_COMMIT, 15).
|
||||||
-define(CMD_REMOVE_DB, 16).
|
-define(CMD_REMOVE_DB, 16).
|
||||||
-define(CMD_TRUNCATE, 17).
|
-define(CMD_TRUNCATE, 17).
|
||||||
|
|
164
src/bdberl.erl
164
src/bdberl.erl
|
@ -63,10 +63,11 @@
|
||||||
get/2, get/3,
|
get/2, get/3,
|
||||||
get_r/2, get_r/3,
|
get_r/2, get_r/3,
|
||||||
update/3, update/4, update/5, update/6, update/7,
|
update/3, update/4, update/5, update/6, update/7,
|
||||||
del/2,
|
del/2,
|
||||||
truncate/0, truncate/1,
|
truncate/0, truncate/1,
|
||||||
delete_database/1,
|
delete_database/1,
|
||||||
cursor_open/1, cursor_next/0, cursor_prev/0, cursor_current/0, cursor_close/0,
|
cursor_open/1, cursor_next/0, cursor_prev/0, cursor_current/0, cursor_close/0,
|
||||||
|
cursor_get/0, cursor_get/1, cursor_get/2, %TODO: cursor_del/2, cursor_del/3, cursor_put/2, cursor_put/3,
|
||||||
driver_info/0,
|
driver_info/0,
|
||||||
register_logger/0,
|
register_logger/0,
|
||||||
stop/0]).
|
stop/0]).
|
||||||
|
@ -1248,9 +1249,7 @@ cursor_prev() ->
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Retrieves key/data pairs from the database.
|
%% Retrieves the key/data pair to which the cursor refers.
|
||||||
%%
|
|
||||||
%% Returns the key/data pair to which the cursor refers.
|
|
||||||
%%
|
%%
|
||||||
%% Modifications to the database during a sequential scan will be
|
%% Modifications to the database during a sequential scan will be
|
||||||
%% reflected in the scan; that is, records inserted behind a cursor will
|
%% reflected in the scan; that is, records inserted behind a cursor will
|
||||||
|
@ -1270,6 +1269,132 @@ cursor_current() ->
|
||||||
do_cursor_move(?CMD_CURSOR_CURR).
|
do_cursor_move(?CMD_CURSOR_CURR).
|
||||||
|
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% @doc
|
||||||
|
%% Retrieves the key/data pair to which the cursor refers.
|
||||||
|
%%
|
||||||
|
%% Modifications to the database during a sequential scan will be
|
||||||
|
%% reflected in the scan; that is, records inserted behind a cursor will
|
||||||
|
%% not be returned while records inserted in front of a cursor will be
|
||||||
|
%% returned.
|
||||||
|
%%
|
||||||
|
%% If this function fails for any reason, the state of the cursor will
|
||||||
|
%% be unchanged.
|
||||||
|
%%
|
||||||
|
%% @spec cursor_get() -> not_found | {ok, Key, Value} | {error, Error}
|
||||||
|
%%
|
||||||
|
%% @end
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
-spec cursor_get() -> {ok, db_key(), db_value()} | not_found | db_error().
|
||||||
|
|
||||||
|
cursor_get() ->
|
||||||
|
cursor_current().
|
||||||
|
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% @doc
|
||||||
|
%% Positions the cursor at the key and retrieves that key/data pair.
|
||||||
|
%%
|
||||||
|
%% Modifications to the database during a sequential scan will be
|
||||||
|
%% reflected in the scan; that is, records inserted behind a cursor will
|
||||||
|
%% not be returned while records inserted in front of a cursor will be
|
||||||
|
%% returned.
|
||||||
|
%%
|
||||||
|
%% If this function fails for any reason, the state of the cursor will
|
||||||
|
%% be unchanged.
|
||||||
|
%%
|
||||||
|
%% @spec cursor_get(Key) -> not_found | {ok, Key, Value} | {error, Error}
|
||||||
|
%% where
|
||||||
|
%% Key = term()
|
||||||
|
%%
|
||||||
|
%% @equiv cursor_get(Key, [])
|
||||||
|
%% @see cursor_get/2
|
||||||
|
%% @end
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
-spec cursor_get(Key :: db_key()) -> not_found | {ok, db_key(), db_value()} | db_error().
|
||||||
|
|
||||||
|
cursor_get(Key) ->
|
||||||
|
cursor_get(Key, [db_set]).
|
||||||
|
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% @doc
|
||||||
|
%% Positions the cursor at the key and retrieves that key/data pair.
|
||||||
|
%%
|
||||||
|
%% Modifications to the database during a sequential scan will be
|
||||||
|
%% reflected in the scan; that is, records inserted behind a cursor will
|
||||||
|
%% not be returned while records inserted in front of a cursor will be
|
||||||
|
%% returned.
|
||||||
|
%%
|
||||||
|
%% If this function fails for any reason, the state of the cursor will
|
||||||
|
%% be unchanged.
|
||||||
|
%%
|
||||||
|
%% === Options ===
|
||||||
|
%%
|
||||||
|
%% <dl>
|
||||||
|
%% <dt>db_current</dt>
|
||||||
|
%% <dd></dd>
|
||||||
|
%% <dt>db_first</dt>
|
||||||
|
%% <dd></dd>
|
||||||
|
%% <dt>db_get_both</dt>
|
||||||
|
%% <dd></dd>
|
||||||
|
%% <dt>db_get_both_range</dt>
|
||||||
|
%% <dd></dd>
|
||||||
|
%% <dt>db_last</dt>
|
||||||
|
%% <dd></dd>
|
||||||
|
%% <dt>db_next</dt>
|
||||||
|
%% <dd></dd>
|
||||||
|
%% <dt>db_next_dup</dt>
|
||||||
|
%% <dd></dd>
|
||||||
|
%% <dt>db_next_nodup</dt>
|
||||||
|
%% <dd></dd>
|
||||||
|
%% <dt>db_prev</dt>
|
||||||
|
%% <dd></dd>
|
||||||
|
%% <dt>db_prev_dup</dt>
|
||||||
|
%% <dd></dd>
|
||||||
|
%% <dt>db_prev_nodup</dt>
|
||||||
|
%% <dd></dd>
|
||||||
|
%% <dt>db_set</dt>
|
||||||
|
%% <dd></dd>
|
||||||
|
%% <dt>db_set_rance</dt>
|
||||||
|
%% <dd>TODO... finish the doc, add other DB_?? flags</dd>
|
||||||
|
%% </dl>
|
||||||
|
%%
|
||||||
|
%% @spec cursor_get(Key, Opts) -> not_found | {ok, Key, Value} | {error, Error}
|
||||||
|
%% where
|
||||||
|
%% Key = term()
|
||||||
|
%% Opts = [atom()]
|
||||||
|
%%
|
||||||
|
%% @end
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
-spec cursor_get(Key :: db_key(), Opts :: db_flags()) ->
|
||||||
|
not_found | {ok, db_key(), db_value()} | db_error().
|
||||||
|
|
||||||
|
cursor_get(Key, Opts) ->
|
||||||
|
{KeyLen, KeyBin} = to_binary(Key),
|
||||||
|
Flags = process_flags(Opts),
|
||||||
|
Cmd = <<Flags:32/native, KeyLen:32/native, KeyBin/bytes>>,
|
||||||
|
<<Result:32/signed-native>> = erlang:port_control(get_port(), ?CMD_CURSOR_GET, Cmd),
|
||||||
|
case decode_rc(Result) of
|
||||||
|
ok ->
|
||||||
|
receive
|
||||||
|
{ok, _, Bin} ->
|
||||||
|
<<Crc:32/native, Payload/binary>> = Bin,
|
||||||
|
case erlang:crc32(Payload) of
|
||||||
|
Crc ->
|
||||||
|
{ok, binary_to_term(Payload)};
|
||||||
|
CrcOther ->
|
||||||
|
error_logger:warning_msg("Invalid CRC: ~p ~p\n", [Crc, CrcOther]),
|
||||||
|
{error, invalid_crc}
|
||||||
|
end;
|
||||||
|
not_found -> not_found;
|
||||||
|
{error, Reason} -> {error, Reason}
|
||||||
|
end;
|
||||||
|
Error ->
|
||||||
|
{error, Error}
|
||||||
|
end.
|
||||||
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Closes the cursor.
|
%% Closes the cursor.
|
||||||
|
@ -2249,15 +2374,28 @@ flag_value(Flag) ->
|
||||||
stat_lock_objects -> ?DB_STAT_LOCK_OBJECTS;
|
stat_lock_objects -> ?DB_STAT_LOCK_OBJECTS;
|
||||||
stat_lock_params -> ?DB_STAT_LOCK_PARAMS;
|
stat_lock_params -> ?DB_STAT_LOCK_PARAMS;
|
||||||
stat_memp_hash -> ?DB_STAT_MEMP_HASH;
|
stat_memp_hash -> ?DB_STAT_MEMP_HASH;
|
||||||
stat_subsystem -> ?DB_STAT_SUBSYSTEM;
|
stat_subsystem -> ?DB_STAT_SUBSYSTEM;
|
||||||
threaded -> ?DB_THREAD;
|
threaded -> ?DB_THREAD;
|
||||||
truncate -> ?DB_TRUNCATE;
|
truncate -> ?DB_TRUNCATE;
|
||||||
txn_no_sync -> ?DB_TXN_NOSYNC;
|
txn_no_sync -> ?DB_TXN_NOSYNC;
|
||||||
txn_no_wait -> ?DB_TXN_NOWAIT;
|
txn_no_wait -> ?DB_TXN_NOWAIT;
|
||||||
txn_snapshot -> ?DB_TXN_SNAPSHOT;
|
txn_snapshot -> ?DB_TXN_SNAPSHOT;
|
||||||
txn_sync -> ?DB_TXN_SYNC;
|
txn_sync -> ?DB_TXN_SYNC;
|
||||||
txn_wait -> ?DB_TXN_WAIT;
|
txn_wait -> ?DB_TXN_WAIT;
|
||||||
txn_write_nosync -> ?DB_TXN_WRITE_NOSYNC
|
txn_write_nosync -> ?DB_TXN_WRITE_NOSYNC;
|
||||||
|
db_current -> ?DB_CURRENT;
|
||||||
|
db_first -> ?DB_FIRST;
|
||||||
|
db_get_both -> ?DB_GET_BOTH;
|
||||||
|
db_get_both_range -> ?DB_GET_BOTH_RANGE;
|
||||||
|
db_last -> ?DB_LAST;
|
||||||
|
db_next -> ?DB_NEXT;
|
||||||
|
db_next_dup -> ?DB_NEXT_DUP;
|
||||||
|
db_next_nodup -> ?DB_NEXT_NODUP;
|
||||||
|
db_prev -> ?DB_PREV;
|
||||||
|
db_prev_dup -> ?DB_PREV_DUP;
|
||||||
|
db_prev_nodup -> ?DB_PREV_NODUP;
|
||||||
|
db_set -> ?DB_SET;
|
||||||
|
db_set_range -> ?DB_SET_RANGE
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -45,7 +45,7 @@ all() ->
|
||||||
get_should_return_a_value_when_getting_a_valid_record,
|
get_should_return_a_value_when_getting_a_valid_record,
|
||||||
put_should_succeed_with_manual_transaction,
|
put_should_succeed_with_manual_transaction,
|
||||||
put_should_rollback_with_failed_manual_transaction,
|
put_should_rollback_with_failed_manual_transaction,
|
||||||
% del_should_remove_a_value,
|
% del_should_remove_a_value, %TODO: why is this disabled
|
||||||
transaction_should_commit_on_success,
|
transaction_should_commit_on_success,
|
||||||
transaction_should_abort_on_exception,
|
transaction_should_abort_on_exception,
|
||||||
transaction_should_abort_on_user_abort,
|
transaction_should_abort_on_user_abort,
|
||||||
|
@ -53,7 +53,7 @@ all() ->
|
||||||
update_should_save_value_if_successful,
|
update_should_save_value_if_successful,
|
||||||
update_should_accept_args_for_fun,
|
update_should_accept_args_for_fun,
|
||||||
port_should_return_transaction_timeouts,
|
port_should_return_transaction_timeouts,
|
||||||
cursor_should_iterate, cursor_should_fail_if_not_open,
|
cursor_should_iterate, cursor_get_should_pos, cursor_should_fail_if_not_open,
|
||||||
put_commit_should_end_txn,
|
put_commit_should_end_txn,
|
||||||
data_dir_should_be_priv_dir,
|
data_dir_should_be_priv_dir,
|
||||||
delete_should_remove_file,
|
delete_should_remove_file,
|
||||||
|
@ -223,7 +223,7 @@ cursor_should_iterate(Config) ->
|
||||||
{ok, key3, value3} = bdberl:cursor_next(),
|
{ok, key3, value3} = bdberl:cursor_next(),
|
||||||
not_found = bdberl:cursor_next(),
|
not_found = bdberl:cursor_next(),
|
||||||
|
|
||||||
%% Validate that the "current" key is key3
|
%% Validate that the current key is key3
|
||||||
{ok, key3, value3} = bdberl:cursor_current(),
|
{ok, key3, value3} = bdberl:cursor_current(),
|
||||||
|
|
||||||
%% Now move backwards (should jump to key2, since we are "on" key3)
|
%% Now move backwards (should jump to key2, since we are "on" key3)
|
||||||
|
@ -233,10 +233,35 @@ cursor_should_iterate(Config) ->
|
||||||
|
|
||||||
ok = bdberl:cursor_close().
|
ok = bdberl:cursor_close().
|
||||||
|
|
||||||
|
cursor_get_should_pos(Config) ->
|
||||||
|
Db = ?config(db, Config),
|
||||||
|
|
||||||
|
%% Store some sample values in the db
|
||||||
|
ok = bdberl:put(Db, key1, value1),
|
||||||
|
ok = bdberl:put(Db, key2, value2),
|
||||||
|
ok = bdberl:put(Db, key3, value3),
|
||||||
|
ok = bdberl:put(Db, key4, value4),
|
||||||
|
|
||||||
|
%% Validate that the cursor is positioned properly, then
|
||||||
|
%% returns the next value.
|
||||||
|
ok = bdberl:cursor_open(Db),
|
||||||
|
{ok, value2} = bdberl:cursor_get(key2),
|
||||||
|
{ok, key3, value3} = bdberl:cursor_next(),
|
||||||
|
{ok, value2} = bdberl:cursor_get(key2),
|
||||||
|
{ok, key3, value3} = bdberl:cursor_next(),
|
||||||
|
{ok, value1} = bdberl:cursor_get(key1),
|
||||||
|
{ok, key2, value2} = bdberl:cursor_next(),
|
||||||
|
{ok, key3, value3} = bdberl:cursor_next(),
|
||||||
|
{ok, key4, value4} = bdberl:cursor_next(),
|
||||||
|
not_found = bdberl:cursor_next(),
|
||||||
|
|
||||||
|
ok = bdberl:cursor_close().
|
||||||
|
|
||||||
cursor_should_fail_if_not_open(_Config) ->
|
cursor_should_fail_if_not_open(_Config) ->
|
||||||
{error, no_cursor} = bdberl:cursor_next(),
|
{error, no_cursor} = bdberl:cursor_next(),
|
||||||
{error, no_cursor} = bdberl:cursor_prev(),
|
{error, no_cursor} = bdberl:cursor_prev(),
|
||||||
{error, no_cursor} = bdberl:cursor_current(),
|
{error, no_cursor} = bdberl:cursor_current(),
|
||||||
|
{error, no_cursor} = bdberl:cursor_get(),
|
||||||
{error, no_cursor} = bdberl:cursor_close().
|
{error, no_cursor} = bdberl:cursor_close().
|
||||||
|
|
||||||
put_commit_should_end_txn(Config) ->
|
put_commit_should_end_txn(Config) ->
|
||||||
|
|
Loading…
Reference in a new issue