From 490dd86c09916dda8982f83a97f4e5461343d4db Mon Sep 17 00:00:00 2001 From: Sears Russell Date: Tue, 20 Jul 2004 00:15:17 +0000 Subject: [PATCH] pageCache.c is now re-entrant. --- libdfa/rw.h | 4 + lladd/common.h | 5 +- src/libdfa/rw.c | 95 ++++-- src/lladd/bufferManager.c | 47 ++- src/lladd/common.c | 45 ++- src/lladd/latches.h | 16 + src/lladd/logger/logWriter.c | 1 - src/lladd/page.c | 26 +- src/lladd/page.h | 15 +- src/lladd/pageCache.c | 490 ++++++++++++++++++++++++++++--- src/lladd/pageFile.c | 59 ++-- src/lladd/pageFile.h | 3 + src/pbl/pblhash.c | 13 +- test/lladd/Makefile.am | 4 +- test/lladd/check_bufferManager.c | 78 ++++- test/lladd/check_page.c | 5 + 16 files changed, 765 insertions(+), 141 deletions(-) diff --git a/libdfa/rw.h b/libdfa/rw.h index f350788..498c5a4 100644 --- a/libdfa/rw.h +++ b/libdfa/rw.h @@ -34,7 +34,11 @@ typedef struct { rwl *initlock (void); void readlock (rwl *lock, int d); void writelock (rwl *lock, int d); +void downgradelock(rwl * lock); +void unlock(rwl * lock); +/** @deprecated in favor of unlock() */ void readunlock (rwl *lock); +/** @deprecated in favor of unlock() */ void writeunlock (rwl *lock); void deletelock (rwl *lock); /* diff --git a/lladd/common.h b/lladd/common.h index c3f4c2f..65b7d3b 100644 --- a/lladd/common.h +++ b/lladd/common.h @@ -98,8 +98,9 @@ extern int errno; #define byte unsigned char #define lsn_t long -#define DEBUGGING -/*#define PROFILE_LATCHES*/ + +/*#define DEBUGGING + #define PROFILE_LATCHES*/ #ifdef DEBUGGING /** @todo Files that use DEBUG have to pull in stdio.h, which is a pain! */ diff --git a/src/libdfa/rw.c b/src/libdfa/rw.c index ed19ca4..5274340 100644 --- a/src/libdfa/rw.c +++ b/src/libdfa/rw.c @@ -1,4 +1,9 @@ #include +#include + +#undef pthread_cond_wait +#undef pthread_cond_timedwait + rwl *initlock (void) { @@ -33,8 +38,8 @@ rwl *initlock (void) void readlock (rwl *lock, int d) { - /* printf("reader %d\n", d); */ - fflush(NULL); + /* printf("reader %d\n", d); + fflush(NULL); */ pthread_mutex_lock (lock->mut); if (lock->writers || lock->waiting) { @@ -46,16 +51,16 @@ void readlock (rwl *lock, int d) } lock->readers++; pthread_mutex_unlock (lock->mut); - /* printf("reader %d done\n", d); */ - fflush(NULL); + /* printf("reader %d done\n", d); + fflush(NULL); */ return; } void writelock (rwl *lock, int d) { - /* printf("\nwritelock %d\n", d); */ - fflush(NULL); + /* printf("\nwritelock %d\n", d); + fflush(NULL); */ pthread_mutex_lock (lock->mut); lock->waiting++; while (lock->readers || lock->writers) { @@ -67,44 +72,74 @@ void writelock (rwl *lock, int d) lock->writers++; pthread_mutex_unlock (lock->mut); - /* printf("\nwritelock %d done\n", d); */ - fflush(NULL); + /* printf("\nwritelock %d done\n", d); + fflush(NULL); */ return; } +void downgradelock(rwl * lock) { + pthread_mutex_lock(lock->mut); + assert(lock->writers); + lock->writers--; + lock->readers++; + pthread_cond_broadcast(lock->readOK); + pthread_mutex_unlock(lock->mut); +} + +void unlock(rwl * lock) { + pthread_mutex_lock (lock->mut); + if(lock->readers) { + lock->readers--; + pthread_cond_signal (lock->writeOK); + } else { + assert (lock->writers); + lock->writers--; + /* Need this as well (in case there's another writer, which is blocking the all of the readers. */ + pthread_cond_signal (lock->writeOK); + pthread_cond_broadcast (lock->readOK); + } + pthread_mutex_unlock (lock->mut); +} + /*void readunlock(rwl *lock) { writeunlock(lock); }*/ - -void readunlock (rwl *lock) -{ +void readunlock(rwl * lock) { + unlock(lock); +} +void writeunlock(rwl * lock) { + unlock(lock); +} +/* + void readunlock (rwl *lock) + { pthread_mutex_lock (lock->mut); lock->readers--; pthread_cond_signal (lock->writeOK); - /* Don't need to broadcast, since only one writer can run at - once. */ - - /* pthread_cond_broadcast (lock->writeOK); */ + / * Don't need to broadcast, since only one writer can run at + once. * / + + / * pthread_cond_broadcast (lock->writeOK); * / pthread_mutex_unlock (lock->mut); - /* printf("readunlock done\n"); */ -} - -void writeunlock (rwl *lock) -{ - /* printf("writeunlock done\n"); */ - fflush(NULL); - - pthread_mutex_lock (lock->mut); - lock->writers--; - /* Need this as well (in case there's another writer, which is blocking the all of the readers. */ - pthread_cond_signal (lock->writeOK); - pthread_cond_broadcast (lock->readOK); - pthread_mutex_unlock (lock->mut); -} + / * printf("readunlock done\n"); * / + } + void writeunlock (rwl *lock) + { + / * printf("writeunlock done\n"); + fflush(NULL); * / + + pthread_mutex_lock (lock->mut); + lock->writers--; + / * Need this as well (in case there's another writer, which is blocking the all of the readers. * / + pthread_cond_signal (lock->writeOK); + pthread_cond_broadcast (lock->readOK); + pthread_mutex_unlock (lock->mut); + } +*/ void deletelock (rwl *lock) { pthread_mutex_destroy (lock->mut); diff --git a/src/lladd/bufferManager.c b/src/lladd/bufferManager.c index 05ced66..9f0399b 100644 --- a/src/lladd/bufferManager.c +++ b/src/lladd/bufferManager.c @@ -47,17 +47,15 @@ terms specified in this license. #include #include +#include #include #include #include "blobManager.h" #include -/*#include "logger/logWriter.h" */ #include "page.h" #include "pageFile.h" - - /** Invariant: This lock should be held while updating lastFreepage, or while performing any operation that may decrease the amount of @@ -77,7 +75,7 @@ static unsigned int lastFreepage = 0; * @return fully formed Page type * @return page with -1 ID if page not found */ -Page * loadPage(int pageid); +Page * loadPage(int pageid); @@ -116,7 +114,9 @@ void simulateBufferManagerCrash() { /* ** No file I/O below this line. ** */ Page * loadPage (int pageid) { - return loadPagePtr(pageid); + Page * p = loadPagePtr(pageid); + assert (p->id == pageid); + return p; } Page * lastRallocPage = 0; @@ -133,10 +133,11 @@ recordid ralloc(int xid, /*lsn_t lsn,*/ long size) { pthread_mutex_lock(&lastFreepage_mutex); - while(freespace(p = loadPage(lastFreepage)) < size ) { lastFreepage++; } + while(freespace(p = loadPage(lastFreepage)) < size ) { unlock(p->loadlatch); lastFreepage++; } ret = pageRalloc(p, size); + unlock(p->loadlatch); pthread_mutex_unlock(&lastFreepage_mutex); DEBUG("alloced rid = {%d, %d, %ld}\n", ret.page, ret.slot, ret.size); @@ -147,11 +148,14 @@ recordid ralloc(int xid, /*lsn_t lsn,*/ long size) { void slotRalloc(int pageid, lsn_t lsn, recordid rid) { Page * loadedPage = loadPage(rid.page); pageSlotRalloc(loadedPage, lsn, rid); + unlock(loadedPage->loadlatch); } long readLSN(int pageid) { - - return pageReadLSN(loadPage(pageid)); + Page *p; + lsn_t lsn = pageReadLSN(p = loadPage(pageid)); + unlock(p->loadlatch); + return lsn; } void writeRecord(int xid, lsn_t lsn, recordid rid, const void *dat) { @@ -159,30 +163,36 @@ void writeRecord(int xid, lsn_t lsn, recordid rid, const void *dat) { Page *p; if(rid.size > BLOB_THRESHOLD_SIZE) { - DEBUG("Writing blob.\n"); + /* DEBUG("Writing blob.\n"); */ writeBlob(xid, lsn, rid, dat); } else { - DEBUG("Writing record.\n"); - p = loadPagePtr(rid.page); + /* DEBUG("Writing record.\n"); */ + p = loadPage(rid.page); /* loadPagePtr(rid.page); */ assert( (p->id == rid.page) && (p->memAddr != NULL) ); /** @todo This assert should be here, but the tests are broken, so it causes bogus failures. */ /*assert(pageReadLSN(*p) <= lsn);*/ pageWriteRecord(xid, p, rid, lsn, dat); + unlock(p->loadlatch); + } } void readRecord(int xid, recordid rid, void *buf) { if(rid.size > BLOB_THRESHOLD_SIZE) { - DEBUG("Reading blob. xid = %d rid = { %d %d %ld } buf = %x\n", - xid, rid.page, rid.slot, rid.size, (unsigned int)buf); + /* DEBUG("Reading blob. xid = %d rid = { %d %d %ld } buf = %x\n", + xid, rid.page, rid.slot, rid.size, (unsigned int)buf); */ readBlob(xid, rid, buf); } else { - DEBUG("Reading record xid = %d rid = { %d %d %ld } buf = %x\n", - xid, rid.page, rid.slot, rid.size, (unsigned int)buf); - pageReadRecord(xid, loadPage(rid.page), rid, buf); + Page * p = loadPage(rid.page); + assert(rid.page == p->id); + /* DEBUG("Reading record xid = %d rid = { %d %d %ld } buf = %x\n", + xid, rid.page, rid.slot, rid.size, (unsigned int)buf); */ + pageReadRecord(xid, p, rid, buf); + assert(rid.page == p->id); + unlock(p->loadlatch); } } @@ -205,6 +215,7 @@ int bufTransAbort(int xid, lsn_t lsn) { void setSlotType(int pageid, int slot, int type) { Page * p = loadPage(pageid); pageSetSlotType(p, slot, type); + unlock(p->loadlatch); } /** @@ -232,6 +243,8 @@ void addPendingEvent(int pageid){ pthread_mutex_unlock(&(p->pending_mutex)); + unlock(p->loadlatch); + } /** @@ -263,6 +276,8 @@ void removePendingEvent(int pageid) { } pthread_mutex_unlock(&(p->pending_mutex)); + + unlock(p->loadlatch); } diff --git a/src/lladd/common.c b/src/lladd/common.c index cf7d8a4..b086efc 100644 --- a/src/lladd/common.c +++ b/src/lladd/common.c @@ -14,7 +14,8 @@ #undef pthread_mutex_lock #undef pthread_mutex_trylock #undef pthread_mutex_unlock - +#undef pthread_cond_timedwait +#undef pthread_cond_wait int __lladd_pthread_mutex_init(lladd_pthread_mutex_t *mutex, const pthread_mutexattr_t *mutexattr, const char * file, int line, const char * name) { @@ -44,8 +45,8 @@ int __lladd_pthread_mutex_lock(lladd_pthread_mutex_t *mutex, char * file, int li blockCount ++; pthread_yield(); - if(blockCount > 10000) { - DEBUG("Spinning at %s:%d, %ld times\n", file, line, blockCount); + if(blockCount >= 10000 && ! (blockCount % 10000)) { + DEBUG("Spinning at %s:%d, %ld times. Held by: %s\n", file, line, blockCount, mutex->last_acquired_at); } } @@ -108,6 +109,20 @@ int __lladd_pthread_mutex_destroy(lladd_pthread_mutex_t *mutex) { } +/** + @todo The profiled version of pthread_cond_wait isn't really implemented, so it throws off the mutex statistics. +*/ +int __lladd_pthread_cond_wait(pthread_cond_t *cond, lladd_pthread_mutex_t *mutex, + char * file, int line, char * cond_name, char * mutex_name) { + return pthread_cond_wait(cond, &mutex->mutex); +} + +int __lladd_pthread_cond_timedwait(pthread_cond_t *cond, lladd_pthread_mutex_t *mutex, void *abstime, + char * file, int line, char * cond_name, char * mutex_name) { + return pthread_cond_timedwait(cond, &mutex->mutex, abstime); +} + + #undef rwl #undef initlock #undef readlock @@ -115,6 +130,8 @@ int __lladd_pthread_mutex_destroy(lladd_pthread_mutex_t *mutex) { #undef readunlock #undef writeunlock #undef deletelock +#undef unlock +#undef downgradelock __profile_rwl *__profile_rw_initlock (char * file, int line) { __profile_rwl * ret = malloc(sizeof(__profile_rwl)); @@ -143,6 +160,7 @@ void __profile_readlock (__profile_rwl *lock, int d, char * file, int line) { readlock(lock->lock, d); + } void __profile_writelock (__profile_rwl *lock, int d, char * file, int line) { @@ -187,7 +205,6 @@ void __profile_readunlock (__profile_rwl *lock) { readunlock(lock->lock); - } void __profile_writeunlock (__profile_rwl *lock) { @@ -201,6 +218,26 @@ void __profile_writeunlock (__profile_rwl *lock) { writeunlock(lock->lock); } + +void __profile_unlock (__profile_rwl * lock) { + if(lock->lock->writers) { + __profile_writeunlock(lock); + } else { + __profile_readunlock(lock); + } +} + +void __profile_downgradelock (__profile_rwl * lock) { + profile_tuple * tup = pblHtLookup(lock->lockpoints, lock->last_acquired_at, strlen(lock->last_acquired_at)+1); + + released_lock(tup); + released_lock(&(lock->tup)); + + free(lock->last_acquired_at); + + downgradelock(lock->lock); +} + void __profile_deletelock (__profile_rwl *lock) { profile_tuple * tup; diff --git a/src/lladd/latches.h b/src/lladd/latches.h index e82f7a6..87ae1d4 100644 --- a/src/lladd/latches.h +++ b/src/lladd/latches.h @@ -4,6 +4,9 @@ /** @todo threading should be moved into its own header file. */ #include + + + /*#include -- Don't want everything that touches threading to include pbl... */ #include @@ -50,17 +53,26 @@ typedef struct { #define pthread_mutex_lock(x) __lladd_pthread_mutex_lock((x), __FILE__, __LINE__) #define pthread_mutex_unlock(x) __lladd_pthread_mutex_unlock((x)) #define pthread_mutex_trylock(x) NO_PROFILING_EQUIVALENT_TO_PTHREAD_TRYLOCK +#define pthread_cond_wait(x, y) __lladd_pthread_cond_wait((x), (y), __FILE__, __LINE__, #x, #y); +#define pthread_cond_timedwait(x, y, z) __lladd_pthread_cond_timedwait((x), (y), (z), __FILE__, __LINE__, #x, #y); int __lladd_pthread_mutex_init(lladd_pthread_mutex_t *mutex, const pthread_mutexattr_t *mutexattr, const char * file, int line, const char * mutex_name); int __lladd_pthread_mutex_lock(lladd_pthread_mutex_t *mutex, char * file, int line); int __lladd_pthread_mutex_unlock(lladd_pthread_mutex_t *mutex); int __lladd_pthread_mutex_destroy(lladd_pthread_mutex_t *mutex); +int __lladd_pthread_cond_wait(pthread_cond_t *cond, lladd_pthread_mutex_t *mutex, + char * file, int line, char * cond_name, char * mutex_name); +/** @param abstime should be const struct timespec, but GCC won't take that. */ +int __lladd_pthread_cond_timedwait(pthread_cond_t *cond, lladd_pthread_mutex_t *mutex, void *abstime, + char * file, int line, char * cond_name, char * mutex_name); #define initlock() __profile_rw_initlock(__FILE__, __LINE__) #define readlock(x, y) __profile_readlock((x),(y), __FILE__, __LINE__) #define writelock(x, y) __profile_writelock((x), (y), __FILE__, __LINE__) #define readunlock(x) __profile_readunlock((x)) #define writeunlock(x) __profile_writeunlock((x)) +#define unlock(x) __profile_unlock((x)) +#define downgradelock(x) __profile_downgradelock((x)) #define deletelock(x) __profile_deletelock((x)) #define rwl __profile_rwl @@ -70,9 +82,13 @@ void __profile_readlock (rwl *lock, int d, char * file, int line); void __profile_writelock (rwl *lock, int d, char * file, int line); void __profile_readunlock (rwl *lock); void __profile_writeunlock (rwl *lock); +void __profile_unlock (rwl *lock); +void __profile_downgradelock (rwl *lock); void __profile_deletelock (rwl *lock); + + #endif #endif /* __LATCHES_H */ diff --git a/src/lladd/logger/logWriter.c b/src/lladd/logger/logWriter.c index 601a49d..8fba7da 100644 --- a/src/lladd/logger/logWriter.c +++ b/src/lladd/logger/logWriter.c @@ -46,7 +46,6 @@ terms specified in this license. #include "logWriter.h" #include "logHandle.h" -#include "../latches.h" #include "../pageFile.h" #include #include diff --git a/src/lladd/page.c b/src/lladd/page.c index d474a75..b113bc2 100644 --- a/src/lladd/page.c +++ b/src/lladd/page.c @@ -89,12 +89,12 @@ terms specified in this license. #include #include +#include "latches.h" #include "page.h" #include #include -/*#include "latches.h" */ #include /* TODO: Combine with buffer size... */ @@ -140,7 +140,8 @@ static int MASK_FFFF0000; /* ------ */ static pthread_mutex_t pageAllocMutex; -Page pool[MAX_BUFFER_SIZE]; +/** We need one dummy page for locking purposes, so this array has one extra page in it. */ +Page pool[MAX_BUFFER_SIZE+1]; int isValidSlot(byte *memAddr, int slot); @@ -247,9 +248,9 @@ static const byte *slotMemAddr(const byte *memAddr, int slotNum) { lsn_t pageReadLSN(const Page * page) { lsn_t ret; - readlock(page->rwlatch, 259); + /* readlock(page->rwlatch, 259); */ ret = *(long *)(page->memAddr + START_OF_LSN); - readunlock(page->rwlatch); + /* readunlock(page->rwlatch); */ return ret; } @@ -539,6 +540,8 @@ void pageReadRecord(int xid, Page * page, recordid rid, byte *buff) { byte *recAddress; readlock(page->rwlatch, 519); + + assert(page->id == rid.page); recAddress = page->memAddr + getSlotOffset(page->memAddr, rid.slot); memcpy(buff, recAddress, rid.size); readunlock(page->rwlatch); @@ -564,13 +567,17 @@ void pageWriteRecord(int xid, Page * page, recordid rid, lsn_t lsn, const byte * } -void pageRealloc(Page *p, int id) { - writelock(p->rwlatch, 10); +void pageReallocNoLock(Page *p, int id) { p->id = id; p->LSN = 0; p->dirty = 0; p->pending = 0; p->waiting = 0; +} + +void pageRealloc(Page *p, int id) { + writelock(p->rwlatch, 10); + pageReallocNoLock(p,id); writeunlock(p->rwlatch); } @@ -591,12 +598,15 @@ Page *pageAlloc(int id) { /* We have an implicit lock on rwlatch, since we allocated it, but haven't returned yet. */ page->rwlatch = initlock(); + page->loadlatch = initlock(); pthread_mutex_init(&page->pending_mutex, NULL); pthread_cond_init(&page->noMorePending, NULL); + page->memAddr = malloc(PAGE_SIZE); + nextPage++; - assert(nextPage <= MAX_BUFFER_SIZE); + assert(nextPage <= MAX_BUFFER_SIZE + 1); /* There's a dummy page that we need to keep around, thus the +1 */ /* uggh. Really just want to pass pages by reference */ /* page->pending = malloc(sizeof(int)); */ @@ -604,8 +614,6 @@ Page *pageAlloc(int id) { pthread_mutex_unlock(&pageAllocMutex); - /* pageRealloc does its own locking... */ - pageRealloc(page, id); return page; diff --git a/src/lladd/page.h b/src/lladd/page.h index edb124a..3d18339 100644 --- a/src/lladd/page.h +++ b/src/lladd/page.h @@ -54,14 +54,14 @@ terms specified in this license. #ifndef __PAGE_H__ #define __PAGE_H__ +#include #include - +#include "latches.h" /** @todo page.h includes things that it shouldn't! (Or, page.h shouldn't be an installed header.) */ #include -#include -#include "latches.h" + BEGIN_C_DECLS /** @@ -90,6 +90,9 @@ typedef struct Page_s { struct Page_s *prev; /** Which queue is the page in? */ int queue; + /** Is the page in the cache at all? */ + int inCache; + /** Used for page-level latching. Each page has an associated read/write lock. This lock only @@ -113,7 +116,9 @@ typedef struct Page_s { writing the locked page to disk) */ - void * rwlatch; + rwl * rwlatch; + + rwl * loadlatch; /** This mutex protects the pending field. We don't use rwlatch for this, since we also need to use a condition variable to update @@ -207,6 +212,8 @@ void pageCommit(int xid); void pageAbort(int xid); +void pageReallocNoLock(Page * p, int id); +/** @todo Do we need a locking version of pageRealloc? */ void pageRealloc(Page * p, int id); Page* pageAlloc(int id); diff --git a/src/lladd/pageCache.c b/src/lladd/pageCache.c index cc18a0c..5c047b8 100644 --- a/src/lladd/pageCache.c +++ b/src/lladd/pageCache.c @@ -7,9 +7,9 @@ */ #include #include +#include "latches.h" #include #include -#include "latches.h" #include #include @@ -20,17 +20,39 @@ static pblHashTable_t *activePages; /* page lookup */ static unsigned int bufferSize; /* < MAX_BUFFER_SIZE */ static Page *repHead, *repMiddle, *repTail; /* replacement policy */ +static pthread_mutex_t loadPagePtr_mutex; + +#define INITIAL 0 +#define FULL 1 + +static int state; + +/* These three functions are for internal use. They are not declared + static so that their test case can compile. */ +static void cacheHitOnPage(Page * ret); +static void cacheRemovePage(Page * ret); +static void cacheInsertPage (Page * ret); + +static Page * dummy_page; void pageCacheInit() { Page *first; bufferSize = 1; + state = INITIAL; + + pthread_mutex_init(&loadPagePtr_mutex, NULL); + activePages = pblHtCreate(); assert(activePages); DEBUG("pageCacheInit()"); first = pageAlloc(0); + dummy_page = pageAlloc(-1); + pageRealloc(first, 0); + pageRealloc(dummy_page, -1); + first->inCache = 1; pblHtInsert(activePages, &first->id, sizeof(int), first); first->prev = first->next = NULL; @@ -40,6 +62,7 @@ void pageCacheInit() { repHead = repTail = first; repMiddle = NULL; + } void pageCacheDeinit() { @@ -57,6 +80,10 @@ void pageCacheDeinit() { pageWrite(p); } + + pthread_mutex_destroy(&loadPagePtr_mutex); + + pblHtDelete(activePages); } @@ -73,8 +100,10 @@ static void headInsert(Page *ret) { } static void middleInsert(Page *ret) { + + assert(state == FULL); - assert( bufferSize == MAX_BUFFER_SIZE ); + /* assert( bufferSize == MAX_BUFFER_SIZE ); */ assert(ret != repMiddle); assert(ret != repTail); @@ -90,9 +119,12 @@ static void middleInsert(Page *ret) { assert(ret->next != ret && ret->prev != ret); } +/** @todo Under high contention, the buffer pool can empty. What should be done about this, other than making sure that # threads > buffer size? */ static void qRemove(Page *ret) { - assert( bufferSize == MAX_BUFFER_SIZE ); + assert(state == FULL); + + /* assert( bufferSize == MAX_BUFFER_SIZE ); */ assert(ret->next != ret && ret->prev != ret); if( ret->prev ) @@ -114,6 +146,260 @@ static void qRemove(Page *ret) { assert(ret != repHead); } +static Page *getFreePage() { + Page *ret; + if( state == FULL ) { /* kick */ + + ret = repTail; + + /** Make sure no one else will try to reuse this page. */ + + cacheRemovePage(ret); + + /** Temporarily drop the mutex while we wait for outstanding + operations on the page to complete. */ + + pthread_mutex_unlock(&loadPagePtr_mutex); + + + /** @ todo getFreePage (finalize) needs to yield the getPage mutex, + but also needs to remove a page from the kick list before + doing so. If there is a cache hit on the page that's been + removed from the kick list, then the cache eviction policy + code needs o know this, and ignore the hit. -- Done. */ + + finalize(ret); /* This cannot deadlock because each thread can + only have outstanding pending events on the + page that it's accessing, but they can only + hold that lock if the page is in cache. If the + page is in cache, then the thread surely isn't + here! Therefore any threads that finalize will + block on can not possibly be blocking on this + thread's latches. */ + + /* writelock(ret->loadlatch, 181); */ /* Don't need the lock here--No one else has a pointer to this page! */ + + pthread_mutex_lock(&loadPagePtr_mutex); + + /* Now that finalize returned, pull ret out of the cache's lookup table. */ + + /* pblHtRemove(activePages, &ret->id, sizeof(int)); */ + + + + + } else { + + ret = pageAlloc(-1); + + ret->id = -1; + ret->inCache = 0; + /* writelock(ret->loadlatch, 166); */ + + } + + return ret; +} + +#define RO 0 +#define RW 1 + +Page * getPage(int pageid, int locktype) { + Page * ret; + + assert(locktype == RO); + + pthread_mutex_lock(&loadPagePtr_mutex); + + ret = pblHtLookup(activePages, &pageid, sizeof(int)); + + // Unfortunately, this is a heuristic, as a race condition exists. + // (Until we obtain a readlock on ret, we have no way of knowing if + // we've gotten the correct page.) + if(ret) { + cacheHitOnPage(ret); + assert(ret->id == -1 || ret->id == pageid); + } + + pthread_mutex_unlock(&loadPagePtr_mutex); + + if(!ret) { + ret = dummy_page; + } + + writelock(ret->loadlatch, 217); + + while(ret->id != pageid) { /* Either we got a stale mapping from the HT, or no mapping at all. */ + + unlock(ret->loadlatch); + + pthread_mutex_lock(&loadPagePtr_mutex); + + ret = getFreePage(); + + pblHtRemove(activePages, &(ret->id), sizeof(int)); + + pthread_mutex_unlock(&loadPagePtr_mutex); + + writelock(ret->loadlatch, 231); + + if(ret->id != -1) { + assert(ret != dummy_page); + pageWrite(ret); + } + + pageRealloc(ret, pageid); /* Do we need any special lock here? */ + + pageRead(ret); + + unlock(ret->loadlatch); + + pthread_mutex_lock(&loadPagePtr_mutex); + + /* By inserting ret into the cache, we give up the implicit write lock. */ + + cacheInsertPage(ret); + + pblHtInsert(activePages, &pageid, sizeof(int), ret); + + pthread_mutex_unlock(&loadPagePtr_mutex); + + writelock(ret->loadlatch, 217); + + } + + assert(ret->id == pageid); + + return ret; + +} + +/* Page * getPageOld(int pageid, int locktype) { + Page * ret; + int spin = 0; + + assert(0); + + / ** This wonderful bit of code avoids deadlocks. + + Locking works like this: + + a) get a HT mutex, lookup pageid, did we get a pointer? + - yes, release the mutex, so we don't deadlock getting a page lock. + - no, keep the mutex, move on to the next part of the function. + b) lock whatever pointer the HT returned. (Safe, since the memory + mutex are allocated exactly one time.) + c) did we get the right page? + - yes, success! + - no, goto (a) + + * / + + pthread_mutex_lock(&loadPagePtr_mutex); + + do { + + do { + + if(spin) { + sched_yield(); + } + spin ++; + if(spin > 1000 && (spin % 10000 == 0)) { + DEBUG("Spinning in pageCache's hashtable lookup: %d\n", spin); + } + + + ret = pblHtLookup(activePages, &pageid, sizeof(int)); + + if(ret) { + + / * pthread_mutex_unlock(&loadPagePtr_mutex); * / + + if(locktype == RO) { + readlock(ret->loadlatch, 147); + } else { + writelock(ret->loadlatch, 149); + } + + / * pthread_mutex_lock(&loadPagePtr_mutex); * / + + } + + + } while (ret && (ret->id != pageid)); + + if(ret) { + cacheHitOnPage(ret); + pthread_mutex_unlock(&loadPagePtr_mutex); + assert(ret->id == pageid); + return ret; + } + + / * OK, we need to handle a cache miss. This is also tricky. + + If getFreePage needs to kick a page, then it will need a + writeLock on the thing it kicks, so we drop our mutex here. + + But, before we do that, we need to make sure that no one else + tries to load our page. We do this by inserting a dummy entry in + the cache. Since it's pageid won't match the pageid that we're + inserting, other threads will spin in the do..while loop untile + we've loaded the page. + + * / + + pblHtInsert(activePages, &pageid, sizeof(int), dummy_page); + + + ret = getFreePage(); + + / * ret is now a pointer that no other thread has access to, and we + hold a write lock on it * / + + pblHtRemove(activePages, &pageid, sizeof(int)); + pblHtInsert(activePages, &pageid, sizeof(int), ret); + + / * writes were here... * / + + / * pthread_mutex_unlock(&loadPagePtr_mutex); * / + + if(ret->id != -1) { + pageWrite(ret); + } + + pageRealloc(ret, pageid); + + pageRead(ret); + + / * pthread_mutex_lock(&loadPagePtr_mutex); * / + + + + assert(ret->inCache == 0); + + cacheInsertPage(ret); + + assert(ret->inCache == 1); + + pthread_mutex_unlock(&loadPagePtr_mutex); + + if(locktype == RO) { + readlock(ret->loadlatch, 314); + } else { + writelock(ret->loadlatch, 316); + } + + + + if(locktype == RO) { + downgradelock(ret->loadlatch); + } + + } while (ret->id != pageid); + + return ret; + }*/ +/* static Page *kickPage(int pageid) { Page *ret = repTail; @@ -124,7 +410,7 @@ static Page *kickPage(int pageid) { qRemove(ret); pblHtRemove(activePages, &ret->id, sizeof(int)); - /* It's almost safe to release the mutex here. The LRU-2 + / * It's almost safe to release the mutex here. The LRU-2 linked lists are in a consistent (but under-populated) state, and there is no reference to the page that we're holding in the hash table, so the data structures are @@ -136,31 +422,140 @@ static Page *kickPage(int pageid) { cache slots, which would cause consistency problems. @todo Don't block while holding the loadPagePtr mutex! - */ + * / - /*pthread_mutex_unlock(loadPagePtr_mutex);*/ + / *pthread_mutex_unlock(loadPagePtr_mutex);* / - pageWrite(ret); - - /*pthread_mutex_lock(loadPagePtr_mutex);*/ + / *pthread_mutex_lock(loadPagePtr_mutex);* / writelock(ret->rwlatch, 121); - pageRealloc(ret, pageid); - - middleInsert(ret); - /* pblHtInsert(activePages, &pageid, sizeof(int), ret); */ + / * pblHtInsert(activePages, &pageid, sizeof(int), ret); * / return ret; -} +} int lastPageId = -1; Page * lastPage = 0; +*/ +/* +static void noteRead(Page * ret) { + if( bufferSize == MAX_BUFFER_SIZE ) { / * we need to worry about page sorting * / + / * move to head * / + if( ret != repHead ) { + qRemove(ret); + headInsert(ret); + assert(ret->next != ret && ret->prev != ret); -static pthread_mutex_t loadPagePtr_mutex; + if( ret->queue == 2 ) { + / * keep first queue same size * / + repMiddle = repMiddle->prev; + repMiddle->queue = 2; + + ret->queue = 1; + } + } + } +} + + +void loadPagePtrFoo(int pageid, int readOnly) { + Page * ret; + + pthread_mutex_lock(&loadPagePtr_mutex); + + ret = pblHtLookup(activePages, &pageid, sizeof(int)); + + getPage( + + if(ret) { + if(readOnly) { + readlock(ret->rwlatch, 178); + } else { + writelock(ret->rwlatch, 180); + } + noteRead(ret); + + pthread_mutex_unlock(&loadPagePtr_mutex); + + } else if(bufferSize == MAX_BUFFER_SIZE - 1) { + + } + +} +*/ +static void cacheInsertPage (Page * ret) { + bufferSize++; + assert(!ret->inCache); + ret->inCache ++; + if(state == FULL) { + middleInsert(ret); + } else { + if(bufferSize == MAX_BUFFER_SIZE/* - 1*/) { /* Set up page kick mechanism. */ + int i; + Page *iter; + + state = FULL; + + headInsert(ret); + assert(ret->next != ret && ret->prev != ret); + + /* split up queue: + * "in all cases studied ... fixing the primary region to 30% ... + * resulted in the best performance" + */ + repMiddle = repHead; + for( i = 0; i < MAX_BUFFER_SIZE / 3; i++ ) { + repMiddle->queue = 1; + repMiddle = repMiddle->next; + } + + for( iter = repMiddle; iter; iter = iter->next ) { + iter->queue = 2; + } + } else { /* Just insert it. */ + headInsert(ret); + assert(ret->next != ret && ret->prev != ret); + assert(ret->next != ret && ret->prev != ret); + } + } +} + +static void cacheRemovePage(Page * ret) { + assert(ret->inCache); + qRemove(ret); + ret->inCache--; + bufferSize --; +} + +static void cacheHitOnPage(Page * ret) { + /* The page may not be in cache if it is about to be freed. */ + if(ret->inCache && state == FULL) { /* we need to worry about page sorting */ + /* move to head */ + if( ret != repHead ) { + qRemove(ret); + headInsert(ret); + assert(ret->next != ret && ret->prev != ret); + + if( ret->queue == 2 ) { + /* keep first queue same size */ + repMiddle = repMiddle->prev; + repMiddle->queue = 2; + + ret->queue = 1; + } + } + } +} void *loadPagePtr(int pageid) { - /* lock activePages, bufferSize */ + Page * ret = getPage(pageid, RO); + return ret; +} + +/** @todo loadPagePtr needs to aquire the page read/write lock -- if it does, then should page.c do any locking? */ +/*void *loadPagePtr(int pageid) { + / * lock activePages, bufferSize * / Page *ret; pthread_mutex_lock(&(loadPagePtr_mutex)); @@ -175,17 +570,17 @@ void *loadPagePtr(int pageid) { } if( ret ) { - /** Don't need write lock for linked list manipulations. The loadPagePtr_mutex protects those operations. */ + / ** Don't need write lock for linked list manipulations. The loadPagePtr_mutex protects those operations. * / - if( bufferSize == MAX_BUFFER_SIZE ) { /* we need to worry about page sorting */ - /* move to head */ + if( bufferSize == MAX_BUFFER_SIZE ) { / * we need to worry about page sorting * / + / * move to head * / if( ret != repHead ) { qRemove(ret); headInsert(ret); assert(ret->next != ret && ret->prev != ret); if( ret->queue == 2 ) { - /* keep first queue same size */ + / * keep first queue same size * / repMiddle = repMiddle->prev; repMiddle->queue = 2; @@ -193,34 +588,37 @@ void *loadPagePtr(int pageid) { } } } - + lastPage = ret; lastPageId = pageid; - - /* DEBUG("Loaded page %d => %x\n", pageid, (unsigned int) ret->memAddr); */ + + / * DEBUG("Loaded page %d => %x\n", pageid, (unsigned int) ret->memAddr); * / pthread_mutex_unlock(&(loadPagePtr_mutex)); - + return ret; - } else if( bufferSize == MAX_BUFFER_SIZE ) { /* we need to kick */ - ret = kickPage(pageid); - } else if( bufferSize == MAX_BUFFER_SIZE-1 ) { /* we need to setup kickPage mechanism */ - int i; + } else if( bufferSize == MAX_BUFFER_SIZE ) { / * we need to kick * / + ret = kickPage(pageid); + pageWrite(ret); + pageRealloc(ret, pageid); + middleInsert(ret); + + } else if( bufferSize == MAX_BUFFER_SIZE-1 ) { / * we need to setup kickPage mechanism * / + int i; Page *iter; - + ret = pageAlloc(pageid); - writelock(ret->rwlatch, 224); - - headInsert(ret); - assert(ret->next != ret && ret->prev != ret); - - /* pblHtInsert( activePages, &pageid, sizeof(int), ret ); */ - bufferSize++; - /* split up queue: + pageRealloc(ret, pageid); + writelock(ret->rwlatch, 224); + + headInsert(ret); + assert(ret->next != ret && ret->prev != ret); + + / * split up queue: * "in all cases studied ... fixing the primary region to 30% ... * resulted in the best performance" - */ + * / repMiddle = repHead; for( i = 0; i < MAX_BUFFER_SIZE / 3; i++ ) { repMiddle->queue = 1; @@ -231,20 +629,23 @@ void *loadPagePtr(int pageid) { iter->queue = 2; } - } else { /* we are adding to an nonfull queue */ + } else { / * we are adding to an nonfull queue * / bufferSize++; ret = pageAlloc(pageid); + + pageRealloc(ret, pageid); + writelock(ret->rwlatch, 224); headInsert(ret); assert(ret->next != ret && ret->prev != ret); assert(ret->next != ret && ret->prev != ret); - /* pblHtInsert( activePages, &pageid, sizeof(int), ret ); */ + } - /* we now have a page we can dump info into */ + / * we now have a page we can dump info into * / @@ -257,17 +658,18 @@ void *loadPagePtr(int pageid) { lastPageId = pageid; - /* release mutex for this function */ + / * release mutex for this function * / pthread_mutex_unlock(&(loadPagePtr_mutex)); pageRead(ret); - /* release write lock on the page. */ + / * release write lock on the page. * / writeunlock(ret->rwlatch); - DEBUG("Loaded page %d => %x\n", pageid, (unsigned int) ret->memAddr); + / * DEBUG("Loaded page %d => %x\n", pageid, (unsigned int) ret->memAddr); * / return ret; } +*/ diff --git a/src/lladd/pageFile.c b/src/lladd/pageFile.c index a1f84ec..2e435cf 100644 --- a/src/lladd/pageFile.c +++ b/src/lladd/pageFile.c @@ -20,7 +20,7 @@ static FILE * stable = NULL; */ -static void finalize(Page * p) { +void finalize(Page * p) { pthread_mutex_lock(&(p->pending_mutex)); p->waiting++; @@ -38,26 +38,39 @@ static void finalize(Page * p) { /* This function is declared in page.h */ void pageRead(Page *ret) { - long fileSize = myFseek(stable, 0, SEEK_END); - long pageoffset = ret->id * PAGE_SIZE; + long fileSize; + + long pageoffset; long offset; - DEBUG("Reading page %d\n", ret->id); - if(!ret->memAddr) { + /** @todo pageRead() is using fseek to calculate the file size on each read, which is inefficient. */ + pageoffset = ret->id * PAGE_SIZE; + flockfile(stable); + + fileSize = myFseekNoLock(stable, 0, SEEK_END); + + + /* DEBUG("Reading page %d\n", ret->id); */ + + /* if(!ret->memAddr) { ret->memAddr = malloc(PAGE_SIZE); } - assert(ret->memAddr); + if(!ret->memAddr) { + perror("pageFile.c"); + fflush(NULL); + } + assert(ret->memAddr); */ if ((ret->id)*PAGE_SIZE >= fileSize) { - myFseek(stable, (1+ ret->id) * PAGE_SIZE -1, SEEK_SET); + myFseekNoLock(stable, (1+ ret->id) * PAGE_SIZE -1, SEEK_SET); if(1 != fwrite("", 1, 1, stable)) { if(feof(stable)) { printf("Unexpected eof extending storefile!\n"); fflush(NULL); abort(); } if(ferror(stable)) { printf("Error extending storefile! %d", ferror(stable)); fflush(NULL); abort(); } } } - offset = myFseek(stable, pageoffset, SEEK_SET); + offset = myFseekNoLock(stable, pageoffset, SEEK_SET); assert(offset == pageoffset); if(1 != fread(ret->memAddr, PAGE_SIZE, 1, stable)) { @@ -67,27 +80,27 @@ void pageRead(Page *ret) { } + funlockfile(stable); + } /* This function is declared in page.h */ void pageWrite(Page * ret) { long pageoffset = ret->id * PAGE_SIZE; - long offset = myFseek(stable, pageoffset, SEEK_SET); - assert(offset == pageoffset); - assert(ret->memAddr); + long offset ; - DEBUG("Writing page %d\n", ret->id); - /* Need to call finalize before checking the LSN. Once finalize - returns, we have exclusive access to this page, and can safely - write it to disk. */ - finalize(ret); - if(flushedLSN() < pageReadLSN(ret)) { - DEBUG("pageWrite is calling syncLog()!\n"); + DEBUG("pageWrite is calling syncLog()!\n"); syncLog(); } + flockfile(stable); + offset = myFseekNoLock(stable, pageoffset, SEEK_SET); + assert(offset == pageoffset); + assert(ret->memAddr); + + /* DEBUG("Writing page %d\n", ret->id); */ if(1 != fwrite(ret->memAddr, PAGE_SIZE, 1, stable)) { @@ -95,6 +108,8 @@ void pageWrite(Page * ret) { if(ferror(stable)) { printf("Error writing stream! %d", ferror(stable)); fflush(NULL); abort(); } } + + funlockfile(stable); } void openPageFile() { @@ -125,9 +140,15 @@ void closePageFile() { long myFseek(FILE * f, long offset, int whence) { long ret; flockfile(f); + ret = myFseekNoLock(f, offset, whence); + funlockfile(f); + return ret; +} + +long myFseekNoLock(FILE * f, long offset, int whence) { + long ret; if(0 != fseek(f, offset, whence)) { perror ("fseek"); fflush(NULL); abort(); } if(-1 == (ret = ftell(f))) { perror("ftell"); fflush(NULL); abort(); } - funlockfile(f); return ret; } diff --git a/src/lladd/pageFile.h b/src/lladd/pageFile.h index 4c22245..8c1adc5 100644 --- a/src/lladd/pageFile.h +++ b/src/lladd/pageFile.h @@ -43,6 +43,9 @@ void openPageFile(); void closePageFile(); long myFseek(FILE * f, long offset, int whence); +long myFseekNoLock(FILE * f, long offset, int whence); void myFwrite(const void * dat, long size, FILE * f); +void finalize(Page * p); + #endif /* __PAGE_FILE_H */ diff --git a/src/pbl/pblhash.c b/src/pbl/pblhash.c index c88e93a..1c23578 100644 --- a/src/pbl/pblhash.c +++ b/src/pbl/pblhash.c @@ -24,8 +24,13 @@ please see: http://mission.base.com/. $Log$ - Revision 1.1 2004/06/24 21:11:54 sears - Initial revision + Revision 1.2 2004/07/20 00:15:17 sears + pageCache.c is now re-entrant. + + Revision 1.1.1.1 2004/06/24 21:11:54 sears + Need to send laptop in for warranty service, so it's time to put this code into CVS. :) + + Vs. the paper version of LLADD, this version has a re-written logger + recovery system. It also includes unit tests and API documentation. Revision 1.4 2004/05/26 09:55:31 sears Misc bugfixes / optimizations. @@ -63,8 +68,8 @@ static int rcsid_fct() { return( rcsid ? 0 : rcsid_fct() ); } /*****************************************************************************/ /* #defines */ /*****************************************************************************/ -/*#define PBL_HASHTABLE_SIZE 1019*/ -#define PBL_HASHTABLE_SIZE 100003 +#define PBL_HASHTABLE_SIZE 1019 +/*#define PBL_HASHTABLE_SIZE 100003 */ /*****************************************************************************/ /* typedefs */ diff --git a/test/lladd/Makefile.am b/test/lladd/Makefile.am index 12ed56f..f98a867 100644 --- a/test/lladd/Makefile.am +++ b/test/lladd/Makefile.am @@ -6,6 +6,6 @@ else TESTS = endif noinst_PROGRAMS = $(TESTS) -LDADD = @CHECK_LIBS@ $(top_builddir)/src/lladd/liblladd.a $(top_builddir)/src/pbl/libpbl.a $(top_builddir)/src/libdfa/librw.a -lefence -CLEANFILES = check_lht.log check_logEntry.log storefile.txt logfile.txt blob0_file.txt blob1_file.txt check_blobRecovery.log check_logWriter.log check_operations.log check_recovery.log check_transactional2.log check_page.log +LDADD = @CHECK_LIBS@ $(top_builddir)/src/lladd/liblladd.a $(top_builddir)/src/pbl/libpbl.a $(top_builddir)/src/libdfa/librw.a #-lefence +CLEANFILES = check_lht.log check_logEntry.log storefile.txt logfile.txt blob0_file.txt blob1_file.txt check_blobRecovery.log check_logWriter.log check_operations.log check_recovery.log check_transactional2.log check_page.log check_bufferManager.log AM_CFLAGS= -g -Wall -pedantic -std=gnu99 diff --git a/test/lladd/check_bufferManager.c b/test/lladd/check_bufferManager.c index 6c62ea6..1af86ca 100644 --- a/test/lladd/check_bufferManager.c +++ b/test/lladd/check_bufferManager.c @@ -7,16 +7,66 @@ /*#include */ #include "../../src/lladd/logger/logHandle.h" #include "../../src/lladd/logger/logWriter.h" - #include "../../src/lladd/latches.h" +#include "../../src/lladd/page.h" +#include #include #include #include "../check_includes.h" - - #define LOG_NAME "check_bufferMananger.log" +#define NUM_PAGES 1000 +#define THREAD_COUNT 5 +#define READS_PER_THREAD 50000 +void initializePages() { + + int i; + + for(i = 0 ; i < NUM_PAGES; i++) { + recordid rid; + rid.page = i; + rid.slot = 0; + rid.size = sizeof(int); + writeRecord(1, 1, rid, &i); + } + +} + +void * workerThread(void * p) { + int i; + for(i = 0 ; i < READS_PER_THREAD; i++) { + recordid rid; + int j; + + int k = (int) (((double)NUM_PAGES)*rand()/(RAND_MAX+1.0)); + + if(! (i % 5000) ) { + printf("%d", i / 5000); fflush(NULL); + } + + rid.page = k; + rid.slot = 0; + rid.size = sizeof(int); + + readRecord(1, rid, &j); + assert(k == j); + } + + return NULL; +} + +START_TEST(pageSingleThreadTest) { + Tinit(); + + initializePages(); + + /* sleep(100000000); */ + + workerThread(NULL); + + Tdeinit(); +} END_TEST /** @test @@ -31,9 +81,24 @@ pages. */ -START_TEST(pageLoadTest) -{ - fail_unless(0, "Write this test!"); +START_TEST(pageLoadTest) { + pthread_t workers[THREAD_COUNT]; + int i; + + /* fail_unless(0, "Broken for now."); + assert(0); */ + Tinit(); + + initializePages(); + + for(i = 0; i < THREAD_COUNT; i++) { + pthread_create(&workers[i], NULL, workerThread, NULL); + } + for(i = 0; i < THREAD_COUNT; i++) { + pthread_join(workers[i], NULL); + } + + Tdeinit(); } END_TEST Suite * check_suite(void) { @@ -43,6 +108,7 @@ Suite * check_suite(void) { /* Sub tests are added, one per line, here */ + /*tcase_add_test(tc, pageSingleThreadTest); */ tcase_add_test(tc, pageLoadTest); /* --------------------------------------------- */ diff --git a/test/lladd/check_page.c b/test/lladd/check_page.c index 124cd40..4ec33fe 100644 --- a/test/lladd/check_page.c +++ b/test/lladd/check_page.c @@ -129,6 +129,8 @@ START_TEST(pageNoThreadTest) worker_thread(p); + unlock(p->loadlatch); + Tdeinit(); } @@ -154,6 +156,9 @@ START_TEST(pageThreadTest) { for(i = 0; i < THREAD_COUNT; i++) { pthread_join(workers[i], NULL); } + + unlock(p->loadlatch); + Tdeinit(); } END_TEST