From 126ae313925ead625e489a09971d8d5daa9a9533 Mon Sep 17 00:00:00 2001 From: Sears Russell Date: Wed, 21 Jul 2004 02:13:28 +0000 Subject: [PATCH] bufferMananger is *really* reentrant now! (I think) However, the same thread may not load a page more than once. So, LLADD has to be altered so that it never calls loadPage() more than once per operation (more efficient), and/or pin/unpin need to be implemented. An easy way to do pin/unpin is to have them pull things out of the cache replacement list, and then re-insert them. --- ChangeLog | 140 ++++++++++++++++++++++++++++++ lladd/common.h | 2 +- src/libdfa/rw.c | 1 + src/lladd/bufferManager.c | 18 ++-- src/lladd/common.c | 3 +- src/lladd/logger/logWriter.c | 4 +- src/lladd/operations.c | 9 +- src/lladd/page.c | 58 +++++++------ src/lladd/page.h | 6 +- src/lladd/pageCache.c | 120 ++++++++++++++----------- src/lladd/pageFile.c | 13 +-- src/lladd/recovery2.c | 4 +- src/lladd/transactional2.c | 11 ++- test/lladd/check_bufferManager.c | 60 ++++++++----- test/lladd/check_operations.c | 14 +-- test/lladd/check_transactional2.c | 12 +-- 16 files changed, 333 insertions(+), 142 deletions(-) diff --git a/ChangeLog b/ChangeLog index e69de29..38536a0 100644 --- a/ChangeLog +++ b/ChangeLog @@ -0,0 +1,140 @@ +2004-07-20 sears + + * bufferManager.c, common.c, operations/alloc.c, page.c, page.h, pageCache.c, pageFile.c, transactional2.c: + Continuing work on multi-threading. r/w access to buffer manager getting close, but still buggy. + + * logger/logWriter.c, pageFile.c, pageFile.h, bufferManager.c, common.c, latches.h, page.c, page.h, pageCache.c: + pageCache.c is now re-entrant. + +2004-07-15 sears + + * Makefile.am, blobManager.c, bufferManager.c, logger/logWriter.c, page.h, pageCache.c, pageFile.c, pageFile.h: + pageCache is now re-entrant, in theory. + +2004-07-14 sears + + * blobManager.c, blobManager.h, bufferManager.c, logger/logHandle.c, logger/logHandle.h, logger/logWriter.c, logger/logWriter.h, logger/logger2.c, operations.c, operations/alloc.c, operations/prepare.c, page.c, page.h, pageCache.c, recovery2.c, transactional2.c: + Moved page.h and some of the logging headers out of the public API. + + * blobManager.c, bufferManager.c, latches.h, logger/logEntry.c, logger/logWriter.c, logger/logger2.c, operations.c, operations/alloc.c, page.c, pageCache.c, recovery2.c, transactional2.c: + Cleaning up bufferManager / page for locking. Want to limit access to the Page struct. + +2004-07-13 sears + + * page.c, pageCache.c: + page.c is re-entrant (mostly), and now reuses DeRalloced space properly. (For now, BufferManager still is not re-entrant, and also prevents space from being reused.) + +2004-07-09 sears + + * bufferManager.c, page.c: Made pageWriteLSN static. + +2004-07-06 sears + + * bufferManager.c, page.c: More documentation fixes. + + * blobManager.h: Fixed groupings for LLADD API docs. + + * operations/alloc.c, operations/lladdhash.c, logger/logWriter.c, logger/logger2.c, logger/logEntry.c, operations.c, page.c, pageCache.c, recovery2.c, stats.c, transactional2.c, Makefile.am, blobManager.c, blobManager.h, bufferManager.c, common.c, latches.h, linkedlist.c: + Ported LLADD to Fedora, cleaned up autoconf setup, and numerouse #includes that were problematic. + +2004-07-04 sears + + * Makefile.am, blobManager.c, blobManager.h, bufferManager.c, common.c, logger/logHandle.c, logger/logWriter.c, logger/logger2.c, operations.c, operations/alloc.c, page.c, pageCache.c, recovery2.c, stats.c, transactional2.c: + Documentation update, added latch profiling tools. + +2004-07-01 sears + + * logger/logHandle.c, logger/logWriter.c, operations.c: + Log truncation. (But no checkpoints, so it doesn't get called... it does pass testing though. :) + +2004-06-30 sears + + * Makefile.am, blobManager.c, blobManager.h, bufferManager.c, logger/logWriter.c, operations/alloc.c, operations/prepare.c, page.c, pageCache.c, transactional2.c: + Logwriter can now handler partial log entries correctly (it ignores them) + Buffer Mananger no longer steals pages pre-maturely + Alloc is an operation, and correct. + Caching broken out into a new module + Wrote a utility to translate a log file into human-readable ascii. + +2004-06-28 sears + + * logger/logstreamer.h, operations/alloc.c, operations/prepare.c, blobManager.c, blobManager.h, bufferManager.c, logger/logEntry.c, logger/logWriter.c, logger/logger.c, logger/logger2.c, logger/logparser.c, logger/logparser.h, logger/logstreamer.c, page.c, recovery.c, recovery.h, recovery2.c, transactional.c, transactional2.c: + Cleaned out old cruft, such as commented out code, dead data structures, and dead files. sloccount went from $75,000 to $50,000 on src/lladd. ;) + + * blobManager.c, blobManager.h, bufferManager.c, logger/logger2.c, operations.c, operations/alloc.c, operations/decrement.c, operations/increment.c, operations/set.c, page.c, recovery2.c, transactional2.c: + Bugfixes ; blobs pass regression. Next stop: Delete old cruft. + +2004-06-26 sears + + * blobManager.c: Wrote blobmanager, didn't compile it yet. + +2004-06-25 sears + + * blobManager.c, blobManager.h: blob manager commit. + + * Makefile.am, bufferManager.c, page.c: + Preliminary implementation of blobManager + +2004-06-24 sears + + * logger/.deps/logger.Po, logger/.deps/logparser.Po, logger/.deps/logstreamer.Po: + Need to send laptop in for warranty service, so it's time to put this code into CVS. :) + + Vs. the paper version of LLADD, this version has a re-written logger + recovery system. It also includes unit tests and API documentation. + + * logger/.deps/logger.Po, logger/.deps/logparser.Po, logger/.deps/logstreamer.Po: + New file. + + * logger/logEntry.c, logger/logWriter.c, logger/logger.c, logger/logparser.c, logger/logparser.h: + Need to send laptop in for warranty service, so it's time to put this code into CVS. :) + + Vs. the paper version of LLADD, this version has a re-written logger + recovery system. It also includes unit tests and API documentation. + + * logger/logEntry.c, logger/logWriter.c, logger/logger.c, logger/logparser.c, logger/logparser.h: + New file. + + * logger/Makefile.am-old, logger/logHandle.c, logger/logger2.c, logger/logstreamer.c, logger/logstreamer.h, operations/.deps/increment.Po, operations/.deps/set.Po: + Need to send laptop in for warranty service, so it's time to put this code into CVS. :) + + Vs. the paper version of LLADD, this version has a re-written logger + recovery system. It also includes unit tests and API documentation. + + * logger/Makefile.am-old, logger/logHandle.c, logger/logger2.c, logger/logstreamer.c, logger/logstreamer.h, operations/.deps/increment.Po, operations/.deps/set.Po: + New file. + + * operations/.deps/decrement.Po, operations/.deps/lladdhash.Po, operations/.deps/prepare.Po, operations/alloc.c, operations/decrement.c, operations/increment.c, operations/lladdhash.c, operations/prepare.c: + Need to send laptop in for warranty service, so it's time to put this code into CVS. :) + + Vs. the paper version of LLADD, this version has a re-written logger + recovery system. It also includes unit tests and API documentation. + + * operations/.deps/decrement.Po, operations/.deps/lladdhash.Po, operations/.deps/prepare.Po, operations/alloc.c, operations/decrement.c, operations/increment.c, operations/lladdhash.c, operations/prepare.c: + New file. + + * linkedlist.c, linkedlist.h, operations.c, operations/Makefile.am-old, operations/set.c, transactional.c: + Need to send laptop in for warranty service, so it's time to put this code into CVS. :) + + Vs. the paper version of LLADD, this version has a re-written logger + recovery system. It also includes unit tests and API documentation. + + * linkedlist.c, linkedlist.h, operations.c, operations/Makefile.am-old, operations/set.c, transactional.c: + New file. + + * page.c, transactional2.c: + Need to send laptop in for warranty service, so it's time to put this code into CVS. :) + + Vs. the paper version of LLADD, this version has a re-written logger + recovery system. It also includes unit tests and API documentation. + + * page.c, transactional2.c: New file. + + * Makefile.am, bufferManager.c, recovery.c, recovery.h: + Need to send laptop in for warranty service, so it's time to put this code into CVS. :) + + Vs. the paper version of LLADD, this version has a re-written logger + recovery system. It also includes unit tests and API documentation. + + * Makefile.am, bufferManager.c, recovery.c, recovery.h: New file. + + * 1, recovery2.c: + Need to send laptop in for warranty service, so it's time to put this code into CVS. :) + + Vs. the paper version of LLADD, this version has a re-written logger + recovery system. It also includes unit tests and API documentation. + + * 1, recovery2.c: New file. + diff --git a/lladd/common.h b/lladd/common.h index 20429cc..0323fc3 100644 --- a/lladd/common.h +++ b/lladd/common.h @@ -99,7 +99,7 @@ extern int errno; #define lsn_t long -/*#define DEBUGGING */ +/*#define DEBUGGING */ #define PROFILE_LATCHES #ifdef DEBUGGING diff --git a/src/libdfa/rw.c b/src/libdfa/rw.c index 5274340..9487eee 100644 --- a/src/libdfa/rw.c +++ b/src/libdfa/rw.c @@ -83,6 +83,7 @@ void downgradelock(rwl * lock) { assert(lock->writers); lock->writers--; lock->readers++; + pthread_cond_signal (lock->writeOK); pthread_cond_broadcast(lock->readOK); pthread_mutex_unlock(lock->mut); } diff --git a/src/lladd/bufferManager.c b/src/lladd/bufferManager.c index 50c3ffc..0bdb533 100644 --- a/src/lladd/bufferManager.c +++ b/src/lladd/bufferManager.c @@ -135,13 +135,17 @@ recordid ralloc(int xid, /*lsn_t lsn,*/ long size) { assert(size < BLOB_THRESHOLD_SIZE || size == BLOB_SLOT); - pthread_mutex_lock(&lastFreepage_mutex); - - while(freespace(p = loadPage(lastFreepage)) < size ) { unlock(p->loadlatch); lastFreepage++; } + + pthread_mutex_lock(&lastFreepage_mutex); + while(freespace(p = loadPage(lastFreepage)) < size ) { + unlock(p->loadlatch); + lastFreepage++; + } ret = pageRalloc(p, size); unlock(p->loadlatch); + pthread_mutex_unlock(&lastFreepage_mutex); /* DEBUG("alloced rid = {%d, %d, %ld}\n", ret.page, ret.slot, ret.size); */ @@ -235,7 +239,7 @@ void setSlotType(int pageid, int slot, int type) { @see finalize, removePendingEvent */ -void addPendingEvent(int pageid){ +/*void addPendingEvent(int pageid){ Page * p; @@ -266,7 +270,7 @@ void addPendingEvent(int pageid){ unlock(p->loadlatch); -} + }*/ /** @@ -281,7 +285,7 @@ void addPendingEvent(int pageid){ @todo as implemented, loadPage() ... doOperation is not atomic! */ -void removePendingEvent(int pageid) { +/*void removePendingEvent(int pageid) { Page * p; @@ -303,6 +307,6 @@ void removePendingEvent(int pageid) { unlock(p->loadlatch); -} + }*/ diff --git a/src/lladd/common.c b/src/lladd/common.c index 50434eb..5e992a9 100644 --- a/src/lladd/common.c +++ b/src/lladd/common.c @@ -46,7 +46,8 @@ int __lladd_pthread_mutex_lock(lladd_pthread_mutex_t *mutex, char * file, int li pthread_yield(); if(blockCount >= 10000 && ! (blockCount % 10000)) { - DEBUG("Spinning at %s:%d, %ld times. Held by: %s\n", file, line, blockCount, mutex->last_acquired_at); + printf("Spinning at %s:%d, %ld times. Held by: %s\n", file, line, blockCount, mutex->last_acquired_at); + fflush(NULL); } } diff --git a/src/lladd/logger/logWriter.c b/src/lladd/logger/logWriter.c index 8fba7da..98051e0 100644 --- a/src/lladd/logger/logWriter.c +++ b/src/lladd/logger/logWriter.c @@ -201,10 +201,10 @@ int writeLogEntry(LogEntry * e) { const long size = sizeofLogEntry(e); if(e->type == UPDATELOG) { - addPendingEvent(e->contents.update.rid.page); + /* addPendingEvent(e->contents.update.rid.page); */ } if(e->type == CLRLOG) { - addPendingEvent(e->contents.clr.rid.page); + /* addPendingEvent(e->contents.clr.rid.page); */ } if(e->xid == -1) { /* Don't write log entries for recovery xacts. */ diff --git a/src/lladd/operations.c b/src/lladd/operations.c index facd1b3..a15bc53 100644 --- a/src/lladd/operations.c +++ b/src/lladd/operations.c @@ -51,11 +51,12 @@ Operation operationsTable[MAX_OPERATIONS]; void doUpdate(const LogEntry * e) { + DEBUG("OPERATION update arg length %d, lsn = %ld\n", e->contents.update.argSize, e->LSN); operationsTable[e->contents.update.funcID].run(e->xid, e->LSN, e->contents.update.rid, getUpdateArgs(e)); - removePendingEvent(e->contents.update.rid.page); + /* removePendingEvent(e->contents.update.rid.page); */ } @@ -70,7 +71,7 @@ void redoUpdate(const LogEntry * e) { doUpdate(e); } else { DEBUG("OPERATION Skipping redo, %ld <= %ld {%d %d %ld}\n", e->LSN, pageLSN, rid.page, rid.slot, rid.size); - removePendingEvent(e->contents.update.rid.page); + /* removePendingEvent(e->contents.update.rid.page); */ } } else if(e->type == CLRLOG) { LogEntry * f = readLSNEntry(e->contents.clr.thisUpdateLSN); @@ -85,7 +86,7 @@ void redoUpdate(const LogEntry * e) { undoUpdate(f, e->LSN); } else { DEBUG("OPERATION Skiping undo for clr, %ld {%d %d %ld}\n", f->LSN, rid.page, rid.slot, rid.size); - removePendingEvent(e->contents.update.rid.page); + /* removePendingEvent(e->contents.update.rid.page); */ } } else { assert(0); @@ -121,7 +122,7 @@ void undoUpdate(const LogEntry * e, lsn_t clr_lsn) { DEBUG("OPERATION Skipping undo, %ld {%d %d %ld}\n", e->LSN, rid.page, rid.slot, rid.size); } - removePendingEvent(e->contents.update.rid.page); + /* removePendingEvent(e->contents.update.rid.page); */ /* printf("Undo done."); fflush(NULL); */ diff --git a/src/lladd/page.c b/src/lladd/page.c index c3697a9..ddd97ab 100644 --- a/src/lladd/page.c +++ b/src/lladd/page.c @@ -96,7 +96,7 @@ terms specified in this license. #include #include - +#include /* TODO: Combine with buffer size... */ static int nextPage = 0; @@ -144,8 +144,8 @@ static pthread_mutex_t pageAllocMutex; Page pool[MAX_BUFFER_SIZE+1]; -int isValidSlot(byte *memAddr, int slot); -void invalidateSlot(byte *memAddr, int slot); +static int isValidSlot(byte *memAddr, int slot); +static void invalidateSlot(byte *memAddr, int slot); void pageDeRalloc(Page * page, recordid rid); /** @@ -248,9 +248,9 @@ static const byte *slotMemAddr(const byte *memAddr, int slotNum) { lsn_t pageReadLSN(const Page * page) { lsn_t ret; - /* readlock(page->rwlatch, 259); */ + readlock(page->rwlatch, 259); ret = *(long *)(page->memAddr + START_OF_LSN); - /* readunlock(page->rwlatch); */ + readunlock(page->rwlatch); return ret; } @@ -263,8 +263,9 @@ lsn_t pageReadLSN(const Page * page) { * @param page You must have a writelock on page before calling this function. */ static void pageWriteLSN(Page * page) { - + /* unlocked since we're only called by a function that holds the writelock. */ *(long *)(page->memAddr + START_OF_LSN) = page->LSN; + } static int unlocked_freespace(Page * page); @@ -420,19 +421,19 @@ recordid pageSlotRalloc(Page * page, lsn_t lsn, recordid rid) { return rid; } -int isValidSlot(byte *memAddr, int slot) { +static int isValidSlot(byte *memAddr, int slot) { return (getSlotOffset(memAddr, slot) != INVALID_SLOT) ? 1 : 0; } -void invalidateSlot(byte *memAddr, int slot) { - setSlotOffset(memAddr, slot, INVALID_SLOT); +static void invalidateSlot(byte *memAddr, int slot) { + setSlotOffset(memAddr, slot, INVALID_SLOT); } void pageDeRalloc(Page * page, recordid rid) { - writelock(page->rwlatch, 416); + /* Don't need any locking, since we don't support concurrent access to the same slot.. */ invalidateSlot(page->memAddr, rid.slot); - writeunlock(page->rwlatch); + } /** @@ -524,10 +525,10 @@ static void setSlotOffset(byte *memAddr, int slot, int offset) { static void setSlotLength(byte *memAddr, int slot, int length) { setSecondHalfOfWord((int*)(unsigned int*)slotMemAddr(memAddr, slot), length); } - -int isBlobSlot(byte *pageMemAddr, int slot) { +/* +static int isBlobSlot(byte *pageMemAddr, int slot) { return BLOB_SLOT == getSlotLength(pageMemAddr, slot); -} + }*/ /* This needs should trust the rid (since the caller needs to @@ -551,7 +552,7 @@ void pageReadRecord(int xid, Page * page, recordid rid, byte *buff) { void pageWriteRecord(int xid, Page * page, recordid rid, lsn_t lsn, const byte *data) { byte *rec; - readlock(page->rwlatch, 529); + writelock(page->rwlatch, 529); assert(rid.size < PAGE_SIZE); rec = page->memAddr + getSlotOffset(page->memAddr, rid.slot); @@ -563,7 +564,7 @@ void pageWriteRecord(int xid, Page * page, recordid rid, lsn_t lsn, const byte * page->LSN = lsn; pageWriteLSN(page); - readunlock(page->rwlatch); + writeunlock(page->rwlatch); } @@ -571,8 +572,9 @@ void pageReallocNoLock(Page *p, int id) { p->id = id; p->LSN = 0; p->dirty = 0; - p->pending = 0; - p->waiting = 0; + /* assert(p->pending == 0); + assert(p->waiting == 1); + p->waiting = 0;*/ } void pageRealloc(Page *p, int id) { @@ -601,7 +603,7 @@ Page *pageAlloc(int id) { page->loadlatch = initlock(); /* pthread_mutex_init(&page->pending_mutex, NULL);*/ - pthread_cond_init(&page->noMorePending, NULL); + /* pthread_cond_init(&page->noMorePending, NULL); */ page->memAddr = malloc(PAGE_SIZE); @@ -613,9 +615,9 @@ Page *pageAlloc(int id) { pthread_mutex_unlock(&pageAllocMutex); - - page->pending = 0; - page->waiting = 0; + /**@todo if re-implement pending event thing, these lines need to be protected by a lock!? */ + /* page->pending = 0; + page->waiting = 1; */ return page; } @@ -678,16 +680,20 @@ int pageTest() { return 0; } +/** @todo: Should the caller need to obtain the writelock when calling pageSetSlotType? */ void pageSetSlotType(Page * p, int slot, int type) { assert(type > PAGE_SIZE); - - /* setSlotLength does the locking for us. */ + writelock(p->rwlatch, 686); setSlotLength(p->memAddr, slot, type); - + unlock(p->rwlatch); } int pageGetSlotType(Page * p, int slot, int type) { - int ret = getSlotLength(p->memAddr, slot); + int ret; + readlock(p->rwlatch, 693); + ret = getSlotLength(p->memAddr, slot); + unlock(p->rwlatch); + /* getSlotType does the locking for us. */ return ret > PAGE_SIZE ? ret : NORMAL_SLOT; } diff --git a/src/lladd/page.h b/src/lladd/page.h index a5c5123..04b3f02 100644 --- a/src/lladd/page.h +++ b/src/lladd/page.h @@ -125,9 +125,9 @@ typedef struct Page_s { this properly, and there are no read-only functions for the pending field. */ - pthread_cond_t noMorePending; /* pthread_cond_t */ + /* pthread_cond_t noMorePending; */ /* pthread_cond_t */ - int waiting; + /* int waiting; */ /** In the multi-threaded case, before we steal a page, we need to @@ -149,7 +149,7 @@ typedef struct Page_s { carefully. */ - int pending; + /* int pending; */ } Page; extern pthread_cond_t addPendingOK; diff --git a/src/lladd/pageCache.c b/src/lladd/pageCache.c index 7766cb4..f6aa907 100644 --- a/src/lladd/pageCache.c +++ b/src/lladd/pageCache.c @@ -145,45 +145,47 @@ static void qRemove(Page *ret) { assert(ret != repTail); assert(ret != repHead); } - +/* static Page *getFreePage() { Page *ret; - if( state == FULL ) { /* kick */ + if( state == FULL ) { / * kick * / ret = repTail; - /** Make sure no one else will try to reuse this page. */ + / ** Make sure no one else will try to reuse this page. * / cacheRemovePage(ret); - /** Temporarily drop the mutex while we wait for outstanding - operations on the page to complete. */ + / ** Temporarily drop the mutex while we wait for outstanding + operations on the page to complete. * / - pthread_mutex_unlock(&loadPagePtr_mutex); + pthread_mutex_unlock(&loadPagePtr_mutex); - /** @ todo getFreePage (finalize) needs to yield the getPage mutex, + / ** @ todo getFreePage (finalize) needs to yield the getPage mutex, but also needs to remove a page from the kick list before doing so. If there is a cache hit on the page that's been removed from the kick list, then the cache eviction policy - code needs o know this, and ignore the hit. -- Done. */ + code needs o know this, and ignore the hit. -- Done. * / - finalize(ret); /* This cannot deadlock because each thread can + / * finalize(ret); * / + / * ret->waiting++; * / / * @todo remove waiting / pending fields.. * / + / * This cannot deadlock because each thread can only have outstanding pending events on the page that it's accessing, but they can only hold that lock if the page is in cache. If the page is in cache, then the thread surely isn't here! Therefore any threads that finalize will block on can not possibly be blocking on this - thread's latches. */ + thread's latches. * / - /* writelock(ret->loadlatch, 181); */ /* Don't need the lock here--No one else has a pointer to this page! */ + / * writelock(ret->loadlatch, 181); * / / * Don't need the lock here--No one else has a pointer to this page! * / - pthread_mutex_lock(&loadPagePtr_mutex); + pthread_mutex_lock(&loadPagePtr_mutex); - /* Now that finalize returned, pull ret out of the cache's lookup table. */ + / * Now that finalize returned, pull ret out of the cache's lookup table. * / - /* pblHtRemove(activePages, &ret->id, sizeof(int)); */ + / * pblHtRemove(activePages, &ret->id, sizeof(int)); * / @@ -194,81 +196,93 @@ static Page *getFreePage() { ret->id = -1; ret->inCache = 0; - /* writelock(ret->loadlatch, 166); */ + / * writelock(ret->loadlatch, 166); * / } return ret; } - +*/ #define RO 0 #define RW 1 Page * getPage(int pageid, int locktype) { Page * ret; - - assert(locktype == RO); - + int spin = 0; pthread_mutex_lock(&loadPagePtr_mutex); - ret = pblHtLookup(activePages, &pageid, sizeof(int)); - /* Unfortunately, this is a heuristic, as a race condition exists. - (Until we obtain a readlock on ret, we have no way of knowing if - we've gotten the correct page.) */ + if(ret) { + readlock(ret->loadlatch, 217); + } + + while (ret && ret->id != pageid) { + unlock(ret->loadlatch); + ret = pblHtLookup(activePages, &pageid, sizeof(int)); + if(ret) { + readlock(ret->loadlatch, 217); + } + spin++; + if(spin > 10000) { + printf("GetPage stuck!"); + } + } if(ret) { cacheHitOnPage(ret); assert(ret->id == -1 || ret->id == pageid); - } - - pthread_mutex_unlock(&loadPagePtr_mutex); - - if(!ret) { + } else { ret = dummy_page; + readlock(ret->loadlatch, 232); } - - readlock(ret->loadlatch, 217); - - while(ret->id != pageid) { /* Either we got a stale mapping from the HT, or no mapping at all. */ + + if(ret->id != pageid) { unlock(ret->loadlatch); - pthread_mutex_lock(&loadPagePtr_mutex); + if( state == FULL ) { - ret = getFreePage(); + ret = repTail; + cacheRemovePage(ret); + + } else { - pblHtRemove(activePages, &(ret->id), sizeof(int)); + ret = pageAlloc(-1); + ret->id = -1; + ret->inCache = 0; - pthread_mutex_unlock(&loadPagePtr_mutex); + } - writelock(ret->loadlatch, 231); - - if(ret->id != -1) { + writelock(ret->loadlatch, 217); + + cacheInsertPage(ret); + pblHtInsert(activePages, &pageid, sizeof(int), ret); + pblHtRemove(activePages, &(ret->id), sizeof(int)); + + pthread_mutex_unlock(&loadPagePtr_mutex); + + /*new*/ /*writelock(ret->loadlatch, 217);*/ + + if(ret->id != -1) { assert(ret != dummy_page); pageWrite(ret); } - pageRealloc(ret, pageid); /* Do we need any special lock here? */ + pageRealloc(ret, pageid); pageRead(ret); + /*new*/ + /* pthread_mutex_lock(&loadPagePtr_mutex); + pblHtRemove(activePages, &(ret->id), sizeof(int)); + pthread_mutex_unlock(&loadPagePtr_mutex); */ + /*new*/ + downgradelock(ret->loadlatch); - unlock(ret->loadlatch); - - pthread_mutex_lock(&loadPagePtr_mutex); - - /* By inserting ret into the cache, we give up the implicit write lock. */ - - cacheInsertPage(ret); - - pblHtInsert(activePages, &pageid, sizeof(int), ret); + } else { pthread_mutex_unlock(&loadPagePtr_mutex); - - readlock(ret->loadlatch, 217); - + } - assert(ret->id == pageid); return ret; diff --git a/src/lladd/pageFile.c b/src/lladd/pageFile.c index 825f6be..1447ccc 100644 --- a/src/lladd/pageFile.c +++ b/src/lladd/pageFile.c @@ -24,7 +24,7 @@ extern pthread_mutex_t add_pending_mutex; */ -void finalize(Page * p) { +/*void finalize(Page * p) { pthread_mutex_lock(&(add_pending_mutex)); p->waiting++; @@ -32,12 +32,15 @@ void finalize(Page * p) { DEBUG("A"); pthread_cond_wait(&(p->noMorePending), &(add_pending_mutex)); } - - pthread_mutex_unlock(&(add_pending_mutex)); + + assert(p->pending == 0); + assert(p->waiting == 1); pthread_cond_broadcast(&addPendingOK); + pthread_mutex_unlock(&(add_pending_mutex)); + return; -} + }*/ @@ -95,7 +98,7 @@ void pageWrite(Page * ret) { long pageoffset = ret->id * PAGE_SIZE; long offset ; - assert(ret->pending == 0); + /* assert(ret->pending == 0); */ if(flushedLSN() < pageReadLSN(ret)) { DEBUG("pageWrite is calling syncLog()!\n"); diff --git a/src/lladd/recovery2.c b/src/lladd/recovery2.c index e9688ed..6931457 100644 --- a/src/lladd/recovery2.c +++ b/src/lladd/recovery2.c @@ -155,9 +155,9 @@ static void Redo() { don't need to check to see if the page is newer than this log entry. */ if(e->type == UPDATELOG) { - addPendingEvent(e->contents.update.rid.page); + /* addPendingEvent(e->contents.update.rid.page); */ } else { - addPendingEvent(e->contents.clr.rid.page); + /* addPendingEvent(e->contents.clr.rid.page); */ } redoUpdate(e); } diff --git a/src/lladd/transactional2.c b/src/lladd/transactional2.c index 3932b83..50c5fb0 100644 --- a/src/lladd/transactional2.c +++ b/src/lladd/transactional2.c @@ -11,6 +11,8 @@ #include #include +#include + TransactionLog XactionTable[MAX_TRANSACTIONS]; int numActiveXactions = 0; int xidCount = 0; @@ -47,7 +49,6 @@ int Tinit() { setupOperationsTable(); - /* pageInit(); */ bufInit(); openLogWriter(); @@ -92,20 +93,24 @@ int Tbegin() { void Tupdate(int xid, recordid rid, const void *dat, int op) { LogEntry * e; - + Page * p; #ifdef DEBUGGING pthread_mutex_lock(&transactional_2_mutex); assert(numActiveXactions <= MAX_TRANSACTIONS); pthread_mutex_unlock(&transactional_2_mutex); #endif + p = loadPage(rid.page); + e = LogUpdate(&XactionTable[xid % MAX_TRANSACTIONS], rid, op, dat); assert(XactionTable[xid % MAX_TRANSACTIONS].prevLSN == e->LSN); DEBUG("Tupdate() e->LSN: %ld\n", e->LSN); - doUpdate(e); + doUpdate(e, p); + unlock(p->loadlatch); + } /* @todo what about locking? */ diff --git a/test/lladd/check_bufferManager.c b/test/lladd/check_bufferManager.c index 9d8fe9c..e691e6a 100644 --- a/test/lladd/check_bufferManager.c +++ b/test/lladd/check_bufferManager.c @@ -34,13 +34,14 @@ void initializePages() { rid.slot = 0; rid.size = sizeof(int); p = loadPage(rid.page); - assert(p->id != -1); + assert(p->id != -1); pageSlotRalloc(p, 0, rid); - /* addPendingEvent(rid.page); */ - writeRecord(1, 1, rid, &i); - /* removePendingEvent(rid.page); */ - assert(p->pending == 0); unlock(p->loadlatch); + /* addPendingEvent(rid.page); */ + writeRecord(1, 1, rid, &i); + /* removePendingEvent(rid.page); */ + /* assert(p->pending == 0); */ + } printf("Initialization complete.\n"); fflush(NULL); @@ -55,7 +56,7 @@ void * workerThread(void * p) { int j; int k = (int) (((double)NUM_PAGES)*rand()/(RAND_MAX+1.0)); - + Page * p; if(! (i % 500) ) { printf("%d", i / 500); fflush(NULL); } @@ -64,39 +65,53 @@ void * workerThread(void * p) { rid.slot = 0; rid.size = sizeof(int); - addPendingEvent(rid.page); + /* addPendingEvent(rid.page); */ readRecord(1, rid, &j); assert(rid.page == k); - removePendingEvent(rid.page); + /* removePendingEvent(rid.page); */ assert(k == j); + } return NULL; } -void * workerThreadWriting(void * p) { +void * workerThreadWriting(void * q) { - int offset = *(int*)p; + int offset = *(int*)q; recordid rids[RECORDS_PER_THREAD]; for(int i = 0 ; i < RECORDS_PER_THREAD; i++) { + /* addPendingEvent(rids[i].page); */ rids[i] = ralloc(1, sizeof(int)); + /* removePendingEvent(rids[i].page); */ + + /* printf("\nRID:\t%d,%d\n", rids[i].page, rids[i].slot); */ + fflush(NULL); + + if(! (i % 1000) ) { + printf("A%d", i / 1000); fflush(NULL); + } + + + + sched_yield(); } for(int i = 0; i < RECORDS_PER_THREAD; i++) { - int val = i + offset; + int val = (i * 10000) + offset; int oldpage = rids[i].page; - addPendingEvent(rids[i].page); + writeRecord(1, 0, rids[i], &val); - assert(oldpage == rids[i].page); - removePendingEvent(rids[i].page); if(! (i % 1000) ) { printf("W%d", i / 1000); fflush(NULL); } + + sched_yield(); } for(int i = 0; i < RECORDS_PER_THREAD; i++) { int val; + Page * p; - addPendingEvent(rids[i].page); readRecord(1, rids[i], &val); if(! (i % 1000) ) { @@ -104,10 +119,9 @@ void * workerThreadWriting(void * p) { } - assert(val == i+offset); - - removePendingEvent(rids[i].page); + assert(val == (i * 10000)+offset); + sched_yield(); } return NULL; @@ -177,7 +191,9 @@ START_TEST(pageThreadedWritersTest) { Tinit(); for(i = 0; i < RECORD_THREAD_COUNT; i++) { - pthread_create(&workers[i], NULL, workerThreadWriting, &i); + int * j = malloc(sizeof(int)); + *j = i; + pthread_create(&workers[i], NULL, workerThreadWriting, j); } for(i = 0; i < RECORD_THREAD_COUNT; i++) { pthread_join(workers[i], NULL); @@ -193,9 +209,9 @@ Suite * check_suite(void) { /* Sub tests are added, one per line, here */ - tcase_add_test(tc, pageSingleThreadTest); - tcase_add_test(tc, pageLoadTest); - tcase_add_test(tc, pageSingleThreadWriterTest); + /* tcase_add_test(tc, pageSingleThreadTest); */ + tcase_add_test(tc, pageLoadTest); + /* tcase_add_test(tc, pageSingleThreadWriterTest); */ tcase_add_test(tc, pageThreadedWritersTest); /* --------------------------------------------- */ diff --git a/test/lladd/check_operations.c b/test/lladd/check_operations.c index 336223d..0dfe909 100644 --- a/test/lladd/check_operations.c +++ b/test/lladd/check_operations.c @@ -82,7 +82,7 @@ START_TEST(operation_physical_do_undo) { setToTwo->LSN = 10; DEBUG("C\n"); - addPendingEvent(rid.page); + /* addPendingEvent(rid.page); */ doUpdate(setToTwo); /* PAGE LSN= 10, value = 2. */ readRecord(xid, rid, &buf); @@ -97,7 +97,7 @@ START_TEST(operation_physical_do_undo) { setToTwo->LSN = 5; - addPendingEvent(rid.page); + /* addPendingEvent(rid.page); */ undoUpdate(setToTwo, 8); /* Should succeed, CLR LSN is too low, but undoUpdate only checks the log entry. */ readRecord(xid, rid, &buf); @@ -105,7 +105,7 @@ START_TEST(operation_physical_do_undo) { fail_unless(buf == 1, NULL); DEBUG("E\n"); - addPendingEvent(rid.page); + /* addPendingEvent(rid.page); */ redoUpdate(setToTwo); @@ -146,7 +146,7 @@ START_TEST(operation_physical_do_undo) { setToTwo->LSN = 10; DEBUG("F\n"); - addPendingEvent(rid.page); + /* addPendingEvent(rid.page); */ redoUpdate(setToTwo); /* writeLSN(setToTwo->LSN, rid.page); */ @@ -155,7 +155,7 @@ START_TEST(operation_physical_do_undo) { fail_unless(buf == 2, NULL); DEBUG("G undo set to 2\n"); - addPendingEvent(rid.page); + /* addPendingEvent(rid.page); */ undoUpdate(setToTwo, 20); /* Succeeds -- 20 is the 'CLR' entry's lsn.*/ readRecord(xid, rid, &buf); @@ -163,7 +163,7 @@ START_TEST(operation_physical_do_undo) { fail_unless(buf == 1, NULL); DEBUG("H don't redo set to 2\n"); - addPendingEvent(rid.page); + /* addPendingEvent(rid.page); */ redoUpdate(setToTwo); /* Fails */ readRecord(xid, rid, &buf); @@ -174,7 +174,7 @@ START_TEST(operation_physical_do_undo) { /* writeLSN(0,rid.page); */ DEBUG("I redo set to 2\n"); - addPendingEvent(rid.page); + /* addPendingEvent(rid.page); */ redoUpdate(setToTwo); /* Succeeds */ readRecord(xid, rid, &buf); diff --git a/test/lladd/check_transactional2.c b/test/lladd/check_transactional2.c index 67f029b..b2bd53b 100644 --- a/test/lladd/check_transactional2.c +++ b/test/lladd/check_transactional2.c @@ -57,8 +57,8 @@ void * writingWorkerThread ( void * v ) { int xid = Tbegin(); for(int i = 0; i < RECORDS_PER_THREAD; i++) { rids[i] = Talloc(xid, sizeof(int)); - if(! (i %100)) { - printf("A%d", i/100);fflush(NULL); + if(! (i %1000)) { + printf("A%d", i/1000);fflush(NULL); } } @@ -66,8 +66,8 @@ void * writingWorkerThread ( void * v ) { for(int i = 0; i < RECORDS_PER_THREAD; i++) { int tmp = i + offset; Tset(xid, rids[i], &tmp); - if(! (i %100)) { - printf("W%d", i/100); fflush(NULL); + if(! (i %1000)) { + printf("W%d", i/1000); fflush(NULL); } @@ -77,8 +77,8 @@ void * writingWorkerThread ( void * v ) { int j; Tread(xid, rids[i], &j); assert(i + offset == j); - if(! (i %100)) { - printf("R%d", i/100);fflush(NULL); + if(! (i %1000)) { + printf("R%d", i/1000);fflush(NULL); } }