diff --git a/c_src/lmdb.c b/c_src/lmdb.c index e6056c3..9c5e738 100644 --- a/c_src/lmdb.c +++ b/c_src/lmdb.c @@ -140,6 +140,7 @@ * @{ */ #ifdef _WIN32 +#define MDB_PIDLOCK 0 #define pthread_t DWORD #define pthread_mutex_t HANDLE #define pthread_key_t DWORD @@ -162,7 +163,18 @@ #define GET_PAGESIZE(x) {SYSTEM_INFO si; GetSystemInfo(&si); (x) = si.dwPageSize;} #define close(fd) (CloseHandle(fd) ? 0 : -1) #define munmap(ptr,len) UnmapViewOfFile(ptr) +#ifdef PROCESS_QUERY_LIMITED_INFORMATION +#define MDB_PROCESS_QUERY_LIMITED_INFORMATION PROCESS_QUERY_LIMITED_INFORMATION #else +#define MDB_PROCESS_QUERY_LIMITED_INFORMATION 0x1000 +#endif +#define Z "I" +#else + +#define Z "z" + + /** For MDB_LOCK_FORMAT: True if readers take a pid lock in the lockfile */ +#define MDB_PIDLOCK 1 #ifdef MDB_USE_POSIX_SEM @@ -281,33 +293,30 @@ typedef MDB_ID txnid_t; * @{ */ #ifndef MDB_DEBUG - /** Enable debug output. + /** Enable debug output. Needs variable argument macros (a C99 feature). * Set this to 1 for copious tracing. Set to 2 to add dumps of all IDLs * read from and written to the database (used for free space management). */ #define MDB_DEBUG 0 #endif -#if !(__STDC_VERSION__ >= 199901L || defined(__GNUC__)) -# undef MDB_DEBUG -# define MDB_DEBUG 0 -# define DPRINTF (void) /* Vararg macros may be unsupported */ -#elif MDB_DEBUG +#if MDB_DEBUG static int mdb_debug; static txnid_t mdb_debug_start; - /** Print a debug message with printf formatting. */ -# define DPRINTF(fmt, ...) /**< Requires 2 or more args */ \ - ((void) ((mdb_debug) && \ - fprintf(stderr, "%s:%d " fmt "\n", __func__, __LINE__, __VA_ARGS__))) + /** Print a debug message with printf formatting. + * Requires double parenthesis around 2 or more args. + */ +# define DPRINTF(args) ((void) ((mdb_debug) && DPRINTF0 args)) +# define DPRINTF0(fmt, ...) \ + fprintf(stderr, "%s:%d " fmt "\n", __func__, __LINE__, __VA_ARGS__) #else -# define DPRINTF(fmt, ...) ((void) 0) -# define MDB_DEBUG_SKIP +# define DPRINTF(args) ((void) 0) #endif /** Print a debug string. * The string is printed literally, with no format processing. */ -#define DPUTS(arg) DPRINTF("%s", arg) +#define DPUTS(arg) DPRINTF(("%s", arg)) /** @} */ /** A default memory page size. @@ -383,7 +392,7 @@ static txnid_t mdb_debug_start; */ #define DKEY(x) mdb_dkey(x, kbuf) #else -#define DKBUF typedef int dummy_kbuf /* so we can put ';' after */ +#define DKBUF #define DKEY(x) 0 #endif @@ -515,8 +524,8 @@ typedef struct MDB_txbody { /** Stamp identifying this as an MDB file. It must be set * to #MDB_MAGIC. */ uint32_t mtb_magic; - /** Version number of this lock file. Must be set to #MDB_LOCK_VERSION. */ - uint32_t mtb_version; + /** Format of this lock file. Must be set to #MDB_LOCK_FORMAT. */ + uint32_t mtb_format; #if defined(_WIN32) || defined(MDB_USE_POSIX_SEM) char mtb_rmname[MNAME_LEN]; #else @@ -542,7 +551,7 @@ typedef struct MDB_txninfo { union { MDB_txbody mtb; #define mti_magic mt1.mtb.mtb_magic -#define mti_version mt1.mtb.mtb_version +#define mti_format mt1.mtb.mtb_format #define mti_mutex mt1.mtb.mtb_mutex #define mti_rmname mt1.mtb.mtb_rmname #define mti_txnid mt1.mtb.mtb_txnid @@ -561,6 +570,13 @@ typedef struct MDB_txninfo { } mt2; MDB_reader mti_readers[1]; } MDB_txninfo; + + /** Lockfile format signature: version, features and field layout */ +#define MDB_LOCK_FORMAT \ + ((uint32_t) \ + ((MDB_LOCK_VERSION) \ + /* Flags which describe functionality */ \ + + (((MDB_PIDLOCK) != 0) << 16))) /** @} */ /** Common header for all page types. @@ -740,9 +756,12 @@ typedef struct MDB_node { */ #define LEAF2KEY(p, i, ks) ((char *)(p) + PAGEHDRSZ + ((i)*(ks))) - /** Set the \b node's key into \b key, if requested. */ -#define MDB_GET_KEY(node, key) { if ((key) != NULL) { \ - (key)->mv_size = NODEKSZ(node); (key)->mv_data = NODEKEY(node); } } + /** Set the \b node's key into \b keyptr, if requested. */ +#define MDB_GET_KEY(node, keyptr) { if ((keyptr) != NULL) { \ + (keyptr)->mv_size = NODEKSZ(node); (keyptr)->mv_data = NODEKEY(node); } } + + /** Set the \b node's key into \b key. */ +#define MDB_GET_KEY2(node, key) { key.mv_size = NODEKSZ(node); key.mv_data = NODEKEY(node); } /** Information about a single database in the environment. */ typedef struct MDB_db { @@ -827,13 +846,16 @@ struct MDB_txn { /** The list of pages that became unused during this transaction. */ MDB_IDL mt_free_pgs; - /** The list of dirty pages we temporarily wrote to disk - * because the dirty list was full. + /** The sorted list of dirty pages we temporarily wrote to disk + * because the dirty list was full. page numbers in here are + * shifted left by 1, deleted slots have the LSB set. */ MDB_IDL mt_spill_pgs; union { - MDB_ID2L dirty_list; /**< for write txns: modified pages */ - MDB_reader *reader; /**< this thread's reader table slot or NULL */ + /** For write txns: Modified pages. Sorted when not MDB_WRITEMAP. */ + MDB_ID2L dirty_list; + /** For read txns: This thread/txn's reader table slot, or NULL. */ + MDB_reader *reader; } mt_u; /** Array of records for each DB known in the environment. */ MDB_dbx *mt_dbxs; @@ -911,6 +933,7 @@ struct MDB_cursor { #define C_INITIALIZED 0x01 /**< cursor has been initialized and is valid */ #define C_EOF 0x02 /**< No more data */ #define C_SUB 0x04 /**< Cursor is a sub-cursor */ +#define C_DEL 0x08 /**< last op was a cursor_del */ #define C_SPLITTING 0x20 /**< Cursor is in page_split */ #define C_UNTRACK 0x40 /**< Un-track cursor when closing */ /** @} */ @@ -1104,8 +1127,10 @@ static char *const mdb_errstr[] = { "MDB_CURSOR_FULL: Internal error - cursor stack limit reached", "MDB_PAGE_FULL: Internal error - page has no more space", "MDB_MAP_RESIZED: Database contents grew beyond environment mapsize", - "MDB_INCOMPATIBLE: Database flags changed or would change", + "MDB_INCOMPATIBLE: Operation and DB incompatible, or DB flags changed", "MDB_BAD_RSLOT: Invalid reuse of reader locktable slot", + "MDB_BAD_TXN: Transaction cannot recover - it must be aborted", + "MDB_BAD_VALSIZE: Too big key/data, key is empty, or wrong DUPFIXED size", }; char * @@ -1164,14 +1189,14 @@ mdb_page_list(MDB_page *mp) DKBUF; nkeys = NUMKEYS(mp); - fprintf(stderr, "Page %zu numkeys %d\n", mp->mp_pgno, nkeys); + fprintf(stderr, "Page %"Z"u numkeys %d\n", mp->mp_pgno, nkeys); for (i=0; imn_ksize; key.mv_data = node->mn_data; nsize = NODESIZE + NODEKSZ(node) + sizeof(indx_t); if (IS_BRANCH(mp)) { - fprintf(stderr, "key %d: page %zu, %s\n", i, NODEPGNO(node), + fprintf(stderr, "key %d: page %"Z"u, %s\n", i, NODEPGNO(node), DKEY(&key)); } else { if (F_ISSET(node->mn_flags, F_BIGDATA)) @@ -1202,7 +1227,7 @@ mdb_cursor_chk(MDB_cursor *mc) } #endif -#if MDB_DEBUG > 2 +#if (MDB_DEBUG) > 2 /** Count all the pages in each DB and in the freelist * and make sure it matches the actual number of pages * being used. @@ -1267,7 +1292,7 @@ mdb_dcmp(MDB_txn *txn, MDB_dbi dbi, const MDB_val *a, const MDB_val *b) return txn->mt_dbxs[dbi].md_dcmp(a, b); } -/** Allocate a page. +/** Allocate memory for a page. * Re-use old malloc'd pages first for singletons, otherwise just malloc. */ static MDB_page * @@ -1331,45 +1356,61 @@ mdb_dlist_free(MDB_txn *txn) dl[0].mid = 0; } -/* Set or clear P_KEEP in non-overflow, non-sub pages in known cursors. - * When clearing, only consider backup cursors (from parent txns) since - * other P_KEEP flags have already been cleared. +/* Set or clear P_KEEP in dirty, non-overflow, non-sub pages watched by txn. * @param[in] mc A cursor handle for the current operation. * @param[in] pflags Flags of the pages to update: * P_DIRTY to set P_KEEP, P_DIRTY|P_KEEP to clear it. + * @param[in] all No shortcuts. Needed except after a full #mdb_page_flush(). + * @return 0 on success, non-zero on failure. */ -static void -mdb_cursorpages_mark(MDB_cursor *mc, unsigned pflags) +static int +mdb_pages_xkeep(MDB_cursor *mc, unsigned pflags, int all) { MDB_txn *txn = mc->mc_txn; - MDB_cursor *m2, *m3; + MDB_cursor *m3; MDB_xcursor *mx; + MDB_page *dp; unsigned i, j; + int rc = MDB_SUCCESS, level; + /* Mark pages seen by cursors */ if (mc->mc_flags & C_UNTRACK) mc = NULL; /* will find mc in mt_cursors */ for (i = txn->mt_numdbs;; mc = txn->mt_cursors[--i]) { for (; mc; mc=mc->mc_next) { - m2 = pflags == P_DIRTY ? mc : mc->mc_backup; - for (; m2; m2 = m2->mc_backup) { - for (m3=m2; m3->mc_flags & C_INITIALIZED; m3=&mx->mx_cursor) { + for (m3 = mc; m3->mc_flags & C_INITIALIZED; m3 = &mx->mx_cursor) { for (j=0; jmc_snum; j++) if ((m3->mc_pg[j]->mp_flags & (P_SUBP|P_DIRTY|P_KEEP)) == pflags) m3->mc_pg[j]->mp_flags ^= P_KEEP; - if (!(m3->mc_db->md_flags & MDB_DUPSORT)) + mx = m3->mc_xcursor; + if (mx == NULL) break; - /* Cursor backups have mx malloced at the end of m2 */ - mx = (m3 == mc ? m3->mc_xcursor : (MDB_xcursor *)(m3+1)); - } } } if (i == 0) break; } + + if (all) { + /* Mark dirty root pages */ + for (i=0; imt_numdbs; i++) { + if (txn->mt_dbflags[i] & DB_DIRTY) { + pgno_t pgno = txn->mt_dbs[i].md_root; + if (pgno == P_INVALID) + continue; + if ((rc = mdb_page_get(txn, pgno, &dp, &level)) != MDB_SUCCESS) + break; + if ((dp->mp_flags & (P_DIRTY|P_KEEP)) == pflags && level <= 1) + dp->mp_flags ^= P_KEEP; + } + } + } + + return rc; } -static int mdb_page_flush(MDB_txn *txn); +static int mdb_page_flush(MDB_txn *txn, int keep); /** Spill pages from the dirty list back to disk. * This is intended to prevent running into #MDB_TXN_FULL situations, @@ -1412,7 +1453,7 @@ mdb_page_spill(MDB_cursor *m0, MDB_val *key, MDB_val *data) MDB_txn *txn = m0->mc_txn; MDB_page *dp; MDB_ID2L dl = txn->mt_u.dirty_list; - unsigned int i, j; + unsigned int i, j, need; int rc; if (m0->mc_flags & C_SUB) @@ -1427,6 +1468,7 @@ mdb_page_spill(MDB_cursor *m0, MDB_val *key, MDB_val *data) if (key) i += (LEAFSIZE(key, data) + txn->mt_env->me_psize) / txn->mt_env->me_psize; i += i; /* double it for good measure */ + need = i; if (txn->mt_dirty_room > i) return MDB_SUCCESS; @@ -1435,24 +1477,36 @@ mdb_page_spill(MDB_cursor *m0, MDB_val *key, MDB_val *data) txn->mt_spill_pgs = mdb_midl_alloc(MDB_IDL_UM_MAX); if (!txn->mt_spill_pgs) return ENOMEM; - } - - /* Mark all the dirty root pages we want to preserve */ - for (i=0; imt_numdbs; i++) { - if (txn->mt_dbflags[i] & DB_DIRTY) { - j = mdb_mid2l_search(dl, txn->mt_dbs[i].md_root); - if (j <= dl[0].mid) { - dp = dl[j].mptr; - dp->mp_flags |= P_KEEP; - } + } else { + /* purge deleted slots */ + MDB_IDL sl = txn->mt_spill_pgs; + unsigned int num = sl[0]; + j=0; + for (i=1; i<=num; i++) { + if (!(sl[i] & 1)) + sl[++j] = sl[i]; } + sl[0] = j; } - /* Preserve pages used by cursors */ - mdb_cursorpages_mark(m0, P_DIRTY); + /* Preserve pages which may soon be dirtied again */ + if ((rc = mdb_pages_xkeep(m0, P_DIRTY, 1)) != MDB_SUCCESS) + goto done; + + /* Less aggressive spill - we originally spilled the entire dirty list, + * with a few exceptions for cursor pages and DB root pages. But this + * turns out to be a lot of wasted effort because in a large txn many + * of those pages will need to be used again. So now we spill only 1/8th + * of the dirty pages. Testing revealed this to be a good tradeoff, + * better than 1/2, 1/4, or 1/10. + */ + if (need < MDB_IDL_UM_MAX / 8) + need = MDB_IDL_UM_MAX / 8; /* Save the page IDs of all the pages we're flushing */ - for (i=1; i<=dl[0].mid; i++) { + /* flush from the tail forward, this saves a lot of shifting later on. */ + for (i=dl[0].mid; i && need; i--) { + MDB_ID pn = dl[i].mid << 1; dp = dl[i].mptr; if (dp->mp_flags & P_KEEP) continue; @@ -1463,8 +1517,8 @@ mdb_page_spill(MDB_cursor *m0, MDB_val *key, MDB_val *data) MDB_txn *tx2; for (tx2 = txn->mt_parent; tx2; tx2 = tx2->mt_parent) { if (tx2->mt_spill_pgs) { - j = mdb_midl_search(tx2->mt_spill_pgs, dl[i].mid); - if (j <= tx2->mt_spill_pgs[0] && tx2->mt_spill_pgs[j] == dl[i].mid) { + j = mdb_midl_search(tx2->mt_spill_pgs, pn); + if (j <= tx2->mt_spill_pgs[0] && tx2->mt_spill_pgs[j] == pn) { dp->mp_flags |= P_KEEP; break; } @@ -1473,24 +1527,29 @@ mdb_page_spill(MDB_cursor *m0, MDB_val *key, MDB_val *data) if (tx2) continue; } - if ((rc = mdb_midl_append(&txn->mt_spill_pgs, dl[i].mid))) - return rc; + if ((rc = mdb_midl_append(&txn->mt_spill_pgs, pn))) + goto done; + need--; } mdb_midl_sort(txn->mt_spill_pgs); - rc = mdb_page_flush(txn); + /* Flush the spilled part of dirty list */ + if ((rc = mdb_page_flush(txn, i)) != MDB_SUCCESS) + goto done; - mdb_cursorpages_mark(m0, P_DIRTY|P_KEEP); + /* Reset any dirty pages we kept that page_flush didn't see */ + rc = mdb_pages_xkeep(m0, P_DIRTY|P_KEEP, i); +done: if (rc == 0) { if (txn->mt_parent) { - MDB_txn *tx2; - pgno_t pgno = dl[i].mid; txn->mt_dirty_room = txn->mt_parent->mt_dirty_room - dl[0].mid; /* dirty pages that are dirty in an ancestor don't * count against this txn's dirty_room. */ for (i=1; i<=dl[0].mid; i++) { + pgno_t pgno = dl[i].mid; + MDB_txn *tx2; for (tx2 = txn->mt_parent; tx2; tx2 = tx2->mt_parent) { j = mdb_mid2l_search(tx2->mt_u.dirty_list, pgno); if (j <= tx2->mt_u.dirty_list[0].mid && @@ -1504,6 +1563,8 @@ mdb_page_spill(MDB_cursor *m0, MDB_val *key, MDB_val *data) txn->mt_dirty_room = MDB_IDL_UM_MAX - dl[0].mid; } txn->mt_flags |= MDB_TXN_SPILLS; + } else { + txn->mt_flags |= MDB_TXN_ERROR; } return rc; } @@ -1543,9 +1604,14 @@ mdb_page_dirty(MDB_txn *txn, MDB_page *mp) txn->mt_dirty_room--; } -/** Allocate pages for writing. +/** Allocate page numbers and memory for writing. Maintain me_pglast, + * me_pghead and mt_next_pgno. + * * If there are free pages available from older transactions, they - * will be re-used first. Otherwise a new page will be allocated. + * are re-used first. Otherwise allocate a new page at mt_next_pgno. + * Do not modify the freedB, just merge freeDB records into me_pghead[] + * and move me_pglast to say which records were consumed. Only this + * function can create me_pghead and move me_pglast/mt_next_pgno. * @param[in] mc cursor A cursor handle identifying the transaction and * database for which we are allocating. * @param[in] num the number of pages to allocate. @@ -1609,7 +1675,7 @@ mdb_page_alloc(MDB_cursor *mc, int num, MDB_page **mp) mdb_cursor_init(&m2, txn, FREE_DBI, NULL); if (last) { op = MDB_SET_RANGE; - key.mv_data = &last; /* will loop up last+1 */ + key.mv_data = &last; /* will look up last+1 */ key.mv_size = sizeof(last); } if (Paranoid && mc->mc_dbi == FREE_DBI) @@ -1647,11 +1713,11 @@ mdb_page_alloc(MDB_cursor *mc, int num, MDB_page **mp) mop = env->me_pghead; } env->me_pglast = last; -#if MDB_DEBUG > 1 - DPRINTF("IDL read txn %zu root %zu num %u", - last, txn->mt_dbs[FREE_DBI].md_root, i); +#if (MDB_DEBUG) > 1 + DPRINTF(("IDL read txn %"Z"u root %"Z"u num %u", + last, txn->mt_dbs[FREE_DBI].md_root, i)); for (k = i; k; k--) - DPRINTF("IDL %zu", idl[k]); + DPRINTF(("IDL %"Z"u", idl[k])); #endif /* Merge in descending sorted order */ j = mop_len; @@ -1735,13 +1801,13 @@ mdb_page_unspill(MDB_txn *tx0, MDB_page *mp, MDB_page **ret) MDB_env *env = tx0->mt_env; MDB_txn *txn; unsigned x; - pgno_t pgno = mp->mp_pgno; + pgno_t pgno = mp->mp_pgno, pn = pgno << 1; for (txn = tx0; txn; txn=txn->mt_parent) { if (!txn->mt_spill_pgs) continue; - x = mdb_midl_search(txn->mt_spill_pgs, pgno); - if (x <= txn->mt_spill_pgs[0] && txn->mt_spill_pgs[x] == pgno) { + x = mdb_midl_search(txn->mt_spill_pgs, pn); + if (x <= txn->mt_spill_pgs[0] && txn->mt_spill_pgs[x] == pn) { MDB_page *np; int num; if (IS_OVERFLOW(mp)) @@ -1760,10 +1826,14 @@ mdb_page_unspill(MDB_txn *tx0, MDB_page *mp, MDB_page **ret) mdb_page_copy(np, mp, env->me_psize); } if (txn == tx0) { - /* If in current txn, this page is no longer spilled */ - for (; x < txn->mt_spill_pgs[0]; x++) - txn->mt_spill_pgs[x] = txn->mt_spill_pgs[x+1]; - txn->mt_spill_pgs[0]--; + /* If in current txn, this page is no longer spilled. + * If it happens to be the last page, truncate the spill list. + * Otherwise mark it as deleted by setting the LSB. + */ + if (x == txn->mt_spill_pgs[0]) + txn->mt_spill_pgs[0]--; + else + txn->mt_spill_pgs[x] |= 1; } /* otherwise, if belonging to a parent txn, the * page remains spilled until child commits */ @@ -1778,7 +1848,7 @@ mdb_page_unspill(MDB_txn *tx0, MDB_page *mp, MDB_page **ret) x = mdb_mid2l_search(tx2->mt_u.dirty_list, pgno); if (x <= tx2->mt_u.dirty_list[0].mid && tx2->mt_u.dirty_list[x].mid == pgno) { - txn->mt_dirty_room++; + tx0->mt_dirty_room++; break; } } @@ -1819,7 +1889,7 @@ mdb_page_touch(MDB_cursor *mc) (rc = mdb_page_alloc(mc, 1, &np))) return rc; pgno = np->mp_pgno; - DPRINTF("touched db %u page %zu -> %zu", mc->mc_dbi,mp->mp_pgno,pgno); + DPRINTF(("touched db %u page %"Z"u -> %"Z"u", mc->mc_dbi,mp->mp_pgno,pgno)); assert(mp->mp_pgno != pgno); mdb_midl_xappend(txn->mt_free_pgs, mp->mp_pgno); /* Update the parent page, if any, to point to the new page */ @@ -1985,19 +2055,20 @@ mdb_cursors_close(MDB_txn *txn, unsigned merge) } mc = bk; } + /* Only malloced cursors are permanently tracked. */ free(mc); } cursors[i] = NULL; } } -#ifdef MDB_DEBUG_SKIP +#if !(MDB_DEBUG) #define mdb_txn_reset0(txn, act) mdb_txn_reset0(txn) #endif static void mdb_txn_reset0(MDB_txn *txn, const char *act); -#ifdef _WIN32 +#if !(MDB_PIDLOCK) /* Currently the same as defined(_WIN32) */ enum Pidlock_op { Pidset, Pidcheck }; @@ -2008,7 +2079,8 @@ enum Pidlock_op { #endif /** Set or check a pid lock. Set returns 0 on success. - * Check returns 0 if lock exists (meaning the process is alive). + * Check returns 0 if the process is certainly dead, nonzero if it may + * be alive (the lock exists or an error happened so we do not know). * * On Windows Pidset is a no-op, we merely check for the existence * of the process with the given pid. On POSIX we use a single byte @@ -2017,33 +2089,36 @@ enum Pidlock_op { static int mdb_reader_pid(MDB_env *env, enum Pidlock_op op, pid_t pid) { -#ifdef _WIN32 +#if !(MDB_PIDLOCK) /* Currently the same as defined(_WIN32) */ + int ret = 0; HANDLE h; - int ver, query; - switch(op) { - case Pidset: - break; - case Pidcheck: + if (op == Pidcheck) { h = OpenProcess(env->me_pidquery, FALSE, pid); + /* No documented "no such process" code, but other program use this: */ if (!h) - return GetLastError(); + return ErrCode() != ERROR_INVALID_PARAMETER; + /* A process exists until all handles to it close. Has it exited? */ + ret = WaitForSingleObject(h, 0) != 0; CloseHandle(h); - break; } - return 0; + return ret; #else - int rc; - struct flock lock_info; - memset((void *)&lock_info, 0, sizeof(lock_info)); - lock_info.l_type = F_WRLCK; - lock_info.l_whence = SEEK_SET; - lock_info.l_start = pid; - lock_info.l_len = 1; - while ((rc = fcntl(env->me_lfd, op, &lock_info)) && - (rc = ErrCode()) == EINTR) ; - if (op == F_GETLK && rc == 0 && lock_info.l_type == F_UNLCK) - rc = -1; - return rc; + for (;;) { + int rc; + struct flock lock_info; + memset(&lock_info, 0, sizeof(lock_info)); + lock_info.l_type = F_WRLCK; + lock_info.l_whence = SEEK_SET; + lock_info.l_start = pid; + lock_info.l_len = 1; + if ((rc = fcntl(env->me_lfd, op, &lock_info)) == 0) { + if (op == F_GETLK && lock_info.l_type != F_UNLCK) + rc = -1; + } else if ((rc = ErrCode()) == EINTR) { + continue; + } + return rc; + } #endif } @@ -2172,9 +2247,9 @@ mdb_txn_renew(MDB_txn *txn) rc = mdb_txn_renew0(txn); if (rc == MDB_SUCCESS) { - DPRINTF("renew txn %zu%c %p on mdbenv %p, root page %zu", + DPRINTF(("renew txn %"Z"u%c %p on mdbenv %p, root page %"Z"u", txn->mt_txnid, (txn->mt_flags & MDB_TXN_RDONLY) ? 'r' : 'w', - (void *)txn, (void *)txn->mt_env, txn->mt_dbs[MAIN_DBI].md_root); + (void *)txn, (void *)txn->mt_env, txn->mt_dbs[MAIN_DBI].md_root)); } return rc; } @@ -2195,10 +2270,11 @@ mdb_txn_begin(MDB_env *env, MDB_txn *parent, unsigned int flags, MDB_txn **ret) if (parent) { /* Nested transactions: Max 1 child, write txns only, no writemap */ if (parent->mt_child || - (flags & MDB_RDONLY) || (parent->mt_flags & MDB_TXN_RDONLY) || + (flags & MDB_RDONLY) || + (parent->mt_flags & (MDB_TXN_RDONLY|MDB_TXN_ERROR)) || (env->me_flags & MDB_WRITEMAP)) { - return EINVAL; + return (parent->mt_flags & MDB_TXN_RDONLY) ? EINVAL : MDB_BAD_TXN; } tsize = sizeof(MDB_ntxn); } @@ -2207,7 +2283,7 @@ mdb_txn_begin(MDB_env *env, MDB_txn *parent, unsigned int flags, MDB_txn **ret) size += env->me_maxdbs * sizeof(MDB_cursor *); if ((txn = calloc(1, size)) == NULL) { - DPRINTF("calloc: %s", strerror(ErrCode())); + DPRINTF(("calloc: %s", strerror(ErrCode()))); return ENOMEM; } txn->mt_dbs = (MDB_db *) ((char *)txn + tsize); @@ -2267,14 +2343,21 @@ mdb_txn_begin(MDB_env *env, MDB_txn *parent, unsigned int flags, MDB_txn **ret) free(txn); else { *ret = txn; - DPRINTF("begin txn %zu%c %p on mdbenv %p, root page %zu", + DPRINTF(("begin txn %"Z"u%c %p on mdbenv %p, root page %"Z"u", txn->mt_txnid, (txn->mt_flags & MDB_TXN_RDONLY) ? 'r' : 'w', - (void *) txn, (void *) env, txn->mt_dbs[MAIN_DBI].md_root); + (void *) txn, (void *) env, txn->mt_dbs[MAIN_DBI].md_root)); } return rc; } +MDB_env * +mdb_txn_env(MDB_txn *txn) +{ + if(!txn) return NULL; + return txn->mt_env; +} + /** Export or close DBI handles opened in this txn. */ static void mdb_dbis_update(MDB_txn *txn, int keep) @@ -2304,6 +2387,7 @@ mdb_dbis_update(MDB_txn *txn, int keep) /** Common code for #mdb_txn_reset() and #mdb_txn_abort(). * May be called twice for readonly txns: First reset it, then abort. * @param[in] txn the transaction handle to reset + * @param[in] act why the transaction is being reset */ static void mdb_txn_reset0(MDB_txn *txn, const char *act) @@ -2313,9 +2397,9 @@ mdb_txn_reset0(MDB_txn *txn, const char *act) /* Close any DBI handles opened in this txn */ mdb_dbis_update(txn, 0); - DPRINTF("%s txn %zu%c %p on mdbenv %p, root page %zu", + DPRINTF(("%s txn %"Z"u%c %p on mdbenv %p, root page %"Z"u", act, txn->mt_txnid, (txn->mt_flags & MDB_TXN_RDONLY) ? 'r' : 'w', - (void *) txn, (void *)env, txn->mt_dbs[MAIN_DBI].md_root); + (void *) txn, (void *)env, txn->mt_dbs[MAIN_DBI].md_root)); if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) { if (txn->mt_u.reader) { @@ -2453,13 +2537,13 @@ mdb_freelist_save(MDB_txn *txn) } while (freecnt < free_pgs[0]); mdb_midl_sort(free_pgs); memcpy(data.mv_data, free_pgs, data.mv_size); -#if MDB_DEBUG > 1 +#if (MDB_DEBUG) > 1 { unsigned int i = free_pgs[0]; - DPRINTF("IDL write txn %zu root %zu num %u", - txn->mt_txnid, txn->mt_dbs[FREE_DBI].md_root, i); + DPRINTF(("IDL write txn %"Z"u root %"Z"u num %u", + txn->mt_txnid, txn->mt_dbs[FREE_DBI].md_root, i)); for (; i; i--) - DPRINTF("IDL %zu", free_pgs[i]); + DPRINTF(("IDL %"Z"u", free_pgs[i])); } #endif continue; @@ -2533,10 +2617,13 @@ mdb_freelist_save(MDB_txn *txn) return rc; } -/** Flush dirty pages to the map, after clearing their dirty flag. +/** Flush (some) dirty pages to the map, after clearing their dirty flag. + * @param[in] txn the transaction that's being committed + * @param[in] keep number of initial pages in dirty_list to keep dirty. + * @return 0 on success, non-zero on failure. */ static int -mdb_page_flush(MDB_txn *txn) +mdb_page_flush(MDB_txn *txn, int keep) { MDB_env *env = txn->mt_env; MDB_ID2L dl = txn->mt_u.dirty_list; @@ -2554,10 +2641,11 @@ mdb_page_flush(MDB_txn *txn) int n = 0; #endif - j = 0; + j = i = keep; + if (env->me_flags & MDB_WRITEMAP) { /* Clear dirty flags */ - for (i = pagecount; i; i--) { + while (++i <= pagecount) { dp = dl[i].mptr; /* Don't flush this page yet */ if (dp->mp_flags & P_KEEP) { @@ -2572,8 +2660,8 @@ mdb_page_flush(MDB_txn *txn) } /* Write the pages */ - for (i = 1;; i++) { - if (i <= pagecount) { + for (;;) { + if (++i <= pagecount) { dp = dl[i].mptr; /* Don't flush this page yet */ if (dp->mp_flags & P_KEEP) { @@ -2598,13 +2686,13 @@ mdb_page_flush(MDB_txn *txn) * the write offset, to at least save the overhead of a Seek * system call. */ - DPRINTF("committing page %zu", pgno); + DPRINTF(("committing page %"Z"u", pgno)); memset(&ov, 0, sizeof(ov)); ov.Offset = pos & 0xffffffff; ov.OffsetHigh = pos >> 16 >> 16; if (!WriteFile(env->me_fd, dp, size, NULL, &ov)) { rc = ErrCode(); - DPRINTF("WriteFile: %d", rc); + DPRINTF(("WriteFile: %d", rc)); return rc; } #else @@ -2620,7 +2708,7 @@ mdb_page_flush(MDB_txn *txn) } else { if (lseek(env->me_fd, wpos, SEEK_SET) == -1) { rc = ErrCode(); - DPRINTF("lseek: %s", strerror(rc)); + DPRINTF(("lseek: %s", strerror(rc))); return rc; } wres = writev(env->me_fd, iov, n); @@ -2629,7 +2717,7 @@ mdb_page_flush(MDB_txn *txn) if (wres != wsize) { if (wres < 0) { rc = ErrCode(); - DPRINTF("Write error: %s", strerror(rc)); + DPRINTF(("Write error: %s", strerror(rc))); } else { rc = EIO; /* TODO: Use which error code? */ DPUTS("short write, filesystem full?"); @@ -2643,7 +2731,7 @@ mdb_page_flush(MDB_txn *txn) wpos = pos; wsize = 0; } - DPRINTF("committing page %zu", pgno); + DPRINTF(("committing page %"Z"u", pgno)); next_pos = pos + size; iov[n].iov_len = size; iov[n].iov_base = (char *)dp; @@ -2652,8 +2740,7 @@ mdb_page_flush(MDB_txn *txn) #endif /* _WIN32 */ } - j = 0; - for (i=1; i<=pagecount; i++) { + for (i = keep; ++i <= pagecount; ) { dp = dl[i].mptr; /* This is a page we skipped above */ if (!dl[i].mid) { @@ -2698,7 +2785,7 @@ mdb_txn_commit(MDB_txn *txn) DPUTS("error flag is set, can't commit"); if (txn->mt_parent) txn->mt_parent->mt_flags |= MDB_TXN_ERROR; - rc = EINVAL; + rc = MDB_BAD_TXN; goto fail; } @@ -2738,9 +2825,10 @@ mdb_txn_commit(MDB_txn *txn) len = x; /* zero out our dirty pages in parent spill list */ for (i=1; i<=src[0].mid; i++) { - if (src[i].mid < parent->mt_spill_pgs[x]) + MDB_ID pn = src[i].mid << 1; + if (pn < parent->mt_spill_pgs[x]) continue; - if (src[i].mid > parent->mt_spill_pgs[x]) { + if (pn > parent->mt_spill_pgs[x]) { if (x <= 1) break; x--; @@ -2818,11 +2906,12 @@ mdb_txn_commit(MDB_txn *txn) mdb_cursors_close(txn, 0); - if (!txn->mt_u.dirty_list[0].mid && !(txn->mt_flags & MDB_TXN_DIRTY)) + if (!txn->mt_u.dirty_list[0].mid && + !(txn->mt_flags & (MDB_TXN_DIRTY|MDB_TXN_SPILLS))) goto done; - DPRINTF("committing txn %zu %p on mdbenv %p, root page %zu", - txn->mt_txnid, (void *)txn, (void *)env, txn->mt_dbs[MAIN_DBI].md_root); + DPRINTF(("committing txn %"Z"u %p on mdbenv %p, root page %"Z"u", + txn->mt_txnid, (void*)txn, (void*)env, txn->mt_dbs[MAIN_DBI].md_root)); /* Update DB root pointers */ if (txn->mt_numdbs > 2) { @@ -2851,11 +2940,11 @@ mdb_txn_commit(MDB_txn *txn) if (mdb_midl_shrink(&txn->mt_free_pgs)) env->me_free_pgs = txn->mt_free_pgs; -#if MDB_DEBUG > 2 +#if (MDB_DEBUG) > 2 mdb_audit(txn); #endif - if ((rc = mdb_page_flush(txn)) || + if ((rc = mdb_page_flush(txn, 0)) || (rc = mdb_env_sync(env, 0)) || (rc = mdb_env_write_meta(txn))) goto fail; @@ -2909,14 +2998,14 @@ mdb_env_read_header(MDB_env *env, MDB_meta *meta) if (rc == 0 && off == 0) return ENOENT; rc = rc < 0 ? (int) ErrCode() : MDB_INVALID; - DPRINTF("read: %s", mdb_strerror(rc)); + DPRINTF(("read: %s", mdb_strerror(rc))); return rc; } p = (MDB_page *)&pbuf; if (!F_ISSET(p->mp_flags, P_META)) { - DPRINTF("page %zu not a meta page", p->mp_pgno); + DPRINTF(("page %"Z"u not a meta page", p->mp_pgno)); return MDB_INVALID; } @@ -2927,8 +3016,8 @@ mdb_env_read_header(MDB_env *env, MDB_meta *meta) } if (m->mm_version != MDB_DATA_VERSION) { - DPRINTF("database is version %u, expected version %u", - m->mm_version, MDB_DATA_VERSION); + DPRINTF(("database is version %u, expected version %u", + m->mm_version, MDB_DATA_VERSION)); return MDB_VERSION_MISMATCH; } @@ -2957,7 +3046,7 @@ mdb_env_init_meta(MDB_env *env, MDB_meta *meta) ov.Offset = pos; \ rc = WriteFile(fd, ptr, size, &len, &ov); } while(0) #else - unsigned int len; + int len; #define DO_PWRITE(rc, fd, ptr, size, len, pos) do { \ len = pwrite(fd, ptr, size, pos); \ rc = (len >= 0); } while(0) @@ -2965,7 +3054,7 @@ mdb_env_init_meta(MDB_env *env, MDB_meta *meta) DPUTS("writing new meta page"); - GET_PAGESIZE(psize); + psize = env->me_psize; meta->mm_magic = MDB_MAGIC; meta->mm_version = MDB_DATA_VERSION; @@ -2990,7 +3079,7 @@ mdb_env_init_meta(MDB_env *env, MDB_meta *meta) DO_PWRITE(rc, env->me_fd, p, psize * 2, len, 0); if (!rc) rc = ErrCode(); - else if (len == psize * 2) + else if ((unsigned) len == psize * 2) rc = MDB_SUCCESS; else rc = ENOSPC; @@ -3021,8 +3110,8 @@ mdb_env_write_meta(MDB_txn *txn) assert(txn->mt_env != NULL); toggle = !txn->mt_toggle; - DPRINTF("writing meta page %d for root page %zu", - toggle, txn->mt_dbs[MAIN_DBI].md_root); + DPRINTF(("writing meta page %d for root page %"Z"u", + toggle, txn->mt_dbs[MAIN_DBI].md_root)); env = txn->mt_env; mp = env->me_metas[toggle]; @@ -3098,6 +3187,7 @@ mdb_env_write_meta(MDB_txn *txn) WriteFile(env->me_fd, ptr, len, NULL, &ov); #else r2 = pwrite(env->me_fd, ptr, len, off); + (void)r2; /* Silence warnings. We don't care about pwrite's return value */ #endif fail: env->me_flags |= MDB_FATAL_ERROR; @@ -3149,11 +3239,97 @@ mdb_env_create(MDB_env **env) return MDB_SUCCESS; } +static int +mdb_env_map(MDB_env *env, void *addr, int newsize) +{ + MDB_page *p; + unsigned int flags = env->me_flags; +#ifdef _WIN32 + int rc; + HANDLE mh; + LONG sizelo, sizehi; + sizelo = env->me_mapsize & 0xffffffff; + sizehi = env->me_mapsize >> 16 >> 16; /* only needed on Win64 */ + + /* Windows won't create mappings for zero length files. + * Just allocate the maxsize right now. + */ + if (newsize) { + if (SetFilePointer(env->me_fd, sizelo, &sizehi, 0) != (DWORD)sizelo + || !SetEndOfFile(env->me_fd) + || SetFilePointer(env->me_fd, 0, NULL, 0) != 0) + return ErrCode(); + } + mh = CreateFileMapping(env->me_fd, NULL, flags & MDB_WRITEMAP ? + PAGE_READWRITE : PAGE_READONLY, + sizehi, sizelo, NULL); + if (!mh) + return ErrCode(); + env->me_map = MapViewOfFileEx(mh, flags & MDB_WRITEMAP ? + FILE_MAP_WRITE : FILE_MAP_READ, + 0, 0, env->me_mapsize, addr); + rc = env->me_map ? 0 : ErrCode(); + CloseHandle(mh); + if (rc) + return rc; +#else + int prot = PROT_READ; + if (flags & MDB_WRITEMAP) { + prot |= PROT_WRITE; + if (newsize && ftruncate(env->me_fd, env->me_mapsize) < 0) + return ErrCode(); + } + env->me_map = mmap(addr, env->me_mapsize, prot, MAP_SHARED, + env->me_fd, 0); + if (env->me_map == MAP_FAILED) { + env->me_map = NULL; + return ErrCode(); + } + /* Turn off readahead. It's harmful when the DB is larger than RAM. */ +#ifdef MADV_RANDOM + madvise(env->me_map, env->me_mapsize, MADV_RANDOM); +#else +#ifdef POSIX_MADV_RANDOM + posix_madvise(env->me_map, env->me_mapsize, POSIX_MADV_RANDOM); +#endif /* POSIX_MADV_RANDOM */ +#endif /* MADV_RANDOM */ +#endif /* _WIN32 */ + + /* Can happen because the address argument to mmap() is just a + * hint. mmap() can pick another, e.g. if the range is in use. + * The MAP_FIXED flag would prevent that, but then mmap could + * instead unmap existing pages to make room for the new map. + */ + if (addr && env->me_map != addr) + return EBUSY; /* TODO: Make a new MDB_* error code? */ + + p = (MDB_page *)env->me_map; + env->me_metas[0] = METADATA(p); + env->me_metas[1] = (MDB_meta *)((char *)env->me_metas[0] + env->me_psize); + + return MDB_SUCCESS; +} + int mdb_env_set_mapsize(MDB_env *env, size_t size) { - if (env->me_map) - return EINVAL; + /* If env is already open, caller is responsible for making + * sure there are no active txns. + */ + if (env->me_map) { + int rc; + void *old; + if (env->me_txn) + return EINVAL; + if (!size) + size = env->me_metas[mdb_env_pick_meta(env)]->mm_mapsize; + munmap(env->me_map, env->me_mapsize); + env->me_mapsize = size; + old = (env->me_flags & MDB_FIXEDMAP) ? env->me_map : NULL; + rc = mdb_env_map(env, old, 1); + if (rc) + return rc; + } env->me_mapsize = size; if (env->me_psize) env->me_maxpg = env->me_mapsize / env->me_psize; @@ -3193,12 +3369,17 @@ static int mdb_env_open2(MDB_env *env) { unsigned int flags = env->me_flags; - int i, newenv = 0; + int i, newenv = 0, rc; MDB_meta meta; - MDB_page *p; -#ifndef _WIN32 - int prot; -#endif + +#ifdef _WIN32 + /* See if we should use QueryLimited */ + rc = GetVersion(); + if ((rc & 0xff) > 5) + env->me_pidquery = MDB_PROCESS_QUERY_LIMITED_INFORMATION; + else + env->me_pidquery = PROCESS_QUERY_INFORMATION; +#endif /* _WIN32 */ memset(&meta, 0, sizeof(meta)); @@ -3207,6 +3388,9 @@ mdb_env_open2(MDB_env *env) return i; DPUTS("new mdbenv"); newenv = 1; + GET_PAGESIZE(env->me_psize); + } else { + env->me_psize = meta.mm_psize; } /* Was a mapsize configured? */ @@ -3224,66 +3408,9 @@ mdb_env_open2(MDB_env *env) env->me_mapsize = minsize; } -#ifdef _WIN32 - { - int rc; - HANDLE mh; - LONG sizelo, sizehi; - sizelo = env->me_mapsize & 0xffffffff; - sizehi = env->me_mapsize >> 16 >> 16; /* only needed on Win64 */ - - /* See if we should use QueryLimited */ - rc = GetVersion(); - if ((rc & 0xff) > 5) - env->me_pidquery = PROCESS_QUERY_LIMITED_INFORMATION; - else - env->me_pidquery = PROCESS_QUERY_INFORMATION; - - /* Windows won't create mappings for zero length files. - * Just allocate the maxsize right now. - */ - if (newenv) { - if (SetFilePointer(env->me_fd, sizelo, &sizehi, 0) != (DWORD)sizelo - || !SetEndOfFile(env->me_fd) - || SetFilePointer(env->me_fd, 0, NULL, 0) != 0) - return ErrCode(); - } - mh = CreateFileMapping(env->me_fd, NULL, flags & MDB_WRITEMAP ? - PAGE_READWRITE : PAGE_READONLY, - sizehi, sizelo, NULL); - if (!mh) - return ErrCode(); - env->me_map = MapViewOfFileEx(mh, flags & MDB_WRITEMAP ? - FILE_MAP_WRITE : FILE_MAP_READ, - 0, 0, env->me_mapsize, meta.mm_address); - rc = env->me_map ? 0 : ErrCode(); - CloseHandle(mh); - if (rc) - return rc; - } -#else - i = MAP_SHARED; - prot = PROT_READ; - if (flags & MDB_WRITEMAP) { - prot |= PROT_WRITE; - if (ftruncate(env->me_fd, env->me_mapsize) < 0) - return ErrCode(); - } - env->me_map = mmap(meta.mm_address, env->me_mapsize, prot, i, - env->me_fd, 0); - if (env->me_map == MAP_FAILED) { - env->me_map = NULL; - return ErrCode(); - } - /* Turn off readahead. It's harmful when the DB is larger than RAM. */ -#ifdef MADV_RANDOM - madvise(env->me_map, env->me_mapsize, MADV_RANDOM); -#else -#ifdef POSIX_MADV_RANDOM - posix_madvise(env->me_map, env->me_mapsize, POSIX_MADV_RANDOM); -#endif /* POSIX_MADV_RANDOM */ -#endif /* MADV_RANDOM */ -#endif /* _WIN32 */ + rc = mdb_env_map(env, meta.mm_address, newenv); + if (rc) + return rc; if (newenv) { if (flags & MDB_FIXEDMAP) @@ -3292,38 +3419,25 @@ mdb_env_open2(MDB_env *env) if (i != MDB_SUCCESS) { return i; } - } else if (meta.mm_address && env->me_map != meta.mm_address) { - /* Can happen because the address argument to mmap() is just a - * hint. mmap() can pick another, e.g. if the range is in use. - * The MAP_FIXED flag would prevent that, but then mmap could - * instead unmap existing pages to make room for the new map. - */ - return EBUSY; /* TODO: Make a new MDB_* error code? */ } - env->me_psize = meta.mm_psize; env->me_maxfree_1pg = (env->me_psize - PAGEHDRSZ) / sizeof(pgno_t) - 1; env->me_nodemax = (env->me_psize - PAGEHDRSZ) / MDB_MINKEYS; env->me_maxpg = env->me_mapsize / env->me_psize; - - p = (MDB_page *)env->me_map; - env->me_metas[0] = METADATA(p); - env->me_metas[1] = (MDB_meta *)((char *)env->me_metas[0] + meta.mm_psize); - #if MDB_DEBUG { int toggle = mdb_env_pick_meta(env); MDB_db *db = &env->me_metas[toggle]->mm_dbs[MAIN_DBI]; - DPRINTF("opened database version %u, pagesize %u", - env->me_metas[0]->mm_version, env->me_psize); - DPRINTF("using meta page %d", toggle); - DPRINTF("depth: %u", db->md_depth); - DPRINTF("entries: %zu", db->md_entries); - DPRINTF("branch pages: %zu", db->md_branch_pages); - DPRINTF("leaf pages: %zu", db->md_leaf_pages); - DPRINTF("overflow pages: %zu", db->md_overflow_pages); - DPRINTF("root: %zu", db->md_root); + DPRINTF(("opened database version %u, pagesize %u", + env->me_metas[0]->mm_version, env->me_psize)); + DPRINTF(("using meta page %d", toggle)); + DPRINTF(("depth: %u", db->md_depth)); + DPRINTF(("entries: %"Z"u", db->md_entries)); + DPRINTF(("branch pages: %"Z"u", db->md_branch_pages)); + DPRINTF(("leaf pages: %"Z"u", db->md_leaf_pages)); + DPRINTF(("overflow pages: %"Z"u", db->md_overflow_pages)); + DPRINTF(("root: %"Z"u", db->md_root)); } #endif @@ -3516,7 +3630,7 @@ typedef unsigned long long mdb_hash_t; #define MDB_HASH_INIT ((mdb_hash_t)0xcbf29ce484222325ULL) /** perform a 64 bit Fowler/Noll/Vo FNV-1a hash on a buffer - * @param[in] str string to hash + * @param[in] val value to hash * @param[in] hval initial value for hash * @return 64 bit hash * @@ -3551,11 +3665,13 @@ mdb_hash_val(MDB_val *val, mdb_hash_t hval) * @param[in] str string to hash * @param[out] encbuf an array of 11 chars to hold the hash */ -const static char mdb_a85[]= "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz!#$%&()*+-;<=>?@^_`{|}~"; +static const char mdb_a85[]= "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz!#$%&()*+-;<=>?@^_`{|}~"; + static void mdb_pack85(unsigned long l, char *out) { int i; + for (i=0; i<5; i++) { *out++ = mdb_a85[l % 85]; l /= 85; @@ -3566,10 +3682,9 @@ static void mdb_hash_enc(MDB_val *val, char *encbuf) { mdb_hash_t h = mdb_hash_val(val, MDB_HASH_INIT); - unsigned long *l = (unsigned long *)&h; - mdb_pack85(l[0], encbuf); - mdb_pack85(l[1], encbuf+5); + mdb_pack85(h, encbuf); + mdb_pack85(h>>32, encbuf+5); encbuf[10] = '\0'; } #endif @@ -3753,8 +3868,8 @@ mdb_env_setup_locks(MDB_env *env, char *lpath, int mode, int *excl) pthread_mutexattr_destroy(&mattr); #endif /* _WIN32 || MDB_USE_POSIX_SEM */ - env->me_txns->mti_version = MDB_LOCK_VERSION; env->me_txns->mti_magic = MDB_MAGIC; + env->me_txns->mti_format = MDB_LOCK_FORMAT; env->me_txns->mti_txnid = 0; env->me_txns->mti_numreaders = 0; @@ -3764,9 +3879,9 @@ mdb_env_setup_locks(MDB_env *env, char *lpath, int mode, int *excl) rc = MDB_INVALID; goto fail; } - if (env->me_txns->mti_version != MDB_LOCK_VERSION) { - DPRINTF("lock region is version %u, expected version %u", - env->me_txns->mti_version, MDB_LOCK_VERSION); + if (env->me_txns->mti_format != MDB_LOCK_FORMAT) { + DPRINTF(("lock region has format+version 0x%x, expected 0x%x", + env->me_txns->mti_format, MDB_LOCK_FORMAT)); rc = MDB_VERSION_MISMATCH; goto fail; } @@ -3857,9 +3972,12 @@ mdb_env_open(MDB_env *env, const char *path, unsigned int flags, mdb_mode_t mode goto leave; } - rc = mdb_env_setup_locks(env, lpath, mode, &excl); - if (rc) - goto leave; + /* For RDONLY, get lockfile after we know datafile exists */ + if (!F_ISSET(flags, MDB_RDONLY)) { + rc = mdb_env_setup_locks(env, lpath, mode, &excl); + if (rc) + goto leave; + } #ifdef _WIN32 if (F_ISSET(flags, MDB_RDONLY)) { @@ -3885,6 +4003,12 @@ mdb_env_open(MDB_env *env, const char *path, unsigned int flags, mdb_mode_t mode goto leave; } + if (F_ISSET(flags, MDB_RDONLY)) { + rc = mdb_env_setup_locks(env, lpath, mode, &excl); + if (rc) + goto leave; + } + if ((rc = mdb_env_open2(env)) == MDB_SUCCESS) { if (flags & (MDB_RDONLY|MDB_WRITEMAP)) { env->me_mfd = env->me_fd; @@ -3893,10 +4017,12 @@ mdb_env_open(MDB_env *env, const char *path, unsigned int flags, mdb_mode_t mode * MDB_NOSYNC/MDB_NOMETASYNC, in case these get reset. */ #ifdef _WIN32 + len = OPEN_EXISTING; env->me_mfd = CreateFile(dpath, oflags, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, len, mode | FILE_FLAG_WRITE_THROUGH, NULL); #else + oflags &= ~O_CREAT; env->me_mfd = open(dpath, oflags | MDB_DSYNC, mode); #endif if (env->me_mfd == INVALID_HANDLE_VALUE) { @@ -3904,7 +4030,7 @@ mdb_env_open(MDB_env *env, const char *path, unsigned int flags, mdb_mode_t mode goto leave; } } - DPRINTF("opened dbenv %p", (void *) env); + DPRINTF(("opened dbenv %p", (void *) env)); if (excl > 0) { rc = mdb_env_share_locks(env, &excl); } @@ -4060,7 +4186,7 @@ mdb_env_copyfd(MDB_env *env, HANDLE fd) /* Non-blocking or async handles are not supported */ rc = EIO; break; - } + } } if (env->me_txns) UNLOCK_MUTEX_W(env); @@ -4086,7 +4212,7 @@ mdb_env_copyfd(MDB_env *env, HANDLE fd) } else { rc = EIO; break; - } + } } leave: @@ -4120,11 +4246,15 @@ mdb_env_copy(MDB_env *env, const char *path) newfd = CreateFile(lpath, GENERIC_WRITE, 0, NULL, CREATE_NEW, FILE_FLAG_NO_BUFFERING|FILE_FLAG_WRITE_THROUGH, NULL); #else - newfd = open(lpath, O_WRONLY|O_CREAT|O_EXCL #ifdef O_DIRECT - |O_DIRECT + /* The OS supports O_DIRECT, try with it */ + newfd = open(lpath, O_WRONLY|O_CREAT|O_EXCL|O_DIRECT, 0666); + /* But open can fail if O_DIRECT isn't supported by the file system + * so retry without the flag + */ + if (newfd == INVALID_HANDLE_VALUE && ErrCode() == EINVAL) #endif - , 0666); + newfd = open(lpath, O_WRONLY|O_CREAT|O_EXCL, 0666); #endif if (newfd == INVALID_HANDLE_VALUE) { rc = ErrCode(); @@ -4277,9 +4407,9 @@ mdb_node_search(MDB_cursor *mc, MDB_val *key, int *exactp) { pgno_t pgno; COPY_PGNO(pgno, mp->mp_pgno); - DPRINTF("searching %u keys in %s %spage %zu", + DPRINTF(("searching %u keys in %s %spage %"Z"u", nkeys, IS_LEAF(mp) ? "leaf" : "branch", IS_SUBP(mp) ? "sub-" : "", - pgno); + pgno)); } #endif @@ -4306,8 +4436,8 @@ mdb_node_search(MDB_cursor *mc, MDB_val *key, int *exactp) i = (low + high) >> 1; nodekey.mv_data = LEAF2KEY(mp, i, nodekey.mv_size); rc = cmp(key, &nodekey); - DPRINTF("found leaf index %u [%s], rc = %i", - i, DKEY(&nodekey), rc); + DPRINTF(("found leaf index %u [%s], rc = %i", + i, DKEY(&nodekey), rc)); if (rc == 0) break; if (rc > 0) @@ -4326,11 +4456,11 @@ mdb_node_search(MDB_cursor *mc, MDB_val *key, int *exactp) rc = cmp(key, &nodekey); #if MDB_DEBUG if (IS_LEAF(mp)) - DPRINTF("found leaf index %u [%s], rc = %i", - i, DKEY(&nodekey), rc); + DPRINTF(("found leaf index %u [%s], rc = %i", + i, DKEY(&nodekey), rc)); else - DPRINTF("found branch index %u [%s -> %zu], rc = %i", - i, DKEY(&nodekey), NODEPGNO(node), rc); + DPRINTF(("found branch index %u [%s -> %"Z"u], rc = %i", + i, DKEY(&nodekey), NODEPGNO(node), rc)); #endif if (rc == 0) break; @@ -4377,15 +4507,15 @@ static void mdb_cursor_pop(MDB_cursor *mc) { if (mc->mc_snum) { -#ifndef MDB_DEBUG_SKIP +#if MDB_DEBUG MDB_page *top = mc->mc_pg[mc->mc_top]; #endif mc->mc_snum--; if (mc->mc_snum) mc->mc_top--; - DPRINTF("popped page %zu off db %u cursor %p", top->mp_pgno, - mc->mc_dbi, (void *) mc); + DPRINTF(("popped page %"Z"u off db %u cursor %p", top->mp_pgno, + mc->mc_dbi, (void *) mc)); } } @@ -4393,8 +4523,8 @@ mdb_cursor_pop(MDB_cursor *mc) static int mdb_cursor_push(MDB_cursor *mc, MDB_page *mp) { - DPRINTF("pushing page %zu on db %u cursor %p", mp->mp_pgno, - mc->mc_dbi, (void *) mc); + DPRINTF(("pushing page %"Z"u on db %u cursor %p", mp->mp_pgno, + mc->mc_dbi, (void *) mc)); if (mc->mc_snum >= CURSOR_STACK) { assert(mc->mc_snum < CURSOR_STACK); @@ -4418,12 +4548,11 @@ mdb_cursor_push(MDB_cursor *mc, MDB_page *mp) static int mdb_page_get(MDB_txn *txn, pgno_t pgno, MDB_page **ret, int *lvl) { + MDB_env *env = txn->mt_env; MDB_page *p = NULL; int level; - if (!((txn->mt_flags & MDB_TXN_RDONLY) | - (txn->mt_env->me_flags & MDB_WRITEMAP))) - { + if (!((txn->mt_flags & MDB_TXN_RDONLY) | (env->me_flags & MDB_WRITEMAP))) { MDB_txn *tx2 = txn; level = 1; do { @@ -4435,9 +4564,10 @@ mdb_page_get(MDB_txn *txn, pgno_t pgno, MDB_page **ret, int *lvl) * leave that unless page_touch happens again). */ if (tx2->mt_spill_pgs) { - x = mdb_midl_search(tx2->mt_spill_pgs, pgno); - if (x <= tx2->mt_spill_pgs[0] && tx2->mt_spill_pgs[x] == pgno) { - p = (MDB_page *)(txn->mt_env->me_map + txn->mt_env->me_psize * pgno); + MDB_ID pn = pgno << 1; + x = mdb_midl_search(tx2->mt_spill_pgs, pn); + if (x <= tx2->mt_spill_pgs[0] && tx2->mt_spill_pgs[x] == pn) { + p = (MDB_page *)(env->me_map + env->me_psize * pgno); goto done; } } @@ -4454,9 +4584,9 @@ mdb_page_get(MDB_txn *txn, pgno_t pgno, MDB_page **ret, int *lvl) if (pgno < txn->mt_next_pgno) { level = 0; - p = (MDB_page *)(txn->mt_env->me_map + txn->mt_env->me_psize * pgno); + p = (MDB_page *)(env->me_map + env->me_psize * pgno); } else { - DPRINTF("page %zu not found", pgno); + DPRINTF(("page %"Z"u not found", pgno)); assert(p != NULL); return MDB_PAGE_NOTFOUND; } @@ -4482,17 +4612,16 @@ static int mdb_page_search_root(MDB_cursor *mc, MDB_val *key, int modify) { MDB_page *mp = mc->mc_pg[mc->mc_top]; - DKBUF; int rc; - + DKBUF; while (IS_BRANCH(mp)) { MDB_node *node; indx_t i; - DPRINTF("branch page %zu has %u keys", mp->mp_pgno, NUMKEYS(mp)); + DPRINTF(("branch page %"Z"u has %u keys", mp->mp_pgno, NUMKEYS(mp))); assert(NUMKEYS(mp) > 1); - DPRINTF("found index 0 to page %zu", NODEPGNO(NODEPTR(mp, 0))); + DPRINTF(("found index 0 to page %"Z"u", NODEPGNO(NODEPTR(mp, 0)))); if (key == NULL) /* Initialize cursor to first page. */ i = 0; @@ -4514,8 +4643,7 @@ mdb_page_search_root(MDB_cursor *mc, MDB_val *key, int modify) } if (key) - DPRINTF("following index %u for key [%s]", - i, DKEY(key)); + DPRINTF(("following index %u for key [%s]", i, DKEY(key))); assert(i < NUMKEYS(mp)); node = NODEPTR(mp, i); @@ -4534,13 +4662,13 @@ mdb_page_search_root(MDB_cursor *mc, MDB_val *key, int modify) } if (!IS_LEAF(mp)) { - DPRINTF("internal error, index points to a %02X page!?", - mp->mp_flags); + DPRINTF(("internal error, index points to a %02X page!?", + mp->mp_flags)); return MDB_CORRUPTED; } - DPRINTF("found leaf page %zu for key [%s]", mp->mp_pgno, - key ? DKEY(key) : NULL); + DPRINTF(("found leaf page %"Z"u for key [%s]", mp->mp_pgno, + key ? DKEY(key) : NULL)); mc->mc_flags |= C_INITIALIZED; mc->mc_flags &= ~C_EOF; @@ -4592,7 +4720,7 @@ mdb_page_search(MDB_cursor *mc, MDB_val *key, int flags) */ if (F_ISSET(mc->mc_txn->mt_flags, MDB_TXN_ERROR)) { DPUTS("transaction has failed, must abort"); - return EINVAL; + return MDB_BAD_TXN; } else { /* Make sure we're using an up-to-date root */ if (mc->mc_dbi > MAIN_DBI) { @@ -4646,8 +4774,8 @@ mdb_page_search(MDB_cursor *mc, MDB_val *key, int flags) mc->mc_snum = 1; mc->mc_top = 0; - DPRINTF("db %u root page %zu has flags 0x%X", - mc->mc_dbi, root, mc->mc_pg[0]->mp_flags); + DPRINTF(("db %u root page %"Z"u has flags 0x%X", + mc->mc_dbi, root, mc->mc_pg[0]->mp_flags)); if (flags & MDB_PS_MODIFY) { if ((rc = mdb_page_touch(mc))) @@ -4665,33 +4793,40 @@ mdb_ovpage_free(MDB_cursor *mc, MDB_page *mp) { MDB_txn *txn = mc->mc_txn; pgno_t pg = mp->mp_pgno; - unsigned i, ovpages = mp->mp_pages; + unsigned x = 0, ovpages = mp->mp_pages; MDB_env *env = txn->mt_env; + MDB_IDL sl = txn->mt_spill_pgs; + MDB_ID pn = pg << 1; int rc; - DPRINTF("free ov page %zu (%d)", pg, ovpages); + DPRINTF(("free ov page %"Z"u (%d)", pg, ovpages)); /* If the page is dirty or on the spill list we just acquired it, * so we should give it back to our current free list, if any. - * Not currently supported in nested txns. * Otherwise put it onto the list of pages we freed in this txn. + * + * Won't create me_pghead: me_pglast must be inited along with it. + * Unsupported in nested txns: They would need to hide the page + * range in ancestor txns' dirty and spilled lists. */ - if (!(mp->mp_flags & P_DIRTY) && txn->mt_spill_pgs) { - unsigned x = mdb_midl_search(txn->mt_spill_pgs, pg); - if (x <= txn->mt_spill_pgs[0] && txn->mt_spill_pgs[x] == pg) { - /* This page is no longer spilled */ - for (; x < txn->mt_spill_pgs[0]; x++) - txn->mt_spill_pgs[x] = txn->mt_spill_pgs[x+1]; - txn->mt_spill_pgs[0]--; - goto release; - } - } - if ((mp->mp_flags & P_DIRTY) && !txn->mt_parent && env->me_pghead) { - unsigned j, x; + if (env->me_pghead && + !txn->mt_parent && + ((mp->mp_flags & P_DIRTY) || + (sl && (x = mdb_midl_search(sl, pn)) <= sl[0] && sl[x] == pn))) + { + unsigned i, j; pgno_t *mop; MDB_ID2 *dl, ix, iy; rc = mdb_midl_need(&env->me_pghead, ovpages); if (rc) return rc; + if (!(mp->mp_flags & P_DIRTY)) { + /* This page is no longer spilled */ + if (x == sl[0]) + sl[0]--; + else + sl[x] |= 1; + goto release; + } /* Remove from dirty list */ dl = txn->mt_u.dirty_list; x = dl[0].mid--; @@ -4752,7 +4887,7 @@ mdb_node_read(MDB_txn *txn, MDB_node *leaf, MDB_val *data) data->mv_size = NODEDSZ(leaf); memcpy(&pgno, NODEDATA(leaf), sizeof(pgno)); if ((rc = mdb_page_get(txn, pgno, &omp, NULL)) != 0) { - DPRINTF("read overflow page %zu failed", pgno); + DPRINTF(("read overflow page %"Z"u failed", pgno)); return rc; } data->mv_data = METADATA(omp); @@ -4771,13 +4906,16 @@ mdb_get(MDB_txn *txn, MDB_dbi dbi, assert(key); assert(data); - DPRINTF("===> get db %u key [%s]", dbi, DKEY(key)); + DPRINTF(("===> get db %u key [%s]", dbi, DKEY(key))); if (txn == NULL || !dbi || dbi >= txn->mt_numdbs || !(txn->mt_dbflags[dbi] & DB_VALID)) return EINVAL; + if (txn->mt_flags & MDB_TXN_ERROR) + return MDB_BAD_TXN; + if (key->mv_size == 0 || key->mv_size > MDB_MAXKEYSIZE) { - return EINVAL; + return MDB_BAD_VALSIZE; } mdb_cursor_init(&mc, txn, dbi, &mx); @@ -4804,13 +4942,13 @@ mdb_cursor_sibling(MDB_cursor *mc, int move_right) } mdb_cursor_pop(mc); - DPRINTF("parent page is page %zu, index %u", - mc->mc_pg[mc->mc_top]->mp_pgno, mc->mc_ki[mc->mc_top]); + DPRINTF(("parent page is page %"Z"u, index %u", + mc->mc_pg[mc->mc_top]->mp_pgno, mc->mc_ki[mc->mc_top])); if (move_right ? (mc->mc_ki[mc->mc_top] + 1u >= NUMKEYS(mc->mc_pg[mc->mc_top])) : (mc->mc_ki[mc->mc_top] == 0)) { - DPRINTF("no more keys left, moving to %s sibling", - move_right ? "right" : "left"); + DPRINTF(("no more keys left, moving to %s sibling", + move_right ? "right" : "left")); if ((rc = mdb_cursor_sibling(mc, move_right)) != MDB_SUCCESS) { /* undo cursor_pop before returning */ mc->mc_top++; @@ -4822,8 +4960,8 @@ mdb_cursor_sibling(MDB_cursor *mc, int move_right) mc->mc_ki[mc->mc_top]++; else mc->mc_ki[mc->mc_top]--; - DPRINTF("just moving to %s index key %u", - move_right ? "right" : "left", mc->mc_ki[mc->mc_top]); + DPRINTF(("just moving to %s index key %u", + move_right ? "right" : "left", mc->mc_ki[mc->mc_top])); } assert(IS_BRANCH(mc->mc_pg[mc->mc_top])); @@ -4859,8 +4997,11 @@ mdb_cursor_next(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_cursor_op op) if (F_ISSET(leaf->mn_flags, F_DUPDATA)) { if (op == MDB_NEXT || op == MDB_NEXT_DUP) { rc = mdb_cursor_next(&mc->mc_xcursor->mx_cursor, data, NULL, MDB_NEXT); - if (op != MDB_NEXT || rc != MDB_NOTFOUND) + if (op != MDB_NEXT || rc != MDB_NOTFOUND) { + if (rc == MDB_SUCCESS) + MDB_GET_KEY(leaf, key); return rc; + } } } else { mc->mc_xcursor->mx_cursor.mc_flags &= ~(C_INITIALIZED|C_EOF); @@ -4869,7 +5010,9 @@ mdb_cursor_next(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_cursor_op op) } } - DPRINTF("cursor_next: top page is %zu in cursor %p", mp->mp_pgno, (void *) mc); + DPRINTF(("cursor_next: top page is %"Z"u in cursor %p", mp->mp_pgno, (void *) mc)); + if (mc->mc_flags & C_DEL) + goto skip; if (mc->mc_ki[mc->mc_top] + 1u >= NUMKEYS(mp)) { DPUTS("=====> move to next sibling page"); @@ -4878,12 +5021,13 @@ mdb_cursor_next(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_cursor_op op) return rc; } mp = mc->mc_pg[mc->mc_top]; - DPRINTF("next page is %zu, key index %u", mp->mp_pgno, mc->mc_ki[mc->mc_top]); + DPRINTF(("next page is %"Z"u, key index %u", mp->mp_pgno, mc->mc_ki[mc->mc_top])); } else mc->mc_ki[mc->mc_top]++; - DPRINTF("==> cursor points to page %zu with %u keys, key index %u", - mp->mp_pgno, NUMKEYS(mp), mc->mc_ki[mc->mc_top]); +skip: + DPRINTF(("==> cursor points to page %"Z"u with %u keys, key index %u", + mp->mp_pgno, NUMKEYS(mp), mc->mc_ki[mc->mc_top])); if (IS_LEAF2(mp)) { key->mv_size = mc->mc_db->md_pad; @@ -4926,11 +5070,14 @@ mdb_cursor_prev(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_cursor_op op) if (mc->mc_db->md_flags & MDB_DUPSORT) { leaf = NODEPTR(mp, mc->mc_ki[mc->mc_top]); - if (op == MDB_PREV || op == MDB_PREV_DUP) { - if (F_ISSET(leaf->mn_flags, F_DUPDATA)) { + if (F_ISSET(leaf->mn_flags, F_DUPDATA)) { + if (op == MDB_PREV || op == MDB_PREV_DUP) { rc = mdb_cursor_prev(&mc->mc_xcursor->mx_cursor, data, NULL, MDB_PREV); - if (op != MDB_PREV || rc != MDB_NOTFOUND) + if (op != MDB_PREV || rc != MDB_NOTFOUND) { + if (rc == MDB_SUCCESS) + MDB_GET_KEY(leaf, key); return rc; + } } else { mc->mc_xcursor->mx_cursor.mc_flags &= ~(C_INITIALIZED|C_EOF); if (op == MDB_PREV_DUP) @@ -4939,7 +5086,7 @@ mdb_cursor_prev(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_cursor_op op) } } - DPRINTF("cursor_prev: top page is %zu in cursor %p", mp->mp_pgno, (void *) mc); + DPRINTF(("cursor_prev: top page is %"Z"u in cursor %p", mp->mp_pgno, (void *) mc)); if (mc->mc_ki[mc->mc_top] == 0) { DPUTS("=====> move to prev sibling page"); @@ -4948,14 +5095,14 @@ mdb_cursor_prev(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_cursor_op op) } mp = mc->mc_pg[mc->mc_top]; mc->mc_ki[mc->mc_top] = NUMKEYS(mp) - 1; - DPRINTF("prev page is %zu, key index %u", mp->mp_pgno, mc->mc_ki[mc->mc_top]); + DPRINTF(("prev page is %"Z"u, key index %u", mp->mp_pgno, mc->mc_ki[mc->mc_top])); } else mc->mc_ki[mc->mc_top]--; mc->mc_flags &= ~C_EOF; - DPRINTF("==> cursor points to page %zu with %u keys, key index %u", - mp->mp_pgno, NUMKEYS(mp), mc->mc_ki[mc->mc_top]); + DPRINTF(("==> cursor points to page %"Z"u with %u keys, key index %u", + mp->mp_pgno, NUMKEYS(mp), mc->mc_ki[mc->mc_top])); if (IS_LEAF2(mp)) { key->mv_size = mc->mc_db->md_pad; @@ -5015,7 +5162,7 @@ mdb_cursor_set(MDB_cursor *mc, MDB_val *key, MDB_val *data, nodekey.mv_data = LEAF2KEY(mp, 0, nodekey.mv_size); } else { leaf = NODEPTR(mp, 0); - MDB_GET_KEY(leaf, &nodekey); + MDB_GET_KEY2(leaf, nodekey); } rc = mc->mc_dbx->md_cmp(key, &nodekey); if (rc == 0) { @@ -5036,7 +5183,7 @@ mdb_cursor_set(MDB_cursor *mc, MDB_val *key, MDB_val *data, nkeys-1, nodekey.mv_size); } else { leaf = NODEPTR(mp, nkeys-1); - MDB_GET_KEY(leaf, &nodekey); + MDB_GET_KEY2(leaf, nodekey); } rc = mc->mc_dbx->md_cmp(key, &nodekey); if (rc == 0) { @@ -5054,7 +5201,7 @@ mdb_cursor_set(MDB_cursor *mc, MDB_val *key, MDB_val *data, mc->mc_ki[mc->mc_top], nodekey.mv_size); } else { leaf = NODEPTR(mp, mc->mc_ki[mc->mc_top]); - MDB_GET_KEY(leaf, &nodekey); + MDB_GET_KEY2(leaf, nodekey); } rc = mc->mc_dbx->md_cmp(key, &nodekey); if (rc == 0) { @@ -5084,7 +5231,10 @@ mdb_cursor_set(MDB_cursor *mc, MDB_val *key, MDB_val *data, if (!mc->mc_top) { /* There are no other pages */ mc->mc_ki[mc->mc_top] = 0; - return MDB_NOTFOUND; + if (op == MDB_SET_RANGE) + goto set1; + else + return MDB_NOTFOUND; } } @@ -5161,7 +5311,7 @@ set1: /* The key already matches in all other cases */ if (op == MDB_SET_RANGE || op == MDB_SET_KEY) MDB_GET_KEY(leaf, key); - DPRINTF("==> cursor placed on key [%s]", DKEY(key)); + DPRINTF(("==> cursor placed on key [%s]", DKEY(key))); return rc; } @@ -5266,9 +5416,13 @@ mdb_cursor_get(MDB_cursor *mc, MDB_val *key, MDB_val *data, { int rc; int exact = 0; + int (*mfunc)(MDB_cursor *mc, MDB_val *key, MDB_val *data); assert(mc); + if (mc->mc_txn->mt_flags & MDB_TXN_ERROR) + return MDB_BAD_TXN; + switch (op) { case MDB_GET_CURRENT: if (!(mc->mc_flags & C_INITIALIZED)) { @@ -5289,6 +5443,8 @@ mdb_cursor_get(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_GET_KEY(leaf, key); if (data) { if (F_ISSET(leaf->mn_flags, F_DUPDATA)) { + if (mc->mc_flags & C_DEL) + mdb_xcursor_init1(mc, leaf); rc = mdb_cursor_get(&mc->mc_xcursor->mx_cursor, data, NULL, MDB_GET_CURRENT); } else { rc = mdb_node_read(mc->mc_txn, leaf, data); @@ -5299,39 +5455,50 @@ mdb_cursor_get(MDB_cursor *mc, MDB_val *key, MDB_val *data, break; case MDB_GET_BOTH: case MDB_GET_BOTH_RANGE: - if (data == NULL || mc->mc_xcursor == NULL) { + if (data == NULL) { rc = EINVAL; break; } + if (mc->mc_xcursor == NULL) { + rc = MDB_INCOMPATIBLE; + break; + } /* FALLTHRU */ case MDB_SET: case MDB_SET_KEY: case MDB_SET_RANGE: - if (key == NULL || key->mv_size == 0 || key->mv_size > MDB_MAXKEYSIZE) { + if (key == NULL) { rc = EINVAL; + } else if (key->mv_size == 0 || key->mv_size > MDB_MAXKEYSIZE) { + rc = MDB_BAD_VALSIZE; } else if (op == MDB_SET_RANGE) rc = mdb_cursor_set(mc, key, data, op, NULL); else rc = mdb_cursor_set(mc, key, data, op, &exact); break; case MDB_GET_MULTIPLE: - if (data == NULL || - !(mc->mc_db->md_flags & MDB_DUPFIXED) || - !(mc->mc_flags & C_INITIALIZED)) { + if (data == NULL || !(mc->mc_flags & C_INITIALIZED)) { rc = EINVAL; break; } + if (!(mc->mc_db->md_flags & MDB_DUPFIXED)) { + rc = MDB_INCOMPATIBLE; + break; + } rc = MDB_SUCCESS; if (!(mc->mc_xcursor->mx_cursor.mc_flags & C_INITIALIZED) || (mc->mc_xcursor->mx_cursor.mc_flags & C_EOF)) break; goto fetchm; case MDB_NEXT_MULTIPLE: - if (data == NULL || - !(mc->mc_db->md_flags & MDB_DUPFIXED)) { + if (data == NULL) { rc = EINVAL; break; } + if (!(mc->mc_db->md_flags & MDB_DUPFIXED)) { + rc = MDB_INCOMPATIBLE; + break; + } if (!(mc->mc_flags & C_INITIALIZED)) rc = mdb_cursor_first(mc, key, data); else @@ -5374,34 +5541,37 @@ fetchm: rc = mdb_cursor_first(mc, key, data); break; case MDB_FIRST_DUP: - if (data == NULL || - !(mc->mc_db->md_flags & MDB_DUPSORT) || - !(mc->mc_flags & C_INITIALIZED) || - !(mc->mc_xcursor->mx_cursor.mc_flags & C_INITIALIZED)) { + mfunc = mdb_cursor_first; + mmove: + if (data == NULL || !(mc->mc_flags & C_INITIALIZED)) { rc = EINVAL; break; } - rc = mdb_cursor_first(&mc->mc_xcursor->mx_cursor, data, NULL); + if (mc->mc_xcursor == NULL) { + rc = MDB_INCOMPATIBLE; + break; + } + if (!(mc->mc_xcursor->mx_cursor.mc_flags & C_INITIALIZED)) { + rc = EINVAL; + break; + } + rc = mfunc(&mc->mc_xcursor->mx_cursor, data, NULL); break; case MDB_LAST: rc = mdb_cursor_last(mc, key, data); break; case MDB_LAST_DUP: - if (data == NULL || - !(mc->mc_db->md_flags & MDB_DUPSORT) || - !(mc->mc_flags & C_INITIALIZED) || - !(mc->mc_xcursor->mx_cursor.mc_flags & C_INITIALIZED)) { - rc = EINVAL; - break; - } - rc = mdb_cursor_last(&mc->mc_xcursor->mx_cursor, data, NULL); - break; + mfunc = mdb_cursor_last; + goto mmove; default: - DPRINTF("unhandled/unimplemented cursor operation %u", op); + DPRINTF(("unhandled/unimplemented cursor operation %u", op)); rc = EINVAL; break; } + if (mc->mc_flags & C_DEL) + mc->mc_flags ^= C_DEL; + return rc; } @@ -5460,28 +5630,28 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data, dcount = data[1].mv_size; data[1].mv_size = 0; if (!F_ISSET(mc->mc_db->md_flags, MDB_DUPFIXED)) - return EINVAL; + return MDB_INCOMPATIBLE; } nospill = flags & MDB_NOSPILL; flags &= ~MDB_NOSPILL; - if (F_ISSET(mc->mc_txn->mt_flags, MDB_TXN_RDONLY)) - return EACCES; + if (mc->mc_txn->mt_flags & (MDB_TXN_RDONLY|MDB_TXN_ERROR)) + return (mc->mc_txn->mt_flags & MDB_TXN_RDONLY) ? EACCES : MDB_BAD_TXN; if (flags != MDB_CURRENT && (key->mv_size == 0 || key->mv_size > MDB_MAXKEYSIZE)) - return EINVAL; + return MDB_BAD_VALSIZE; if (F_ISSET(mc->mc_db->md_flags, MDB_DUPSORT) && data->mv_size > MDB_MAXKEYSIZE) - return EINVAL; + return MDB_BAD_VALSIZE; #if SIZE_MAX > MAXDATASIZE if (data->mv_size > MAXDATASIZE) - return EINVAL; + return MDB_BAD_VALSIZE; #endif - DPRINTF("==> put db %u key [%s], size %zu, data size %zu", - mc->mc_dbi, DKEY(key), key ? key->mv_size:0, data->mv_size); + DPRINTF(("==> put db %u key [%s], size %"Z"u, data size %"Z"u", + mc->mc_dbi, DKEY(key), key ? key->mv_size:0, data->mv_size)); dkey.mv_size = 0; @@ -5514,7 +5684,7 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data, rc = mdb_cursor_set(mc, key, &d2, MDB_SET, &exact); } if ((flags & MDB_NOOVERWRITE) && rc == 0) { - DPRINTF("duplicate key [%s]", DKEY(key)); + DPRINTF(("duplicate key [%s]", DKEY(key))); *data = d2; return MDB_KEYEXIST; } @@ -5522,6 +5692,9 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data, return rc; } + if (mc->mc_flags & C_DEL) + mc->mc_flags ^= C_DEL; + /* Cursor is positioned, check for room in the dirty list */ if (!nospill) { if (flags & MDB_MULTIPLE) { @@ -5562,7 +5735,7 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data, if (IS_LEAF2(mc->mc_pg[mc->mc_top])) { unsigned int ksize = mc->mc_db->md_pad; if (key->mv_size != ksize) - return EINVAL; + return MDB_BAD_VALSIZE; if (flags == MDB_CURRENT) { char *ptr = LEAF2KEY(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], ksize); memcpy(ptr, key->mv_data, ksize); @@ -5591,9 +5764,16 @@ more: mc->mc_dbx->md_dcmp = mdb_cmp_cint; #endif #endif - /* if data matches, ignore it */ - if (!mc->mc_dbx->md_dcmp(data, &dkey)) - return (flags == MDB_NODUPDATA) ? MDB_KEYEXIST : MDB_SUCCESS; + /* if data matches, skip it */ + if (!mc->mc_dbx->md_dcmp(data, &dkey)) { + if (flags & MDB_NODUPDATA) + rc = MDB_KEYEXIST; + else if (flags & MDB_MULTIPLE) + goto next_mult; + else + rc = MDB_SUCCESS; + return rc; + } /* create a fake page for the dup items */ memcpy(dbuf, dkey.mv_data, dkey.mv_size); @@ -5775,7 +5955,7 @@ current: mdb_node_del(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], 0); mc->mc_db->md_entries--; } else { - DPRINTF("inserting key at index %i", mc->mc_ki[mc->mc_top]); + DPRINTF(("inserting key at index %i", mc->mc_ki[mc->mc_top])); insert = 1; } @@ -5874,15 +6054,16 @@ put_sub: mc->mc_db->md_entries++; if (flags & MDB_MULTIPLE) { if (!rc) { +next_mult: mcount++; + /* let caller know how many succeeded, if any */ + data[1].mv_size = mcount; if (mcount < dcount) { data[0].mv_data = (char *)data[0].mv_data + data[0].mv_size; leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]); goto more; } } - /* let caller know how many succeeded, if any */ - data[1].mv_size = mcount; } } done: @@ -5900,8 +6081,8 @@ mdb_cursor_del(MDB_cursor *mc, unsigned int flags) MDB_node *leaf; int rc; - if (F_ISSET(mc->mc_txn->mt_flags, MDB_TXN_RDONLY)) - return EACCES; + if (mc->mc_txn->mt_flags & (MDB_TXN_RDONLY|MDB_TXN_ERROR)) + return (mc->mc_txn->mt_flags & MDB_TXN_RDONLY) ? EACCES : MDB_BAD_TXN; if (!(mc->mc_flags & C_INITIALIZED)) return EINVAL; @@ -5917,7 +6098,7 @@ mdb_cursor_del(MDB_cursor *mc, unsigned int flags) leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]); if (!IS_LEAF2(mc->mc_pg[mc->mc_top]) && F_ISSET(leaf->mn_flags, F_DUPDATA)) { - if (flags != MDB_NODUPDATA) { + if (!(flags & MDB_NODUPDATA)) { if (!F_ISSET(leaf->mn_flags, F_SUBDATA)) { mc->mc_xcursor->mx_cursor.mc_pg[0] = NODEDATA(leaf); } @@ -5943,6 +6124,7 @@ mdb_cursor_del(MDB_cursor *mc, unsigned int flags) } } mc->mc_db->md_entries--; + mc->mc_flags |= C_DEL; return rc; } /* otherwise fall thru and delete the sub-DB */ @@ -5977,8 +6159,8 @@ mdb_page_new(MDB_cursor *mc, uint32_t flags, int num, MDB_page **mp) if ((rc = mdb_page_alloc(mc, num, &np))) return rc; - DPRINTF("allocated new mpage %zu, page size %u", - np->mp_pgno, mc->mc_txn->mt_env->me_psize); + DPRINTF(("allocated new mpage %"Z"u, page size %u", + np->mp_pgno, mc->mc_txn->mt_env->me_psize)); np->mp_flags = flags | P_DIRTY; np->mp_lower = PAGEHDRSZ; np->mp_upper = mc->mc_txn->mt_env->me_psize; @@ -6076,11 +6258,11 @@ mdb_node_add(MDB_cursor *mc, indx_t indx, assert(mp->mp_upper >= mp->mp_lower); - DPRINTF("add to %s %spage %zu index %i, data size %zu key size %zu [%s]", + DPRINTF(("add to %s %spage %"Z"u index %i, data size %"Z"u key size %"Z"u [%s]", IS_LEAF(mp) ? "leaf" : "branch", IS_SUBP(mp) ? "sub-" : "", mp->mp_pgno, indx, data ? data->mv_size : 0, - key ? key->mv_size : 0, key ? DKEY(key) : NULL); + key ? key->mv_size : 0, key ? DKEY(key) : NULL)); if (IS_LEAF2(mp)) { /* Move higher keys up one slot. */ @@ -6110,12 +6292,12 @@ mdb_node_add(MDB_cursor *mc, indx_t indx, int ovpages = OVPAGES(data->mv_size, mc->mc_txn->mt_env->me_psize); int rc; /* Put data on overflow page. */ - DPRINTF("data size is %zu, node would be %zu, put data on overflow page", - data->mv_size, node_size+data->mv_size); + DPRINTF(("data size is %"Z"u, node would be %"Z"u, put data on overflow page", + data->mv_size, node_size+data->mv_size)); node_size += sizeof(pgno_t); if ((rc = mdb_page_new(mc, P_OVERFLOW, ovpages, &ofp))) return rc; - DPRINTF("allocated overflow page %zu", ofp->mp_pgno); + DPRINTF(("allocated overflow page %"Z"u", ofp->mp_pgno)); flags |= F_BIGDATA; } else { node_size += data->mv_size; @@ -6124,11 +6306,11 @@ mdb_node_add(MDB_cursor *mc, indx_t indx, node_size += node_size & 1; if (node_size + sizeof(indx_t) > SIZELEFT(mp)) { - DPRINTF("not enough room in page %zu, got %u ptrs", - mp->mp_pgno, NUMKEYS(mp)); - DPRINTF("upper - lower = %u - %u = %u", mp->mp_upper, mp->mp_lower, - mp->mp_upper - mp->mp_lower); - DPRINTF("node size = %zu", node_size); + DPRINTF(("not enough room in page %"Z"u, got %u ptrs", + mp->mp_pgno, NUMKEYS(mp))); + DPRINTF(("upper - lower = %u - %u = %u", mp->mp_upper, mp->mp_lower, + mp->mp_upper - mp->mp_lower)); + DPRINTF(("node size = %"Z"u", node_size)); return MDB_PAGE_FULL; } @@ -6197,8 +6379,8 @@ mdb_node_del(MDB_page *mp, indx_t indx, int ksize) { pgno_t pgno; COPY_PGNO(pgno, mp->mp_pgno); - DPRINTF("delete node %u on %s page %zu", indx, - IS_LEAF(mp) ? "leaf" : "branch", pgno); + DPRINTF(("delete node %u on %s page %"Z"u", indx, + IS_LEAF(mp) ? "leaf" : "branch", pgno)); } #endif assert(indx < NUMKEYS(mp)); @@ -6359,8 +6541,8 @@ mdb_xcursor_init1(MDB_cursor *mc, MDB_node *node) mx->mx_db.md_flags |= MDB_INTEGERKEY; } } - DPRINTF("Sub-db %u for db %u root page %zu", mx->mx_cursor.mc_dbi, mc->mc_dbi, - mx->mx_db.md_root); + DPRINTF(("Sub-db %u for db %u root page %"Z"u", mx->mx_cursor.mc_dbi, mc->mc_dbi, + mx->mx_db.md_root)); mx->mx_dbflag = DB_VALID | (F_ISSET(mc->mc_pg[mc->mc_top]->mp_flags, P_DIRTY) ? DB_DIRTY : 0); mx->mx_dbx.md_name.mv_data = NODEKEY(node); @@ -6411,6 +6593,9 @@ mdb_cursor_open(MDB_txn *txn, MDB_dbi dbi, MDB_cursor **ret) if (txn == NULL || ret == NULL || dbi >= txn->mt_numdbs || !(txn->mt_dbflags[dbi] & DB_VALID)) return EINVAL; + if (txn->mt_flags & MDB_TXN_ERROR) + return MDB_BAD_TXN; + /* Allow read access to the freelist */ if (!dbi && !F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) return EINVAL; @@ -6456,8 +6641,8 @@ mdb_cursor_count(MDB_cursor *mc, size_t *countp) if (mc == NULL || countp == NULL) return EINVAL; - if (!(mc->mc_db->md_flags & MDB_DUPSORT)) - return EINVAL; + if (mc->mc_xcursor == NULL) + return MDB_INCOMPATIBLE; leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]); if (!F_ISSET(leaf->mn_flags, F_DUPDATA)) { @@ -6526,11 +6711,11 @@ mdb_update_key(MDB_cursor *mc, MDB_val *key) char kbuf2[(MDB_MAXKEYSIZE*2+1)]; k2.mv_data = NODEKEY(node); k2.mv_size = node->mn_ksize; - DPRINTF("update key %u (ofs %u) [%s] to [%s] on page %zu", + DPRINTF(("update key %u (ofs %u) [%s] to [%s] on page %"Z"u", indx, ptr, mdb_dkey(&k2, kbuf2), DKEY(key), - mp->mp_pgno); + mp->mp_pgno)); } #endif @@ -6544,7 +6729,7 @@ mdb_update_key(MDB_cursor *mc, MDB_val *key) if (delta > 0 && SIZELEFT(mp) < delta) { pgno_t pgno; /* not enough space left, do a delete and split */ - DPRINTF("Not enough room, delta = %d, splitting...", delta); + DPRINTF(("Not enough room, delta = %d, splitting...", delta)); pgno = NODEPGNO(node); mdb_node_del(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top], 0); return mdb_page_split(mc, key, NULL, pgno, MDB_SPLIT_REPLACE); @@ -6654,12 +6839,12 @@ mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst) return rc; } - DPRINTF("moving %s node %u [%s] on page %zu to node %u on page %zu", + DPRINTF(("moving %s node %u [%s] on page %"Z"u to node %u on page %"Z"u", IS_LEAF(csrc->mc_pg[csrc->mc_top]) ? "leaf" : "branch", csrc->mc_ki[csrc->mc_top], DKEY(&key), csrc->mc_pg[csrc->mc_top]->mp_pgno, - cdst->mc_ki[cdst->mc_top], cdst->mc_pg[cdst->mc_top]->mp_pgno); + cdst->mc_ki[cdst->mc_top], cdst->mc_pg[cdst->mc_top]->mp_pgno)); /* Add the node to the destination page. */ @@ -6705,8 +6890,8 @@ mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst) key.mv_size = NODEKSZ(srcnode); key.mv_data = NODEKEY(srcnode); } - DPRINTF("update separator for source page %zu to [%s]", - csrc->mc_pg[csrc->mc_top]->mp_pgno, DKEY(&key)); + DPRINTF(("update separator for source page %"Z"u to [%s]", + csrc->mc_pg[csrc->mc_top]->mp_pgno, DKEY(&key))); mdb_cursor_copy(csrc, &mn); mn.mc_snum--; mn.mc_top--; @@ -6733,8 +6918,8 @@ mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst) key.mv_size = NODEKSZ(srcnode); key.mv_data = NODEKEY(srcnode); } - DPRINTF("update separator for destination page %zu to [%s]", - cdst->mc_pg[cdst->mc_top]->mp_pgno, DKEY(&key)); + DPRINTF(("update separator for destination page %"Z"u to [%s]", + cdst->mc_pg[cdst->mc_top]->mp_pgno, DKEY(&key))); mdb_cursor_copy(cdst, &mn); mn.mc_snum--; mn.mc_top--; @@ -6771,8 +6956,8 @@ mdb_page_merge(MDB_cursor *csrc, MDB_cursor *cdst) MDB_val key, data; unsigned nkeys; - DPRINTF("merging page %zu into %zu", csrc->mc_pg[csrc->mc_top]->mp_pgno, - cdst->mc_pg[cdst->mc_top]->mp_pgno); + DPRINTF(("merging page %"Z"u into %"Z"u", csrc->mc_pg[csrc->mc_top]->mp_pgno, + cdst->mc_pg[cdst->mc_top]->mp_pgno)); assert(csrc->mc_snum > 1); /* can't merge root page */ assert(cdst->mc_snum > 1); @@ -6824,8 +7009,9 @@ mdb_page_merge(MDB_cursor *csrc, MDB_cursor *cdst) } } - DPRINTF("dst page %zu now has %u keys (%.1f%% filled)", - cdst->mc_pg[cdst->mc_top]->mp_pgno, NUMKEYS(cdst->mc_pg[cdst->mc_top]), (float)PAGEFILL(cdst->mc_txn->mt_env, cdst->mc_pg[cdst->mc_top]) / 10); + DPRINTF(("dst page %"Z"u now has %u keys (%.1f%% filled)", + cdst->mc_pg[cdst->mc_top]->mp_pgno, NUMKEYS(cdst->mc_pg[cdst->mc_top]), + (float)PAGEFILL(cdst->mc_txn->mt_env, cdst->mc_pg[cdst->mc_top]) / 10)); /* Unlink the src page from parent and add to free list. */ @@ -6915,9 +7101,10 @@ mdb_rebalance(MDB_cursor *mc) { pgno_t pgno; COPY_PGNO(pgno, mc->mc_pg[mc->mc_top]->mp_pgno); - DPRINTF("rebalancing %s page %zu (has %u keys, %.1f%% full)", + DPRINTF(("rebalancing %s page %"Z"u (has %u keys, %.1f%% full)", IS_LEAF(mc->mc_pg[mc->mc_top]) ? "leaf" : "branch", - pgno, NUMKEYS(mc->mc_pg[mc->mc_top]), (float)PAGEFILL(mc->mc_txn->mt_env, mc->mc_pg[mc->mc_top]) / 10); + pgno, NUMKEYS(mc->mc_pg[mc->mc_top]), + (float)PAGEFILL(mc->mc_txn->mt_env, mc->mc_pg[mc->mc_top]) / 10)); } #endif @@ -6926,8 +7113,8 @@ mdb_rebalance(MDB_cursor *mc) #if MDB_DEBUG pgno_t pgno; COPY_PGNO(pgno, mc->mc_pg[mc->mc_top]->mp_pgno); - DPRINTF("no need to rebalance page %zu, above fill threshold", - pgno); + DPRINTF(("no need to rebalance page %"Z"u, above fill threshold", + pgno)); #endif return MDB_SUCCESS; } @@ -7047,8 +7234,9 @@ mdb_rebalance(MDB_cursor *mc) mc->mc_ki[mc->mc_top] = 0; } - DPRINTF("found neighbor page %zu (%u keys, %.1f%% full)", - mn.mc_pg[mn.mc_top]->mp_pgno, NUMKEYS(mn.mc_pg[mn.mc_top]), (float)PAGEFILL(mc->mc_txn->mt_env, mn.mc_pg[mn.mc_top]) / 10); + DPRINTF(("found neighbor page %"Z"u (%u keys, %.1f%% full)", + mn.mc_pg[mn.mc_top]->mp_pgno, NUMKEYS(mn.mc_pg[mn.mc_top]), + (float)PAGEFILL(mc->mc_txn->mt_env, mn.mc_pg[mn.mc_top]) / 10)); /* If the neighbor page is above threshold and has enough keys, * move one key from it. Otherwise we should try to merge them. @@ -7074,6 +7262,7 @@ mdb_cursor_del0(MDB_cursor *mc, MDB_node *leaf) int rc; MDB_page *mp; indx_t ki; + unsigned int nkeys; mp = mc->mc_pg[mc->mc_top]; ki = mc->mc_ki[mc->mc_top]; @@ -7093,30 +7282,34 @@ mdb_cursor_del0(MDB_cursor *mc, MDB_node *leaf) rc = mdb_rebalance(mc); if (rc != MDB_SUCCESS) mc->mc_txn->mt_flags |= MDB_TXN_ERROR; - /* if mc points past last node in page, invalidate */ - else if (mc->mc_ki[mc->mc_top] >= NUMKEYS(mc->mc_pg[mc->mc_top])) - mc->mc_flags &= ~(C_INITIALIZED|C_EOF); - - { - /* Adjust other cursors pointing to mp */ + else { MDB_cursor *m2; - unsigned int nkeys; MDB_dbi dbi = mc->mc_dbi; mp = mc->mc_pg[mc->mc_top]; nkeys = NUMKEYS(mp); + + /* if mc points past last node in page, find next sibling */ + if (mc->mc_ki[mc->mc_top] >= nkeys) + mdb_cursor_sibling(mc, 1); + + /* Adjust other cursors pointing to mp */ for (m2 = mc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) { if (m2 == mc) continue; if (!(m2->mc_flags & C_INITIALIZED)) continue; if (m2->mc_pg[mc->mc_top] == mp) { - if (m2->mc_ki[mc->mc_top] > ki) - m2->mc_ki[mc->mc_top]--; + if (m2->mc_ki[mc->mc_top] >= ki) { + m2->mc_flags |= C_DEL; + if (m2->mc_ki[mc->mc_top] > ki) + m2->mc_ki[mc->mc_top]--; + } if (m2->mc_ki[mc->mc_top] >= nkeys) - m2->mc_flags &= ~(C_INITIALIZED|C_EOF); + mdb_cursor_sibling(m2, 1); } } + mc->mc_flags |= C_DEL; } return rc; @@ -7135,22 +7328,25 @@ mdb_del(MDB_txn *txn, MDB_dbi dbi, assert(key != NULL); - DPRINTF("====> delete db %u key [%s]", dbi, DKEY(key)); + DPRINTF(("====> delete db %u key [%s]", dbi, DKEY(key))); if (txn == NULL || !dbi || dbi >= txn->mt_numdbs || !(txn->mt_dbflags[dbi] & DB_VALID)) return EINVAL; - if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) { - return EACCES; - } + if (txn->mt_flags & (MDB_TXN_RDONLY|MDB_TXN_ERROR)) + return (txn->mt_flags & MDB_TXN_RDONLY) ? EACCES : MDB_BAD_TXN; if (key->mv_size == 0 || key->mv_size > MDB_MAXKEYSIZE) { - return EINVAL; + return MDB_BAD_VALSIZE; } mdb_cursor_init(&mc, txn, dbi, &mx); exact = 0; + if (!F_ISSET(txn->mt_dbs[dbi].md_flags, MDB_DUPSORT)) { + /* must ignore any data */ + data = NULL; + } if (data) { op = MDB_GET_BOTH; rdata = *data; @@ -7208,14 +7404,14 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno mp = mc->mc_pg[mc->mc_top]; newindx = mc->mc_ki[mc->mc_top]; - DPRINTF("-----> splitting %s page %zu and adding [%s] at index %i", + DPRINTF(("-----> splitting %s page %"Z"u and adding [%s] at index %i", IS_LEAF(mp) ? "leaf" : "branch", mp->mp_pgno, - DKEY(newkey), mc->mc_ki[mc->mc_top]); + DKEY(newkey), mc->mc_ki[mc->mc_top])); /* Create a right sibling. */ if ((rc = mdb_page_new(mc, mp->mp_flags, 1, &rp))) return rc; - DPRINTF("new right sibling: page %zu", rp->mp_pgno); + DPRINTF(("new right sibling: page %"Z"u", rp->mp_pgno)); if (mc->mc_snum < 2) { if ((rc = mdb_page_new(mc, P_BRANCH, 1, &pp))) @@ -7226,7 +7422,7 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno mc->mc_pg[0] = pp; mc->mc_ki[0] = 0; mc->mc_db->md_root = pp->mp_pgno; - DPRINTF("root split! new root = %zu", pp->mp_pgno); + DPRINTF(("root split! new root = %"Z"u", pp->mp_pgno)); mc->mc_db->md_depth++; new_root = 1; @@ -7244,7 +7440,7 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno ptop = 0; } else { ptop = mc->mc_top-1; - DPRINTF("parent branch page is %zu", mc->mc_pg[ptop]->mp_pgno); + DPRINTF(("parent branch page is %"Z"u", mc->mc_pg[ptop]->mp_pgno)); } mc->mc_flags |= C_SPLITTING; @@ -7387,7 +7583,7 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno } newsep: - DPRINTF("separator is [%s]", DKEY(&sepkey)); + DPRINTF(("separator is [%s]", DKEY(&sepkey))); /* Copy separator key to the parent. */ @@ -7574,7 +7770,7 @@ done: m3->mc_snum++; m3->mc_top++; } - if (m3->mc_pg[mc->mc_top] == mp) { + if (m3->mc_top >= mc->mc_top && m3->mc_pg[mc->mc_top] == mp) { if (m3->mc_ki[mc->mc_top] >= newindx && !(nflags & MDB_SPLIT_REPLACE)) m3->mc_ki[mc->mc_top]++; if (m3->mc_ki[mc->mc_top] >= fixup) { @@ -7582,7 +7778,7 @@ done: m3->mc_ki[mc->mc_top] -= fixup; m3->mc_ki[ptop] = mn.mc_ki[ptop]; } - } else if (!did_split && m3->mc_pg[ptop] == mc->mc_pg[ptop] && + } else if (!did_split && m3->mc_top >= ptop && m3->mc_pg[ptop] == mc->mc_pg[ptop] && m3->mc_ki[ptop] >= mc->mc_ki[ptop]) { m3->mc_ki[ptop]++; } @@ -7604,12 +7800,11 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi, if (txn == NULL || !dbi || dbi >= txn->mt_numdbs || !(txn->mt_dbflags[dbi] & DB_VALID)) return EINVAL; - if (F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)) { - return EACCES; - } + if (txn->mt_flags & (MDB_TXN_RDONLY|MDB_TXN_ERROR)) + return (txn->mt_flags & MDB_TXN_RDONLY) ? EACCES : MDB_BAD_TXN; if (key->mv_size == 0 || key->mv_size > MDB_MAXKEYSIZE) { - return EINVAL; + return MDB_BAD_VALSIZE; } if ((flags & (MDB_NOOVERWRITE|MDB_NODUPDATA|MDB_RESERVE|MDB_APPEND|MDB_APPENDDUP)) != flags) @@ -7694,10 +7889,12 @@ mdb_env_info(MDB_env *env, MDB_envinfo *arg) arg->me_mapaddr = (env->me_flags & MDB_FIXEDMAP) ? env->me_map : 0; arg->me_mapsize = env->me_mapsize; arg->me_maxreaders = env->me_maxreaders; + /* me_numreaders may be zero if this process never used any readers. Use * the shared numreader count if it exists. */ arg->me_numreaders = env->me_txns ? env->me_txns->mti_numreaders : env->me_numreaders; + arg->me_last_pgno = env->me_metas[toggle]->mm_last_pg; arg->me_last_txnid = env->me_metas[toggle]->mm_txnid; return MDB_SUCCESS; @@ -7741,6 +7938,8 @@ int mdb_dbi_open(MDB_txn *txn, const char *name, unsigned int flags, MDB_dbi *db if ((flags & VALID_FLAGS) != flags) return EINVAL; + if (txn->mt_flags & MDB_TXN_ERROR) + return MDB_BAD_TXN; /* main DB? */ if (!name) { @@ -7795,7 +7994,7 @@ int mdb_dbi_open(MDB_txn *txn, const char *name, unsigned int flags, MDB_dbi *db /* make sure this is actually a DB */ MDB_node *node = NODEPTR(mc.mc_pg[mc.mc_top], mc.mc_ki[mc.mc_top]); if (!(node->mn_flags & F_SUBDATA)) - return EINVAL; + return MDB_INCOMPATIBLE; } else if (rc == MDB_NOTFOUND && (flags & MDB_CREATE)) { /* Create if requested */ MDB_db dummy; @@ -7817,7 +8016,6 @@ int mdb_dbi_open(MDB_txn *txn, const char *name, unsigned int flags, MDB_dbi *db txn->mt_dbflags[slot] = dbflag; memcpy(&txn->mt_dbs[slot], data.mv_data, sizeof(MDB_db)); *dbi = slot; - txn->mt_env->me_dbflags[slot] = txn->mt_dbs[slot].md_flags; mdb_default_cmp(txn, slot); if (!unused) { txn->mt_numdbs++; @@ -7853,12 +8051,12 @@ void mdb_dbi_close(MDB_env *env, MDB_dbi dbi) free(ptr); } -int mdb_dbi_flags(MDB_env *env, MDB_dbi dbi, unsigned int *flags) +int mdb_dbi_flags(MDB_txn *txn, MDB_dbi dbi, unsigned int *flags) { /* We could return the flags for the FREE_DBI too but what's the point? */ - if (dbi <= MAIN_DBI || dbi >= env->me_numdbs) + if (txn == NULL || dbi < MAIN_DBI || dbi >= txn->mt_numdbs) return EINVAL; - *flags = env->me_dbflags[dbi]; + *flags = txn->mt_dbs[dbi].md_flags & PERSISTENT_FLAGS; return MDB_SUCCESS; } @@ -8026,6 +8224,11 @@ int mdb_set_relctx(MDB_txn *txn, MDB_dbi dbi, void *ctx) return MDB_SUCCESS; } +int mdb_env_get_maxkeysize(MDB_env *env) +{ + return MDB_MAXKEYSIZE; +} + int mdb_reader_list(MDB_env *env, MDB_msg_func *func, void *ctx) { unsigned int i, rdrs; @@ -8046,9 +8249,9 @@ int mdb_reader_list(MDB_env *env, MDB_msg_func *func, void *ctx) int rc; tid = mr[i].mr_tid; if (mr[i].mr_txnid == (txnid_t)-1) { - sprintf(buf, "%10d %zx -\n", mr[i].mr_pid, tid); + sprintf(buf, "%10d %"Z"x -\n", mr[i].mr_pid, tid); } else { - sprintf(buf, "%10d %zx %zu\n", mr[i].mr_pid, tid, mr[i].mr_txnid); + sprintf(buf, "%10d %"Z"x %"Z"u\n", mr[i].mr_pid, tid, mr[i].mr_txnid); } if (first) { first = 0; @@ -8093,7 +8296,7 @@ static int mdb_pid_insert(pid_t *ids, pid_t pid) return -1; } } - + if( val > 0 ) { ++cursor; } @@ -8128,9 +8331,10 @@ int mdb_reader_check(MDB_env *env, int *dead) if (mr[i].mr_pid && mr[i].mr_pid != env->me_pid) { pid = mr[i].mr_pid; if (mdb_pid_insert(pids, pid) == 0) { - if (mdb_reader_pid(env, Pidcheck, pid)) { + if (!mdb_reader_pid(env, Pidcheck, pid)) { LOCK_MUTEX_R(env); - if (mdb_reader_pid(env, Pidcheck, pid)) { + /* Recheck, a new process may have reused pid */ + if (!mdb_reader_pid(env, Pidcheck, pid)) { for (j=i; j - *
  • EINVAL - an invalid parameter was specified, or the environment is already open. + *
  • EINVAL - an invalid parameter was specified, or the environment has + * an active write transaction. * */ int mdb_env_set_mapsize(MDB_env *env, size_t size); @@ -707,6 +731,13 @@ int mdb_env_get_maxreaders(MDB_env *env, unsigned int *readers); */ int mdb_env_set_maxdbs(MDB_env *env, MDB_dbi dbs); + /** @brief Get the maximum size of a key for the environment. + * + * @param[in] env An environment handle returned by #mdb_env_create() + * @return The maximum size of a key. (#MDB_MAXKEYSIZE) + */ +int mdb_env_get_maxkeysize(MDB_env *env); + /** @brief Create a transaction for use with the environment. * * The transaction handle may be discarded using #mdb_txn_abort() or #mdb_txn_commit(). @@ -718,8 +749,8 @@ int mdb_env_set_maxdbs(MDB_env *env, MDB_dbi dbs); * @param[in] parent If this parameter is non-NULL, the new transaction * will be a nested transaction, with the transaction indicated by \b parent * as its parent. Transactions may be nested to any level. A parent - * transaction may not issue any other operations besides mdb_txn_begin, - * mdb_txn_abort, or mdb_txn_commit while it has active child transactions. + * transaction and its cursors may not issue any other operations than + * mdb_txn_commit and mdb_txn_abort while it has active child transactions. * @param[in] flags Special options for this transaction. This parameter * must be set to 0 or by bitwise OR'ing together one or more of the * values described here. @@ -734,7 +765,8 @@ int mdb_env_set_maxdbs(MDB_env *env, MDB_dbi dbs); *
  • #MDB_PANIC - a fatal error occurred earlier and the environment * must be shut down. *
  • #MDB_MAP_RESIZED - another process wrote data beyond this MDB_env's - * mapsize and the environment must be shut down. + * mapsize and this environment's map must be resized as well. + * See #mdb_env_set_mapsize(). *
  • #MDB_READERS_FULL - a read-only transaction was requested and * the reader lock table is full. See #mdb_env_set_maxreaders(). *
  • ENOMEM - out of memory. @@ -742,6 +774,12 @@ int mdb_env_set_maxdbs(MDB_env *env, MDB_dbi dbs); */ int mdb_txn_begin(MDB_env *env, MDB_txn *parent, unsigned int flags, MDB_txn **txn); + /** @brief Returns the transaction's #MDB_env + * + * @param[in] txn A transaction handle returned by #mdb_txn_begin() + */ +MDB_env *mdb_txn_env(MDB_txn *txn); + /** @brief Commit all the operations of a transaction into the database. * * The transaction handle is freed. It and its cursors must not be used @@ -891,12 +929,12 @@ int mdb_stat(MDB_txn *txn, MDB_dbi dbi, MDB_stat *stat); /** @brief Retrieve the DB flags for a database handle. * - * @param[in] env An environment handle returned by #mdb_env_create() + * @param[in] txn A transaction handle returned by #mdb_txn_begin() * @param[in] dbi A database handle returned by #mdb_dbi_open() * @param[out] flags Address where the flags will be returned. * @return A non-zero error value on failure and 0 on success. */ -int mdb_dbi_flags(MDB_env *env, MDB_dbi dbi, unsigned int *flags); +int mdb_dbi_flags(MDB_txn *txn, MDB_dbi dbi, unsigned int *flags); /** @brief Close a database handle. * @@ -909,14 +947,12 @@ int mdb_dbi_flags(MDB_env *env, MDB_dbi dbi, unsigned int *flags); */ void mdb_dbi_close(MDB_env *env, MDB_dbi dbi); - /** @brief Delete a database and/or free all its pages. + /** @brief Empty or delete+close a database. * - * If the \b del parameter is 1, the DB handle will be closed - * and the DB will be deleted. * @param[in] txn A transaction handle returned by #mdb_txn_begin() * @param[in] dbi A database handle returned by #mdb_dbi_open() - * @param[in] del 1 to delete the DB from the environment, - * 0 to just free its pages. + * @param[in] del 0 to empty the DB, 1 to delete it from the + * environment and close the DB handle. * @return A non-zero error value on failure and 0 on success. */ int mdb_drop(MDB_txn *txn, MDB_dbi dbi, int del);