diff --git a/src/stasis/operations.c b/src/stasis/operations.c index 583a9e2..ac4410a 100644 --- a/src/stasis/operations.c +++ b/src/stasis/operations.c @@ -58,16 +58,14 @@ Operation operationsTable[MAX_OPERATIONS]; void doUpdate(const LogEntry * e, Page * p) { assert(p); - + assertlocked(p->rwlatch); operationsTable[e->update.funcID].run(e, p); - writelock(p->rwlatch,0); DEBUG("OPERATION xid %d Do, %lld {%lld:%lld}\n", e->xid, e->LSN, e->update.page, stasis_page_lsn_read(p)); stasis_page_lsn_write(e->xid, p, e->LSN); - unlock(p->rwlatch); } @@ -89,20 +87,18 @@ void redoUpdate(const LogEntry * e) { // is for this log type. // contrast with doUpdate(), which doesn't check the .id field. - stasis_page_lsn_write(e->xid, p, e->LSN); //XXX do this after run(); - unlock(p->rwlatch); /// XXX keep lock while calling run(); - operationsTable[operationsTable[e->update.funcID].id] .run(e,p); + stasis_page_lsn_write(e->xid, p, e->LSN); } else { DEBUG("OPERATION xid %d skip redo, %lld {%lld:%lld}\n", e->xid, e->LSN, e->update.page, stasis_page_lsn_read(p)); - unlock(p->rwlatch); } + unlock(p->rwlatch); releasePage(p); } -void undoUpdate(const LogEntry * e, lsn_t effective_lsn) { +void undoUpdate(const LogEntry * e, lsn_t effective_lsn, Page * p) { // Only handle update entries assert(e->type == UPDATELOG); @@ -116,19 +112,16 @@ void undoUpdate(const LogEntry * e, lsn_t effective_lsn) { operationsTable[undo].run(e,0); } else { - Page * p = loadPage(e->xid, e->update.page); - writelock(p->rwlatch,0); + assert(p->id == e->update.page); + if(stasis_page_lsn_read(p) < effective_lsn) { DEBUG("OPERATION xid %d Undo, %lld {%lld:%lld}\n", e->xid, e->LSN, e->update.page, stasis_page_lsn_read(p)); - stasis_page_lsn_write(e->xid, p, effective_lsn); // XXX call after run() - unlock(p->rwlatch); // release after run() operationsTable[undo].run(e,p); + stasis_page_lsn_write(e->xid, p, effective_lsn); } else { DEBUG("OPERATION xid %d skip undo, %lld {%lld:%lld}\n", e->xid, e->LSN, e->update.page, stasis_page_lsn_read(p)); - unlock(p->rwlatch); } - releasePage(p); } } diff --git a/src/stasis/operations/alloc.c b/src/stasis/operations/alloc.c index 7de570a..d2a6d75 100644 --- a/src/stasis/operations/alloc.c +++ b/src/stasis/operations/alloc.c @@ -96,8 +96,6 @@ typedef struct { } alloc_arg; static int op_alloc(const LogEntry* e, Page* p) { //(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) { - writelock(p->rwlatch, 0); - assert(e->update.arg_size >= sizeof(alloc_arg)); const alloc_arg* arg = (const alloc_arg*)getUpdateArgs(e); @@ -116,12 +114,11 @@ static int op_alloc(const LogEntry* e, Page* p) { //(int xid, Page * p, lsn_t ls // otherwise, no preimage assert(e->update.arg_size == sizeof(alloc_arg)); } - unlock(p->rwlatch); + return ret; } static int op_dealloc(const LogEntry* e, Page* p) { //deoperate(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) { - writelock(p->rwlatch,0); assert(e->update.arg_size >= sizeof(alloc_arg)); const alloc_arg* arg = (const alloc_arg*)getUpdateArgs(e); recordid rid = { @@ -134,12 +131,10 @@ static int op_dealloc(const LogEntry* e, Page* p) { //deoperate(int xid, Page * stasis_record_free(e->xid, p, rid); assert(stasis_record_type_read(e->xid, p, rid) == INVALID_SLOT); - unlock(p->rwlatch); return 0; } static int op_realloc(const LogEntry* e, Page* p) { //reoperate(int xid, Page *p, lsn_t lsn, recordid rid, const void * dat) { - writelock(p->rwlatch,0); assert(e->update.arg_size >= sizeof(alloc_arg)); const alloc_arg* arg = (const alloc_arg*)getUpdateArgs(e); @@ -157,8 +152,6 @@ static int op_realloc(const LogEntry* e, Page* p) { //reoperate(int xid, Page *p byte * buf = stasis_record_write_begin(e->xid,p,rid); memcpy(buf, arg+1, stasis_record_length_read(e->xid,p,rid)); stasis_record_write_done(e->xid,p,rid,buf); - unlock(p->rwlatch); - return ret; } @@ -411,21 +404,22 @@ compensated_function void Tdealloc(int xid, recordid rid) { p = loadPage(xid, rid.page); } end; - + readlock(p->rwlatch,0); + recordid newrid = stasis_record_dereference(xid, p, rid); allocationPolicyLockPage(allocPolicy, xid, newrid.page); - readlock(p->rwlatch,0); int64_t size = stasis_record_length_read(xid,p,rid); - unlock(p->rwlatch); byte * preimage = malloc(sizeof(alloc_arg)+rid.size); - + ((alloc_arg*)preimage)->slot = rid.slot; ((alloc_arg*)preimage)->size = size; begin_action(releasePage, p) { stasis_record_read(xid, p, rid, preimage+sizeof(alloc_arg)); + unlock(p->rwlatch); + /** @todo race in Tdealloc; do we care, or is this something that the log manager should cope with? */ Tupdate(xid, rid, preimage, sizeof(alloc_arg)+rid.size, OPERATION_DEALLOC); } compensate; @@ -469,7 +463,6 @@ void TinitializeFixedPage(int xid, int pageid, int slotLength) { } static int op_initialize_page(const LogEntry* e, Page* p) { //int xid, Page *p, lsn_t lsn, recordid rid, const void * dat) { - writelock(p->rwlatch, 0); assert(e->update.arg_size == sizeof(alloc_arg)); const alloc_arg* arg = (const alloc_arg*)getUpdateArgs(e); @@ -483,7 +476,6 @@ static int op_initialize_page(const LogEntry* e, Page* p) { //int xid, Page *p, default: abort(); } - unlock(p->rwlatch); return 0; } diff --git a/src/stasis/operations/arrayList.c b/src/stasis/operations/arrayList.c index 0552de7..3f1088d 100644 --- a/src/stasis/operations/arrayList.c +++ b/src/stasis/operations/arrayList.c @@ -67,9 +67,6 @@ static int op_array_list_alloc(const LogEntry* e, Page* p) { int multiplier = tlp->multiplier; int size = tlp->size; - /* Allocing this page -> implicit lock, but latch to conformt to - fixedPage's interface. */ - writelock(p->rwlatch, 0); stasis_fixed_initialize_page(p, sizeof(int), stasis_fixed_records_per_page(sizeof(int))); recordid countRid, multiplierRid, slotSizeRid, maxOffset, firstDataPageRid; @@ -95,7 +92,6 @@ static int op_array_list_alloc(const LogEntry* e, Page* p) { ret.page = firstPage; ret.slot = 0; /* slot = # of slots in array... */ ret.size = size; - unlock(p->rwlatch); return 0; } @@ -190,9 +186,6 @@ compensated_function int TarrayListLength(int xid, recordid rid) { /*----------------------------------------------------------------------------*/ -/** - @todo XXX latching for dereference arraylist rid (and other dereference functions...) -*/ recordid dereferenceArrayListRid(int xid, Page * p, int offset) { readlock(p->rwlatch,0); TarrayListParameters tlp = pageToTLP(xid, p); diff --git a/src/stasis/operations/naiveLinearHash.c b/src/stasis/operations/naiveLinearHash.c index 18c7518..6f3903b 100644 --- a/src/stasis/operations/naiveLinearHash.c +++ b/src/stasis/operations/naiveLinearHash.c @@ -412,7 +412,9 @@ recordid ThashAlloc(int xid, int keySize, int valSize) { assert(headerRidB); Page * p = loadPage(xid, rid.page); + readlock(p->rwlatch,0); recordid * check = malloc(stasis_record_type_to_size(stasis_record_dereference(xid, p, rid).size)); + unlock(p->rwlatch); releasePage(p); rid.slot = 0; Tread(xid, rid, check); diff --git a/src/stasis/operations/pageOperations.c b/src/stasis/operations/pageOperations.c index d3cde42..93096b5 100644 --- a/src/stasis/operations/pageOperations.c +++ b/src/stasis/operations/pageOperations.c @@ -103,11 +103,9 @@ compensated_function int TpageAlloc(int xid /*, int type */) { } int op_fixed_page_alloc(const LogEntry* e, Page* p) { - writelock(p->rwlatch,0); assert(e->update.arg_size == sizeof(int)); int slot_size = *(const int*)getUpdateArgs(e); stasis_fixed_initialize_page(p, slot_size, stasis_fixed_records_per_page(slot_size)); - unlock(p->rwlatch); return 0; } diff --git a/src/stasis/operations/regions.c b/src/stasis/operations/regions.c index 4fd13c0..34df30d 100644 --- a/src/stasis/operations/regions.c +++ b/src/stasis/operations/regions.c @@ -21,13 +21,8 @@ static void TdeallocBoundaryTag(int xid, unsigned int page); /** This doesn't need a latch since it is only initiated within nested top actions (and is local to this file. During abort(), the nested top action's logical undo grabs the necessary latches. - - @todo opearate_alloc_boundary_tag is executed without holding the - proper mutex during REDO. For now this doesn't matter, but it - could matter in the future. */ static int op_alloc_boundary_tag(const LogEntry* e, Page* p) { - writelock(p->rwlatch, 0); stasis_slotted_initialize_page(p); recordid rid = {p->id, 0, sizeof(boundary_tag)}; assert(e->update.arg_size == sizeof(boundary_tag)); @@ -36,7 +31,6 @@ static int op_alloc_boundary_tag(const LogEntry* e, Page* p) { byte * buf = stasis_record_write_begin(e->xid, p, rid); memcpy(buf, getUpdateArgs(e), stasis_record_length_read(e->xid, p, rid)); stasis_record_write_done(e->xid, p, rid, buf); - unlock(p->rwlatch); return 0; } @@ -164,8 +158,9 @@ void regionsInit() { // hack; allocate a fake log entry; pass it into ourselves. LogEntry * e = allocUpdateLogEntry(0,0,OPERATION_ALLOC_BOUNDARY_TAG, p->id, (const byte*)&t, sizeof(boundary_tag)); - + writelock(p->rwlatch,0); op_alloc_boundary_tag(e,p); + unlock(p->rwlatch); FreeLogEntry(e); } holding_mutex = 0; diff --git a/src/stasis/operations/set.c b/src/stasis/operations/set.c index b3bd73f..584b744 100644 --- a/src/stasis/operations/set.c +++ b/src/stasis/operations/set.c @@ -51,7 +51,6 @@ terms specified in this license. #include #include static int op_set(const LogEntry *e, Page *p) { - readlock(p->rwlatch,0); assert(e->update.arg_size >= sizeof(slotid_t) + sizeof(int64_t)); const byte * b = getUpdateArgs(e); recordid rid; @@ -66,12 +65,9 @@ static int op_set(const LogEntry *e, Page *p) { stasis_record_write(e->xid, p, e->LSN, rid, b); - unlock(p->rwlatch); - return 0; } static int op_set_inverse(const LogEntry *e, Page *p) { - readlock(p->rwlatch,0); assert(e->update.arg_size >= sizeof(slotid_t) + sizeof(int64_t)); const byte * b = getUpdateArgs(e); recordid rid; @@ -85,8 +81,6 @@ static int op_set_inverse(const LogEntry *e, Page *p) { stasis_record_write(e->xid, p, e->LSN, rid, b+rid.size); - unlock(p->rwlatch); - return 0; } @@ -97,13 +91,15 @@ typedef struct { int Tset(int xid, recordid rid, const void * dat) { Page * p = loadPage(xid, rid.page); + readlock(p->rwlatch,0); rid = stasis_record_dereference(xid,p,rid); - rid.size = stasis_record_type_to_size(rid.size); if(rid.size > BLOB_THRESHOLD_SIZE) { writeBlob(xid,p,rid,dat); + unlock(p->rwlatch); releasePage(p); } else { + unlock(p->rwlatch); releasePage(p); size_t sz = sizeof(slotid_t) + sizeof(int64_t) + 2 * rid.size; @@ -141,7 +137,6 @@ int TsetRaw(int xid, recordid rid, const void * dat) { } static int op_set_range(const LogEntry* e, Page* p) { - readlock(p->rwlatch,0); int diffLength = e->update.arg_size - sizeof(set_range_t); assert(! (diffLength % 2)); diffLength >>= 1; @@ -160,12 +155,10 @@ static int op_set_range(const LogEntry* e, Page* p) { stasis_record_write(e->xid, p, e->LSN, rid, tmp); free(tmp); - unlock(p->rwlatch); return 0; } static int op_set_range_inverse(const LogEntry* e, Page* p) { - readlock(p->rwlatch,0); int diffLength = e->update.arg_size - sizeof(set_range_t); assert(! (diffLength % 2)); diffLength >>= 1; @@ -183,7 +176,6 @@ static int op_set_range_inverse(const LogEntry* e, Page* p) { stasis_record_write(e->xid, p, e->LSN, rid, tmp); free(tmp); - unlock(p->rwlatch); return 0; } compensated_function void TsetRange(int xid, recordid rid, int offset, int length, const void * dat) { @@ -195,7 +187,6 @@ compensated_function void TsetRange(int xid, recordid rid, int offset, int lengt /// XXX rewrite without malloc (use read_begin, read_done) set_range_t * range = malloc(sizeof(set_range_t) + 2 * length); - byte * record = malloc(rid.size); range->offset = offset; range->slot = rid.slot; @@ -206,18 +197,19 @@ compensated_function void TsetRange(int xid, recordid rid, int offset, int lengt // No further locking is necessary here; readRecord protects the // page layout, but attempts at concurrent modification have undefined // results. (See page.c) - stasis_record_read(xid, p, rid, record); + readlock(p->rwlatch,0); + const byte* record = stasis_record_read_begin(xid,p,rid); // Copy old value into log structure memcpy((byte*)(range + 1) + length, record+offset, length); + stasis_record_read_done(xid,p,rid,record); - free(record); - /** @todo will leak 'range' if interrupted with pthread_cancel */ - begin_action(releasePage, p) { - Tupdate(xid, rid, range, sizeof(set_range_t) + 2 * length, OPERATION_SET_RANGE); - free(range); - } compensate; + unlock(p->rwlatch); + Tupdate(xid, rid, range, sizeof(set_range_t) + 2 * length, OPERATION_SET_RANGE); + free(range); + + releasePage(p); } Operation getSet() { diff --git a/src/stasis/page.c b/src/stasis/page.c index 9ac7915..1aefe77 100644 --- a/src/stasis/page.c +++ b/src/stasis/page.c @@ -150,27 +150,23 @@ int stasis_page_impl_register(page_impl p) { return 0; } void stasis_record_write(int xid, Page * p, lsn_t lsn, recordid rid, const byte *dat) { - + assertlocked(p->rwlatch); assert( (p->id == rid.page) && (p->memAddr != NULL) ); - - readlock(p->rwlatch, 225); assert(rid.size <= BLOB_THRESHOLD_SIZE); byte * buf = stasis_record_write_begin(xid, p, rid); memcpy(buf, dat, stasis_record_length_read(xid, p, rid)); - unlock(p->rwlatch); - + stasis_record_write_done(xid,p,rid,buf); assert( (p->id == rid.page) && (p->memAddr != NULL) ); } int stasis_record_read(int xid, Page * p, recordid rid, byte *buf) { + assertlocked(p->rwlatch); assert(rid.page == p->id); - assert(rid.size <= BLOB_THRESHOLD_SIZE); - readlock(p->rwlatch, 0); const byte * dat = stasis_record_read_begin(xid,p,rid); memcpy(buf, dat, stasis_record_length_read(xid,p,rid)); - unlock(p->rwlatch); + return 0; } @@ -178,6 +174,8 @@ int stasis_record_read(int xid, Page * p, recordid rid, byte *buf) { @todo stasis_record_dereference should dispatch via page_impl... */ recordid stasis_record_dereference(int xid, Page * p, recordid rid) { + assertlocked(p->rwlatch); + int page_type = *stasis_page_type_ptr(p); if(page_type == INDIRECT_PAGE) { rid = dereferenceIndirectRID(xid, rid); @@ -191,6 +189,8 @@ recordid stasis_record_dereference(int xid, Page * p, recordid rid) { static int recordWarnedAboutPageTypeKludge = 0; const byte * stasis_record_read_begin(int xid, Page * p, recordid rid) { + assertlocked(p->rwlatch); + int page_type = *stasis_page_type_ptr(p); if(!page_type) { page_type = FIXED_PAGE; @@ -203,6 +203,8 @@ const byte * stasis_record_read_begin(int xid, Page * p, recordid rid) { return page_impls[page_type].recordRead(xid, p, rid); } byte * stasis_record_write_begin(int xid, Page * p, recordid rid) { + assertlocked(p->rwlatch); + int page_type = *stasis_page_type_ptr(p); if(!page_type) { page_type = FIXED_PAGE; @@ -228,16 +230,19 @@ void stasis_record_write_done(int xid, Page *p, recordid rid, byte *b) { } } int stasis_record_type_read(int xid, Page *p, recordid rid) { + assertlocked(p->rwlatch); if(page_impls[*stasis_page_type_ptr(p)].recordGetType) return page_impls[*stasis_page_type_ptr(p)].recordGetType(xid, p, rid); else return INVALID_SLOT; } void stasis_record_type_write(int xid, Page *p, recordid rid, int type) { + assertlocked(p->rwlatch); page_impls[*stasis_page_type_ptr(p)] .recordSetType(xid, p, rid, type); } int stasis_record_length_read(int xid, Page *p, recordid rid) { + assertlocked(p->rwlatch); return page_impls[*stasis_page_type_ptr(p)] .recordGetLength(xid,p,rid); } diff --git a/src/stasis/recovery2.c b/src/stasis/recovery2.c index 454f37b..edcd37c 100644 --- a/src/stasis/recovery2.c +++ b/src/stasis/recovery2.c @@ -195,7 +195,12 @@ static void Redo() { if(ce->update.page == INVALID_PAGE) { // logical redo of end of NTA; no-op } else { - undoUpdate(ce, e->LSN); + // ughh; need to grab page here so that abort() can be atomic below... + Page * p = loadPage(e->xid, ce->update.page); + writelock(p->rwlatch,0); + undoUpdate(ce, e->LSN, p); + unlock(p->rwlatch); + releasePage(p); } FreeLogEntry(ce); } break; @@ -252,11 +257,26 @@ static void Undo(int recovery) { // we've finished physical undo for this op } else { DEBUG("physical update\n"); + + // atomically log (getting clr), and apply undo. + // otherwise, there's a race where the page's LSN is + // updated before we undo. + Page* p = NULL; + if(e->update.page != INVALID_PAGE) { + p = loadPage(e->xid, e->update.page); + writelock(p->rwlatch,0); + } + // Log a CLR for this entry lsn_t clr_lsn = LogCLR(e); DEBUG("logged clr\n"); - undoUpdate(e, clr_lsn); + undoUpdate(e, clr_lsn, p); + + if(p) { + unlock(p->rwlatch); + releasePage(p); + } DEBUG("rolled back clr's update\n"); } @@ -267,7 +287,7 @@ static void Undo(int recovery) { const LogEntry * ce = LogReadLSN(((CLRLogEntry*)e)->clr.compensated_lsn); if(ce->update.page == INVALID_PAGE) { DEBUG("logical clr\n"); - undoUpdate(ce, 0); // logical undo; effective LSN doesn't matter + undoUpdate(ce, 0, 0); // logical undo; effective LSN doesn't matter } else { DEBUG("physical clr: op %d lsn %lld\n", ce->update.funcID, ce->LSN); // no-op. Already undone during redo. This would redo the original op. diff --git a/src/stasis/transactional2.c b/src/stasis/transactional2.c index 71cfec5..91fe420 100644 --- a/src/stasis/transactional2.c +++ b/src/stasis/transactional2.c @@ -277,12 +277,15 @@ static compensated_function void TactionHelper(int xid, recordid rid, } } end; + writelock(p->rwlatch,0); + e = LogUpdate(&XactionTable[xid % MAX_TRANSACTIONS], p, op, dat, datlen); assert(XactionTable[xid % MAX_TRANSACTIONS].prevLSN == e->LSN); DEBUG("Tupdate() e->LSN: %ld\n", e->LSN); doUpdate(e, p); FreeLogEntry(e); + unlock(p->rwlatch); } // XXX remove this function once it's clear that nobody is failing the assert in Tupdate() @@ -302,9 +305,10 @@ compensated_function void TupdateStr(int xid, recordid rid, compensated_function void Tupdate(int xid, recordid rid, const void *dat, size_t datlen, int op) { Page * p = loadPage(xid, rid.page); + readlock(p->rwlatch,0); recordid rid2 = stasis_record_dereference(xid, p, rid); - assert(rid2.page == rid.page); + unlock(p->rwlatch); TactionHelper(xid, rid, dat, datlen, op, p); releasePage(p); @@ -320,10 +324,14 @@ compensated_function void Tread(int xid, recordid rid, void * dat) { p = loadPage(xid, rid.page); } end; + readlock(p->rwlatch,0); + rid = stasis_record_dereference(xid, p, rid); if(rid.page != p->id) { + unlock(p->rwlatch); releasePage(p); p = loadPage(xid, rid.page); + readlock(p->rwlatch,0); } if(rid.size > BLOB_THRESHOLD_SIZE) { DEBUG("call readBlob %lld %lld %lld\n", (long long)rid.page, (long long)rid.slot, (long long)rid.size); @@ -332,6 +340,7 @@ compensated_function void Tread(int xid, recordid rid, void * dat) { } else { stasis_record_read(xid, p, rid, dat); } + unlock(p->rwlatch); releasePage(p); } @@ -340,8 +349,9 @@ compensated_function void TreadRaw(int xid, recordid rid, void * dat) { try { p = loadPage(xid, rid.page); } end; - + readlock(p->rwlatch,0); stasis_record_read(xid, p, rid, dat); + unlock(p->rwlatch); releasePage(p); } diff --git a/stasis/operations.h b/stasis/operations.h index f1f39bf..6af94b4 100644 --- a/stasis/operations.h +++ b/stasis/operations.h @@ -66,40 +66,17 @@ terms specified in this license. #include #include -#include +#include #include #include #include BEGIN_C_DECLS -/** +/** * function pointer that the operation will run */ -//typedef int (*Function)(int xid, Page * p, slotid_t slot, int64_t arg_size, lsn_t lsn, const void *d); -typedef int (*Function)(const LogEntry* e, Page * p); //, slotid_t slot, int64_t arg_size, lsn_t lsn, const void *d); - -/** - -*/ - -/** - If the Operation struct's sizeofData is set to this value, then the - size field of the recordid is used to determine the size of the - argument passed into the operation. - */ -//#define SIZEOF_RECORD -1 -/** - Logical log entries (such as those used by nested top actions - have a null recordid, as they are not assoicated with a specific page - - If a log entry is not associated with a specific page, the page id can - be overloaded to hold the size of the associated log entry. Contrast - this with the description of SIZEOF_RECORD, which is used when the - operation uses a variable length argument, but is associated with - a specfic page. -*/ -//#define SIZEIS_PAGEID -2 +typedef int (*Function)(const LogEntry* e, Page * p); typedef struct { /** @@ -114,7 +91,7 @@ typedef struct { - Periodically checkpoint, syncing the data store to disk, and writing a checkpoint operation. No writes can be serviced - during the sync, and this implies 'no steal'. See: + during the sync, and this implies 'no steal'. See: @@inproceedings{ woo97accommodating, author = "Seung-Kyoon Woo and Myoung-Ho Kim and Yoon-Joon Lee", @@ -127,7 +104,7 @@ typedef struct { for a more complex scheme involving a hybrid logical/physical logging system that does not implement steal. - The other option: + The other option: - Get rid of operations that span records entirely by splitting complex logical operations into simpler ones. @@ -141,7 +118,6 @@ typedef struct { general, since other transactions could read dirty information from the pinned pages, producsing nonsensical log entries that preceed the current transaction's log entry. - */ /** index into operations table of undo function @@ -174,13 +150,20 @@ typedef struct { extern Operation operationsTable[]; /* [MAX_OPERATIONS]; memset somewhere */ -/** Performs an operation during normal execution. +/** + Performs an operation during normal execution. Does not write to the log, and assumes that the operation's results are not already in the buffer manager. -*/ + + @param e the logentry to play forward. will be played forward regardless of lsn's + + @param p the page the update should be applied to (no support for + logical redo). p->rwlatch should be writelock()'ed + */ void doUpdate(const LogEntry * e, Page * p); -/** Undo the update under normal operation, and during recovery. +/** + Undo the update under normal operation, and during recovery. For logical undo, this unconditionally executes the requested operation. @@ -189,9 +172,11 @@ void doUpdate(const LogEntry * e, Page * p); @param e The log entry containing the operation to be undone. @param clr_lsn The lsn of the clr that records this undo operation. + @param p Like doUpdate(), this function is called during forward operation, + so p->rwlatch must be writelock()'ed */ -void undoUpdate(const LogEntry * e, lsn_t clr_lsn); -/** +void undoUpdate(const LogEntry * e, lsn_t clr_lsn, Page * p); +/** Redoes an operation during recovery. This is different than doUpdate because it checks to see if the operation needs to be redone before redoing it. (if(e->lsn > e->rid.lsn) { doUpdate(e); } return) @@ -199,7 +184,8 @@ void undoUpdate(const LogEntry * e, lsn_t clr_lsn); Also, this is the only function in operations.h that can take either CLR or UPDATE log entries. The other functions can handle update entries. - Does not write to the log. + Does not write to the log. No need for a page parameter, Stasis' recovery is + single-threaded, so redoUpdate can latch the page itself. */ void redoUpdate(const LogEntry * e); diff --git a/test/stasis/check_bTree.c b/test/stasis/check_bTree.c index 2acaad0..d40f30a 100644 --- a/test/stasis/check_bTree.c +++ b/test/stasis/check_bTree.c @@ -202,10 +202,10 @@ void testFunctions(){ Page * p1 = loadPage(xid, pageid1); // calling functions - + writelock(p1->rwlatch,0); initializeNewBTreeNode(xid, p1, rid1); insert(xid, p1, rid1, 3); - + unlock(p1->rwlatch); // cleaning up @@ -277,14 +277,15 @@ int SimpleExample(){ rid2.slot = 0; // @todo This is a messy way to do this... - unlock(p1->rwlatch); - + stasis_record_write(xid, p1, 1, rid2, b1); stasis_record_read(xid, p1, rid2, b2); if (DEBUGP) { printf("\nb2** = %d\n",*((int *) b2));} // initializeNewBTreeNode(p1, rid1); + unlock(p1->rwlatch); + releasePage(p1); Tcommit(xid); diff --git a/test/stasis/check_operations.c b/test/stasis/check_operations.c index ea45db0..427b2ef 100644 --- a/test/stasis/check_operations.c +++ b/test/stasis/check_operations.c @@ -101,20 +101,24 @@ START_TEST(operation_physical_do_undo) { DEBUG("B\n"); p = loadPage(xid, rid.page); - // manually fill in UNDO field - //stasis_record_read(xid, p, rid, ((byte*)(setToTwo) + sizeofLogEntry(setToTwo) - rid.size)); - + writelock(p->rwlatch,0); + // manually fill in UNDO field stasis_record_write(xid, p, lsn, rid, (byte*)&buf); + unlock(p->rwlatch); releasePage(p); setToTwo->LSN = 10; DEBUG("C\n"); p = loadPage(xid, rid.page); + writelock(p->rwlatch,0); doUpdate(setToTwo, p); /* PAGE LSN= 10, value = 2. */ + unlock(p->rwlatch); releasePage(p); p = loadPage(xid, rid.page); + writelock(p->rwlatch,0); stasis_record_read(xid, p, rid, (byte*)&buf); + unlock(p->rwlatch); releasePage(p); assert(buf == 2); @@ -125,29 +129,33 @@ START_TEST(operation_physical_do_undo) { p = loadPage(xid, rid.page); readlock(p->rwlatch,0); assert(10 == stasis_page_lsn_read(p)); // "page lsn not set correctly." - unlock(p->rwlatch); setToTwo->LSN = 5; - undoUpdate(setToTwo, 12); //, p, 8); /* Should succeed: log LSN is lower than page LSN, but effective LSN is higher than page LSN */ + undoUpdate(setToTwo, 12, p); /* Should succeed: log LSN is lower than page LSN, but effective LSN is higher than page LSN */ + + unlock(p->rwlatch); releasePage(p); p = loadPage(xid, rid.page); + readlock(p->rwlatch,0); stasis_record_read(xid, p, rid, (byte*)&buf); + unlock(p->rwlatch); releasePage(p); assert(buf == 1); - + DEBUG("E\n"); redoUpdate(setToTwo); - p = loadPage(xid, rid.page); + readlock(p->rwlatch,0); stasis_record_read(xid, p, rid, (byte*)&buf); + unlock(p->rwlatch); releasePage(p); assert(buf == 1); - + /* Now, simulate scenarios from normal operation: do the operation, and update the LSN, (update happens) then undo, and update the LSN again. (undo happens) @@ -162,13 +170,14 @@ START_TEST(operation_physical_do_undo) { buf = 1; p = loadPage(xid, rid.page); + writelock(p->rwlatch,0); stasis_record_write(xid, p, lsn, rid, (byte*)&buf); + unlock(p->rwlatch); releasePage(p); - /* Trace of test: + /* Trace of test: PAGE LSN LOG LSN CLR LSN TYPE SUCCEED? - - 2 10 - do write YES (C) + 2 10 - do write YES (C) 10 5 8 undo write YES (D) 8 5 - redo write NO (E) 8 10 - redo write YES (F) @@ -190,14 +199,15 @@ START_TEST(operation_physical_do_undo) { redoUpdate(setToTwo); p = loadPage(xid, rid.page); + writelock(p->rwlatch,0); stasis_record_read(xid, p, rid, (byte*)&buf); assert(buf == 2); DEBUG("G undo set to 2\n"); - undoUpdate(setToTwo, 20); //, p, 20); /* Succeeds -- 20 is the 'CLR' entry's lsn.*/ + undoUpdate(setToTwo, 20, p); /* Succeeds -- 20 is the 'CLR' entry's lsn.*/ stasis_record_read(xid, p, rid, (byte*)&buf); - + unlock(p->rwlatch); assert(buf == 1); releasePage(p); @@ -205,7 +215,7 @@ START_TEST(operation_physical_do_undo) { redoUpdate(setToTwo); /* Fails */ p = loadPage(xid, rid.page); - + writelock(p->rwlatch,0); stasis_record_read(xid, p, rid, (byte*)&buf); assert(buf == 1); @@ -214,12 +224,16 @@ START_TEST(operation_physical_do_undo) { DEBUG("I redo set to 2\n"); + unlock(p->rwlatch); releasePage(p); - redoUpdate(setToTwo); /* Succeeds */ - p = loadPage(xid, rid.page); - stasis_record_read(xid, p, rid, (byte*)&buf); + redoUpdate(setToTwo); /* Succeeds */ + + p = loadPage(xid, rid.page); + readlock(p->rwlatch,0); + stasis_record_read(xid, p, rid, (byte*)&buf); assert(buf == 2); + unlock(p->rwlatch); releasePage(p); Tdeinit(); } @@ -233,25 +247,25 @@ START_TEST(operation_prepare) { /* Check this sequence prepare, action, crash, recover, read, action, abort, read again. */ Tinit(); - + int loser = Tbegin(); int prepared = Tbegin(); int winner = Tbegin(); - + recordid a = Talloc(winner, sizeof(int)); recordid b = Talloc(winner, sizeof(int)); - + int one =1; int two =2; int three=3; Tset(winner, a, &one); Tset(winner, b, &one); - + Tset(loser, a, &three); Tset(prepared, b, &three); - Tprepare(prepared); //, a); + Tprepare(prepared); Tset(prepared, b, &two); @@ -416,7 +430,7 @@ START_TEST(operation_instant_set) { Tinit(); int xid = Tbegin(); - recordid rid = Talloc(xid, sizeof(int)); /** @todo probably need an immediate version of TpageAlloc... */ + recordid rid = Talloc(xid, sizeof(int)); // @todo probably need an immediate version of TpageAlloc... int one = 1; int two = 2; int three = 3; @@ -447,7 +461,7 @@ START_TEST(operation_instant_set) { -} END_TEST +} END_TEST START_TEST(operation_set_range) { Tinit(); @@ -589,11 +603,18 @@ Suite * check_suite(void) { tcase_set_timeout(tc, 0); // disable timeouts /* Sub tests are added, one per line, here */ + /*(void)operation_physical_do_undo; + (void)operation_nestedTopAction; + (void)operation_set_range; + (void)operation_prepare; + (void)operation_alloc_test; + (void)operation_array_list;*/ + tcase_add_test(tc, operation_physical_do_undo); tcase_add_test(tc, operation_nestedTopAction); tcase_add_test(tc, operation_instant_set); tcase_add_test(tc, operation_set_range); - if(loggerType != LOG_TO_MEMORY) { + if(loggerType != LOG_TO_MEMORY) { tcase_add_test(tc, operation_prepare); } tcase_add_test(tc, operation_alloc_test); diff --git a/test/stasis/check_page.c b/test/stasis/check_page.c index f057624..f4902e0 100644 --- a/test/stasis/check_page.c +++ b/test/stasis/check_page.c @@ -88,16 +88,15 @@ static void * multiple_simultaneous_pages ( void * arg_ptr) { unlock(p->rwlatch); pthread_mutex_unlock(&lsn_mutex); - if(! first ) { + if(!first) { for(k = 0; k < 100; k++) { - stasis_record_read(1, p, rid[k], (byte*)&j); - - assert((j + 1) == i + k); writelock(p->rwlatch,0); + stasis_record_read(1, p, rid[k], (byte*)&j); + assert((j + 1) == i + k); stasis_record_free(-1, p, rid[k]); stasis_page_lsn_write(-1, p, this_lsn); unlock(p->rwlatch); - sched_yield(); + sched_yield(); } } @@ -175,9 +174,9 @@ static void* worker_thread(void * arg_ptr) { pthread_mutex_unlock(&lsn_mutex); if(! first ) { + writelock(p->rwlatch,0); stasis_record_read(1, p, rid, (byte*)&j); assert((j + 1) == i); - writelock(p->rwlatch,0); stasis_record_free(-1, p, rid); stasis_page_lsn_write(-1, p, this_lsn); unlock(p->rwlatch);