Add page ops for tree implementations.

This commit is contained in:
Sears Russell 2009-07-26 06:18:54 +00:00
parent 5ca3a008fa
commit 260310e151
6 changed files with 133 additions and 0 deletions

View file

@ -102,8 +102,10 @@ page_impl stasis_page_blob_impl() {
0, //pageGenericBlockDone, 0, //pageGenericBlockDone,
0, //freespace, 0, //freespace,
0, //compact, 0, //compact,
0, //compact slotids
0, //preRalloc, 0, //preRalloc,
0, //postRalloc, 0, //postRalloc,
0, //splice,
0, //Free, 0, //Free,
0, //XXX page_impl_dereference_identity, 0, //XXX page_impl_dereference_identity,
stasis_page_blob_loaded, stasis_page_blob_loaded,

View file

@ -245,6 +245,10 @@ void stasis_record_alloc_done(int xid, Page * p, recordid rid){
page_impls[p->pageType] page_impls[p->pageType]
.recordPostAlloc(xid, p, rid); .recordPostAlloc(xid, p, rid);
} }
void stasis_record_splice(int xid, Page * p, slotid_t first, slotid_t second) {
page_impls[p->pageType]
.recordSplice(xid, p, first, second);
}
void stasis_record_free(int xid, Page * p, recordid rid){ void stasis_record_free(int xid, Page * p, recordid rid){
page_impls[p->pageType] page_impls[p->pageType]
.recordFree(xid, p, rid); .recordFree(xid, p, rid);
@ -274,6 +278,10 @@ void stasis_record_compact(Page * p){
page_impls[p->pageType] page_impls[p->pageType]
.pageCompact(p); .pageCompact(p);
} }
void stasis_record_compact_slotids(int xid, Page * p) {
page_impls[p->pageType]
.pageCompactSlotIDs(xid, p);
}
/** @todo How should the LSN of pages without a page_type be handled? /** @todo How should the LSN of pages without a page_type be handled?
This only works because we don't have LSN-free pages yet. With This only works because we don't have LSN-free pages yet. With

View file

@ -85,6 +85,9 @@ static int fixedFreespace(int xid, Page * p) {
static void fixedCompact(Page * p) { static void fixedCompact(Page * p) {
// no-op // no-op
} }
static void fixedCompactSlotIds(int xid, Page * p) {
abort();
}
static recordid fixedPreAlloc(int xid, Page *p, int size) { static recordid fixedPreAlloc(int xid, Page *p, int size) {
assertlocked(p->rwlatch); assertlocked(p->rwlatch);
if(stasis_fixed_records_per_page(*recordsize_ptr(p)) > *recordcount_ptr(p)) { if(stasis_fixed_records_per_page(*recordsize_ptr(p)) > *recordcount_ptr(p)) {
@ -103,6 +106,9 @@ static void fixedPostAlloc(int xid, Page *p, recordid rid) {
assert(*recordsize_ptr(p) == rid.size); assert(*recordsize_ptr(p) == rid.size);
(*recordcount_ptr(p))++; (*recordcount_ptr(p))++;
} }
static void fixedSplice(int xid, Page *p, slotid_t first, slotid_t second) {
abort();
}
static void fixedFree(int xid, Page *p, recordid rid) { static void fixedFree(int xid, Page *p, recordid rid) {
assertlocked(p->rwlatch); assertlocked(p->rwlatch);
if(*recordsize_ptr(p) == rid.slot+1) { if(*recordsize_ptr(p) == rid.slot+1) {
@ -140,8 +146,10 @@ page_impl fixedImpl() {
stasis_block_done_default_impl, stasis_block_done_default_impl,
fixedFreespace, fixedFreespace,
fixedCompact, fixedCompact,
fixedCompactSlotIds,
fixedPreAlloc, fixedPreAlloc,
fixedPostAlloc, fixedPostAlloc,
fixedSplice,
fixedFree, fixedFree,
0, // XXX dereference 0, // XXX dereference
fixedLoaded, // loaded fixedLoaded, // loaded

View file

@ -207,6 +207,22 @@ static void slottedCompact(Page * page) {
#endif // SLOTTED_PAGE_OLD_CHECKS #endif // SLOTTED_PAGE_OLD_CHECKS
} }
static void slottedCompactSlotIDs(int xid, Page * p) {
int16_t numSlots = *stasis_page_slotted_numslots_ptr(p);
int16_t out = 0;
for(int16_t in = 0; in < numSlots; in++) {
if(*stasis_page_slotted_slot_ptr(p, in) == INVALID_SLOT) {
// nop
} else {
*stasis_page_slotted_slot_ptr(p, out) = *stasis_page_slotted_slot_cptr(p, in);
*stasis_page_slotted_slot_length_ptr(p, out) = *stasis_page_slotted_slot_length_cptr(p, in);
out++;
}
}
*stasis_page_slotted_numslots_ptr(p) = out;
*stasis_page_slotted_freelist_ptr(p) = INVALID_SLOT;
}
/** /**
Check to see how many bytes can fit in a given slot. This Check to see how many bytes can fit in a given slot. This
makes it possible for callers to guarantee the safety makes it possible for callers to guarantee the safety
@ -487,6 +503,18 @@ static void slottedPostRalloc(int xid, Page * page, recordid rid) {
} }
static void slottedSpliceSlot(int xid, Page *p, slotid_t a, slotid_t b) {
assert(a < b);
int16_t b_slot = *stasis_page_slotted_slot_cptr(p, b);
int16_t b_slot_len = *stasis_page_slotted_slot_length_cptr(p, b);
for(int16_t i = b-1; i >= a; i--) {
*stasis_page_slotted_slot_ptr(p, i+1) = *stasis_page_slotted_slot_cptr(p, i);
*stasis_page_slotted_slot_length_ptr(p, i+1) = *stasis_page_slotted_slot_length_cptr(p, i);
}
*stasis_page_slotted_slot_ptr(p, a) = b_slot;
*stasis_page_slotted_slot_length_ptr(p, b) = b_slot_len;
}
static void slottedFree(int xid, Page * p, recordid rid) { static void slottedFree(int xid, Page * p, recordid rid) {
slottedSanityCheck(p, rid); slottedSanityCheck(p, rid);
@ -560,8 +588,10 @@ static page_impl pi = {
stasis_block_done_default_impl, stasis_block_done_default_impl,
slottedFreespace, slottedFreespace,
slottedCompact, slottedCompact,
slottedCompactSlotIDs,
slottedPreRalloc, slottedPreRalloc,
slottedPostRalloc, slottedPostRalloc,
slottedSpliceSlot,
slottedFree, slottedFree,
0, //XXX page_impl_dereference_identity, 0, //XXX page_impl_dereference_identity,
slottedLoaded, slottedLoaded,

View file

@ -546,10 +546,12 @@ recordid stasis_record_first(int xid, Page * p);
recordid stasis_record_next(int xid, Page * p, recordid prev); recordid stasis_record_next(int xid, Page * p, recordid prev);
recordid stasis_record_alloc_begin(int xid, Page * p, int size); recordid stasis_record_alloc_begin(int xid, Page * p, int size);
void stasis_record_alloc_done(int xid, Page * p, recordid rid); void stasis_record_alloc_done(int xid, Page * p, recordid rid);
void stasis_record_splice(int xid, Page * p, slotid_t first, slotid_t second);
void stasis_record_free(int xid, Page * p, recordid rid); void stasis_record_free(int xid, Page * p, recordid rid);
int stasis_block_supported(int xid, Page * p); int stasis_block_supported(int xid, Page * p);
int stasis_record_freespace(int xid, Page * p); int stasis_record_freespace(int xid, Page * p);
void stasis_record_compact(Page * p); void stasis_record_compact(Page * p);
void stasis_record_compact_slotids(int xid, Page * p);
void stasis_page_loaded(Page * p, pagetype_t type); void stasis_page_loaded(Page * p, pagetype_t type);
void stasis_page_flushed(Page * p); void stasis_page_flushed(Page * p);
void stasis_page_cleanup(Page * p); void stasis_page_cleanup(Page * p);
@ -782,6 +784,11 @@ typedef struct page_impl {
other side effects. other side effects.
*/ */
void(*pageCompact)(Page *p); void(*pageCompact)(Page *p);
/**
* Compact the slotids on the page; the order of valid records will
* be preserved, but holes due to invalid slots will be closed.
*/
void(*pageCompactSlotIDs)(int xid, Page *p);
/** /**
Generate a new, appropriately sized recordid. This is the first Generate a new, appropriately sized recordid. This is the first
of two allocation phases, and does not actually modify the page. of two allocation phases, and does not actually modify the page.
@ -808,6 +815,19 @@ typedef struct page_impl {
@see Talloc(), page_impl.recordPreAlloc() @see Talloc(), page_impl.recordPreAlloc()
*/ */
void (*recordPostAlloc)(int xid, Page *p, recordid rid); void (*recordPostAlloc)(int xid, Page *p, recordid rid);
/** Reorder records within a page. The second slot will be moved before the
* first slot; all intermediate slots will be shifted up one slotid. The
* intermediate records must always be valid; this can be ensured by calling
* pageCompact(). As long as no records are freed after the page is allocated
* or compacted, all intermediate records will be valid.
*
* @param xid The active transaction
* @param p The page that will be modified. The page must be latched against writes.
* @param first The first slot to be shifted to higher slots in the page.
* @param second The slot which will be moved to first's original location. Second
* must be greater than first.
*/
void (*recordSplice)(int xid, Page *p, slotid_t first, slotid_t second);
/** Free a record. The page implementation doesn't need to worry /** Free a record. The page implementation doesn't need to worry
about uncommitted deallocations; that is handled by a higher about uncommitted deallocations; that is handled by a higher
level. level.

View file

@ -587,6 +587,70 @@ START_TEST(pageTrecordTypeTest) {
Tdeinit(); Tdeinit();
} END_TEST } END_TEST
START_TEST(pageTreeOpTest) {
Tinit();
int xid = Tbegin();
pageid_t page = TpageAlloc(xid);
// run a sanity check on a page pinned in ram; don't bother logging (since we don't care about recovery for this test)
Page *p = loadPage(xid, page);
writelock(p->rwlatch, 0);
stasis_page_slotted_initialize_page(p);
recordid rids[5];
// Alloc + set five records
for(int i = 0; i < 5; i++) {
rids[i] = stasis_record_alloc_begin(xid, p, sizeof(int));
stasis_record_alloc_done(xid, p, rids[i]);
assert(rids[i].slot == i);
stasis_record_write(xid, p, rids[i], (byte*)(&i));
}
//Read them back, free the odd ones.
for(int i = 0; i < 5; i++) {
int j;
stasis_record_read(xid, p, rids[i], (byte*)(&j));
assert(i == j);
if(i % 2) {
stasis_record_free(xid, p, rids[i]);
}
}
// Close holes due to odd ones.
stasis_record_compact_slotids(xid, p);
for(int i = 0; i < 3; i++) {
int k = i * 2;
int j;
assert(stasis_record_type_read(xid, p, rids[i]) == NORMAL_SLOT);
stasis_record_read(xid, p, rids[i], (byte*)(&j));
assert(j == k);
}
// Reinsert odd ones at the end, then splice them back to their original position
for(int i = 1; i < 5; i+=2) {
recordid rid = stasis_record_alloc_begin(xid, p, sizeof(int));
stasis_record_alloc_done(xid, p, rid);
stasis_record_write(xid, p, rid, (byte*)(&i));
stasis_record_splice(xid, p, i, rid.slot);
int j;
stasis_record_read(xid, p, rids[i], (byte*)(&j));
assert(i == j);
}
// Does it still look right?
for(int i = 0; i < 5; i++) {
int j;
stasis_record_read(xid, p, rids[i], (byte*)(&j));
assert(i == j);
}
// (tdeinit till fsck it at shutdown)
unlock(p->rwlatch);
releasePage(p);
Tcommit(xid);
Tdeinit();
} END_TEST
Suite * check_suite(void) { Suite * check_suite(void) {
Suite *s = suite_create("page"); Suite *s = suite_create("page");
@ -599,6 +663,7 @@ Suite * check_suite(void) {
tcase_add_test(tc, pageCheckMacros); tcase_add_test(tc, pageCheckMacros);
tcase_add_test(tc, pageCheckSlotTypeTest); tcase_add_test(tc, pageCheckSlotTypeTest);
tcase_add_test(tc, pageTrecordTypeTest); tcase_add_test(tc, pageTrecordTypeTest);
tcase_add_test(tc, pageTreeOpTest);
tcase_add_test(tc, pageNoThreadMultPageTest); tcase_add_test(tc, pageNoThreadMultPageTest);
tcase_add_test(tc, pageNoThreadTest); tcase_add_test(tc, pageNoThreadTest);
tcase_add_test(tc, pageThreadTest); tcase_add_test(tc, pageThreadTest);