diff --git a/doc/dict.txt b/doc/dict.txt index 1b06d23..a20c32e 100644 --- a/doc/dict.txt +++ b/doc/dict.txt @@ -7,3 +7,4 @@ param callbacks mutex todo +writeback diff --git a/src/stasis/blobManager.c b/src/stasis/blobManager.c index f27d6ee..a3f58d5 100644 --- a/src/stasis/blobManager.c +++ b/src/stasis/blobManager.c @@ -51,7 +51,7 @@ void stasis_blob_write(int xid, Page * p, recordid rid, const void* dat) { for(; (chunk+1) * USABLE_SIZE_OF_PAGE < rid.size; chunk++) { Page * cnk = loadPage(xid, rec.offset+chunk); writelock(cnk->rwlatch,0); - if(*stasis_page_type_ptr(cnk) != BLOB_PAGE) { + if(cnk->pageType != BLOB_PAGE) { stasis_page_blob_initialize(cnk); } unlock(cnk->rwlatch); @@ -61,7 +61,7 @@ void stasis_blob_write(int xid, Page * p, recordid rid, const void* dat) { } Page * cnk = loadPage(xid, rec.offset+chunk); writelock(cnk->rwlatch,0); - if(*stasis_page_type_ptr(cnk) != BLOB_PAGE) { + if(p->pageType != BLOB_PAGE) { stasis_page_blob_initialize(cnk); } unlock(cnk->rwlatch); @@ -86,6 +86,7 @@ static void stasis_page_blob_cleanup(Page *p) { } page_impl stasis_page_blob_impl() { page_impl pi = { BLOB_PAGE, + 1, 0, //read, 0, //write, 0, //readDone @@ -115,5 +116,5 @@ void stasis_page_blob_initialize(Page * p) { assertlocked(p->rwlatch); DEBUG("lsn: %lld\n",(long long)p->LSN); stasis_page_cleanup(p); - *stasis_page_type_ptr(p) = BLOB_PAGE; + p->pageType = BLOB_PAGE; } diff --git a/src/stasis/bufferManager/bufferHash.c b/src/stasis/bufferManager/bufferHash.c index 71ce841..4421459 100644 --- a/src/stasis/bufferManager/bufferHash.c +++ b/src/stasis/bufferManager/bufferHash.c @@ -283,7 +283,7 @@ static Page * bhLoadPageImpl_helper(int xid, const pageid_t pageid, int uninitia memset(ret->memAddr,0,PAGE_SIZE); *stasis_page_lsn_ptr(ret) = ret->LSN; ret->dirty = 0; - stasis_page_loaded(ret); + stasis_page_loaded(ret, UNKNOWN_TYPE_PAGE); } *pagePendingPtr(ret) = 0; // Would remove from lru, but getFreePage() guarantees that it isn't diff --git a/src/stasis/bufferManager/legacy/legacyBufferManager.c b/src/stasis/bufferManager/legacy/legacyBufferManager.c index 91dc795..70e0e2a 100644 --- a/src/stasis/bufferManager/legacy/legacyBufferManager.c +++ b/src/stasis/bufferManager/legacy/legacyBufferManager.c @@ -284,7 +284,7 @@ static Page* bufManGetPage(pageid_t pageid, int locktype, int uninitialized) { *stasis_page_lsn_ptr(ret) = ret->LSN; // XXX need mutex for this call? - stasis_page_loaded(ret); + stasis_page_loaded(ret, UNKNOWN_TYPE_PAGE); } writeunlock(ret->loadlatch); diff --git a/src/stasis/bufferManager/legacy/pageFile.c b/src/stasis/bufferManager/legacy/pageFile.c index 4856f63..65deccb 100644 --- a/src/stasis/bufferManager/legacy/pageFile.c +++ b/src/stasis/bufferManager/legacy/pageFile.c @@ -71,7 +71,7 @@ static void pfPageRead(stasis_page_handle_t * h, Page *ret) { } ret->dirty = 0; - stasis_page_loaded(ret); + stasis_page_loaded(ret, UNKNOWN_TYPE_PAGE); pthread_mutex_unlock(&stable_mutex); diff --git a/src/stasis/operations/alloc.c b/src/stasis/operations/alloc.c index f1a4c3d..504970e 100644 --- a/src/stasis/operations/alloc.c +++ b/src/stasis/operations/alloc.c @@ -223,7 +223,7 @@ static void stasis_alloc_register_old_regions() { for(pageid_t i = 0; i < t.size; i++) { Page * p = loadPage(-1, boundary + i); readlock(p->rwlatch,0); - if(*stasis_page_type_ptr(p) == SLOTTED_PAGE) { + if(p->pageType == SLOTTED_PAGE) { availablePage * next = malloc(sizeof(availablePage)); next->pageid = boundary+i; next->freespace = stasis_record_freespace(-1, p); diff --git a/src/stasis/operations/arrayList.c b/src/stasis/operations/arrayList.c index e7ebeb4..9394e78 100644 --- a/src/stasis/operations/arrayList.c +++ b/src/stasis/operations/arrayList.c @@ -91,7 +91,7 @@ static int array_list_op_init_header(const LogEntry* e, Page* p) { stasis_record_write(e->xid, p, firstDataPageRid, (const byte*)&firstDataPage); - *stasis_page_type_ptr(p) = ARRAY_LIST_PAGE; + p->pageType = ARRAY_LIST_PAGE; return 0; } diff --git a/src/stasis/operations/lsmTree.c b/src/stasis/operations/lsmTree.c index dad3c5e..423d661 100644 --- a/src/stasis/operations/lsmTree.c +++ b/src/stasis/operations/lsmTree.c @@ -278,7 +278,7 @@ recordid TlsmCreate(int xid, int comparator, Page *p = loadPage(xid, ret.page); writelock(p->rwlatch,0); stasis_fixed_initialize_page(p, sizeof(lsmTreeNodeRecord) + keySize, 0); - *stasis_page_type_ptr(p) = LSM_ROOT_PAGE; + p->pageType = LSM_ROOT_PAGE; lsmTreeState *state = malloc(sizeof(lsmTreeState)); state->lastLeaf = -1; /// XXX define something in constants.h? @@ -403,8 +403,7 @@ static recordid appendInternalNode(int xid, Page *p, pageid_t val_page, pageid_t lastLeaf, lsm_page_allocator_t allocator, void *allocator_state) { - assert(*stasis_page_type_ptr(p) == LSM_ROOT_PAGE || - *stasis_page_type_ptr(p) == FIXED_PAGE); + assert(p->pageType == LSM_ROOT_PAGE || p->pageType == FIXED_PAGE); if(!depth) { // leaf node. recordid ret = stasis_record_alloc_begin(xid, p, sizeof(lsmTreeNodeRecord)+key_len); @@ -720,7 +719,7 @@ pageid_t TlsmLastPage(int xid, recordid tree) { } Page * root = loadPage(xid, tree.page); readlock(root->rwlatch,0); - assert(*stasis_page_type_ptr(root) == LSM_ROOT_PAGE); + assert(root->pageType == LSM_ROOT_PAGE); lsmTreeState *state = root->impl; int keySize = getKeySize(xid,root); if(state->lastLeaf == -1) { diff --git a/src/stasis/operations/lsnFreeSet.c b/src/stasis/operations/lsnFreeSet.c index 371914a..b2063c1 100644 --- a/src/stasis/operations/lsnFreeSet.c +++ b/src/stasis/operations/lsnFreeSet.c @@ -3,7 +3,7 @@ #include #include static int op_lsn_free_set(const LogEntry *e, Page *p) { - if(*stasis_page_type_ptr(p) != SLOTTED_LSN_FREE_PAGE) { abort() ; } + if(p->pageType != SLOTTED_LSN_FREE_PAGE) { abort() ; } assert(e->update.arg_size >= (sizeof(pageoff_t) * 2)); int size = e->update.arg_size; size -= (2*sizeof(pageoff_t)); @@ -14,7 +14,7 @@ static int op_lsn_free_set(const LogEntry *e, Page *p) { return 0; } static int op_lsn_free_unset(const LogEntry *e, Page *p) { - if(*stasis_page_type_ptr(p) != SLOTTED_LSN_FREE_PAGE) { return 0; } + if(p->pageType != SLOTTED_LSN_FREE_PAGE) { return 0; } assert(e->update.arg_size >= (sizeof(pageoff_t) * 2)); int size = e->update.arg_size; size -= (2*sizeof(pageoff_t)); diff --git a/src/stasis/operations/pageOperations.c b/src/stasis/operations/pageOperations.c index aca87a4..f2c6e92 100644 --- a/src/stasis/operations/pageOperations.c +++ b/src/stasis/operations/pageOperations.c @@ -120,7 +120,7 @@ compensated_function pageid_t TpageAllocMany(int xid, int count) { int TpageGetType(int xid, pageid_t page) { Page * p = loadPage(xid, page); - int ret = *stasis_page_type_ptr(p); + int ret = p->pageType; releasePage(p); return ret; } diff --git a/src/stasis/operations/regions.c b/src/stasis/operations/regions.c index 774d205..497f9d3 100644 --- a/src/stasis/operations/regions.c +++ b/src/stasis/operations/regions.c @@ -14,19 +14,19 @@ static pthread_mutex_t region_mutex = PTHREAD_MUTEX_INITIALIZER; pthread_t holding_mutex; static void TregionAllocHelper(int xid, pageid_t page, pageid_t pageCount, int allocationManager); static void TallocBoundaryTag(int xid, pageid_t page, boundary_tag* tag); -static int readBoundaryTag(int xid, pageid_t page, boundary_tag* tag); -static void TsetBoundaryTag(int xid, pageid_t page, boundary_tag* tag); +static int readBoundaryTag(int xid, pageid_t page, boundary_tag* tag); +static void TsetBoundaryTag(int xid, pageid_t page, boundary_tag* tag); static void TdeallocBoundaryTag(int xid, pageid_t page); /** This doesn't need a latch since it is only initiated within nested - top actions (and is local to this file. During abort(), the nested + top actions (and is local to this file. During abort(), the nested top action's logical undo grabs the necessary latches. */ static int op_alloc_boundary_tag(const LogEntry* e, Page* p) { stasis_slotted_initialize_page(p); recordid rid = {p->id, 0, sizeof(boundary_tag)}; assert(e->update.arg_size == sizeof(boundary_tag)); - *stasis_page_type_ptr(p) = BOUNDARY_TAG_PAGE; + p->pageType = BOUNDARY_TAG_PAGE; stasis_record_alloc_done(e->xid, p, rid); byte * buf = stasis_record_write_begin(e->xid, p, rid); memcpy(buf, getUpdateArgs(e), stasis_record_length_read(e->xid, p, rid)); @@ -49,7 +49,7 @@ static int operate_dealloc_region_unlocked(int xid, regionAllocArg *dat) { pageid_t firstPage = dat->startPage + 1; - boundary_tag t; + boundary_tag t; int ret = readBoundaryTag(xid, firstPage - 1, &t); assert(ret); @@ -83,7 +83,7 @@ static void TallocBoundaryTag(int xid, pageid_t page, boundary_tag* tag) { Tupdate(xid, page, tag, sizeof(boundary_tag), OPERATION_ALLOC_BOUNDARY_TAG); } -int readBoundaryTag(int xid, pageid_t page, boundary_tag* tag) { +int readBoundaryTag(int xid, pageid_t page, boundary_tag* tag) { assert(holding_mutex == pthread_self()); recordid rid = { page, 0, sizeof(boundary_tag) }; if(TpageGetType(xid, rid.page) != BOUNDARY_TAG_PAGE) { @@ -104,7 +104,7 @@ int TregionReadBoundaryTag(int xid, pageid_t page, boundary_tag* tag) { return ret; } -static void TsetBoundaryTag(int xid, pageid_t page, boundary_tag* tag) { +static void TsetBoundaryTag(int xid, pageid_t page, boundary_tag* tag) { //printf("Write boundary tag at %d = { %d, %d, %d }\n", page, tag->size, tag->prev_size, tag->status); // Sanity checking: @@ -132,13 +132,12 @@ static void TdeallocBoundaryTag(int xid, pageid_t page) { TsetBoundaryTag(xid, page, &t); } - -void regionsInit() { + +void regionsInit() { Page * p = loadPage(-1, 0); - int pageType = *stasis_page_type_ptr(p); holding_mutex = pthread_self(); - if(pageType != BOUNDARY_TAG_PAGE) { + if(p->pageType != BOUNDARY_TAG_PAGE) { boundary_tag t; t.size = PAGEID_T_MAX; t.prev_size = PAGEID_T_MAX; @@ -173,7 +172,7 @@ int TregionNextBoundaryTag(int xid, pageid_t* pid, boundary_tag * tag, int type) int ret = readBoundaryTag(xid, *pid-1, tag); - if(ret) { + if(ret) { while(1) { if(tag->size == PAGEID_T_MAX) { ret = 0; @@ -195,7 +194,7 @@ int TregionNextBoundaryTag(int xid, pageid_t* pid, boundary_tag * tag, int type) return ret; } -void fsckRegions(int xid) { +void fsckRegions(int xid) { // Ignore region_xid, allocation_manager for now. pthread_mutex_lock(®ion_mutex); @@ -212,23 +211,23 @@ void fsckRegions(int xid) { assert(ret); assert(tag.prev_size == PAGEID_T_MAX); - while(tag.size != PAGEID_T_MAX) { + while(tag.size != PAGEID_T_MAX) { // Ignore region_xid, allocation_manager for now. assert(tag.status == REGION_VACANT || tag.status == REGION_ZONED); assert(prev_tag.size == tag.prev_size); - for(pageid_t i = 0; i < tag.size; i++) { + for(pageid_t i = 0; i < tag.size; i++) { pageid_t thisPage = tagPage + 1 + i; pageType = TpageGetType(xid, thisPage); - if(pageType == BOUNDARY_TAG_PAGE) { + if(pageType == BOUNDARY_TAG_PAGE) { boundary_tag orphan; int ret = readBoundaryTag(xid, thisPage, &orphan); assert(ret); assert(orphan.status == REGION_CONDEMNED); Page * p = loadPage(xid, thisPage); releasePage(p); - } else if (pageType == SLOTTED_PAGE) { + } else if (pageType == SLOTTED_PAGE) { Page * p = loadPage(xid, thisPage); releasePage(p); } @@ -251,7 +250,7 @@ static void TregionAllocHelper(int xid, pageid_t page, pageid_t pageCount, int a int ret = readBoundaryTag(xid, page, &t); assert(ret); - if(t.size != pageCount) { + if(t.size != pageCount) { // need to split region // allocate new boundary tag. @@ -259,7 +258,7 @@ static void TregionAllocHelper(int xid, pageid_t page, pageid_t pageCount, int a pageid_t newPageid = page + pageCount + 1; boundary_tag new_tag; - + if(t.size != PAGEID_T_MAX) { new_tag.size = t.size - pageCount - 1; // pageCount must be strictly less than t->size, so this is non-negative. @@ -270,17 +269,17 @@ static void TregionAllocHelper(int xid, pageid_t page, pageid_t pageCount, int a succ_tag.prev_size = new_tag.size; TsetBoundaryTag(xid, page + t.size + 1, &succ_tag); - } else { + } else { new_tag.size = PAGEID_T_MAX; } new_tag.prev_size = pageCount; - // Create the new region, and disassociate it from this transaction immediately. - // This has two implications: + // Create the new region, and disassociate it from this transaction immediately. + // This has two implications: // - It could cause some fragmentation if interleaved transactions are allocating, and some abort. // - Multiple transactions can allocate space at the end of the page file without blocking each other. - new_tag.status = REGION_VACANT; + new_tag.status = REGION_VACANT; new_tag.region_xid = INVALID_XID; new_tag.allocation_manager = 0; @@ -297,7 +296,7 @@ static void TregionAllocHelper(int xid, pageid_t page, pageid_t pageCount, int a } -static void consolidateRegions(int xid, pageid_t * firstPage, boundary_tag *t) { +static void consolidateRegions(int xid, pageid_t * firstPage, boundary_tag *t) { if(t->status != REGION_VACANT || TisActiveTransaction(t->region_xid)) { return; } @@ -314,7 +313,7 @@ static void consolidateRegions(int xid, pageid_t * firstPage, boundary_tag *t) // TODO: Check stasis_page_type_ptr()... - if(succ_tag.size == PAGEID_T_MAX) { + if(succ_tag.size == PAGEID_T_MAX) { t->size = PAGEID_T_MAX; assert(succ_tag.status == REGION_VACANT); // TODO: Truncate page file. @@ -331,10 +330,10 @@ static void consolidateRegions(int xid, pageid_t * firstPage, boundary_tag *t) assert(ret); succ_succ_tag.prev_size = t->size; TsetBoundaryTag(xid, succ_succ_page, &succ_succ_tag); - + TdeallocBoundaryTag(xid, succ_page); mustWriteOriginalTag = 1; - } else { + } else { mustWriteOriginalTag = 0; } @@ -345,30 +344,30 @@ static void consolidateRegions(int xid, pageid_t * firstPage, boundary_tag *t) // creates a situation where the current page is not a boundary // tag...) - if(t->prev_size != PAGEID_T_MAX) { - + if(t->prev_size != PAGEID_T_MAX) { + pageid_t pred_page = ((*firstPage) - 1) - t->prev_size; // If the predecessor is length zero, then it's boundary tag is two pages before this region's tag. - + boundary_tag pred_tag; int ret = readBoundaryTag(xid, pred_page, &pred_tag); assert(ret); - - if(pred_tag.status == REGION_VACANT && (!TisActiveTransaction(pred_tag.region_xid))) { - + + if(pred_tag.status == REGION_VACANT && (!TisActiveTransaction(pred_tag.region_xid))) { + TdeallocBoundaryTag(xid, *firstPage); - - if(t->size == PAGEID_T_MAX) { + + if(t->size == PAGEID_T_MAX) { pred_tag.size = PAGEID_T_MAX; - + // TODO: truncate region - - } else { - + + } else { + pred_tag.size += (t->size + 1); - + pageid_t succ_page = (*firstPage) + 1+ t->size; assert(pred_page + pred_tag.size + 1 == succ_page); - + boundary_tag succ_tag; ret = readBoundaryTag(xid, succ_page, &succ_tag); assert(ret); @@ -377,19 +376,19 @@ static void consolidateRegions(int xid, pageid_t * firstPage, boundary_tag *t) assert(succ_page - pred_page - 1 == pred_tag.size); } - + TsetBoundaryTag(xid, pred_page, &pred_tag); - + assert(pred_page < *firstPage); (*firstPage) = pred_page; (*t) = pred_tag; - } else { - if(mustWriteOriginalTag) { + } else { + if(mustWriteOriginalTag) { TsetBoundaryTag(xid, (*firstPage), t); - } + } } - } else { - if(mustWriteOriginalTag) { + } else { + if(mustWriteOriginalTag) { TsetBoundaryTag(xid, (*firstPage), t); } } @@ -414,7 +413,7 @@ void TregionDealloc(int xid, pageid_t firstPage) { regionAllocArg arg = { firstPage-1, t.size, t.allocation_manager }; - assert(t.status != REGION_VACANT); + assert(t.status != REGION_VACANT); void * handle = TbeginNestedTopAction(xid, OPERATION_DEALLOC_REGION, (const byte*)&arg, sizeof(regionAllocArg)); @@ -423,12 +422,12 @@ void TregionDealloc(int xid, pageid_t firstPage) { firstPage --; TendNestedTopAction(xid, handle); - + holding_mutex = 0; pthread_mutex_unlock(®ion_mutex); } -pageid_t TregionAlloc(int xid, pageid_t pageCount, int allocationManager) { +pageid_t TregionAlloc(int xid, pageid_t pageCount, int allocationManager) { // Initial implementation. Naive first fit. pthread_mutex_lock(®ion_mutex); @@ -445,10 +444,10 @@ pageid_t TregionAlloc(int xid, pageid_t pageCount, int allocationManager) { // printf("consolidateRegions pageid, t: %d, {%d, %d, %d} -> ", pageid, t.size, t.prev_size, t.status); consolidateRegions(xid, &pageid, &t); - + // printf(" %d, {%d, %d, %d}\tpageCount=%d\n", pageid, t.size, t.prev_size, t.status, pageCount); - while(t.status != REGION_VACANT || t.size < pageCount || TisActiveTransaction(t.region_xid)) { + while(t.status != REGION_VACANT || t.size < pageCount || TisActiveTransaction(t.region_xid)) { // TODO: This while loop and the boundary tag manipulation below should be factored into two submodules. // printf("t.status = %d, REGION_VACANT = %d, t.size = %d, pageCount = %d\n", t.status, REGION_VACANT, t.size, pageCount); @@ -457,14 +456,14 @@ pageid_t TregionAlloc(int xid, pageid_t pageCount, int allocationManager) { assert(ret); // printf("\tconsolidateRegions pageid, t: %d, {%d, %d, %d} -> ", pageid, t.size, t.prev_size, t.status); - + consolidateRegions(xid, &pageid, &t); - + // printf(" %d, {%d, %d, %d}\tpageCount=%d\n", pageid, t.size, t.prev_size, t.status, pageCount); } // printf("page = %d, t.status = %d, REGION_VACANT = %d, t.size = %d, pageCount = %d (alloced)\n", pageid, t.status, REGION_VACANT, t.size, pageCount); - + TendNestedTopAction(xid, ntaHandle); regionAllocArg arg = { pageid, pageCount, allocationManager }; @@ -476,23 +475,23 @@ pageid_t TregionAlloc(int xid, pageid_t pageCount, int allocationManager) { holding_mutex = 0; pthread_mutex_unlock(®ion_mutex); - + return pageid+1; } stasis_operation_impl stasis_op_impl_boundary_tag_alloc() { - stasis_operation_impl o = { - OPERATION_ALLOC_BOUNDARY_TAG, - OPERATION_ALLOC_BOUNDARY_TAG, + stasis_operation_impl o = { + OPERATION_ALLOC_BOUNDARY_TAG, + OPERATION_ALLOC_BOUNDARY_TAG, OPERATION_NOOP, op_alloc_boundary_tag }; return o; } -stasis_operation_impl stasis_op_impl_region_alloc() { - stasis_operation_impl o = { +stasis_operation_impl stasis_op_impl_region_alloc() { + stasis_operation_impl o = { OPERATION_ALLOC_REGION, OPERATION_NOOP, OPERATION_ALLOC_REGION_INVERSE, @@ -500,8 +499,8 @@ stasis_operation_impl stasis_op_impl_region_alloc() { }; return o; } -stasis_operation_impl stasis_op_impl_region_alloc_inverse() { - stasis_operation_impl o = { +stasis_operation_impl stasis_op_impl_region_alloc_inverse() { + stasis_operation_impl o = { OPERATION_ALLOC_REGION_INVERSE, OPERATION_ALLOC_REGION_INVERSE, OPERATION_INVALID, @@ -509,8 +508,8 @@ stasis_operation_impl stasis_op_impl_region_alloc_inverse() { }; return o; } -stasis_operation_impl stasis_op_impl_region_dealloc() { - stasis_operation_impl o = { +stasis_operation_impl stasis_op_impl_region_dealloc() { + stasis_operation_impl o = { OPERATION_DEALLOC_REGION, OPERATION_NOOP, OPERATION_DEALLOC_REGION_INVERSE, @@ -519,8 +518,8 @@ stasis_operation_impl stasis_op_impl_region_dealloc() { return o; } -stasis_operation_impl stasis_op_impl_region_dealloc_inverse() { - stasis_operation_impl o = { +stasis_operation_impl stasis_op_impl_region_dealloc_inverse() { + stasis_operation_impl o = { OPERATION_DEALLOC_REGION_INVERSE, OPERATION_DEALLOC_REGION_INVERSE, OPERATION_INVALID, @@ -529,22 +528,22 @@ stasis_operation_impl stasis_op_impl_region_dealloc_inverse() { return o; } -void TregionFindNthActive(int xid, pageid_t regionNumber, pageid_t * firstPage, pageid_t * size) { +void TregionFindNthActive(int xid, pageid_t regionNumber, pageid_t * firstPage, pageid_t * size) { boundary_tag t; recordid rid = {0, 0, sizeof(boundary_tag)}; pthread_mutex_lock(®ion_mutex); holding_mutex = pthread_self(); Tread(xid, rid, &t); pageid_t prevSize = 0; - while(t.status == REGION_VACANT) { + while(t.status == REGION_VACANT) { rid.page += (t.size + 1); Tread(xid, rid, &t); assert(t.size != PAGEID_T_MAX); - assert(t.prev_size != PAGEID_T_MAX); + assert(t.prev_size != PAGEID_T_MAX); assert(prevSize == t.prev_size || !prevSize); prevSize = t.size; } - for(pageid_t i = 0; i < regionNumber; i++) { + for(pageid_t i = 0; i < regionNumber; i++) { rid.page += (t.size + 1); Tread(xid, rid, &t); if(t.status == REGION_VACANT) { i--; } diff --git a/src/stasis/page.c b/src/stasis/page.c index cc1d0da..d608c73 100644 --- a/src/stasis/page.c +++ b/src/stasis/page.c @@ -180,7 +180,7 @@ int stasis_record_read(int xid, Page * p, recordid rid, byte *buf) { recordid stasis_record_dereference(int xid, Page * p, recordid rid) { assertlocked(p->rwlatch); - int page_type = *stasis_page_type_ptr(p); + int page_type = p->pageType; if(page_type == INDIRECT_PAGE) { rid = dereferenceIndirectRID(xid, rid); } else if(page_type == ARRAY_LIST_PAGE) { @@ -194,90 +194,90 @@ recordid stasis_record_dereference(int xid, Page * p, recordid rid) { const byte * stasis_record_read_begin(int xid, Page * p, recordid rid) { assertlocked(p->rwlatch); - int page_type = *stasis_page_type_ptr(p); + int page_type = p->pageType; assert(page_type); return page_impls[page_type].recordRead(xid, p, rid); } byte * stasis_record_write_begin(int xid, Page * p, recordid rid) { assertlocked(p->rwlatch); - int page_type = *stasis_page_type_ptr(p); + int page_type = p->pageType; assert(page_type); assert(stasis_record_length_read(xid, p, rid) == stasis_record_type_to_size(rid.size)); return page_impls[page_type].recordWrite(xid, p, rid); } void stasis_record_read_done(int xid, Page *p, recordid rid, const byte *b) { - int page_type = *stasis_page_type_ptr(p); + int page_type = p->pageType; if(page_impls[page_type].recordReadDone) { page_impls[page_type].recordReadDone(xid,p,rid,b); } } void stasis_record_write_done(int xid, Page *p, recordid rid, byte *b) { - int page_type = *stasis_page_type_ptr(p); + int page_type = p->pageType; if(page_impls[page_type].recordWriteDone) { page_impls[page_type].recordWriteDone(xid,p,rid,b); } } int stasis_record_type_read(int xid, Page *p, recordid rid) { assertlocked(p->rwlatch); - if(page_impls[*stasis_page_type_ptr(p)].recordGetType) - return page_impls[*stasis_page_type_ptr(p)].recordGetType(xid, p, rid); + if(page_impls[p->pageType].recordGetType) + return page_impls[p->pageType].recordGetType(xid, p, rid); else return INVALID_SLOT; } void stasis_record_type_write(int xid, Page *p, recordid rid, int type) { assertlocked(p->rwlatch); - page_impls[*stasis_page_type_ptr(p)] + page_impls[p->pageType] .recordSetType(xid, p, rid, type); } int stasis_record_length_read(int xid, Page *p, recordid rid) { assertlocked(p->rwlatch); - return page_impls[*stasis_page_type_ptr(p)] + return page_impls[p->pageType] .recordGetLength(xid,p,rid); } recordid stasis_record_first(int xid, Page * p){ - return page_impls[*stasis_page_type_ptr(p)] + return page_impls[p->pageType] .recordFirst(xid,p); } recordid stasis_record_next(int xid, Page * p, recordid prev){ - return page_impls[*stasis_page_type_ptr(p)] + return page_impls[p->pageType] .recordNext(xid,p,prev); } recordid stasis_record_alloc_begin(int xid, Page * p, int size){ - return page_impls[*stasis_page_type_ptr(p)] + return page_impls[p->pageType] .recordPreAlloc(xid,p,size); } void stasis_record_alloc_done(int xid, Page * p, recordid rid){ - page_impls[*stasis_page_type_ptr(p)] + page_impls[p->pageType] .recordPostAlloc(xid, p, rid); } void stasis_record_free(int xid, Page * p, recordid rid){ - page_impls[*stasis_page_type_ptr(p)] + page_impls[p->pageType] .recordFree(xid, p, rid); } int stasis_block_supported(int xid, Page * p){ - return page_impls[*stasis_page_type_ptr(p)] + return page_impls[p->pageType] .isBlockSupported(xid, p); } block_t * stasis_block_first(int xid, Page * p){ - int t = *stasis_page_type_ptr(p); + int t = p->pageType; return page_impls[t] .blockFirst(xid, p); } block_t * stasis_block_next(int xid, Page * p, block_t * prev){ - return page_impls[*stasis_page_type_ptr(p)] + return page_impls[p->pageType] .blockNext(xid, p,prev); } void stasis_block_done(int xid, Page * p, block_t * done){ - page_impls[*stasis_page_type_ptr(p)] + page_impls[p->pageType] .blockDone(xid, p,done); } int stasis_record_freespace(int xid, Page * p){ - return page_impls[*stasis_page_type_ptr(p)] + return page_impls[p->pageType] .pageFreespace(xid, p); } void stasis_record_compact(Page * p){ - page_impls[*stasis_page_type_ptr(p)] + page_impls[p->pageType] .pageCompact(p); } /** @todo How should the LSN of pages without a page_type be handled? @@ -286,26 +286,32 @@ void stasis_record_compact(Page * p){ LSN-free pages, we'll need special "loadPageForAlloc(), and loadPageOfType() methods (or something...) */ -void stasis_page_loaded(Page * p){ - short type = *stasis_page_type_ptr(p); - if(type) { - assert(page_impls[type].page_type == type); - page_impls[type].pageLoaded(p); +void stasis_page_loaded(Page * p, pagetype_t type){ + p->pageType = (type == UNKNOWN_TYPE_PAGE) ? *stasis_page_type_ptr(p) : type; + if(p->pageType) { + assert(page_impls[p->pageType].page_type == p->pageType); + page_impls[p->pageType].pageLoaded(p); } else { p->LSN = *stasis_page_lsn_ptr(p); // XXX kludge - shouldn't special-case UNINITIALIZED_PAGE } } void stasis_page_flushed(Page * p){ - short type = *stasis_page_type_ptr(p); + + pagetype_t type = p->pageType; if(type) { assert(page_impls[type].page_type == type); + if(page_impls[type].has_header) { + *stasis_page_type_ptr(p)= type; + *stasis_page_lsn_ptr(p) = p->LSN; + } page_impls[type].pageFlushed(p); } else { + *stasis_page_type_ptr(p)= type; *stasis_page_lsn_ptr(p) = p->LSN; } } void stasis_page_cleanup(Page * p) { - short type = *stasis_page_type_ptr(p); + short type = p->pageType; if(type) { assert(page_impls[type].page_type == type); page_impls[type].pageCleanup(p); diff --git a/src/stasis/page/fixed.c b/src/stasis/page/fixed.c index 9f3be96..94a4f7b 100644 --- a/src/stasis/page/fixed.c +++ b/src/stasis/page/fixed.c @@ -13,7 +13,7 @@ int stasis_fixed_records_per_page(size_t size) { void stasis_fixed_initialize_page(Page * page, size_t size, int count) { assertlocked(page->rwlatch); stasis_page_cleanup(page); - *stasis_page_type_ptr(page) = FIXED_PAGE; + page->pageType = FIXED_PAGE; *recordsize_ptr(page) = size; assert(count <= stasis_fixed_records_per_page(size)); *recordcount_ptr(page)= count; @@ -21,7 +21,7 @@ void stasis_fixed_initialize_page(Page * page, size_t size, int count) { static void checkRid(Page * page, recordid rid) { assertlocked(page->rwlatch); - assert(*stasis_page_type_ptr(page)); // any more specific breaks pages based on this one + assert(page->pageType); // any more specific breaks pages based on this one assert(page->id == rid.page); assert(*recordsize_ptr(page) == rid.size); assert(stasis_fixed_records_per_page(rid.size) > rid.slot); @@ -48,7 +48,7 @@ static int fixedGetType(int xid, Page *p, recordid rid) { // checkRid(p, rid); if(rid.slot < *recordcount_ptr(p)) { int type = *recordsize_ptr(p); - if(type > 0) { + if(type > 0) { type = NORMAL_SLOT; } return type; @@ -65,7 +65,7 @@ static void fixedSetType(int xid, Page *p, recordid rid, int type) { } static int fixedGetLength(int xid, Page *p, recordid rid) { assertlocked(p->rwlatch); - assert(*stasis_page_type_ptr(p)); + assert(p->pageType); return rid.slot > *recordcount_ptr(p) ? INVALID_SLOT : stasis_record_type_to_size(*recordsize_ptr(p)); } @@ -124,6 +124,7 @@ void fixedCleanup(Page *p) { } page_impl fixedImpl() { static page_impl pi = { FIXED_PAGE, + 1, fixedRead, fixedWrite, 0,// readDone @@ -159,5 +160,5 @@ page_impl arrayListImpl() { return pi; } -void fixedPageInit() { } +void fixedPageInit() { } void fixedPageDeinit() { } diff --git a/src/stasis/page/indirect.c b/src/stasis/page/indirect.c index f65995c..ed13606 100644 --- a/src/stasis/page/indirect.c +++ b/src/stasis/page/indirect.c @@ -11,7 +11,7 @@ void indirectInitialize(Page * p, int height) { stasis_page_cleanup(p); *level_ptr(p) = height; - *stasis_page_type_ptr(p) = INDIRECT_PAGE; + p->pageType = INDIRECT_PAGE; memset(p->memAddr, INVALID_SLOT, ((size_t)level_ptr(p)) - ((size_t)p->memAddr)); } /** @todo Is locking for dereferenceRID really necessary? */ @@ -24,7 +24,7 @@ compensated_function recordid dereferenceIndirectRID(int xid, recordid rid) { // printf("a"); fflush(stdout); int offset = 0; int max_slot; - while(*stasis_page_type_ptr(page) == INDIRECT_PAGE) { + while(page->pageType == INDIRECT_PAGE) { int i = 0; for(max_slot = *maxslot_ptr(page, i); ( max_slot + offset ) <= rid.slot; max_slot = *maxslot_ptr(page, i)) { i++; @@ -78,7 +78,7 @@ compensated_function recordid rallocMany(int xid, int recordSize, int recordCoun recordid ret; int page; try_ret(NULLRID) { - page = TpageAlloc(xid/*, SLOTTED_PAGE*/); + page = TpageAlloc(xid); }end_ret(NULLRID); try_ret(NULLRID) { ret = __rallocMany(xid, page, recordSize, recordCount); @@ -111,7 +111,7 @@ compensated_function recordid __rallocMany(int xid, pageid_t parentPage, int rec p.memAddr = buffer; p.rwlatch = initlock(); p.loadlatch = initlock(); - *stasis_page_type_ptr(&p) = UNINITIALIZED_PAGE; + p.pageType = UNINITIALIZED_PAGE; if(number_of_pages > 1) { @@ -158,9 +158,9 @@ compensated_function recordid __rallocMany(int xid, pageid_t parentPage, int rec *page_ptr(&p, i) = firstChildPage + i; if(i) { - *maxslot_ptr(&p, i) = *maxslot_ptr(&p, i-1) + min(tmpRecordCount+1, next_level_records_per_page); + *maxslot_ptr(&p, i) = *maxslot_ptr(&p, i-1) + min(tmpRecordCount+1, next_level_records_per_page); } else { - *maxslot_ptr(&p, i) = min(tmpRecordCount+1, next_level_records_per_page); + *maxslot_ptr(&p, i) = min(tmpRecordCount+1, next_level_records_per_page); } i++; } @@ -206,7 +206,7 @@ compensated_function int indirectPageRecordCount(int xid, recordid rid) { readlock(p->rwlatch, 0); int i = 0; unsigned int ret; - if(*stasis_page_type_ptr(p) == INDIRECT_PAGE) { + if(p->pageType == INDIRECT_PAGE) { while(*maxslot_ptr(p, i) > 0) { i++; @@ -216,7 +216,7 @@ compensated_function int indirectPageRecordCount(int xid, recordid rid) { } else { ret = (*maxslot_ptr(p, i-1)) - 1; } - } else if (*stasis_page_type_ptr(p) == SLOTTED_PAGE) { + } else if (p->pageType == SLOTTED_PAGE) { int numslots = *numslots_ptr(p); ret = 0; @@ -247,6 +247,7 @@ void indirectFlushed(Page *p) { void indirectCleanup(Page *p) { } static page_impl pi = { INDIRECT_PAGE, + 1, 0, //read, 0, //write, 0, //readDone diff --git a/src/stasis/page/lsnFree.c b/src/stasis/page/lsnFree.c index 0163191..155f33f 100644 --- a/src/stasis/page/lsnFree.c +++ b/src/stasis/page/lsnFree.c @@ -1,12 +1,10 @@ #include -#include #include #include void stasis_slotted_lsn_free_initialize_page(Page * p) { stasis_slotted_initialize_page(p); - *stasis_page_type_ptr(p) = SLOTTED_LSN_FREE_PAGE; - *stasis_page_lsn_ptr(p) = -1; + p->pageType = SLOTTED_LSN_FREE_PAGE; } // XXX still not correct; need to have an "LSN_FREE" constant. static void lsnFreeLoaded(Page * p) { diff --git a/src/stasis/page/slotted.c b/src/stasis/page/slotted.c index 0130007..1f6a31f 100644 --- a/src/stasis/page/slotted.c +++ b/src/stasis/page/slotted.c @@ -21,7 +21,7 @@ static inline void slottedFsck(const Page const * page) { dummy.id = -1; dummy.memAddr = 0; - const short page_type = *stasis_page_type_cptr(page); + const short page_type = page->pageType; const short numslots = *numslots_cptr(page); const short freespace = *freespace_cptr(page); const short freelist = *freelist_cptr(page); @@ -45,7 +45,7 @@ static inline void slottedFsck(const Page const * page) { short * slot_offsets = alloca(numslots * sizeof(short)); short * slot_lengths = alloca(numslots * sizeof(short)); - for(int i = 0; i < numslots; i++) { + for(int i = 0; i < numslots; i++) { slot_offsets[i] = *slot_ptr(page, i); slot_lengths[i] = *slot_length_ptr(page, i); } @@ -54,21 +54,21 @@ static inline void slottedFsck(const Page const * page) { if(freelist != INVALID_SLOT) { assert(slot_offsets[freelist] == INVALID_SLOT); - } else { + } else { foundEndOfList = 1; } - for(short i = 0; i < numslots; i++) { + for(short i = 0; i < numslots; i++) { const short slot_length = slot_lengths[i]; const short slot_offset = slot_offsets[i]; if(slot_offset == INVALID_SLOT) { - if(slot_length == INVALID_SLOT) { + if(slot_length == INVALID_SLOT) { assert(!foundEndOfList); - foundEndOfList = 1; + foundEndOfList = 1; } else { assert (slot_offsets[slot_length] == INVALID_SLOT); } - } else { + } else { assert(slot_offset + slot_length <= freespace); } } @@ -87,32 +87,32 @@ static inline void slottedFsck(const Page const * page) { const unsigned short S_SLOT_LIST = 0xFCFC; byte image[PAGE_SIZE]; - for(short i = 0; i < PAGE_SIZE; i++) { + for(short i = 0; i < PAGE_SIZE; i++) { image[i] = UNUSED; } for(short i = USABLE_SIZE_OF_PAGE; i < PAGE_SIZE; i++) { image[i] = PAGE_HEADER; } - for(short i = USABLE_SIZE_OF_PAGE - SLOTTED_PAGE_HEADER_OVERHEAD; i < USABLE_SIZE_OF_PAGE; i++) { + for(short i = USABLE_SIZE_OF_PAGE - SLOTTED_PAGE_HEADER_OVERHEAD; i < USABLE_SIZE_OF_PAGE; i++) { image[i] = SLOTTED_HEADER; } - for(short i = *freespace_ptr(page); i < slotListStart; i++) { + for(short i = *freespace_ptr(page); i < slotListStart; i++) { image[i] = FREE_SPACE; } dummy.memAddr = image; - for(short i = 0; i < *numslots_ptr(page); i++) { + for(short i = 0; i < *numslots_ptr(page); i++) { *slot_ptr(&dummy, i) = S_SLOT_LIST; *slot_length_ptr(&dummy, i) = S_SLOT_LIST; } - for(short i = 0; i < *numslots_ptr(page); i++) { + for(short i = 0; i < *numslots_ptr(page); i++) { short slot_offset = *slot_ptr(page, i); - if(slot_offset != INVALID_SLOT) { + if(slot_offset != INVALID_SLOT) { const unsigned char ci = i % 0xFF; short slot_len = stasis_record_type_to_size(*slot_length_ptr(page, i)); - - for(short j = 0; j < slot_len; j++) { + + for(short j = 0; j < slot_len; j++) { assert(image[slot_offset + j] == 0xFF); image[slot_offset + j] = ci; } @@ -131,12 +131,12 @@ increase the available free space. The caller of this function must have a writelock on the page. */ -static void slottedCompact(Page * page) { +static void slottedCompact(Page * page) { assertlocked(page->rwlatch); - Page bufPage; + Page bufPage; byte buffer[PAGE_SIZE]; bufPage.memAddr = buffer; - + // Copy external headers into bufPage. memcpy(&buffer[USABLE_SIZE_OF_PAGE], &(page->memAddr[USABLE_SIZE_OF_PAGE]), PAGE_SIZE - USABLE_SIZE_OF_PAGE); @@ -154,17 +154,17 @@ static void slottedCompact(Page * page) { // Rebuild free list. - for(short i = 0; i < numSlots; i++) { - if(*slot_ptr(page, i) == INVALID_SLOT) { - if(lastFreeSlot == INVALID_SLOT) { + for(short i = 0; i < numSlots; i++) { + if(*slot_ptr(page, i) == INVALID_SLOT) { + if(lastFreeSlot == INVALID_SLOT) { *freelist_ptr(&bufPage) = i; - } else { + } else { *slot_length_ptr(&bufPage, lastFreeSlot) = i; } *slot_ptr(&bufPage, i) = INVALID_SLOT; lastFreeSlot = i; } else { - lastUsedSlot = i; + lastUsedSlot = i; lastFreeSlotBeforeUsedSlot = lastFreeSlot; short logicalSize = *slot_length_ptr(page, i); @@ -179,11 +179,11 @@ static void slottedCompact(Page * page) { } } - + // Truncate linked list, and update numslots_ptr. *slot_length_ptr(&bufPage, lastFreeSlotBeforeUsedSlot) = INVALID_SLOT; *numslots_ptr(&bufPage) = lastUsedSlot+1; - + memcpy(page->memAddr, buffer, PAGE_SIZE); slottedFsck(page); @@ -193,7 +193,7 @@ static void slottedCompact(Page * page) { void slottedPageInit() { #ifdef SLOTTED_PAGE_CHECK_FOR_OVERLAP printf("slotted.c: Using expensive page sanity checking.\n"); -#endif +#endif } void slottedPageDeinit() { @@ -203,7 +203,7 @@ void slottedPageDeinit() { void stasis_slotted_initialize_page(Page * page) { assertlocked(page->rwlatch); stasis_page_cleanup(page); - *stasis_page_type_ptr(page) = SLOTTED_PAGE; + page->pageType = SLOTTED_PAGE; *freespace_ptr(page) = 0; *numslots_ptr(page) = 0; *freelist_ptr(page) = INVALID_SLOT; @@ -214,20 +214,20 @@ void stasis_slotted_initialize_page(Page * page) { makes it possible for callers to guarantee the safety of a subsequent call to really_do_ralloc(). */ -static size_t slottedFreespaceForSlot(Page * page, int slot) { +static size_t slottedFreespaceForSlot(Page * page, int slot) { assertlocked(page->rwlatch); size_t slotOverhead; - if(slot == INVALID_SLOT) { + if(slot == INVALID_SLOT) { slotOverhead = (*freelist_ptr(page) == INVALID_SLOT) ? SLOTTED_PAGE_OVERHEAD_PER_RECORD : 0; - } else if(slot < *numslots_ptr(page)) { + } else if(slot < *numslots_ptr(page)) { slotOverhead = 0; - } else { + } else { // slotOverhead = SLOTTED_PAGE_OVERHEAD_PER_RECORD * (*numslots_ptr(page) - slot); slotOverhead = SLOTTED_PAGE_OVERHEAD_PER_RECORD * ((slot+1) - *numslots_ptr(page)); } // end_of_free_space points to the beginning of the slot header at the bottom of the page header. - byte* end_of_free_space = (byte*)slot_length_ptr(page, (*numslots_ptr(page))-1); + byte* end_of_free_space = (byte*)slot_length_ptr(page, (*numslots_ptr(page))-1); // start_of_free_space points to the first unallocated byte in the page // (ignoring space that could be reclaimed by compaction) @@ -235,10 +235,10 @@ static size_t slottedFreespaceForSlot(Page * page, int slot) { assert(end_of_free_space >= start_of_free_space); - if(end_of_free_space < start_of_free_space + slotOverhead) { + if(end_of_free_space < start_of_free_space + slotOverhead) { // The regions would overlap after allocation. There is no free space. return 0; - } else { + } else { // The regions would not overlap. There might be free space. return (size_t) (end_of_free_space - start_of_free_space - slotOverhead); } @@ -267,11 +267,11 @@ static void really_do_ralloc(Page * page, recordid rid) { assertlocked(page->rwlatch); short freeSpace; - + // Compact the page if we don't have enough room. if(slottedFreespaceForSlot(page, rid.slot) < stasis_record_type_to_size(rid.size)) { slottedCompact(page); - + // Make sure we have enough enough free space for the new record assert (slottedFreespaceForSlot(page, rid.slot) >= stasis_record_type_to_size(rid.size)); } @@ -281,43 +281,43 @@ static void really_do_ralloc(Page * page, recordid rid) { // Remove this entry from the freelist (if necessary) slottedCompact // assumes that this does not change the order of items in the list. // If it did, then slottedCompact could leaks slot id's (or worse!) - if(rid.slot < *numslots_ptr(page) && *slot_ptr(page,rid.slot) == INVALID_SLOT) { + if(rid.slot < *numslots_ptr(page) && *slot_ptr(page,rid.slot) == INVALID_SLOT) { short next = *freelist_ptr(page); short last = INVALID_SLOT; - // special case: is the slot physically before us the predecessor? - if(rid.slot > 0) { - if(*slot_length_ptr(page, rid.slot-1) == rid.slot && *slot_ptr(page, rid.slot-1) == INVALID_SLOT) { + // special case: is the slot physically before us the predecessor? + if(rid.slot > 0) { + if(*slot_length_ptr(page, rid.slot-1) == rid.slot && *slot_ptr(page, rid.slot-1) == INVALID_SLOT) { next = rid.slot; last = rid.slot-1; - } + } } - while(next != INVALID_SLOT && next != rid.slot) { + while(next != INVALID_SLOT && next != rid.slot) { last = next; assert(next < *numslots_ptr(page)); short next_slot_ptr = *slot_ptr(page, next); assert(next_slot_ptr == INVALID_SLOT); next = *slot_length_ptr(page, next); } - if(next == rid.slot) { - if(last == INVALID_SLOT) { + if(next == rid.slot) { + if(last == INVALID_SLOT) { *freelist_ptr(page) = *slot_length_ptr(page, rid.slot); - } else { + } else { *slot_length_ptr(page, last) = *slot_length_ptr(page, rid.slot); } } } // Insert any slots that come between the previous numslots_ptr() - // and the slot we're allocating onto the freelist. In order to + // and the slot we're allocating onto the freelist. In order to // promote the reuse of free slot numbers, we go out of our way to make sure - // that we put them in the list in increasing order. (Note: slottedCompact's + // that we put them in the list in increasing order. (Note: slottedCompact's // correctness depends on this behavior!) - - if(rid.slot > *numslots_ptr(page)) { + + if(rid.slot > *numslots_ptr(page)) { short lastSlot; short numSlots = *numslots_ptr(page); - if(*freelist_ptr(page) == INVALID_SLOT) { + if(*freelist_ptr(page) == INVALID_SLOT) { *freelist_ptr(page) = numSlots; lastSlot = numSlots; @@ -330,7 +330,7 @@ static void really_do_ralloc(Page * page, recordid rid) { } else { lastSlot = INVALID_SLOT; short next = *freelist_ptr(page); - while(next != INVALID_SLOT) { + while(next != INVALID_SLOT) { lastSlot = next; next = *slot_length_ptr(page, lastSlot); assert(lastSlot < *numslots_ptr(page)); @@ -341,8 +341,8 @@ static void really_do_ralloc(Page * page, recordid rid) { } // lastSlot now contains the tail of the free list. We can start adding slots to the list starting at *numslots_ptr. - - while(*numslots_ptr(page) < rid.slot) { + + while(*numslots_ptr(page) < rid.slot) { *slot_length_ptr(page, lastSlot) = *numslots_ptr(page); lastSlot = *numslots_ptr(page); *slot_ptr(page, lastSlot) = INVALID_SLOT; @@ -355,14 +355,14 @@ static void really_do_ralloc(Page * page, recordid rid) { } - if(*numslots_ptr(page) == rid.slot) { + if(*numslots_ptr(page) == rid.slot) { *numslots_ptr(page) = rid.slot+1; } assert(*numslots_ptr(page) > rid.slot); DEBUG("Num slots %d\trid.slot %d\n", *numslots_ptr(page), rid.slot); - + // Reserve space for this record and record the space's offset in // the slot header. @@ -370,7 +370,7 @@ static void really_do_ralloc(Page * page, recordid rid) { *freespace_ptr(page) = freeSpace + stasis_record_type_to_size(rid.size); *slot_ptr(page, rid.slot) = freeSpace; - *slot_length_ptr(page, rid.slot) = rid.size; + *slot_length_ptr(page, rid.slot) = rid.size; } @@ -439,7 +439,7 @@ static recordid slottedNext(int xid, Page *p, recordid rid) { while(rid.slot < n && slottedGetType(xid,p,rid)==INVALID_SLOT) { rid.slot++; } - if(rid.slot != n) { + if(rid.slot != n) { rid.size = *slot_length_ptr(p, rid.slot); return rid; } else { @@ -526,6 +526,7 @@ static void slottedCleanup(Page *p) { } page_impl slottedImpl() { static page_impl pi = { SLOTTED_PAGE, + 1, slottedRead, slottedWrite, 0,// readDone diff --git a/src/stasis/pageHandle.c b/src/stasis/pageHandle.c index d3dd51c..96cf4cd 100644 --- a/src/stasis/pageHandle.c +++ b/src/stasis/pageHandle.c @@ -40,7 +40,7 @@ static void phRead(stasis_page_handle_t * ph, Page * ret) { } } ret->dirty = 0; - stasis_page_loaded(ret); + stasis_page_loaded(ret, UNKNOWN_TYPE_PAGE); unlock(ret->rwlatch); } static void phForce(stasis_page_handle_t * ph) { diff --git a/stasis/common.h b/stasis/common.h index fea5e5c..ec7f27a 100644 --- a/stasis/common.h +++ b/stasis/common.h @@ -110,6 +110,9 @@ typedef int32_t slotid_t; typedef uint16_t pageoff_t; #define PAGEOFF_T_MAX UINT16_MAX; +typedef int16_t pagetype_t; +#define PAGETYPE_T_MAX INT16_MAX; + /*#define DEBUGGING */ /*#define PROFILE_LATCHES*/ /*#define NO_LATCHES */ diff --git a/stasis/constants.h b/stasis/constants.h index 7cf4c94..e8eb2b7 100644 --- a/stasis/constants.h +++ b/stasis/constants.h @@ -227,7 +227,7 @@ static const short SLOT_TYPE_LENGTHS[] = { -1, -1, sizeof(blob_record_t), -1}; #define XPREPARE 8 /* Page types */ - +#define UNKNOWN_TYPE_PAGE (-1) #define UNINITIALIZED_PAGE 0 #define SLOTTED_PAGE 1 #define INDIRECT_PAGE 2 diff --git a/stasis/page.h b/stasis/page.h index 6be7336..a14312b 100644 --- a/stasis/page.h +++ b/stasis/page.h @@ -123,6 +123,16 @@ BEGIN_C_DECLS */ struct Page_s { pageid_t id; + /** + * The type of this page. Set when the page is loaded from disk. If the page contains + * a header, this will be set automatically. Otherwise, it must be passed in by the code + * that pinned the page. + */ + int pageType; + /** + * The LSN of the page (or an estimate). Set when page is loaded from disk. + * The on-page LSN (if any) is set at page writeback. + */ lsn_t LSN; byte *memAddr; byte dirty; @@ -540,7 +550,7 @@ void stasis_record_free(int xid, Page * p, recordid rid); int stasis_block_supported(int xid, Page * p); int stasis_record_freespace(int xid, Page * p); void stasis_record_compact(Page * p); -void stasis_page_loaded(Page * p); +void stasis_page_loaded(Page * p, pagetype_t type); void stasis_page_flushed(Page * p); void stasis_page_cleanup(Page * p); /** @@ -677,7 +687,7 @@ void stasis_block_done(int xid, Page * p, block_t * done); */ typedef struct page_impl { int page_type; - + int has_header; // ---------- Record access /** diff --git a/test/stasis/check_bTree.c b/test/stasis/check_bTree.c index b81d8b5..7b80100 100644 --- a/test/stasis/check_bTree.c +++ b/test/stasis/check_bTree.c @@ -235,7 +235,7 @@ int SimpleExample(){ /* check to make sure page is recorded as a FIXED_PAGE */ - assert( *stasis_page_type_ptr(p1) == FIXED_PAGE); + assert( p1->pageType == FIXED_PAGE); if (DEBUGP) { printf("\n%lld\n", (long long)pageid1); } byte * b1 = (byte *) malloc (sizeof (int)); diff --git a/test/stasis/check_indirect.c b/test/stasis/check_indirect.c index d29133b..dc3673a 100644 --- a/test/stasis/check_indirect.c +++ b/test/stasis/check_indirect.c @@ -102,11 +102,7 @@ START_TEST(indirectAlloc) { Page * p = loadPage(xid, page); - int page_type = *stasis_page_type_ptr(p); - - assert(page_type == SLOTTED_PAGE); - - fail_unless(page_type == SLOTTED_PAGE, NULL); + assert(p->pageType == SLOTTED_PAGE); releasePage(p); @@ -123,13 +119,7 @@ START_TEST(indirectAlloc) { p = loadPage(xid, page); - page_type = *stasis_page_type_ptr(p); - - assert(page_type == INDIRECT_PAGE); - - fail_unless(page_type == INDIRECT_PAGE, NULL); - - + assert(p->pageType == INDIRECT_PAGE); printf("{page = %lld, slot = %d, size = %lld}\n", (pageid_t)rid.page, rid.slot, (pageid_t)rid.size); @@ -147,13 +137,7 @@ START_TEST(indirectAlloc) { p = loadPage(xid, page); - page_type = *stasis_page_type_ptr(p); - - assert(page_type == INDIRECT_PAGE); - - fail_unless(page_type == INDIRECT_PAGE, NULL); - - + assert(p->pageType == INDIRECT_PAGE); printf("{page = %lld, slot = %d, size = %lld}\n", (pageid_t)rid.page, rid.slot, (pageid_t)rid.size); @@ -178,11 +162,7 @@ START_TEST(indirectAccessDirect) { Page * p = loadPage(xid, page); - int page_type = *stasis_page_type_ptr(p); - - assert(page_type == SLOTTED_PAGE); - - fail_unless(page_type == SLOTTED_PAGE, NULL); + assert(p->pageType == SLOTTED_PAGE); releasePage(p); @@ -223,11 +203,7 @@ START_TEST(indirectAccessIndirect) { Page * p = loadPage(xid, page); - int page_type = *stasis_page_type_ptr(p); - - assert(page_type == INDIRECT_PAGE); - - fail_unless(page_type == INDIRECT_PAGE, NULL); + assert(p->pageType == INDIRECT_PAGE); Tcommit(xid); xid = Tbegin(); diff --git a/test/stasis/check_page.c b/test/stasis/check_page.c index c8aab45..4dcb28e 100644 --- a/test/stasis/check_page.c +++ b/test/stasis/check_page.c @@ -323,7 +323,7 @@ static void checkPageIterators(int xid, Page *p,int record_count) { assertRecordCountSizeType(xid, p, record_count, sizeof(int64_t), NORMAL_SLOT); - if(*stasis_page_type_ptr(p) == SLOTTED_PAGE) { + if(p->pageType == SLOTTED_PAGE) { recordid other = first; other.slot = 3; stasis_record_free(xid,p,other); diff --git a/test/stasis/check_pageOperations.c b/test/stasis/check_pageOperations.c index 7375aff..6b6475a 100644 --- a/test/stasis/check_pageOperations.c +++ b/test/stasis/check_pageOperations.c @@ -77,12 +77,11 @@ START_TEST(pageOpCheckRecovery) { memset(p.memAddr, 1, USABLE_SIZE_OF_PAGE); // Reset the page type after overwriting it with memset. Otherwise, Stasis // will try to interpret it when it flushes the page to disk. - *stasis_page_type_ptr(&p) = 0; + p.pageType = 0; TpageSetRange(xid, pageid1, 0, p.memAddr, USABLE_SIZE_OF_PAGE); memset(p.memAddr, 2, USABLE_SIZE_OF_PAGE); - *stasis_page_type_ptr(&p) = 0; TpageSetRange(xid, pageid2, 0, p.memAddr, USABLE_SIZE_OF_PAGE); @@ -121,23 +120,19 @@ START_TEST(pageOpCheckRecovery) { assert(pageid2 != pageid3); memset(p.memAddr, 3, USABLE_SIZE_OF_PAGE); - *stasis_page_type_ptr(&p) = 0; TpageSetRange(xid, pageid3, 0, p.memAddr, USABLE_SIZE_OF_PAGE); byte newAddr[USABLE_SIZE_OF_PAGE]; memset(p.memAddr, 1, USABLE_SIZE_OF_PAGE); - *stasis_page_type_ptr(&p) = 0; TpageGet(xid, pageid1, newAddr); assert(!memcmp(p.memAddr, newAddr, USABLE_SIZE_OF_PAGE)); memset(p.memAddr, 2, USABLE_SIZE_OF_PAGE); - *stasis_page_type_ptr(&p) = 0; TpageGet(xid, pageid2, newAddr); assert(!memcmp(p.memAddr, newAddr, USABLE_SIZE_OF_PAGE)); memset(p.memAddr, 3, USABLE_SIZE_OF_PAGE); - *stasis_page_type_ptr(&p) = 0; TpageGet(xid, pageid3, newAddr); assert(!memcmp(p.memAddr, newAddr, USABLE_SIZE_OF_PAGE)); Tcommit(xid);