Move page type to Page_s struct. This breaks indirect.c (which is poorly thought out, and will be removed shortly), and was the next step on the road to segment-based recovery

This commit is contained in:
Sears Russell 2009-07-07 19:20:22 +00:00
parent b66328192c
commit 2593de7a2c
24 changed files with 219 additions and 228 deletions

View file

@ -7,3 +7,4 @@ param
callbacks
mutex
todo
writeback

View file

@ -51,7 +51,7 @@ void stasis_blob_write(int xid, Page * p, recordid rid, const void* dat) {
for(; (chunk+1) * USABLE_SIZE_OF_PAGE < rid.size; chunk++) {
Page * cnk = loadPage(xid, rec.offset+chunk);
writelock(cnk->rwlatch,0);
if(*stasis_page_type_ptr(cnk) != BLOB_PAGE) {
if(cnk->pageType != BLOB_PAGE) {
stasis_page_blob_initialize(cnk);
}
unlock(cnk->rwlatch);
@ -61,7 +61,7 @@ void stasis_blob_write(int xid, Page * p, recordid rid, const void* dat) {
}
Page * cnk = loadPage(xid, rec.offset+chunk);
writelock(cnk->rwlatch,0);
if(*stasis_page_type_ptr(cnk) != BLOB_PAGE) {
if(p->pageType != BLOB_PAGE) {
stasis_page_blob_initialize(cnk);
}
unlock(cnk->rwlatch);
@ -86,6 +86,7 @@ static void stasis_page_blob_cleanup(Page *p) { }
page_impl stasis_page_blob_impl() {
page_impl pi = {
BLOB_PAGE,
1,
0, //read,
0, //write,
0, //readDone
@ -115,5 +116,5 @@ void stasis_page_blob_initialize(Page * p) {
assertlocked(p->rwlatch);
DEBUG("lsn: %lld\n",(long long)p->LSN);
stasis_page_cleanup(p);
*stasis_page_type_ptr(p) = BLOB_PAGE;
p->pageType = BLOB_PAGE;
}

View file

@ -283,7 +283,7 @@ static Page * bhLoadPageImpl_helper(int xid, const pageid_t pageid, int uninitia
memset(ret->memAddr,0,PAGE_SIZE);
*stasis_page_lsn_ptr(ret) = ret->LSN;
ret->dirty = 0;
stasis_page_loaded(ret);
stasis_page_loaded(ret, UNKNOWN_TYPE_PAGE);
}
*pagePendingPtr(ret) = 0;
// Would remove from lru, but getFreePage() guarantees that it isn't

View file

@ -284,7 +284,7 @@ static Page* bufManGetPage(pageid_t pageid, int locktype, int uninitialized) {
*stasis_page_lsn_ptr(ret) = ret->LSN;
// XXX need mutex for this call?
stasis_page_loaded(ret);
stasis_page_loaded(ret, UNKNOWN_TYPE_PAGE);
}
writeunlock(ret->loadlatch);

View file

@ -71,7 +71,7 @@ static void pfPageRead(stasis_page_handle_t * h, Page *ret) {
}
ret->dirty = 0;
stasis_page_loaded(ret);
stasis_page_loaded(ret, UNKNOWN_TYPE_PAGE);
pthread_mutex_unlock(&stable_mutex);

View file

@ -223,7 +223,7 @@ static void stasis_alloc_register_old_regions() {
for(pageid_t i = 0; i < t.size; i++) {
Page * p = loadPage(-1, boundary + i);
readlock(p->rwlatch,0);
if(*stasis_page_type_ptr(p) == SLOTTED_PAGE) {
if(p->pageType == SLOTTED_PAGE) {
availablePage * next = malloc(sizeof(availablePage));
next->pageid = boundary+i;
next->freespace = stasis_record_freespace(-1, p);

View file

@ -91,7 +91,7 @@ static int array_list_op_init_header(const LogEntry* e, Page* p) {
stasis_record_write(e->xid, p, firstDataPageRid, (const byte*)&firstDataPage);
*stasis_page_type_ptr(p) = ARRAY_LIST_PAGE;
p->pageType = ARRAY_LIST_PAGE;
return 0;
}

View file

@ -278,7 +278,7 @@ recordid TlsmCreate(int xid, int comparator,
Page *p = loadPage(xid, ret.page);
writelock(p->rwlatch,0);
stasis_fixed_initialize_page(p, sizeof(lsmTreeNodeRecord) + keySize, 0);
*stasis_page_type_ptr(p) = LSM_ROOT_PAGE;
p->pageType = LSM_ROOT_PAGE;
lsmTreeState *state = malloc(sizeof(lsmTreeState));
state->lastLeaf = -1; /// XXX define something in constants.h?
@ -403,8 +403,7 @@ static recordid appendInternalNode(int xid, Page *p,
pageid_t val_page, pageid_t lastLeaf,
lsm_page_allocator_t allocator,
void *allocator_state) {
assert(*stasis_page_type_ptr(p) == LSM_ROOT_PAGE ||
*stasis_page_type_ptr(p) == FIXED_PAGE);
assert(p->pageType == LSM_ROOT_PAGE || p->pageType == FIXED_PAGE);
if(!depth) {
// leaf node.
recordid ret = stasis_record_alloc_begin(xid, p, sizeof(lsmTreeNodeRecord)+key_len);
@ -720,7 +719,7 @@ pageid_t TlsmLastPage(int xid, recordid tree) {
}
Page * root = loadPage(xid, tree.page);
readlock(root->rwlatch,0);
assert(*stasis_page_type_ptr(root) == LSM_ROOT_PAGE);
assert(root->pageType == LSM_ROOT_PAGE);
lsmTreeState *state = root->impl;
int keySize = getKeySize(xid,root);
if(state->lastLeaf == -1) {

View file

@ -3,7 +3,7 @@
#include <stasis/logger/reorderingHandle.h>
#include <string.h>
static int op_lsn_free_set(const LogEntry *e, Page *p) {
if(*stasis_page_type_ptr(p) != SLOTTED_LSN_FREE_PAGE) { abort() ; }
if(p->pageType != SLOTTED_LSN_FREE_PAGE) { abort() ; }
assert(e->update.arg_size >= (sizeof(pageoff_t) * 2));
int size = e->update.arg_size;
size -= (2*sizeof(pageoff_t));
@ -14,7 +14,7 @@ static int op_lsn_free_set(const LogEntry *e, Page *p) {
return 0;
}
static int op_lsn_free_unset(const LogEntry *e, Page *p) {
if(*stasis_page_type_ptr(p) != SLOTTED_LSN_FREE_PAGE) { return 0; }
if(p->pageType != SLOTTED_LSN_FREE_PAGE) { return 0; }
assert(e->update.arg_size >= (sizeof(pageoff_t) * 2));
int size = e->update.arg_size;
size -= (2*sizeof(pageoff_t));

View file

@ -120,7 +120,7 @@ compensated_function pageid_t TpageAllocMany(int xid, int count) {
int TpageGetType(int xid, pageid_t page) {
Page * p = loadPage(xid, page);
int ret = *stasis_page_type_ptr(p);
int ret = p->pageType;
releasePage(p);
return ret;
}

View file

@ -14,19 +14,19 @@ static pthread_mutex_t region_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_t holding_mutex;
static void TregionAllocHelper(int xid, pageid_t page, pageid_t pageCount, int allocationManager);
static void TallocBoundaryTag(int xid, pageid_t page, boundary_tag* tag);
static int readBoundaryTag(int xid, pageid_t page, boundary_tag* tag);
static void TsetBoundaryTag(int xid, pageid_t page, boundary_tag* tag);
static int readBoundaryTag(int xid, pageid_t page, boundary_tag* tag);
static void TsetBoundaryTag(int xid, pageid_t page, boundary_tag* tag);
static void TdeallocBoundaryTag(int xid, pageid_t page);
/** This doesn't need a latch since it is only initiated within nested
top actions (and is local to this file. During abort(), the nested
top actions (and is local to this file. During abort(), the nested
top action's logical undo grabs the necessary latches.
*/
static int op_alloc_boundary_tag(const LogEntry* e, Page* p) {
stasis_slotted_initialize_page(p);
recordid rid = {p->id, 0, sizeof(boundary_tag)};
assert(e->update.arg_size == sizeof(boundary_tag));
*stasis_page_type_ptr(p) = BOUNDARY_TAG_PAGE;
p->pageType = BOUNDARY_TAG_PAGE;
stasis_record_alloc_done(e->xid, p, rid);
byte * buf = stasis_record_write_begin(e->xid, p, rid);
memcpy(buf, getUpdateArgs(e), stasis_record_length_read(e->xid, p, rid));
@ -49,7 +49,7 @@ static int operate_dealloc_region_unlocked(int xid, regionAllocArg *dat) {
pageid_t firstPage = dat->startPage + 1;
boundary_tag t;
boundary_tag t;
int ret = readBoundaryTag(xid, firstPage - 1, &t);
assert(ret);
@ -83,7 +83,7 @@ static void TallocBoundaryTag(int xid, pageid_t page, boundary_tag* tag) {
Tupdate(xid, page, tag, sizeof(boundary_tag), OPERATION_ALLOC_BOUNDARY_TAG);
}
int readBoundaryTag(int xid, pageid_t page, boundary_tag* tag) {
int readBoundaryTag(int xid, pageid_t page, boundary_tag* tag) {
assert(holding_mutex == pthread_self());
recordid rid = { page, 0, sizeof(boundary_tag) };
if(TpageGetType(xid, rid.page) != BOUNDARY_TAG_PAGE) {
@ -104,7 +104,7 @@ int TregionReadBoundaryTag(int xid, pageid_t page, boundary_tag* tag) {
return ret;
}
static void TsetBoundaryTag(int xid, pageid_t page, boundary_tag* tag) {
static void TsetBoundaryTag(int xid, pageid_t page, boundary_tag* tag) {
//printf("Write boundary tag at %d = { %d, %d, %d }\n", page, tag->size, tag->prev_size, tag->status);
// Sanity checking:
@ -132,13 +132,12 @@ static void TdeallocBoundaryTag(int xid, pageid_t page) {
TsetBoundaryTag(xid, page, &t);
}
void regionsInit() {
void regionsInit() {
Page * p = loadPage(-1, 0);
int pageType = *stasis_page_type_ptr(p);
holding_mutex = pthread_self();
if(pageType != BOUNDARY_TAG_PAGE) {
if(p->pageType != BOUNDARY_TAG_PAGE) {
boundary_tag t;
t.size = PAGEID_T_MAX;
t.prev_size = PAGEID_T_MAX;
@ -173,7 +172,7 @@ int TregionNextBoundaryTag(int xid, pageid_t* pid, boundary_tag * tag, int type)
int ret = readBoundaryTag(xid, *pid-1, tag);
if(ret) {
if(ret) {
while(1) {
if(tag->size == PAGEID_T_MAX) {
ret = 0;
@ -195,7 +194,7 @@ int TregionNextBoundaryTag(int xid, pageid_t* pid, boundary_tag * tag, int type)
return ret;
}
void fsckRegions(int xid) {
void fsckRegions(int xid) {
// Ignore region_xid, allocation_manager for now.
pthread_mutex_lock(&region_mutex);
@ -212,23 +211,23 @@ void fsckRegions(int xid) {
assert(ret);
assert(tag.prev_size == PAGEID_T_MAX);
while(tag.size != PAGEID_T_MAX) {
while(tag.size != PAGEID_T_MAX) {
// Ignore region_xid, allocation_manager for now.
assert(tag.status == REGION_VACANT || tag.status == REGION_ZONED);
assert(prev_tag.size == tag.prev_size);
for(pageid_t i = 0; i < tag.size; i++) {
for(pageid_t i = 0; i < tag.size; i++) {
pageid_t thisPage = tagPage + 1 + i;
pageType = TpageGetType(xid, thisPage);
if(pageType == BOUNDARY_TAG_PAGE) {
if(pageType == BOUNDARY_TAG_PAGE) {
boundary_tag orphan;
int ret = readBoundaryTag(xid, thisPage, &orphan);
assert(ret);
assert(orphan.status == REGION_CONDEMNED);
Page * p = loadPage(xid, thisPage);
releasePage(p);
} else if (pageType == SLOTTED_PAGE) {
} else if (pageType == SLOTTED_PAGE) {
Page * p = loadPage(xid, thisPage);
releasePage(p);
}
@ -251,7 +250,7 @@ static void TregionAllocHelper(int xid, pageid_t page, pageid_t pageCount, int a
int ret = readBoundaryTag(xid, page, &t);
assert(ret);
if(t.size != pageCount) {
if(t.size != pageCount) {
// need to split region
// allocate new boundary tag.
@ -259,7 +258,7 @@ static void TregionAllocHelper(int xid, pageid_t page, pageid_t pageCount, int a
pageid_t newPageid = page + pageCount + 1;
boundary_tag new_tag;
if(t.size != PAGEID_T_MAX) {
new_tag.size = t.size - pageCount - 1; // pageCount must be strictly less than t->size, so this is non-negative.
@ -270,17 +269,17 @@ static void TregionAllocHelper(int xid, pageid_t page, pageid_t pageCount, int a
succ_tag.prev_size = new_tag.size;
TsetBoundaryTag(xid, page + t.size + 1, &succ_tag);
} else {
} else {
new_tag.size = PAGEID_T_MAX;
}
new_tag.prev_size = pageCount;
// Create the new region, and disassociate it from this transaction immediately.
// This has two implications:
// Create the new region, and disassociate it from this transaction immediately.
// This has two implications:
// - It could cause some fragmentation if interleaved transactions are allocating, and some abort.
// - Multiple transactions can allocate space at the end of the page file without blocking each other.
new_tag.status = REGION_VACANT;
new_tag.status = REGION_VACANT;
new_tag.region_xid = INVALID_XID;
new_tag.allocation_manager = 0;
@ -297,7 +296,7 @@ static void TregionAllocHelper(int xid, pageid_t page, pageid_t pageCount, int a
}
static void consolidateRegions(int xid, pageid_t * firstPage, boundary_tag *t) {
static void consolidateRegions(int xid, pageid_t * firstPage, boundary_tag *t) {
if(t->status != REGION_VACANT || TisActiveTransaction(t->region_xid)) { return; }
@ -314,7 +313,7 @@ static void consolidateRegions(int xid, pageid_t * firstPage, boundary_tag *t)
// TODO: Check stasis_page_type_ptr()...
if(succ_tag.size == PAGEID_T_MAX) {
if(succ_tag.size == PAGEID_T_MAX) {
t->size = PAGEID_T_MAX;
assert(succ_tag.status == REGION_VACANT);
// TODO: Truncate page file.
@ -331,10 +330,10 @@ static void consolidateRegions(int xid, pageid_t * firstPage, boundary_tag *t)
assert(ret);
succ_succ_tag.prev_size = t->size;
TsetBoundaryTag(xid, succ_succ_page, &succ_succ_tag);
TdeallocBoundaryTag(xid, succ_page);
mustWriteOriginalTag = 1;
} else {
} else {
mustWriteOriginalTag = 0;
}
@ -345,30 +344,30 @@ static void consolidateRegions(int xid, pageid_t * firstPage, boundary_tag *t)
// creates a situation where the current page is not a boundary
// tag...)
if(t->prev_size != PAGEID_T_MAX) {
if(t->prev_size != PAGEID_T_MAX) {
pageid_t pred_page = ((*firstPage) - 1) - t->prev_size; // If the predecessor is length zero, then it's boundary tag is two pages before this region's tag.
boundary_tag pred_tag;
int ret = readBoundaryTag(xid, pred_page, &pred_tag);
assert(ret);
if(pred_tag.status == REGION_VACANT && (!TisActiveTransaction(pred_tag.region_xid))) {
if(pred_tag.status == REGION_VACANT && (!TisActiveTransaction(pred_tag.region_xid))) {
TdeallocBoundaryTag(xid, *firstPage);
if(t->size == PAGEID_T_MAX) {
if(t->size == PAGEID_T_MAX) {
pred_tag.size = PAGEID_T_MAX;
// TODO: truncate region
} else {
} else {
pred_tag.size += (t->size + 1);
pageid_t succ_page = (*firstPage) + 1+ t->size;
assert(pred_page + pred_tag.size + 1 == succ_page);
boundary_tag succ_tag;
ret = readBoundaryTag(xid, succ_page, &succ_tag);
assert(ret);
@ -377,19 +376,19 @@ static void consolidateRegions(int xid, pageid_t * firstPage, boundary_tag *t)
assert(succ_page - pred_page - 1 == pred_tag.size);
}
TsetBoundaryTag(xid, pred_page, &pred_tag);
assert(pred_page < *firstPage);
(*firstPage) = pred_page;
(*t) = pred_tag;
} else {
if(mustWriteOriginalTag) {
} else {
if(mustWriteOriginalTag) {
TsetBoundaryTag(xid, (*firstPage), t);
}
}
}
} else {
if(mustWriteOriginalTag) {
} else {
if(mustWriteOriginalTag) {
TsetBoundaryTag(xid, (*firstPage), t);
}
}
@ -414,7 +413,7 @@ void TregionDealloc(int xid, pageid_t firstPage) {
regionAllocArg arg = { firstPage-1, t.size, t.allocation_manager };
assert(t.status != REGION_VACANT);
assert(t.status != REGION_VACANT);
void * handle = TbeginNestedTopAction(xid, OPERATION_DEALLOC_REGION, (const byte*)&arg, sizeof(regionAllocArg));
@ -423,12 +422,12 @@ void TregionDealloc(int xid, pageid_t firstPage) {
firstPage --;
TendNestedTopAction(xid, handle);
holding_mutex = 0;
pthread_mutex_unlock(&region_mutex);
}
pageid_t TregionAlloc(int xid, pageid_t pageCount, int allocationManager) {
pageid_t TregionAlloc(int xid, pageid_t pageCount, int allocationManager) {
// Initial implementation. Naive first fit.
pthread_mutex_lock(&region_mutex);
@ -445,10 +444,10 @@ pageid_t TregionAlloc(int xid, pageid_t pageCount, int allocationManager) {
// printf("consolidateRegions pageid, t: %d, {%d, %d, %d} -> ", pageid, t.size, t.prev_size, t.status);
consolidateRegions(xid, &pageid, &t);
// printf(" %d, {%d, %d, %d}\tpageCount=%d\n", pageid, t.size, t.prev_size, t.status, pageCount);
while(t.status != REGION_VACANT || t.size < pageCount || TisActiveTransaction(t.region_xid)) {
while(t.status != REGION_VACANT || t.size < pageCount || TisActiveTransaction(t.region_xid)) {
// TODO: This while loop and the boundary tag manipulation below should be factored into two submodules.
// printf("t.status = %d, REGION_VACANT = %d, t.size = %d, pageCount = %d\n", t.status, REGION_VACANT, t.size, pageCount);
@ -457,14 +456,14 @@ pageid_t TregionAlloc(int xid, pageid_t pageCount, int allocationManager) {
assert(ret);
// printf("\tconsolidateRegions pageid, t: %d, {%d, %d, %d} -> ", pageid, t.size, t.prev_size, t.status);
consolidateRegions(xid, &pageid, &t);
// printf(" %d, {%d, %d, %d}\tpageCount=%d\n", pageid, t.size, t.prev_size, t.status, pageCount);
}
// printf("page = %d, t.status = %d, REGION_VACANT = %d, t.size = %d, pageCount = %d (alloced)\n", pageid, t.status, REGION_VACANT, t.size, pageCount);
TendNestedTopAction(xid, ntaHandle);
regionAllocArg arg = { pageid, pageCount, allocationManager };
@ -476,23 +475,23 @@ pageid_t TregionAlloc(int xid, pageid_t pageCount, int allocationManager) {
holding_mutex = 0;
pthread_mutex_unlock(&region_mutex);
return pageid+1;
}
stasis_operation_impl stasis_op_impl_boundary_tag_alloc() {
stasis_operation_impl o = {
OPERATION_ALLOC_BOUNDARY_TAG,
OPERATION_ALLOC_BOUNDARY_TAG,
stasis_operation_impl o = {
OPERATION_ALLOC_BOUNDARY_TAG,
OPERATION_ALLOC_BOUNDARY_TAG,
OPERATION_NOOP,
op_alloc_boundary_tag
};
return o;
}
stasis_operation_impl stasis_op_impl_region_alloc() {
stasis_operation_impl o = {
stasis_operation_impl stasis_op_impl_region_alloc() {
stasis_operation_impl o = {
OPERATION_ALLOC_REGION,
OPERATION_NOOP,
OPERATION_ALLOC_REGION_INVERSE,
@ -500,8 +499,8 @@ stasis_operation_impl stasis_op_impl_region_alloc() {
};
return o;
}
stasis_operation_impl stasis_op_impl_region_alloc_inverse() {
stasis_operation_impl o = {
stasis_operation_impl stasis_op_impl_region_alloc_inverse() {
stasis_operation_impl o = {
OPERATION_ALLOC_REGION_INVERSE,
OPERATION_ALLOC_REGION_INVERSE,
OPERATION_INVALID,
@ -509,8 +508,8 @@ stasis_operation_impl stasis_op_impl_region_alloc_inverse() {
};
return o;
}
stasis_operation_impl stasis_op_impl_region_dealloc() {
stasis_operation_impl o = {
stasis_operation_impl stasis_op_impl_region_dealloc() {
stasis_operation_impl o = {
OPERATION_DEALLOC_REGION,
OPERATION_NOOP,
OPERATION_DEALLOC_REGION_INVERSE,
@ -519,8 +518,8 @@ stasis_operation_impl stasis_op_impl_region_dealloc() {
return o;
}
stasis_operation_impl stasis_op_impl_region_dealloc_inverse() {
stasis_operation_impl o = {
stasis_operation_impl stasis_op_impl_region_dealloc_inverse() {
stasis_operation_impl o = {
OPERATION_DEALLOC_REGION_INVERSE,
OPERATION_DEALLOC_REGION_INVERSE,
OPERATION_INVALID,
@ -529,22 +528,22 @@ stasis_operation_impl stasis_op_impl_region_dealloc_inverse() {
return o;
}
void TregionFindNthActive(int xid, pageid_t regionNumber, pageid_t * firstPage, pageid_t * size) {
void TregionFindNthActive(int xid, pageid_t regionNumber, pageid_t * firstPage, pageid_t * size) {
boundary_tag t;
recordid rid = {0, 0, sizeof(boundary_tag)};
pthread_mutex_lock(&region_mutex);
holding_mutex = pthread_self();
Tread(xid, rid, &t);
pageid_t prevSize = 0;
while(t.status == REGION_VACANT) {
while(t.status == REGION_VACANT) {
rid.page += (t.size + 1);
Tread(xid, rid, &t);
assert(t.size != PAGEID_T_MAX);
assert(t.prev_size != PAGEID_T_MAX);
assert(t.prev_size != PAGEID_T_MAX);
assert(prevSize == t.prev_size || !prevSize);
prevSize = t.size;
}
for(pageid_t i = 0; i < regionNumber; i++) {
for(pageid_t i = 0; i < regionNumber; i++) {
rid.page += (t.size + 1);
Tread(xid, rid, &t);
if(t.status == REGION_VACANT) { i--; }

View file

@ -180,7 +180,7 @@ int stasis_record_read(int xid, Page * p, recordid rid, byte *buf) {
recordid stasis_record_dereference(int xid, Page * p, recordid rid) {
assertlocked(p->rwlatch);
int page_type = *stasis_page_type_ptr(p);
int page_type = p->pageType;
if(page_type == INDIRECT_PAGE) {
rid = dereferenceIndirectRID(xid, rid);
} else if(page_type == ARRAY_LIST_PAGE) {
@ -194,90 +194,90 @@ recordid stasis_record_dereference(int xid, Page * p, recordid rid) {
const byte * stasis_record_read_begin(int xid, Page * p, recordid rid) {
assertlocked(p->rwlatch);
int page_type = *stasis_page_type_ptr(p);
int page_type = p->pageType;
assert(page_type);
return page_impls[page_type].recordRead(xid, p, rid);
}
byte * stasis_record_write_begin(int xid, Page * p, recordid rid) {
assertlocked(p->rwlatch);
int page_type = *stasis_page_type_ptr(p);
int page_type = p->pageType;
assert(page_type);
assert(stasis_record_length_read(xid, p, rid) == stasis_record_type_to_size(rid.size));
return page_impls[page_type].recordWrite(xid, p, rid);
}
void stasis_record_read_done(int xid, Page *p, recordid rid, const byte *b) {
int page_type = *stasis_page_type_ptr(p);
int page_type = p->pageType;
if(page_impls[page_type].recordReadDone) {
page_impls[page_type].recordReadDone(xid,p,rid,b);
}
}
void stasis_record_write_done(int xid, Page *p, recordid rid, byte *b) {
int page_type = *stasis_page_type_ptr(p);
int page_type = p->pageType;
if(page_impls[page_type].recordWriteDone) {
page_impls[page_type].recordWriteDone(xid,p,rid,b);
}
}
int stasis_record_type_read(int xid, Page *p, recordid rid) {
assertlocked(p->rwlatch);
if(page_impls[*stasis_page_type_ptr(p)].recordGetType)
return page_impls[*stasis_page_type_ptr(p)].recordGetType(xid, p, rid);
if(page_impls[p->pageType].recordGetType)
return page_impls[p->pageType].recordGetType(xid, p, rid);
else
return INVALID_SLOT;
}
void stasis_record_type_write(int xid, Page *p, recordid rid, int type) {
assertlocked(p->rwlatch);
page_impls[*stasis_page_type_ptr(p)]
page_impls[p->pageType]
.recordSetType(xid, p, rid, type);
}
int stasis_record_length_read(int xid, Page *p, recordid rid) {
assertlocked(p->rwlatch);
return page_impls[*stasis_page_type_ptr(p)]
return page_impls[p->pageType]
.recordGetLength(xid,p,rid);
}
recordid stasis_record_first(int xid, Page * p){
return page_impls[*stasis_page_type_ptr(p)]
return page_impls[p->pageType]
.recordFirst(xid,p);
}
recordid stasis_record_next(int xid, Page * p, recordid prev){
return page_impls[*stasis_page_type_ptr(p)]
return page_impls[p->pageType]
.recordNext(xid,p,prev);
}
recordid stasis_record_alloc_begin(int xid, Page * p, int size){
return page_impls[*stasis_page_type_ptr(p)]
return page_impls[p->pageType]
.recordPreAlloc(xid,p,size);
}
void stasis_record_alloc_done(int xid, Page * p, recordid rid){
page_impls[*stasis_page_type_ptr(p)]
page_impls[p->pageType]
.recordPostAlloc(xid, p, rid);
}
void stasis_record_free(int xid, Page * p, recordid rid){
page_impls[*stasis_page_type_ptr(p)]
page_impls[p->pageType]
.recordFree(xid, p, rid);
}
int stasis_block_supported(int xid, Page * p){
return page_impls[*stasis_page_type_ptr(p)]
return page_impls[p->pageType]
.isBlockSupported(xid, p);
}
block_t * stasis_block_first(int xid, Page * p){
int t = *stasis_page_type_ptr(p);
int t = p->pageType;
return page_impls[t]
.blockFirst(xid, p);
}
block_t * stasis_block_next(int xid, Page * p, block_t * prev){
return page_impls[*stasis_page_type_ptr(p)]
return page_impls[p->pageType]
.blockNext(xid, p,prev);
}
void stasis_block_done(int xid, Page * p, block_t * done){
page_impls[*stasis_page_type_ptr(p)]
page_impls[p->pageType]
.blockDone(xid, p,done);
}
int stasis_record_freespace(int xid, Page * p){
return page_impls[*stasis_page_type_ptr(p)]
return page_impls[p->pageType]
.pageFreespace(xid, p);
}
void stasis_record_compact(Page * p){
page_impls[*stasis_page_type_ptr(p)]
page_impls[p->pageType]
.pageCompact(p);
}
/** @todo How should the LSN of pages without a page_type be handled?
@ -286,26 +286,32 @@ void stasis_record_compact(Page * p){
LSN-free pages, we'll need special "loadPageForAlloc(), and
loadPageOfType() methods (or something...)
*/
void stasis_page_loaded(Page * p){
short type = *stasis_page_type_ptr(p);
if(type) {
assert(page_impls[type].page_type == type);
page_impls[type].pageLoaded(p);
void stasis_page_loaded(Page * p, pagetype_t type){
p->pageType = (type == UNKNOWN_TYPE_PAGE) ? *stasis_page_type_ptr(p) : type;
if(p->pageType) {
assert(page_impls[p->pageType].page_type == p->pageType);
page_impls[p->pageType].pageLoaded(p);
} else {
p->LSN = *stasis_page_lsn_ptr(p); // XXX kludge - shouldn't special-case UNINITIALIZED_PAGE
}
}
void stasis_page_flushed(Page * p){
short type = *stasis_page_type_ptr(p);
pagetype_t type = p->pageType;
if(type) {
assert(page_impls[type].page_type == type);
if(page_impls[type].has_header) {
*stasis_page_type_ptr(p)= type;
*stasis_page_lsn_ptr(p) = p->LSN;
}
page_impls[type].pageFlushed(p);
} else {
*stasis_page_type_ptr(p)= type;
*stasis_page_lsn_ptr(p) = p->LSN;
}
}
void stasis_page_cleanup(Page * p) {
short type = *stasis_page_type_ptr(p);
short type = p->pageType;
if(type) {
assert(page_impls[type].page_type == type);
page_impls[type].pageCleanup(p);

View file

@ -13,7 +13,7 @@ int stasis_fixed_records_per_page(size_t size) {
void stasis_fixed_initialize_page(Page * page, size_t size, int count) {
assertlocked(page->rwlatch);
stasis_page_cleanup(page);
*stasis_page_type_ptr(page) = FIXED_PAGE;
page->pageType = FIXED_PAGE;
*recordsize_ptr(page) = size;
assert(count <= stasis_fixed_records_per_page(size));
*recordcount_ptr(page)= count;
@ -21,7 +21,7 @@ void stasis_fixed_initialize_page(Page * page, size_t size, int count) {
static void checkRid(Page * page, recordid rid) {
assertlocked(page->rwlatch);
assert(*stasis_page_type_ptr(page)); // any more specific breaks pages based on this one
assert(page->pageType); // any more specific breaks pages based on this one
assert(page->id == rid.page);
assert(*recordsize_ptr(page) == rid.size);
assert(stasis_fixed_records_per_page(rid.size) > rid.slot);
@ -48,7 +48,7 @@ static int fixedGetType(int xid, Page *p, recordid rid) {
// checkRid(p, rid);
if(rid.slot < *recordcount_ptr(p)) {
int type = *recordsize_ptr(p);
if(type > 0) {
if(type > 0) {
type = NORMAL_SLOT;
}
return type;
@ -65,7 +65,7 @@ static void fixedSetType(int xid, Page *p, recordid rid, int type) {
}
static int fixedGetLength(int xid, Page *p, recordid rid) {
assertlocked(p->rwlatch);
assert(*stasis_page_type_ptr(p));
assert(p->pageType);
return rid.slot > *recordcount_ptr(p) ?
INVALID_SLOT : stasis_record_type_to_size(*recordsize_ptr(p));
}
@ -124,6 +124,7 @@ void fixedCleanup(Page *p) { }
page_impl fixedImpl() {
static page_impl pi = {
FIXED_PAGE,
1,
fixedRead,
fixedWrite,
0,// readDone
@ -159,5 +160,5 @@ page_impl arrayListImpl() {
return pi;
}
void fixedPageInit() { }
void fixedPageInit() { }
void fixedPageDeinit() { }

View file

@ -11,7 +11,7 @@
void indirectInitialize(Page * p, int height) {
stasis_page_cleanup(p);
*level_ptr(p) = height;
*stasis_page_type_ptr(p) = INDIRECT_PAGE;
p->pageType = INDIRECT_PAGE;
memset(p->memAddr, INVALID_SLOT, ((size_t)level_ptr(p)) - ((size_t)p->memAddr));
}
/** @todo Is locking for dereferenceRID really necessary? */
@ -24,7 +24,7 @@ compensated_function recordid dereferenceIndirectRID(int xid, recordid rid) {
// printf("a"); fflush(stdout);
int offset = 0;
int max_slot;
while(*stasis_page_type_ptr(page) == INDIRECT_PAGE) {
while(page->pageType == INDIRECT_PAGE) {
int i = 0;
for(max_slot = *maxslot_ptr(page, i); ( max_slot + offset ) <= rid.slot; max_slot = *maxslot_ptr(page, i)) {
i++;
@ -78,7 +78,7 @@ compensated_function recordid rallocMany(int xid, int recordSize, int recordCoun
recordid ret;
int page;
try_ret(NULLRID) {
page = TpageAlloc(xid/*, SLOTTED_PAGE*/);
page = TpageAlloc(xid);
}end_ret(NULLRID);
try_ret(NULLRID) {
ret = __rallocMany(xid, page, recordSize, recordCount);
@ -111,7 +111,7 @@ compensated_function recordid __rallocMany(int xid, pageid_t parentPage, int rec
p.memAddr = buffer;
p.rwlatch = initlock();
p.loadlatch = initlock();
*stasis_page_type_ptr(&p) = UNINITIALIZED_PAGE;
p.pageType = UNINITIALIZED_PAGE;
if(number_of_pages > 1) {
@ -158,9 +158,9 @@ compensated_function recordid __rallocMany(int xid, pageid_t parentPage, int rec
*page_ptr(&p, i) = firstChildPage + i;
if(i) {
*maxslot_ptr(&p, i) = *maxslot_ptr(&p, i-1) + min(tmpRecordCount+1, next_level_records_per_page);
*maxslot_ptr(&p, i) = *maxslot_ptr(&p, i-1) + min(tmpRecordCount+1, next_level_records_per_page);
} else {
*maxslot_ptr(&p, i) = min(tmpRecordCount+1, next_level_records_per_page);
*maxslot_ptr(&p, i) = min(tmpRecordCount+1, next_level_records_per_page);
}
i++;
}
@ -206,7 +206,7 @@ compensated_function int indirectPageRecordCount(int xid, recordid rid) {
readlock(p->rwlatch, 0);
int i = 0;
unsigned int ret;
if(*stasis_page_type_ptr(p) == INDIRECT_PAGE) {
if(p->pageType == INDIRECT_PAGE) {
while(*maxslot_ptr(p, i) > 0) {
i++;
@ -216,7 +216,7 @@ compensated_function int indirectPageRecordCount(int xid, recordid rid) {
} else {
ret = (*maxslot_ptr(p, i-1)) - 1;
}
} else if (*stasis_page_type_ptr(p) == SLOTTED_PAGE) {
} else if (p->pageType == SLOTTED_PAGE) {
int numslots = *numslots_ptr(p);
ret = 0;
@ -247,6 +247,7 @@ void indirectFlushed(Page *p) {
void indirectCleanup(Page *p) { }
static page_impl pi = {
INDIRECT_PAGE,
1,
0, //read,
0, //write,
0, //readDone

View file

@ -1,12 +1,10 @@
#include <stasis/page.h>
#include <stasis/page/indirect.h>
#include <stasis/page/slotted.h>
#include <stasis/logger/logger2.h>
void stasis_slotted_lsn_free_initialize_page(Page * p) {
stasis_slotted_initialize_page(p);
*stasis_page_type_ptr(p) = SLOTTED_LSN_FREE_PAGE;
*stasis_page_lsn_ptr(p) = -1;
p->pageType = SLOTTED_LSN_FREE_PAGE;
}
// XXX still not correct; need to have an "LSN_FREE" constant.
static void lsnFreeLoaded(Page * p) {

View file

@ -21,7 +21,7 @@ static inline void slottedFsck(const Page const * page) {
dummy.id = -1;
dummy.memAddr = 0;
const short page_type = *stasis_page_type_cptr(page);
const short page_type = page->pageType;
const short numslots = *numslots_cptr(page);
const short freespace = *freespace_cptr(page);
const short freelist = *freelist_cptr(page);
@ -45,7 +45,7 @@ static inline void slottedFsck(const Page const * page) {
short * slot_offsets = alloca(numslots * sizeof(short));
short * slot_lengths = alloca(numslots * sizeof(short));
for(int i = 0; i < numslots; i++) {
for(int i = 0; i < numslots; i++) {
slot_offsets[i] = *slot_ptr(page, i);
slot_lengths[i] = *slot_length_ptr(page, i);
}
@ -54,21 +54,21 @@ static inline void slottedFsck(const Page const * page) {
if(freelist != INVALID_SLOT) {
assert(slot_offsets[freelist] == INVALID_SLOT);
} else {
} else {
foundEndOfList = 1;
}
for(short i = 0; i < numslots; i++) {
for(short i = 0; i < numslots; i++) {
const short slot_length = slot_lengths[i];
const short slot_offset = slot_offsets[i];
if(slot_offset == INVALID_SLOT) {
if(slot_length == INVALID_SLOT) {
if(slot_length == INVALID_SLOT) {
assert(!foundEndOfList);
foundEndOfList = 1;
foundEndOfList = 1;
} else {
assert (slot_offsets[slot_length] == INVALID_SLOT);
}
} else {
} else {
assert(slot_offset + slot_length <= freespace);
}
}
@ -87,32 +87,32 @@ static inline void slottedFsck(const Page const * page) {
const unsigned short S_SLOT_LIST = 0xFCFC;
byte image[PAGE_SIZE];
for(short i = 0; i < PAGE_SIZE; i++) {
for(short i = 0; i < PAGE_SIZE; i++) {
image[i] = UNUSED;
}
for(short i = USABLE_SIZE_OF_PAGE; i < PAGE_SIZE; i++) {
image[i] = PAGE_HEADER;
}
for(short i = USABLE_SIZE_OF_PAGE - SLOTTED_PAGE_HEADER_OVERHEAD; i < USABLE_SIZE_OF_PAGE; i++) {
for(short i = USABLE_SIZE_OF_PAGE - SLOTTED_PAGE_HEADER_OVERHEAD; i < USABLE_SIZE_OF_PAGE; i++) {
image[i] = SLOTTED_HEADER;
}
for(short i = *freespace_ptr(page); i < slotListStart; i++) {
for(short i = *freespace_ptr(page); i < slotListStart; i++) {
image[i] = FREE_SPACE;
}
dummy.memAddr = image;
for(short i = 0; i < *numslots_ptr(page); i++) {
for(short i = 0; i < *numslots_ptr(page); i++) {
*slot_ptr(&dummy, i) = S_SLOT_LIST;
*slot_length_ptr(&dummy, i) = S_SLOT_LIST;
}
for(short i = 0; i < *numslots_ptr(page); i++) {
for(short i = 0; i < *numslots_ptr(page); i++) {
short slot_offset = *slot_ptr(page, i);
if(slot_offset != INVALID_SLOT) {
if(slot_offset != INVALID_SLOT) {
const unsigned char ci = i % 0xFF;
short slot_len = stasis_record_type_to_size(*slot_length_ptr(page, i));
for(short j = 0; j < slot_len; j++) {
for(short j = 0; j < slot_len; j++) {
assert(image[slot_offset + j] == 0xFF);
image[slot_offset + j] = ci;
}
@ -131,12 +131,12 @@ increase the available free space.
The caller of this function must have a writelock on the page.
*/
static void slottedCompact(Page * page) {
static void slottedCompact(Page * page) {
assertlocked(page->rwlatch);
Page bufPage;
Page bufPage;
byte buffer[PAGE_SIZE];
bufPage.memAddr = buffer;
// Copy external headers into bufPage.
memcpy(&buffer[USABLE_SIZE_OF_PAGE], &(page->memAddr[USABLE_SIZE_OF_PAGE]), PAGE_SIZE - USABLE_SIZE_OF_PAGE);
@ -154,17 +154,17 @@ static void slottedCompact(Page * page) {
// Rebuild free list.
for(short i = 0; i < numSlots; i++) {
if(*slot_ptr(page, i) == INVALID_SLOT) {
if(lastFreeSlot == INVALID_SLOT) {
for(short i = 0; i < numSlots; i++) {
if(*slot_ptr(page, i) == INVALID_SLOT) {
if(lastFreeSlot == INVALID_SLOT) {
*freelist_ptr(&bufPage) = i;
} else {
} else {
*slot_length_ptr(&bufPage, lastFreeSlot) = i;
}
*slot_ptr(&bufPage, i) = INVALID_SLOT;
lastFreeSlot = i;
} else {
lastUsedSlot = i;
lastUsedSlot = i;
lastFreeSlotBeforeUsedSlot = lastFreeSlot;
short logicalSize = *slot_length_ptr(page, i);
@ -179,11 +179,11 @@ static void slottedCompact(Page * page) {
}
}
// Truncate linked list, and update numslots_ptr.
*slot_length_ptr(&bufPage, lastFreeSlotBeforeUsedSlot) = INVALID_SLOT;
*numslots_ptr(&bufPage) = lastUsedSlot+1;
memcpy(page->memAddr, buffer, PAGE_SIZE);
slottedFsck(page);
@ -193,7 +193,7 @@ static void slottedCompact(Page * page) {
void slottedPageInit() {
#ifdef SLOTTED_PAGE_CHECK_FOR_OVERLAP
printf("slotted.c: Using expensive page sanity checking.\n");
#endif
#endif
}
void slottedPageDeinit() {
@ -203,7 +203,7 @@ void slottedPageDeinit() {
void stasis_slotted_initialize_page(Page * page) {
assertlocked(page->rwlatch);
stasis_page_cleanup(page);
*stasis_page_type_ptr(page) = SLOTTED_PAGE;
page->pageType = SLOTTED_PAGE;
*freespace_ptr(page) = 0;
*numslots_ptr(page) = 0;
*freelist_ptr(page) = INVALID_SLOT;
@ -214,20 +214,20 @@ void stasis_slotted_initialize_page(Page * page) {
makes it possible for callers to guarantee the safety
of a subsequent call to really_do_ralloc().
*/
static size_t slottedFreespaceForSlot(Page * page, int slot) {
static size_t slottedFreespaceForSlot(Page * page, int slot) {
assertlocked(page->rwlatch);
size_t slotOverhead;
if(slot == INVALID_SLOT) {
if(slot == INVALID_SLOT) {
slotOverhead = (*freelist_ptr(page) == INVALID_SLOT) ? SLOTTED_PAGE_OVERHEAD_PER_RECORD : 0;
} else if(slot < *numslots_ptr(page)) {
} else if(slot < *numslots_ptr(page)) {
slotOverhead = 0;
} else {
} else {
// slotOverhead = SLOTTED_PAGE_OVERHEAD_PER_RECORD * (*numslots_ptr(page) - slot);
slotOverhead = SLOTTED_PAGE_OVERHEAD_PER_RECORD * ((slot+1) - *numslots_ptr(page));
}
// end_of_free_space points to the beginning of the slot header at the bottom of the page header.
byte* end_of_free_space = (byte*)slot_length_ptr(page, (*numslots_ptr(page))-1);
byte* end_of_free_space = (byte*)slot_length_ptr(page, (*numslots_ptr(page))-1);
// start_of_free_space points to the first unallocated byte in the page
// (ignoring space that could be reclaimed by compaction)
@ -235,10 +235,10 @@ static size_t slottedFreespaceForSlot(Page * page, int slot) {
assert(end_of_free_space >= start_of_free_space);
if(end_of_free_space < start_of_free_space + slotOverhead) {
if(end_of_free_space < start_of_free_space + slotOverhead) {
// The regions would overlap after allocation. There is no free space.
return 0;
} else {
} else {
// The regions would not overlap. There might be free space.
return (size_t) (end_of_free_space - start_of_free_space - slotOverhead);
}
@ -267,11 +267,11 @@ static void really_do_ralloc(Page * page, recordid rid) {
assertlocked(page->rwlatch);
short freeSpace;
// Compact the page if we don't have enough room.
if(slottedFreespaceForSlot(page, rid.slot) < stasis_record_type_to_size(rid.size)) {
slottedCompact(page);
// Make sure we have enough enough free space for the new record
assert (slottedFreespaceForSlot(page, rid.slot) >= stasis_record_type_to_size(rid.size));
}
@ -281,43 +281,43 @@ static void really_do_ralloc(Page * page, recordid rid) {
// Remove this entry from the freelist (if necessary) slottedCompact
// assumes that this does not change the order of items in the list.
// If it did, then slottedCompact could leaks slot id's (or worse!)
if(rid.slot < *numslots_ptr(page) && *slot_ptr(page,rid.slot) == INVALID_SLOT) {
if(rid.slot < *numslots_ptr(page) && *slot_ptr(page,rid.slot) == INVALID_SLOT) {
short next = *freelist_ptr(page);
short last = INVALID_SLOT;
// special case: is the slot physically before us the predecessor?
if(rid.slot > 0) {
if(*slot_length_ptr(page, rid.slot-1) == rid.slot && *slot_ptr(page, rid.slot-1) == INVALID_SLOT) {
// special case: is the slot physically before us the predecessor?
if(rid.slot > 0) {
if(*slot_length_ptr(page, rid.slot-1) == rid.slot && *slot_ptr(page, rid.slot-1) == INVALID_SLOT) {
next = rid.slot;
last = rid.slot-1;
}
}
}
while(next != INVALID_SLOT && next != rid.slot) {
while(next != INVALID_SLOT && next != rid.slot) {
last = next;
assert(next < *numslots_ptr(page));
short next_slot_ptr = *slot_ptr(page, next);
assert(next_slot_ptr == INVALID_SLOT);
next = *slot_length_ptr(page, next);
}
if(next == rid.slot) {
if(last == INVALID_SLOT) {
if(next == rid.slot) {
if(last == INVALID_SLOT) {
*freelist_ptr(page) = *slot_length_ptr(page, rid.slot);
} else {
} else {
*slot_length_ptr(page, last) = *slot_length_ptr(page, rid.slot);
}
}
}
// Insert any slots that come between the previous numslots_ptr()
// and the slot we're allocating onto the freelist. In order to
// and the slot we're allocating onto the freelist. In order to
// promote the reuse of free slot numbers, we go out of our way to make sure
// that we put them in the list in increasing order. (Note: slottedCompact's
// that we put them in the list in increasing order. (Note: slottedCompact's
// correctness depends on this behavior!)
if(rid.slot > *numslots_ptr(page)) {
if(rid.slot > *numslots_ptr(page)) {
short lastSlot;
short numSlots = *numslots_ptr(page);
if(*freelist_ptr(page) == INVALID_SLOT) {
if(*freelist_ptr(page) == INVALID_SLOT) {
*freelist_ptr(page) = numSlots;
lastSlot = numSlots;
@ -330,7 +330,7 @@ static void really_do_ralloc(Page * page, recordid rid) {
} else {
lastSlot = INVALID_SLOT;
short next = *freelist_ptr(page);
while(next != INVALID_SLOT) {
while(next != INVALID_SLOT) {
lastSlot = next;
next = *slot_length_ptr(page, lastSlot);
assert(lastSlot < *numslots_ptr(page));
@ -341,8 +341,8 @@ static void really_do_ralloc(Page * page, recordid rid) {
}
// lastSlot now contains the tail of the free list. We can start adding slots to the list starting at *numslots_ptr.
while(*numslots_ptr(page) < rid.slot) {
while(*numslots_ptr(page) < rid.slot) {
*slot_length_ptr(page, lastSlot) = *numslots_ptr(page);
lastSlot = *numslots_ptr(page);
*slot_ptr(page, lastSlot) = INVALID_SLOT;
@ -355,14 +355,14 @@ static void really_do_ralloc(Page * page, recordid rid) {
}
if(*numslots_ptr(page) == rid.slot) {
if(*numslots_ptr(page) == rid.slot) {
*numslots_ptr(page) = rid.slot+1;
}
assert(*numslots_ptr(page) > rid.slot);
DEBUG("Num slots %d\trid.slot %d\n", *numslots_ptr(page), rid.slot);
// Reserve space for this record and record the space's offset in
// the slot header.
@ -370,7 +370,7 @@ static void really_do_ralloc(Page * page, recordid rid) {
*freespace_ptr(page) = freeSpace + stasis_record_type_to_size(rid.size);
*slot_ptr(page, rid.slot) = freeSpace;
*slot_length_ptr(page, rid.slot) = rid.size;
*slot_length_ptr(page, rid.slot) = rid.size;
}
@ -439,7 +439,7 @@ static recordid slottedNext(int xid, Page *p, recordid rid) {
while(rid.slot < n && slottedGetType(xid,p,rid)==INVALID_SLOT) {
rid.slot++;
}
if(rid.slot != n) {
if(rid.slot != n) {
rid.size = *slot_length_ptr(p, rid.slot);
return rid;
} else {
@ -526,6 +526,7 @@ static void slottedCleanup(Page *p) { }
page_impl slottedImpl() {
static page_impl pi = {
SLOTTED_PAGE,
1,
slottedRead,
slottedWrite,
0,// readDone

View file

@ -40,7 +40,7 @@ static void phRead(stasis_page_handle_t * ph, Page * ret) {
}
}
ret->dirty = 0;
stasis_page_loaded(ret);
stasis_page_loaded(ret, UNKNOWN_TYPE_PAGE);
unlock(ret->rwlatch);
}
static void phForce(stasis_page_handle_t * ph) {

View file

@ -110,6 +110,9 @@ typedef int32_t slotid_t;
typedef uint16_t pageoff_t;
#define PAGEOFF_T_MAX UINT16_MAX;
typedef int16_t pagetype_t;
#define PAGETYPE_T_MAX INT16_MAX;
/*#define DEBUGGING */
/*#define PROFILE_LATCHES*/
/*#define NO_LATCHES */

View file

@ -227,7 +227,7 @@ static const short SLOT_TYPE_LENGTHS[] = { -1, -1, sizeof(blob_record_t), -1};
#define XPREPARE 8
/* Page types */
#define UNKNOWN_TYPE_PAGE (-1)
#define UNINITIALIZED_PAGE 0
#define SLOTTED_PAGE 1
#define INDIRECT_PAGE 2

View file

@ -123,6 +123,16 @@ BEGIN_C_DECLS
*/
struct Page_s {
pageid_t id;
/**
* The type of this page. Set when the page is loaded from disk. If the page contains
* a header, this will be set automatically. Otherwise, it must be passed in by the code
* that pinned the page.
*/
int pageType;
/**
* The LSN of the page (or an estimate). Set when page is loaded from disk.
* The on-page LSN (if any) is set at page writeback.
*/
lsn_t LSN;
byte *memAddr;
byte dirty;
@ -540,7 +550,7 @@ void stasis_record_free(int xid, Page * p, recordid rid);
int stasis_block_supported(int xid, Page * p);
int stasis_record_freespace(int xid, Page * p);
void stasis_record_compact(Page * p);
void stasis_page_loaded(Page * p);
void stasis_page_loaded(Page * p, pagetype_t type);
void stasis_page_flushed(Page * p);
void stasis_page_cleanup(Page * p);
/**
@ -677,7 +687,7 @@ void stasis_block_done(int xid, Page * p, block_t * done);
*/
typedef struct page_impl {
int page_type;
int has_header;
// ---------- Record access
/**

View file

@ -235,7 +235,7 @@ int SimpleExample(){
/* check to make sure page is recorded as a FIXED_PAGE */
assert( *stasis_page_type_ptr(p1) == FIXED_PAGE);
assert( p1->pageType == FIXED_PAGE);
if (DEBUGP) { printf("\n%lld\n", (long long)pageid1); }
byte * b1 = (byte *) malloc (sizeof (int));

View file

@ -102,11 +102,7 @@ START_TEST(indirectAlloc) {
Page * p = loadPage(xid, page);
int page_type = *stasis_page_type_ptr(p);
assert(page_type == SLOTTED_PAGE);
fail_unless(page_type == SLOTTED_PAGE, NULL);
assert(p->pageType == SLOTTED_PAGE);
releasePage(p);
@ -123,13 +119,7 @@ START_TEST(indirectAlloc) {
p = loadPage(xid, page);
page_type = *stasis_page_type_ptr(p);
assert(page_type == INDIRECT_PAGE);
fail_unless(page_type == INDIRECT_PAGE, NULL);
assert(p->pageType == INDIRECT_PAGE);
printf("{page = %lld, slot = %d, size = %lld}\n", (pageid_t)rid.page, rid.slot, (pageid_t)rid.size);
@ -147,13 +137,7 @@ START_TEST(indirectAlloc) {
p = loadPage(xid, page);
page_type = *stasis_page_type_ptr(p);
assert(page_type == INDIRECT_PAGE);
fail_unless(page_type == INDIRECT_PAGE, NULL);
assert(p->pageType == INDIRECT_PAGE);
printf("{page = %lld, slot = %d, size = %lld}\n", (pageid_t)rid.page, rid.slot, (pageid_t)rid.size);
@ -178,11 +162,7 @@ START_TEST(indirectAccessDirect) {
Page * p = loadPage(xid, page);
int page_type = *stasis_page_type_ptr(p);
assert(page_type == SLOTTED_PAGE);
fail_unless(page_type == SLOTTED_PAGE, NULL);
assert(p->pageType == SLOTTED_PAGE);
releasePage(p);
@ -223,11 +203,7 @@ START_TEST(indirectAccessIndirect) {
Page * p = loadPage(xid, page);
int page_type = *stasis_page_type_ptr(p);
assert(page_type == INDIRECT_PAGE);
fail_unless(page_type == INDIRECT_PAGE, NULL);
assert(p->pageType == INDIRECT_PAGE);
Tcommit(xid);
xid = Tbegin();

View file

@ -323,7 +323,7 @@ static void checkPageIterators(int xid, Page *p,int record_count) {
assertRecordCountSizeType(xid, p, record_count, sizeof(int64_t), NORMAL_SLOT);
if(*stasis_page_type_ptr(p) == SLOTTED_PAGE) {
if(p->pageType == SLOTTED_PAGE) {
recordid other = first;
other.slot = 3;
stasis_record_free(xid,p,other);

View file

@ -77,12 +77,11 @@ START_TEST(pageOpCheckRecovery) {
memset(p.memAddr, 1, USABLE_SIZE_OF_PAGE);
// Reset the page type after overwriting it with memset. Otherwise, Stasis
// will try to interpret it when it flushes the page to disk.
*stasis_page_type_ptr(&p) = 0;
p.pageType = 0;
TpageSetRange(xid, pageid1, 0, p.memAddr, USABLE_SIZE_OF_PAGE);
memset(p.memAddr, 2, USABLE_SIZE_OF_PAGE);
*stasis_page_type_ptr(&p) = 0;
TpageSetRange(xid, pageid2, 0, p.memAddr, USABLE_SIZE_OF_PAGE);
@ -121,23 +120,19 @@ START_TEST(pageOpCheckRecovery) {
assert(pageid2 != pageid3);
memset(p.memAddr, 3, USABLE_SIZE_OF_PAGE);
*stasis_page_type_ptr(&p) = 0;
TpageSetRange(xid, pageid3, 0, p.memAddr, USABLE_SIZE_OF_PAGE);
byte newAddr[USABLE_SIZE_OF_PAGE];
memset(p.memAddr, 1, USABLE_SIZE_OF_PAGE);
*stasis_page_type_ptr(&p) = 0;
TpageGet(xid, pageid1, newAddr);
assert(!memcmp(p.memAddr, newAddr, USABLE_SIZE_OF_PAGE));
memset(p.memAddr, 2, USABLE_SIZE_OF_PAGE);
*stasis_page_type_ptr(&p) = 0;
TpageGet(xid, pageid2, newAddr);
assert(!memcmp(p.memAddr, newAddr, USABLE_SIZE_OF_PAGE));
memset(p.memAddr, 3, USABLE_SIZE_OF_PAGE);
*stasis_page_type_ptr(&p) = 0;
TpageGet(xid, pageid3, newAddr);
assert(!memcmp(p.memAddr, newAddr, USABLE_SIZE_OF_PAGE));
Tcommit(xid);