diff --git a/c_src/bdberl_drv.c b/c_src/bdberl_drv.c index 6358492..ae2d003 100644 --- a/c_src/bdberl_drv.c +++ b/c_src/bdberl_drv.c @@ -59,33 +59,33 @@ static void bdberl_drv_stop(ErlDrvData handle); static void bdberl_drv_finish(); -static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, - char* inbuf, int inbuf_sz, +static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, + char* inbuf, int inbuf_sz, char** outbuf, int outbuf_sz); -/** +/** * Driver Entry */ -ErlDrvEntry bdberl_drv_entry = +ErlDrvEntry bdberl_drv_entry = { - NULL, /* F_PTR init, N/A */ - bdberl_drv_start, /* L_PTR start, called when port is opened */ - bdberl_drv_stop, /* F_PTR stop, called when port is closed */ - NULL, /* F_PTR output, called when erlang has sent */ + NULL, /* F_PTR init, N/A */ + bdberl_drv_start, /* L_PTR start, called when port is opened */ + bdberl_drv_stop, /* F_PTR stop, called when port is closed */ + NULL, /* F_PTR output, called when erlang has sent */ NULL, /* F_PTR ready_input, called when input descriptor ready */ - NULL, /* F_PTR ready_output, called when output descriptor ready */ + NULL, /* F_PTR ready_output, called when output descriptor ready */ "bdberl_drv", /* driver_name */ bdberl_drv_finish, /* F_PTR finish, called when unloaded */ - NULL, /* handle */ - bdberl_drv_control, /* F_PTR control, port_command callback */ - NULL, /* F_PTR timeout, reserved */ + NULL, /* handle */ + bdberl_drv_control, /* F_PTR control, port_command callback */ + NULL, /* F_PTR timeout, reserved */ NULL, /* F_PTR outputv, reserved */ NULL, /* F_PTR ready_async */ NULL, /* F_PTR flush */ NULL, /* F_PTR call */ NULL, /* F_PTR event */ - ERL_DRV_EXTENDED_MARKER, - ERL_DRV_EXTENDED_MAJOR_VERSION, + ERL_DRV_EXTENDED_MARKER, + ERL_DRV_EXTENDED_MAJOR_VERSION, ERL_DRV_EXTENDED_MINOR_VERSION, ERL_DRV_FLAG_USE_PORT_LOCKING, NULL, /* Reserved */ @@ -110,7 +110,10 @@ static void do_async_put(void* arg); static void do_async_get(void* arg); static void do_async_del(void* arg); static void do_async_txnop(void* arg); +static void do_async_cursor_put(void* arg); static void do_async_cursor_get(void* arg); +static void do_async_cursor_del(void* arg); +static void do_async_cursor_cnp(void* arg); static void do_async_truncate(void* arg); static void do_sync_data_dirs_info(PortData *p); static void do_sync_driver_info(PortData *d); @@ -153,10 +156,10 @@ static int G_DB_ENV_ERROR = 0; * G_DATABASES is a global array of Database structs. Used to track currently opened DB* * handles and ensure that they get cleaned up when all ports which were using them exit or * explicitly close them. - * + * * This array is allocated when the driver is first initialized and does not grow/shrink * dynamically. G_DATABASES_SIZE contains the size of the array. G_DATABASES_NAMES is a hash of - * filenames to array index for an opened Database. + * filenames to array index for an opened Database. * * All access to G_DATABASES and G_DATABASES_NAMES must be protected by the read/write lock * G_DATABASES_MUTEX. @@ -196,7 +199,7 @@ static unsigned int G_CHECKPOINT_INTERVAL = 60; /* Seconds between check /** * Pipe to used to wake up the various monitors. Instead of just sleeping - * they wait for an exceptional condition on the read fd of the pipe. When it is time to + * they wait for an exceptional condition on the read fd of the pipe. When it is time to * shutdown, the driver closes the write fd and waits for the threads to be joined. */ static int G_BDBERL_PIPE[2] = {-1, -1}; @@ -204,7 +207,7 @@ static int G_BDBERL_PIPE[2] = {-1, -1}; /** * Lock, port and pid reference for relaying BDB output into the SASL logger. READ lock * is required to log data. WRITE lock is used when replacing the pid/port reference. If - * no pid/port is available, no callback is registered with BDB. + * no pid/port is available, no callback is registered with BDB. */ static ErlDrvRWLock* G_LOG_RWLOCK = 0; static ErlDrvTermData G_LOG_PID; @@ -254,7 +257,7 @@ static void bdberl_dbgcmdrc(PortData *d, int rc); DBG("threadid %p port %p: unlocking G_DATABASES\r\n", erl_drv_thread_self(), P); \ erl_drv_mutex_unlock(G_DATABASES_MUTEX); \ DBG("threadid %p port %p: unlocked G_DATABASES\r\n", erl_drv_thread_self(), P); \ - } while (0) + } while (0) #define READ_LOCK(L) erl_drv_rwlock_rlock(L) @@ -263,11 +266,11 @@ static void bdberl_dbgcmdrc(PortData *d, int rc); #define WRITE_UNLOCK(L) erl_drv_rwlock_rwunlock(L) -DRIVER_INIT(bdberl_drv) +DRIVER_INIT(bdberl_drv) { DBG("DRIVER INIT\r\n"); // Setup flags we'll use to init the environment - int flags = + int flags = DB_INIT_LOCK | /* Enable support for locking */ DB_INIT_TXN | /* Enable support for transactions */ DB_INIT_MPOOL | /* Enable support for memory pools */ @@ -285,10 +288,10 @@ DRIVER_INIT(bdberl_drv) flags |= DB_SYSTEM_MEM; } - // Initialize global environment -- use environment variable DB_HOME to + // Initialize global environment -- use environment variable DB_HOME to // specify where the working directory is DBG("db_env_create(%p, 0)", &G_DB_ENV); - G_DB_ENV_ERROR = db_env_create(&G_DB_ENV, 0); + G_DB_ENV_ERROR = db_env_create(&G_DB_ENV, 0); DBG(" = %d\r\n", G_DB_ENV_ERROR); if (G_DB_ENV_ERROR != 0) { @@ -301,7 +304,7 @@ DRIVER_INIT(bdberl_drv) DBG(" = %d\r\n", G_DB_ENV_ERROR); if (G_DB_ENV_ERROR != 0) { - // Something bad happened while initializing BDB; in this situation we + // Something bad happened while initializing BDB; in this situation we // cleanup and set the environment to zero. Attempts to open ports will // fail and the user will have to sort out how to resolve the issue. DBG("G_DB_ENV->close(%p, 0);\r\n", &G_DB_ENV); @@ -354,10 +357,10 @@ DRIVER_INIT(bdberl_drv) G_DATABASES = (Database*) driver_alloc(sizeof(Database) * G_DATABASES_SIZE); memset(G_DATABASES, '\0', sizeof(Database) * G_DATABASES_SIZE); G_DATABASES_MUTEX = erl_drv_mutex_create("bdberl_drv: G_DATABASES_MUTEX"); - G_DATABASES_NAMES = hive_hash_new(G_DATABASES_SIZE); + G_DATABASES_NAMES = hive_hash_new(G_DATABASES_SIZE); // Startup deadlock check thread - erl_drv_thread_create("bdberl_drv_deadlock_checker", &G_DEADLOCK_THREAD, + erl_drv_thread_create("bdberl_drv_deadlock_checker", &G_DEADLOCK_THREAD, &deadlock_check, 0, 0); // Use the BDBERL_CHECKPOINT_TIME environment value to determine the @@ -390,7 +393,7 @@ DRIVER_INIT(bdberl_drv) static ErlDrvData bdberl_drv_start(ErlDrvPort port, char* buffer) { - DBG("threadid %p port %p: BDB DRIVER STARTING\r\n", + DBG("threadid %p port %p: BDB DRIVER STARTING\r\n", erl_drv_thread_self(), port); // Make sure we have a functional environment -- if we don't, @@ -399,7 +402,7 @@ static ErlDrvData bdberl_drv_start(ErlDrvPort port, char* buffer) { return ERL_DRV_ERROR_BADARG; } - + PortData* d = (PortData*)driver_alloc(sizeof(PortData)); memset(d, '\0', sizeof(PortData)); @@ -450,7 +453,7 @@ static void bdberl_drv_stop(ErlDrvData handle) // If there was no async job, drop the lock -- not needed erl_drv_mutex_unlock(d->port_lock); } - + // Cleanup the port lock erl_drv_mutex_destroy(d->port_lock); @@ -463,7 +466,7 @@ static void bdberl_drv_stop(ErlDrvData handle) d->cursor->close(d->cursor); } - // If a txn is currently active, terminate it. + // If a txn is currently active, terminate it. abort_txn(d); // Close all the databases we previously opened @@ -477,7 +480,7 @@ static void bdberl_drv_stop(ErlDrvData handle) } } - // If this port was registered as the endpoint for logging, go ahead and + // If this port was registered as the endpoint for logging, go ahead and // remove it. Note that we don't need to lock to check this since we only // unregister if it's already initialized to this port. if (G_LOG_PORT == d->port) @@ -489,7 +492,7 @@ static void bdberl_drv_stop(ErlDrvData handle) // Remove the references G_LOG_PORT = 0; G_LOG_PID = 0; - + // Unregister with BDB -- MUST DO THIS WITH WRITE LOCK HELD! G_DB_ENV->set_msgcall(G_DB_ENV, 0); G_DB_ENV->set_errcall(G_DB_ENV, 0); @@ -498,7 +501,7 @@ static void bdberl_drv_stop(ErlDrvData handle) } DBG("Stopped port: %p\r\n", d->port); - + // Release the port instance data driver_free(d->work_buffer); driver_free(handle); @@ -577,7 +580,7 @@ static void bdberl_drv_finish() hive_hash_destroy(G_DATABASES_NAMES); G_DATABASES_NAMES = NULL; } - + // Release the logging rwlock if (G_LOG_RWLOCK != NULL) { @@ -588,12 +591,12 @@ static void bdberl_drv_finish() DBG("BDB DRIVER FINISHED\r\n"); } -static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, - char* inbuf, int inbuf_sz, +static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, + char* inbuf, int inbuf_sz, char** outbuf, int outbuf_sz) { PortData* d = (PortData*)handle; - switch(cmd) + switch(cmd) { case CMD_OPEN_DB: { @@ -707,27 +710,27 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, d->async_op = cmd; d->async_dbref = dbref; TPoolJobFunc fn; - switch(cmd) { - case CMD_PUT: case CMD_PUT_COMMIT: - { + switch(cmd) { + case CMD_PUT: case CMD_PUT_COMMIT: + { fn = &do_async_put; - } - break; - case CMD_DEL: - { + } + break; + case CMD_DEL: + { fn = &do_async_del; - } - break; - case CMD_GET: - { + } + break; + case CMD_GET: + { fn = &do_async_get; - } - break; - default: - assert(cmd); - } + } + break; + default: + assert(cmd); + } bdberl_general_tpool_run(fn, d, 0, &d->async_job); - + // Let caller know that the operation is in progress // Outbuf is: <<0:32>> RETURN_INT(0, outbuf); @@ -738,7 +741,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, bdberl_send_rc(d->port, d->port_owner, ERROR_INVALID_DBREF); RETURN_INT(0, outbuf); } - } + } case CMD_GETINFO: { // Inbuf is: << Target:32, Values/binary >> @@ -753,7 +756,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, } case CMD_CURSOR_OPEN: { - FAIL_IF_ASYNC_PENDING(d, outbuf); + FAIL_IF_ASYNC_PENDING(d, outbuf); FAIL_IF_CURSOR_OPEN(d, outbuf); // Inbuf is << DbRef:32, Flags:32 >> @@ -775,16 +778,66 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, RETURN_INT(0, outbuf); } } + case CMD_CURSOR_GET: + case CMD_CURSOR_PUT: + case CMD_CURSOR_DEL: + { + FAIL_IF_ASYNC_PENDING(d, outbuf); + FAIL_IF_NO_CURSOR(d, outbuf); + + // Inbuf is <>, + + // If the working buffer is large enough, copy the data to put/get into it. Otherwise, realloc + // until it is large enough + if (d->work_buffer_sz < inbuf_sz) + { + d->work_buffer = driver_realloc(d->work_buffer, inbuf_sz); + d->work_buffer_sz = inbuf_sz; + } + + // Copy the payload into place + memcpy(d->work_buffer, inbuf, inbuf_sz); + d->work_buffer_offset = inbuf_sz; + + // Mark the port as busy and then choose the appropriate async operation + d->async_op = cmd; + TPoolJobFunc fn; + switch(cmd) { + case CMD_CURSOR_PUT: + { + fn = &do_async_cursor_put; + } + break; + case CMD_CURSOR_DEL: + { + fn = &do_async_cursor_del; + } + break; + case CMD_CURSOR_GET: + { + fn = &do_async_cursor_get; + } + break; + default: + assert(cmd); + } + // Now schedule the operation to run + bdberl_general_tpool_run(fn, d, 0, &d->async_job); + + // Let caller know operation is in progress + // Outbuf is: <<0:32>> + RETURN_INT(0, outbuf); + } case CMD_CURSOR_CURR: case CMD_CURSOR_NEXT: case CMD_CURSOR_PREV: { FAIL_IF_ASYNC_PENDING(d, outbuf); FAIL_IF_NO_CURSOR(d, outbuf); - + // Schedule the operation d->async_op = cmd; - bdberl_general_tpool_run(&do_async_cursor_get, d, 0, &d->async_job); + bdberl_general_tpool_run(&do_async_cursor_cnp, d, 0, &d->async_job); // Let caller know operation is in progress RETURN_INT(0, outbuf); @@ -804,7 +857,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, // Regardless of what happens, clear out the cursor pointer d->cursor = 0; - + // Send result code bdberl_send_rc(d->port, d->port_owner, rc); RETURN_INT(0, outbuf); @@ -851,13 +904,13 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, case CMD_REGISTER_LOGGER: { // If this port is not the current logger, make it so. Only one logger can be registered - // at a time. + // at a time. if (G_LOG_PORT != d->port) { // Grab the write lock and update the global vars; also make sure to update BDB callbacks // within the write lock to avoid race conditions. WRITE_LOCK(G_LOG_RWLOCK); - + G_LOG_PORT = d->port; G_LOG_PID = driver_connected(d->port); @@ -891,7 +944,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, FAIL_IF_ASYNC_PENDING(d, outbuf); do_sync_data_dirs_info(d); - + // Let caller know that the operation is in progress // Outbuf is: <<0:32>> RETURN_INT(0, outbuf); @@ -927,7 +980,7 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, FAIL_IF_ASYNC_PENDING(d, outbuf); do_sync_driver_info(d); - + // Let caller know that the operation is in progress // Outbuf is: <<0:32>> RETURN_INT(0, outbuf); @@ -1062,7 +1115,7 @@ static int open_database(const char* name, DBTYPE type, unsigned int flags, Port } } else - { + { // Database hasn't been created while we were waiting on write lock, so // create/open it @@ -1071,7 +1124,7 @@ static int open_database(const char* name, DBTYPE type, unsigned int flags, Port int dbref = alloc_dbref(); if (dbref < 0) { - // No more slots available + // No more slots available UNLOCK_DATABASES(data->port); return ERROR_MAX_DBS; } @@ -1083,7 +1136,7 @@ static int open_database(const char* name, DBTYPE type, unsigned int flags, Port DBGCMD(data, "rc = %s (%d) db = %p", rc == 0 ? "ok" : bdberl_rc_to_atom_str(rc), rc, db); if (rc != 0) { - // Failure while creating the database handle -- drop our lock and return + // Failure while creating the database handle -- drop our lock and return // the code UNLOCK_DATABASES(data->port); return rc; @@ -1124,7 +1177,7 @@ static int open_database(const char* name, DBTYPE type, unsigned int flags, Port // Drop the write lock UNLOCK_DATABASES(data->port); - // Add the dbref to the port list + // Add the dbref to the port list add_dbref(data, dbref); *dbref_res = dbref; return 0; @@ -1133,7 +1186,7 @@ static int open_database(const char* name, DBTYPE type, unsigned int flags, Port static int close_database(int dbref, unsigned flags, PortData* data) { - // Remove this database from our list + // Remove this database from our list if (del_dbref(data, dbref)) { // Something was actually deleted from our list -- now we need to disassociate the @@ -1243,7 +1296,7 @@ static int delete_database(const char* name, PortData *data) */ static void get_info(int target, void* values, BinHelper* bh) { - switch(target) + switch(target) { case SYSP_CACHESIZE_GET: { @@ -1397,7 +1450,7 @@ static int send_dir_info(ErlDrvPort port, ErlDrvTermData pid, const char *path) // send fsid as a binary as will only be used // to compare which physical filesystem is on // and the definintion varies between platforms. - ERL_DRV_BUF2BINARY, (ErlDrvTermData) &svfs.f_fsid, + ERL_DRV_BUF2BINARY, (ErlDrvTermData) &svfs.f_fsid, sizeof(svfs.f_fsid), ERL_DRV_UINT, mbyte_avail, ERL_DRV_TUPLE, 4}; @@ -1514,7 +1567,7 @@ static void do_async_put(void* arg) DB* db = bdberl_lookup_dbref(dbref); unsigned int flags = UNPACK_INT(d->work_buffer, 4); - // Setup DBTs + // Setup DBTs DBT key; DBT value; memset(&key, '\0', sizeof(DBT)); @@ -1530,7 +1583,7 @@ static void do_async_put(void* arg) assert(value.size >= 4); uint32_t calc_crc32 = bdberl_crc32(value.data+4, value.size-4); uint32_t buf_crc32 = *(uint32_t*) value.data; - + int rc; if (calc_crc32 != buf_crc32) { @@ -1569,36 +1622,36 @@ static void do_async_get(void* arg) { // Payload is: << DbRef:32, Flags:32, KeyLen:32, Key:KeyLen >> PortData* d = (PortData*)arg; - + // Get the database object, using the provided ref int dbref = UNPACK_INT(d->work_buffer, 0); DB* db = bdberl_lookup_dbref(dbref); - + // Extract operation flags unsigned flags = UNPACK_INT(d->work_buffer, 4); - - // Setup DBTs + + // Setup DBTs DBT key; DBT value; memset(&key, '\0', sizeof(DBT)); memset(&value, '\0', sizeof(DBT)); - + // Parse payload into DBT key.size = UNPACK_INT(d->work_buffer, 8); key.data = UNPACK_BLOB(d->work_buffer, 12); // Allocate a buffer for the output value value.flags = DB_DBT_MALLOC; - + int rc = db->get(db, d->txn, &key, &value, flags); - + // Check CRC - first 4 bytes are CRC of rest of bytes if (rc == 0) { assert(value.size >= 4); uint32_t calc_crc32 = bdberl_crc32(value.data+4, value.size-4); uint32_t buf_crc32 = *(uint32_t*) value.data; - + if (calc_crc32 != buf_crc32) { DBGCMD(d, "CRC-32 error on get data - buffer %08X calculated %08X.", @@ -1606,16 +1659,16 @@ static void do_async_get(void* arg) rc = ERROR_INVALID_VALUE; } } - + // Cleanup transaction as necessary if (rc && rc != DB_NOTFOUND && d->txn) { d->txn->abort(d->txn); d->txn = 0; } - + async_cleanup_and_send_kv(d, rc, &key, &value); - + // Finally, clean up value buffer (driver_send_term made a copy) free(value.data); } @@ -1624,31 +1677,31 @@ static void do_async_del(void* arg) { // Payload is: << DbRef:32, Flags:32, KeyLen:32, Key:KeyLen >> PortData* d = (PortData*)arg; - + // Get the database object, using the provided ref int dbref = UNPACK_INT(d->work_buffer, 0); DB* db = bdberl_lookup_dbref(dbref); - + // Extract operation flags unsigned flags = UNPACK_INT(d->work_buffer, 4); - - // Setup DBTs + + // Setup DBTs DBT key; memset(&key, '\0', sizeof(DBT)); - + // Parse payload into DBT key.size = UNPACK_INT(d->work_buffer, 8); key.data = UNPACK_BLOB(d->work_buffer, 12); int rc = db->del(db, d->txn, &key, flags); - + // Cleanup transaction as necessary if (rc && rc != DB_NOTFOUND && d->txn) { d->txn->abort(d->txn); d->txn = 0; } - + bdberl_async_cleanup_and_send_rc(d, rc); } @@ -1673,7 +1726,7 @@ static void do_async_txnop(void* arg) DBGCMDRC(d, rc); d->txn = 0; } - else + else { assert(d->async_op == CMD_TXN_ABORT); abort_txn(d); @@ -1683,13 +1736,90 @@ static void do_async_txnop(void* arg) } +static void do_async_cursor_put(void* arg) +{ + PortData* d = (PortData*)arg; + assert(d->cursor != NULL); + DBGCMD(d, "cursor_put/2 not yet implemented..."); /* TODO: implement this. */ + bdberl_async_cleanup_and_send_rc(d, ERROR_DB_ACTIVE); +} + + static void do_async_cursor_get(void* arg) +{ + // Payload is: << Flags:32, KeyLen:32, Key:KeyLen >> + PortData* d = (PortData*)arg; + assert(d->cursor != NULL); + + // Extract operation flags + unsigned flags = UNPACK_INT(d->work_buffer, 0); + + // Setup DBTs + DBT key; + DBT value; + memset(&key, '\0', sizeof(DBT)); + memset(&value, '\0', sizeof(DBT)); + + // Parse payload into DBT + key.size = UNPACK_INT(d->work_buffer, 4); + key.data = UNPACK_BLOB(d->work_buffer, 8); + + // Allocate a buffer for the output value + value.flags = DB_DBT_MALLOC; + + // Execute the operation + DBGCMD(d, "d->cursor->get(%p, %p, %p, %08X);", d->cursor, &key, &value, flags); + int rc = d->cursor->get(d->cursor, &key, &value, flags); + DBGCMDRC(d, rc); + + // Check CRC - first 4 bytes are CRC of rest of bytes + if (rc == 0) + { + assert(value.size >= 4); + uint32_t calc_crc32 = bdberl_crc32(value.data+4, value.size-4); + uint32_t buf_crc32 = *(uint32_t*) value.data; + + if (calc_crc32 != buf_crc32) + { + DBGCMD(d, "CRC-32 error on get data - buffer %08X calculated %08X.", + buf_crc32, calc_crc32); + rc = ERROR_INVALID_VALUE; + } + } + + // Cleanup cursor as necessary + if (rc && rc != DB_NOTFOUND && d->txn) + { + DBG("cursor flags=%d rc=%d\n", flags, rc); + + d->cursor->close(d->cursor); + d->cursor = 0; + abort_txn(d); + } + + async_cleanup_and_send_kv(d, rc, &key, &value); + + // Finally, clean up value buffer (driver_send_term made a copy) + free(value.data); +} + + +static void do_async_cursor_del(void* arg) +{ + PortData* d = (PortData*)arg; + assert(d->cursor != NULL); + DBGCMD(d, "cursor_del/2 not yet implemented..."); /* TODO: implement this. */ + bdberl_async_cleanup_and_send_rc(d, ERROR_DB_ACTIVE); +} + + +static void do_async_cursor_cnp(void* arg) { // Payload is: << DbRef:32, Flags:32, KeyLen:32, Key:KeyLen >> PortData* d = (PortData*)arg; assert(d->cursor != NULL); - - // Setup DBTs + + // Setup DBTs DBT key; DBT value; memset(&key, '\0', sizeof(DBT)); @@ -1699,12 +1829,12 @@ static void do_async_cursor_get(void* arg) int flags = 0; switch (d->async_op) { - case CMD_CURSOR_NEXT: + case CMD_CURSOR_NEXT: flags = DB_NEXT; break; - case CMD_CURSOR_PREV: + case CMD_CURSOR_PREV: flags = DB_PREV; break; default: - flags = DB_CURRENT; + flags = DB_CURRENT; } // Execute the operation @@ -1763,7 +1893,7 @@ static void do_async_truncate(void* arg) DBGCMD(d, "db->truncate(%p, %p, %p, 0) dbref=%d", db, d->txn, &count, i); rc = db->truncate(db, d->txn, &count, 0); - DBGCMD(d, "rc = %s (%d) count=%d", + DBGCMD(d, "rc = %s (%d) count=%d", rc == 0 ? "ok" : bdberl_rc_to_atom_str(rc), rc, count); if (rc != 0) @@ -1779,7 +1909,7 @@ static void do_async_truncate(void* arg) u_int32_t count = 0; DBGCMD(d, "db->truncate(%p, %p, %p, 0) dbref=%d", db, d->txn, &count, d->async_dbref); rc = db->truncate(db, d->txn, &count, 0); - DBGCMD(d, "rc = %s (%d) count=%d", rc == 0 ? "ok" : bdberl_rc_to_atom_str(rc), + DBGCMD(d, "rc = %s (%d) count=%d", rc == 0 ? "ok" : bdberl_rc_to_atom_str(rc), rc, count); } @@ -1838,7 +1968,7 @@ static void do_sync_data_dirs_info(PortData *d) } } } - + if (rc == 0) { rc = send_dir_info(d->port, d->port_owner, data_dir); @@ -1905,7 +2035,7 @@ static void do_sync_driver_info(PortData *d) ERL_DRV_UINT, txn_active, ERL_DRV_TUPLE, 2, // End of list - ERL_DRV_NIL, + ERL_DRV_NIL, ERL_DRV_LIST, 11+1, ERL_DRV_TUPLE, 2 }; @@ -1938,7 +2068,7 @@ static int add_portref(int dbref, ErlDrvPort port) } last = current; - current = current->next; + current = current->next; } while (current != 0); // At the end of the list -- allocate a new entry for this port @@ -2011,7 +2141,7 @@ static int add_dbref(PortData* data, int dbref) current = current->next; } while (current != 0); - // At the end of the list -- allocate a new entry + // At the end of the list -- allocate a new entry current = zalloc(sizeof(DbRefList)); current->dbref = dbref; last->next = current; @@ -2028,7 +2158,7 @@ static int add_dbref(PortData* data, int dbref) } /** - * Delete a db reference from a port's DbRefList. Returns 1 if deleted; 0 if not + * Delete a db reference from a port's DbRefList. Returns 1 if deleted; 0 if not */ static int del_dbref(PortData* data, int dbref) { @@ -2068,7 +2198,7 @@ static int del_dbref(PortData* data, int dbref) /** * Validate that a provided dbref is currently opened by a port. Return 1 if true; 0 if false. */ -int bdberl_has_dbref(PortData* data, int dbref) +int bdberl_has_dbref(PortData* data, int dbref) { DbRefList* current = data->dbrefs; while (current) @@ -2085,7 +2215,7 @@ int bdberl_has_dbref(PortData* data, int dbref) /** * Allocate a Database structure; find first available slot in G_DATABASES and return the - * index of it. If no free slots are available, return -1 + * index of it. If no free slots are available, return -1 */ static int alloc_dbref() { diff --git a/c_src/bdberl_drv.h b/c_src/bdberl_drv.h index 9718c70..21a6915 100644 --- a/c_src/bdberl_drv.h +++ b/c_src/bdberl_drv.h @@ -53,6 +53,9 @@ #define CMD_CURSOR_NEXT 12 #define CMD_CURSOR_PREV 13 #define CMD_CURSOR_CLOSE 14 +#define CMD_CURSOR_GET 35 /* TODO: renumber these next 3 and match them with bdberl.hrl */ +#define CMD_CURSOR_PUT 36 +#define CMD_CURSOR_DEL 37 #define CMD_PUT_COMMIT 15 #define CMD_REMOVE_DB 16 #define CMD_TRUNCATE 17 @@ -140,7 +143,7 @@ typedef struct /** * Structure for holding port instance data */ -typedef struct +typedef struct { ErlDrvPort port; diff --git a/include/bdberl.hrl b/include/bdberl.hrl index 684fb52..169acfa 100644 --- a/include/bdberl.hrl +++ b/include/bdberl.hrl @@ -46,6 +46,9 @@ -define(CMD_CURSOR_NEXT, 12). -define(CMD_CURSOR_PREV, 13). -define(CMD_CURSOR_CLOSE, 14). +-define(CMD_CURSOR_GET, 35). %% TODO: renumber these 3 and match them to bdberl_drv.h +-define(CMD_CURSOR_PUT, 36). +-define(CMD_CURSOR_DEL, 37). -define(CMD_PUT_COMMIT, 15). -define(CMD_REMOVE_DB, 16). -define(CMD_TRUNCATE, 17). diff --git a/src/bdberl.erl b/src/bdberl.erl index 957e068..28a7696 100644 --- a/src/bdberl.erl +++ b/src/bdberl.erl @@ -63,10 +63,11 @@ get/2, get/3, get_r/2, get_r/3, update/3, update/4, update/5, update/6, update/7, - del/2, + del/2, truncate/0, truncate/1, delete_database/1, cursor_open/1, cursor_next/0, cursor_prev/0, cursor_current/0, cursor_close/0, + cursor_get/0, cursor_get/1, cursor_get/2, %TODO: cursor_del/2, cursor_del/3, cursor_put/2, cursor_put/3, driver_info/0, register_logger/0, stop/0]). @@ -1248,9 +1249,7 @@ cursor_prev() -> %%-------------------------------------------------------------------- %% @doc -%% Retrieves key/data pairs from the database. -%% -%% Returns the key/data pair to which the cursor refers. +%% Retrieves the key/data pair to which the cursor refers. %% %% Modifications to the database during a sequential scan will be %% reflected in the scan; that is, records inserted behind a cursor will @@ -1270,6 +1269,132 @@ cursor_current() -> do_cursor_move(?CMD_CURSOR_CURR). +%%-------------------------------------------------------------------- +%% @doc +%% Retrieves the key/data pair to which the cursor refers. +%% +%% Modifications to the database during a sequential scan will be +%% reflected in the scan; that is, records inserted behind a cursor will +%% not be returned while records inserted in front of a cursor will be +%% returned. +%% +%% If this function fails for any reason, the state of the cursor will +%% be unchanged. +%% +%% @spec cursor_get() -> not_found | {ok, Key, Value} | {error, Error} +%% +%% @end +%%-------------------------------------------------------------------- +-spec cursor_get() -> {ok, db_key(), db_value()} | not_found | db_error(). + +cursor_get() -> + cursor_current(). + + +%%-------------------------------------------------------------------- +%% @doc +%% Positions the cursor at the key and retrieves that key/data pair. +%% +%% Modifications to the database during a sequential scan will be +%% reflected in the scan; that is, records inserted behind a cursor will +%% not be returned while records inserted in front of a cursor will be +%% returned. +%% +%% If this function fails for any reason, the state of the cursor will +%% be unchanged. +%% +%% @spec cursor_get(Key) -> not_found | {ok, Key, Value} | {error, Error} +%% where +%% Key = term() +%% +%% @equiv cursor_get(Key, []) +%% @see cursor_get/2 +%% @end +%%-------------------------------------------------------------------- +-spec cursor_get(Key :: db_key()) -> not_found | {ok, db_key(), db_value()} | db_error(). + +cursor_get(Key) -> + cursor_get(Key, [db_set]). + + +%%-------------------------------------------------------------------- +%% @doc +%% Positions the cursor at the key and retrieves that key/data pair. +%% +%% Modifications to the database during a sequential scan will be +%% reflected in the scan; that is, records inserted behind a cursor will +%% not be returned while records inserted in front of a cursor will be +%% returned. +%% +%% If this function fails for any reason, the state of the cursor will +%% be unchanged. +%% +%% === Options === +%% +%%
+%%
db_current
+%%
+%%
db_first
+%%
+%%
db_get_both
+%%
+%%
db_get_both_range
+%%
+%%
db_last
+%%
+%%
db_next
+%%
+%%
db_next_dup
+%%
+%%
db_next_nodup
+%%
+%%
db_prev
+%%
+%%
db_prev_dup
+%%
+%%
db_prev_nodup
+%%
+%%
db_set
+%%
+%%
db_set_rance
+%%
TODO... finish the doc, add other DB_?? flags
+%%
+%% +%% @spec cursor_get(Key, Opts) -> not_found | {ok, Key, Value} | {error, Error} +%% where +%% Key = term() +%% Opts = [atom()] +%% +%% @end +%%-------------------------------------------------------------------- +-spec cursor_get(Key :: db_key(), Opts :: db_flags()) -> + not_found | {ok, db_key(), db_value()} | db_error(). + +cursor_get(Key, Opts) -> + {KeyLen, KeyBin} = to_binary(Key), + Flags = process_flags(Opts), + Cmd = <>, + <> = erlang:port_control(get_port(), ?CMD_CURSOR_GET, Cmd), + case decode_rc(Result) of + ok -> + receive + {ok, _, Bin} -> + <> = Bin, + case erlang:crc32(Payload) of + Crc -> + {ok, binary_to_term(Payload)}; + CrcOther -> + error_logger:warning_msg("Invalid CRC: ~p ~p\n", [Crc, CrcOther]), + {error, invalid_crc} + end; + not_found -> not_found; + {error, Reason} -> {error, Reason} + end; + Error -> + {error, Error} + end. + + %%-------------------------------------------------------------------- %% @doc %% Closes the cursor. @@ -2249,15 +2374,28 @@ flag_value(Flag) -> stat_lock_objects -> ?DB_STAT_LOCK_OBJECTS; stat_lock_params -> ?DB_STAT_LOCK_PARAMS; stat_memp_hash -> ?DB_STAT_MEMP_HASH; - stat_subsystem -> ?DB_STAT_SUBSYSTEM; - threaded -> ?DB_THREAD; - truncate -> ?DB_TRUNCATE; - txn_no_sync -> ?DB_TXN_NOSYNC; - txn_no_wait -> ?DB_TXN_NOWAIT; - txn_snapshot -> ?DB_TXN_SNAPSHOT; - txn_sync -> ?DB_TXN_SYNC; - txn_wait -> ?DB_TXN_WAIT; - txn_write_nosync -> ?DB_TXN_WRITE_NOSYNC + stat_subsystem -> ?DB_STAT_SUBSYSTEM; + threaded -> ?DB_THREAD; + truncate -> ?DB_TRUNCATE; + txn_no_sync -> ?DB_TXN_NOSYNC; + txn_no_wait -> ?DB_TXN_NOWAIT; + txn_snapshot -> ?DB_TXN_SNAPSHOT; + txn_sync -> ?DB_TXN_SYNC; + txn_wait -> ?DB_TXN_WAIT; + txn_write_nosync -> ?DB_TXN_WRITE_NOSYNC; + db_current -> ?DB_CURRENT; + db_first -> ?DB_FIRST; + db_get_both -> ?DB_GET_BOTH; + db_get_both_range -> ?DB_GET_BOTH_RANGE; + db_last -> ?DB_LAST; + db_next -> ?DB_NEXT; + db_next_dup -> ?DB_NEXT_DUP; + db_next_nodup -> ?DB_NEXT_NODUP; + db_prev -> ?DB_PREV; + db_prev_dup -> ?DB_PREV_DUP; + db_prev_nodup -> ?DB_PREV_NODUP; + db_set -> ?DB_SET; + db_set_range -> ?DB_SET_RANGE end. diff --git a/test/bdberl_SUITE.erl b/test/bdberl_SUITE.erl index 185b7dc..06312ec 100644 --- a/test/bdberl_SUITE.erl +++ b/test/bdberl_SUITE.erl @@ -45,7 +45,7 @@ all() -> get_should_return_a_value_when_getting_a_valid_record, put_should_succeed_with_manual_transaction, put_should_rollback_with_failed_manual_transaction, -% del_should_remove_a_value, +% del_should_remove_a_value, %TODO: why is this disabled transaction_should_commit_on_success, transaction_should_abort_on_exception, transaction_should_abort_on_user_abort, @@ -53,7 +53,7 @@ all() -> update_should_save_value_if_successful, update_should_accept_args_for_fun, port_should_return_transaction_timeouts, - cursor_should_iterate, cursor_should_fail_if_not_open, + cursor_should_iterate, cursor_get_should_pos, cursor_should_fail_if_not_open, put_commit_should_end_txn, data_dir_should_be_priv_dir, delete_should_remove_file, @@ -223,7 +223,7 @@ cursor_should_iterate(Config) -> {ok, key3, value3} = bdberl:cursor_next(), not_found = bdberl:cursor_next(), - %% Validate that the "current" key is key3 + %% Validate that the current key is key3 {ok, key3, value3} = bdberl:cursor_current(), %% Now move backwards (should jump to key2, since we are "on" key3) @@ -233,10 +233,35 @@ cursor_should_iterate(Config) -> ok = bdberl:cursor_close(). +cursor_get_should_pos(Config) -> + Db = ?config(db, Config), + + %% Store some sample values in the db + ok = bdberl:put(Db, key1, value1), + ok = bdberl:put(Db, key2, value2), + ok = bdberl:put(Db, key3, value3), + ok = bdberl:put(Db, key4, value4), + + %% Validate that the cursor is positioned properly, then + %% returns the next value. + ok = bdberl:cursor_open(Db), + {ok, value2} = bdberl:cursor_get(key2), + {ok, key3, value3} = bdberl:cursor_next(), + {ok, value2} = bdberl:cursor_get(key2), + {ok, key3, value3} = bdberl:cursor_next(), + {ok, value1} = bdberl:cursor_get(key1), + {ok, key2, value2} = bdberl:cursor_next(), + {ok, key3, value3} = bdberl:cursor_next(), + {ok, key4, value4} = bdberl:cursor_next(), + not_found = bdberl:cursor_next(), + + ok = bdberl:cursor_close(). + cursor_should_fail_if_not_open(_Config) -> {error, no_cursor} = bdberl:cursor_next(), {error, no_cursor} = bdberl:cursor_prev(), {error, no_cursor} = bdberl:cursor_current(), + {error, no_cursor} = bdberl:cursor_get(), {error, no_cursor} = bdberl:cursor_close(). put_commit_should_end_txn(Config) ->