fix handling of loadPageForOperation during recovery; initialize rid.size in slottedLast

This commit is contained in:
Sears Russell 2010-03-22 19:56:55 +00:00
parent 85abeb9d77
commit 6deb34f5b2
7 changed files with 11 additions and 9 deletions

View file

@ -168,9 +168,12 @@ Page * loadUninitializedPage(int xid, pageid_t pageid) {
return bm->loadUninitPageImpl(bm, xid, pageid); return bm->loadUninitPageImpl(bm, xid, pageid);
} }
Page * loadPageForOperation(int xid, pageid_t pageid, int op) { Page * loadPageForOperation(int xid, pageid_t pageid, int op, int is_recovery) {
pagetype_t type = stasis_operation_type(op); pagetype_t type = stasis_operation_type(op);
Page * p; Page * p;
if(is_recovery && type == UNINITIALIZED_PAGE) {
type = UNKNOWN_TYPE_PAGE; // XXX what about segment pages? Presumably, the thing that initializes a segment passes in UNKNOWN.
}
if(pageid == SEGMENT_PAGEID) { if(pageid == SEGMENT_PAGEID) {
assert(type = SEGMENT_PAGE); assert(type = SEGMENT_PAGE);
p = 0; p = 0;

View file

@ -1,4 +1,3 @@
// Multiple include trick. // Multiple include trick.
#define HASH_ENTRY(x) bh_hash##x #define HASH_ENTRY(x) bh_hash##x
#define HASH_FCN(val,y,z) (*(pageid_t*)val) #define HASH_FCN(val,y,z) (*(pageid_t*)val)

View file

@ -168,8 +168,7 @@ int TregionNextBoundaryTag(int xid, pageid_t* pid, boundary_tag * tag, int type)
pthread_mutex_lock(&region_mutex); pthread_mutex_lock(&region_mutex);
assert(0 == holding_mutex); assert(0 == holding_mutex);
holding_mutex = pthread_self(); holding_mutex = pthread_self();
// XXX can't distinguish between EOF and error (error could happen if the boundary tag is consolidated in race with our caller)
int ret = readBoundaryTag(xid, *pid-1, tag); int ret = readBoundaryTag(xid, *pid-1, tag);
if(ret) { if(ret) {
while(1) { while(1) {

View file

@ -352,6 +352,7 @@ static recordid slottedFirst(int xid, Page *p) {
static recordid slottedLast(int xid, Page *p) { static recordid slottedLast(int xid, Page *p) {
recordid rid = {p->id, -1, 0 }; recordid rid = {p->id, -1, 0 };
rid.slot = (*stasis_page_slotted_numslots_cptr(p)) - 1; rid.slot = (*stasis_page_slotted_numslots_cptr(p)) - 1;
rid.size = *stasis_page_slotted_slot_length_cptr(p, rid.slot);
return rid; return rid;
} }

View file

@ -173,7 +173,7 @@ static void stasis_recovery_redo(stasis_log_t* log, stasis_transaction_table_t *
} else if(e->update.page == SEGMENT_PAGEID) { } else if(e->update.page == SEGMENT_PAGEID) {
stasis_operation_redo(e,0); stasis_operation_redo(e,0);
} else { } else {
Page * p = loadPageForOperation(e->xid, e->update.page, e->update.funcID); Page * p = loadPageForOperation(e->xid, e->update.page, e->update.funcID, 1);
writelock(p->rwlatch,0); writelock(p->rwlatch,0);
stasis_operation_redo(e,p); stasis_operation_redo(e,p);
unlock(p->rwlatch); unlock(p->rwlatch);
@ -192,7 +192,7 @@ static void stasis_recovery_redo(stasis_log_t* log, stasis_transaction_table_t *
// need to grab latch page here so that Tabort() can be atomic // need to grab latch page here so that Tabort() can be atomic
// below... // below...
Page * p = loadPageForOperation(e->xid, ce->update.page, ce->update.funcID); Page * p = loadPageForOperation(e->xid, ce->update.page, ce->update.funcID, 1);
writelock(p->rwlatch,0); writelock(p->rwlatch,0);
stasis_operation_undo(ce, e->LSN, p); stasis_operation_undo(ce, e->LSN, p);
unlock(p->rwlatch); unlock(p->rwlatch);
@ -271,7 +271,7 @@ static void stasis_recovery_undo(stasis_log_t* log, stasis_transaction_table_t *
// atomically log (getting clr), and apply undo. // atomically log (getting clr), and apply undo.
// otherwise, there's a race where the page's LSN is // otherwise, there's a race where the page's LSN is
// updated before we undo. // updated before we undo.
Page* p = loadPageForOperation(e->xid, e->update.page, e->update.funcID); Page* p = loadPageForOperation(e->xid, e->update.page, e->update.funcID, 1);
if(p) writelock(p->rwlatch,0); if(p) writelock(p->rwlatch,0);
// Log a CLR for this entry // Log a CLR for this entry

View file

@ -145,7 +145,7 @@ void Tupdate(int xid, pageid_t page,
LogEntry * e; LogEntry * e;
stasis_transaction_table_entry_t * xact = stasis_transaction_table_get(stasis_transaction_table, xid); stasis_transaction_table_entry_t * xact = stasis_transaction_table_get(stasis_transaction_table, xid);
assert(xact); assert(xact);
Page * p = loadPageForOperation(xid, page, op); Page * p = loadPageForOperation(xid, page, op, 0);
if(globalLockManager.writeLockPage && p) { if(globalLockManager.writeLockPage && p) {
globalLockManager.writeLockPage(xid, page); globalLockManager.writeLockPage(xid, page);

View file

@ -96,7 +96,7 @@ Page * loadPageOfType(int xid, pageid_t pageid, pagetype_t type);
Page * loadUninitializedPage(int xid, pageid_t pageid); Page * loadUninitializedPage(int xid, pageid_t pageid);
Page * loadPageForOperation(int xid, pageid_t pageid, int op); Page * loadPageForOperation(int xid, pageid_t pageid, int op, int is_recovery);
/** /**
Get a page from cache. This function should never block on I/O. Get a page from cache. This function should never block on I/O.