diff --git a/src/stasis/blobManager.c b/src/stasis/blobManager.c index a3f58d5..d477a8c 100644 --- a/src/stasis/blobManager.c +++ b/src/stasis/blobManager.c @@ -102,8 +102,10 @@ page_impl stasis_page_blob_impl() { 0, //pageGenericBlockDone, 0, //freespace, 0, //compact, + 0, //compact slotids 0, //preRalloc, 0, //postRalloc, + 0, //splice, 0, //Free, 0, //XXX page_impl_dereference_identity, stasis_page_blob_loaded, diff --git a/src/stasis/page.c b/src/stasis/page.c index 034a499..5214c59 100644 --- a/src/stasis/page.c +++ b/src/stasis/page.c @@ -245,6 +245,10 @@ void stasis_record_alloc_done(int xid, Page * p, recordid rid){ page_impls[p->pageType] .recordPostAlloc(xid, p, rid); } +void stasis_record_splice(int xid, Page * p, slotid_t first, slotid_t second) { + page_impls[p->pageType] + .recordSplice(xid, p, first, second); +} void stasis_record_free(int xid, Page * p, recordid rid){ page_impls[p->pageType] .recordFree(xid, p, rid); @@ -274,6 +278,10 @@ void stasis_record_compact(Page * p){ page_impls[p->pageType] .pageCompact(p); } +void stasis_record_compact_slotids(int xid, Page * p) { + page_impls[p->pageType] + .pageCompactSlotIDs(xid, p); +} /** @todo How should the LSN of pages without a page_type be handled? This only works because we don't have LSN-free pages yet. With diff --git a/src/stasis/page/fixed.c b/src/stasis/page/fixed.c index 94a4f7b..d2f5519 100644 --- a/src/stasis/page/fixed.c +++ b/src/stasis/page/fixed.c @@ -85,6 +85,9 @@ static int fixedFreespace(int xid, Page * p) { static void fixedCompact(Page * p) { // no-op } +static void fixedCompactSlotIds(int xid, Page * p) { + abort(); +} static recordid fixedPreAlloc(int xid, Page *p, int size) { assertlocked(p->rwlatch); if(stasis_fixed_records_per_page(*recordsize_ptr(p)) > *recordcount_ptr(p)) { @@ -103,6 +106,9 @@ static void fixedPostAlloc(int xid, Page *p, recordid rid) { assert(*recordsize_ptr(p) == rid.size); (*recordcount_ptr(p))++; } +static void fixedSplice(int xid, Page *p, slotid_t first, slotid_t second) { + abort(); +} static void fixedFree(int xid, Page *p, recordid rid) { assertlocked(p->rwlatch); if(*recordsize_ptr(p) == rid.slot+1) { @@ -140,8 +146,10 @@ page_impl fixedImpl() { stasis_block_done_default_impl, fixedFreespace, fixedCompact, + fixedCompactSlotIds, fixedPreAlloc, fixedPostAlloc, + fixedSplice, fixedFree, 0, // XXX dereference fixedLoaded, // loaded diff --git a/src/stasis/page/slotted.c b/src/stasis/page/slotted.c index 67bdbbd..646c793 100644 --- a/src/stasis/page/slotted.c +++ b/src/stasis/page/slotted.c @@ -207,6 +207,22 @@ static void slottedCompact(Page * page) { #endif // SLOTTED_PAGE_OLD_CHECKS } +static void slottedCompactSlotIDs(int xid, Page * p) { + int16_t numSlots = *stasis_page_slotted_numslots_ptr(p); + int16_t out = 0; + for(int16_t in = 0; in < numSlots; in++) { + if(*stasis_page_slotted_slot_ptr(p, in) == INVALID_SLOT) { + // nop + } else { + *stasis_page_slotted_slot_ptr(p, out) = *stasis_page_slotted_slot_cptr(p, in); + *stasis_page_slotted_slot_length_ptr(p, out) = *stasis_page_slotted_slot_length_cptr(p, in); + out++; + } + } + *stasis_page_slotted_numslots_ptr(p) = out; + *stasis_page_slotted_freelist_ptr(p) = INVALID_SLOT; +} + /** Check to see how many bytes can fit in a given slot. This makes it possible for callers to guarantee the safety @@ -487,6 +503,18 @@ static void slottedPostRalloc(int xid, Page * page, recordid rid) { } +static void slottedSpliceSlot(int xid, Page *p, slotid_t a, slotid_t b) { + assert(a < b); + int16_t b_slot = *stasis_page_slotted_slot_cptr(p, b); + int16_t b_slot_len = *stasis_page_slotted_slot_length_cptr(p, b); + for(int16_t i = b-1; i >= a; i--) { + *stasis_page_slotted_slot_ptr(p, i+1) = *stasis_page_slotted_slot_cptr(p, i); + *stasis_page_slotted_slot_length_ptr(p, i+1) = *stasis_page_slotted_slot_length_cptr(p, i); + } + *stasis_page_slotted_slot_ptr(p, a) = b_slot; + *stasis_page_slotted_slot_length_ptr(p, b) = b_slot_len; +} + static void slottedFree(int xid, Page * p, recordid rid) { slottedSanityCheck(p, rid); @@ -560,8 +588,10 @@ static page_impl pi = { stasis_block_done_default_impl, slottedFreespace, slottedCompact, + slottedCompactSlotIDs, slottedPreRalloc, slottedPostRalloc, + slottedSpliceSlot, slottedFree, 0, //XXX page_impl_dereference_identity, slottedLoaded, diff --git a/stasis/page.h b/stasis/page.h index 7aada11..3044be8 100644 --- a/stasis/page.h +++ b/stasis/page.h @@ -546,10 +546,12 @@ recordid stasis_record_first(int xid, Page * p); recordid stasis_record_next(int xid, Page * p, recordid prev); recordid stasis_record_alloc_begin(int xid, Page * p, int size); void stasis_record_alloc_done(int xid, Page * p, recordid rid); +void stasis_record_splice(int xid, Page * p, slotid_t first, slotid_t second); void stasis_record_free(int xid, Page * p, recordid rid); int stasis_block_supported(int xid, Page * p); int stasis_record_freespace(int xid, Page * p); void stasis_record_compact(Page * p); +void stasis_record_compact_slotids(int xid, Page * p); void stasis_page_loaded(Page * p, pagetype_t type); void stasis_page_flushed(Page * p); void stasis_page_cleanup(Page * p); @@ -782,6 +784,11 @@ typedef struct page_impl { other side effects. */ void(*pageCompact)(Page *p); + /** + * Compact the slotids on the page; the order of valid records will + * be preserved, but holes due to invalid slots will be closed. + */ + void(*pageCompactSlotIDs)(int xid, Page *p); /** Generate a new, appropriately sized recordid. This is the first of two allocation phases, and does not actually modify the page. @@ -808,6 +815,19 @@ typedef struct page_impl { @see Talloc(), page_impl.recordPreAlloc() */ void (*recordPostAlloc)(int xid, Page *p, recordid rid); + /** Reorder records within a page. The second slot will be moved before the + * first slot; all intermediate slots will be shifted up one slotid. The + * intermediate records must always be valid; this can be ensured by calling + * pageCompact(). As long as no records are freed after the page is allocated + * or compacted, all intermediate records will be valid. + * + * @param xid The active transaction + * @param p The page that will be modified. The page must be latched against writes. + * @param first The first slot to be shifted to higher slots in the page. + * @param second The slot which will be moved to first's original location. Second + * must be greater than first. + */ + void (*recordSplice)(int xid, Page *p, slotid_t first, slotid_t second); /** Free a record. The page implementation doesn't need to worry about uncommitted deallocations; that is handled by a higher level. diff --git a/test/stasis/check_page.c b/test/stasis/check_page.c index 3c536cb..1698f1e 100644 --- a/test/stasis/check_page.c +++ b/test/stasis/check_page.c @@ -587,6 +587,70 @@ START_TEST(pageTrecordTypeTest) { Tdeinit(); } END_TEST +START_TEST(pageTreeOpTest) { + Tinit(); + int xid = Tbegin(); + + pageid_t page = TpageAlloc(xid); + + // run a sanity check on a page pinned in ram; don't bother logging (since we don't care about recovery for this test) + Page *p = loadPage(xid, page); + + writelock(p->rwlatch, 0); + + stasis_page_slotted_initialize_page(p); + + recordid rids[5]; + // Alloc + set five records + for(int i = 0; i < 5; i++) { + rids[i] = stasis_record_alloc_begin(xid, p, sizeof(int)); + stasis_record_alloc_done(xid, p, rids[i]); + assert(rids[i].slot == i); + stasis_record_write(xid, p, rids[i], (byte*)(&i)); + } + //Read them back, free the odd ones. + for(int i = 0; i < 5; i++) { + int j; + stasis_record_read(xid, p, rids[i], (byte*)(&j)); + assert(i == j); + if(i % 2) { + stasis_record_free(xid, p, rids[i]); + } + } + // Close holes due to odd ones. + stasis_record_compact_slotids(xid, p); + for(int i = 0; i < 3; i++) { + int k = i * 2; + int j; + assert(stasis_record_type_read(xid, p, rids[i]) == NORMAL_SLOT); + stasis_record_read(xid, p, rids[i], (byte*)(&j)); + assert(j == k); + } + // Reinsert odd ones at the end, then splice them back to their original position + for(int i = 1; i < 5; i+=2) { + recordid rid = stasis_record_alloc_begin(xid, p, sizeof(int)); + stasis_record_alloc_done(xid, p, rid); + stasis_record_write(xid, p, rid, (byte*)(&i)); + stasis_record_splice(xid, p, i, rid.slot); + int j; + stasis_record_read(xid, p, rids[i], (byte*)(&j)); + assert(i == j); + } + // Does it still look right? + for(int i = 0; i < 5; i++) { + int j; + stasis_record_read(xid, p, rids[i], (byte*)(&j)); + assert(i == j); + } + // (tdeinit till fsck it at shutdown) + unlock(p->rwlatch); + + releasePage(p); + + Tcommit(xid); + Tdeinit(); +} END_TEST + Suite * check_suite(void) { Suite *s = suite_create("page"); @@ -599,6 +663,7 @@ Suite * check_suite(void) { tcase_add_test(tc, pageCheckMacros); tcase_add_test(tc, pageCheckSlotTypeTest); tcase_add_test(tc, pageTrecordTypeTest); + tcase_add_test(tc, pageTreeOpTest); tcase_add_test(tc, pageNoThreadMultPageTest); tcase_add_test(tc, pageNoThreadTest); tcase_add_test(tc, pageThreadTest);