diff --git a/src/lladd/Makefile.am b/src/lladd/Makefile.am index d10ba73..1efd31e 100644 --- a/src/lladd/Makefile.am +++ b/src/lladd/Makefile.am @@ -4,7 +4,7 @@ lib_LIBRARIES=liblladd.a #liblladd_a_LIBADD=logger/liblogger.a operations/liboperations.a # removed: recovery.c transactional.c logger.c logger/logparser.c logger/logstreamer.c liblladd_a_SOURCES=crc32.c redblack.c lhtable.c doubleLinkedList.c common.c stats.c io.c bufferManager.c linkedlist.c operations.c \ - pageFile.c pageCache.c page.c bufferPool.c blobManager.c recovery2.c truncation.c \ + pageHandle.c pageFile.c pageCache.c page.c bufferPool.c blobManager.c recovery2.c truncation.c \ transactional2.c allocationPolicy.c \ lockManager.c iterator.c consumer.c arrayCollection.c ringbuffer.c fifo.c multiplexer.c graph.c\ logger/logEntry.c logger/logWriter.c logger/inMemoryLog.c logger/logHandle.c logger/logger2.c \ diff --git a/src/lladd/bufferManager/bufferHash.c b/src/lladd/bufferManager/bufferHash.c index fdf5df6..cfe112d 100644 --- a/src/lladd/bufferManager/bufferHash.c +++ b/src/lladd/bufferManager/bufferHash.c @@ -30,6 +30,7 @@ static int pageCount; static Page ** freeList; +// A page is in LRU iff !pending, !pinned static replacementPolicy * lru; static int running; @@ -49,48 +50,90 @@ static void pageSetNode(void * page, node_t * n, void * ignore) { #define pagePendingPtr(p) ((intptr_t*)(&((p)->next))) #define pagePinCountPtr(p) ((intptr_t*)(&((p)->queue))) +#ifdef LONG_RUN + +inline static void checkPageState(Page * p) { + Page * check = LH_ENTRY(find)(cachedPages, &(p->id), sizeof(int)); + if(check) { + int pending = *pagePendingPtr(p); + int pinned = *pagePinCountPtr(p); + if((!pinned) && (!pending)) { + assert(pageGetNode(p, 0)); + } else { + assert(!pageGetNode(p,0)); + } + int notfound = 1; + for(int i = 0; i < freeCount; i++) { + if(freeList[i] == p) { notfound = 0; } + } + assert(notfound); + } else { + assert(!pageGetNode(p,0)); + assert(!*pagePendingPtr(p)); + assert(!*pagePinCountPtr(p)); + int found = 0; + for(int i = 0; i < freeCount; i++) { + if(freeList[i] == p) { found = 1; } + } + assert(found); + } +} + +#else + +inline static void checkPageState(Page * p) { } + +#endif + /** You need to hold mut before calling this. @return the page that was just written back. It will not be in lru or cachedPages after the call returns. */ inline static Page * writeBackOnePage() { - Page * victim = lru->getStale(lru); + // Make sure we have an exclusive lock on victim. assert(victim); - assert(!*pagePendingPtr(victim)); - + assert(! *pagePendingPtr(victim)); + assert(! *pagePinCountPtr(victim)); #ifdef LATCH_SANITY_CHECKING int latched = trywritelock(victim->loadlatch,0); assert(latched); #endif - // We have an exclusive lock on victim. - assert(! *pagePinCountPtr(victim)); + checkPageState(victim); + + // XXX this can double free with (*) lru->remove(lru, victim); Page * old = LH_ENTRY(remove)(cachedPages, &(victim->id), sizeof(int)); assert(old == victim); // printf("Write(%ld)\n", (long)victim->id); pageWrite(victim); - + // Make sure that no one mistakenly thinks this is still a live copy. + victim->id = -1; + +#ifdef LATCH_SANITY_CHECKING // We can release the lock since we just grabbed it to see if // anyone else has pinned the page... the caller holds mut, so // no-one will touch the page for now. -#ifdef LATCH_SANITY_CHECKING unlock(victim->loadlatch); #endif - return victim; } -/** Returns a free page. The page will not be cachedPages or lru. */ +/** Returns a free page. The page will not be in freeList, + cachedPages or lru. */ inline static Page * getFreePage() { Page * ret; if(pageCount < MAX_BUFFER_SIZE) { ret = pageMalloc(); - pageCount++; - } else { + pageFree(ret,-1); + (*pagePinCountPtr(ret)) = 0; + (*pagePendingPtr(ret)) = 0; + pageSetNode(ret,0,0); + pageCount++; + } else { if(!freeCount) { ret = writeBackOnePage(); } else { @@ -98,11 +141,13 @@ inline static Page * getFreePage() { freeList[freeCount-1] = 0; freeCount--; } - assert(ret); if(freeCount < freeLowWater) { pthread_cond_signal(&needFree); } } + assert(!*pagePinCountPtr(ret)); + assert(!*pagePendingPtr(ret)); + assert(!pageGetNode(ret,0)); return ret; } @@ -114,13 +159,10 @@ static void * writeBackWorker(void * ignored) { } if(!running) { break; } Page * victim = writeBackOnePage(); - assert(freeCount < freeListLength); freeList[freeCount] = victim; freeCount++; - - // pthread_mutex_unlock(&mut); - // pthread_mutex_lock(&mut); + checkPageState(victim); } pthread_mutex_unlock(&mut); return 0; @@ -137,12 +179,11 @@ static Page * bhLoadPageImpl(int xid, const int pageid) { pthread_mutex_lock(&mut); // Is the page in cache? - Page * ret = LH_ENTRY(find)(cachedPages, &pageid,sizeof(int)); // Is the page already being read from disk? (If ret == 0, then no...) - while(ret) { + checkPageState(ret); if(*pagePendingPtr(ret)) { pthread_cond_wait(&readComplete, &mut); if(ret->id != pageid) { @@ -153,10 +194,13 @@ static Page * bhLoadPageImpl(int xid, const int pageid) { int locked = tryreadlock(ret->loadlatch,0); assert(locked); #endif + // XXX this can double free with (*) if(! *pagePinCountPtr(ret) ) { + // Then ret is in lru (otherwise it would be pending, or not cached); remove it. lru->remove(lru, ret); } (*pagePinCountPtr(ret))++; + checkPageState(ret); pthread_mutex_unlock(&mut); assert(ret->id == pageid); return ret; @@ -170,25 +214,28 @@ static Page * bhLoadPageImpl(int xid, const int pageid) { ret = getFreePage(); // Add a pending entry to cachedPages to block like-minded threads and writeback - *pagePendingPtr(ret) = 1; + (*pagePendingPtr(ret)) = 1; + check = LH_ENTRY(insert)(cachedPages,&pageid,sizeof(int), ret); assert(!check); + ret->id = pageid; + // Now, it is safe to release the mutex; other threads won't // try to read this page from disk. pthread_mutex_unlock(&mut); - ret->id = pageid; pageRead(ret); pthread_mutex_lock(&mut); *pagePendingPtr(ret) = 0; - (*pagePinCountPtr(ret))++; - - // Would remove rom lru, but getFreePage() guarantees that it isn't + // Would remove from lru, but getFreePage() guarantees that it isn't // there. - //lru->remove(lru, ret); + assert(!pageGetNode(ret, 0)); + + assert(!(*pagePinCountPtr(ret))); + (*pagePinCountPtr(ret))++; #ifdef LATCH_SANITY_CHECKING int locked = tryreadlock(ret->loadlatch, 0); @@ -196,16 +243,18 @@ static Page * bhLoadPageImpl(int xid, const int pageid) { #endif pthread_mutex_unlock(&mut); - pthread_cond_broadcast(&readComplete); assert(ret->id == pageid); + checkPageState (ret); return ret; } static void bhReleasePage(Page * p) { pthread_mutex_lock(&mut); + checkPageState(p); (*pagePinCountPtr(p))--; if(!(*pagePinCountPtr(p))) { + assert(!pageGetNode(p, 0)); lru->insert(lru,p); } #ifdef LATCH_SANITY_CHECKING @@ -236,7 +285,7 @@ static void bhBufDeinit() { free(freeList); - closePageFile(); + // closePageFile(); lru->deinit(lru); bufferPoolDeInit(); } @@ -244,6 +293,10 @@ void bhBufInit() { assert(!running); +#ifdef LONG_RUN + printf("Using expensive bufferHash sanity checking.\n"); +#endif + loadPageImpl = bhLoadPageImpl; releasePage = bhReleasePage; writeBackPage = bhWriteBackPage; @@ -253,7 +306,7 @@ void bhBufInit() { bufferPoolInit(); - openPageFile(); + // openPageFile(); lru = lruFastInit(pageGetNode, pageSetNode, 0); cachedPages = LH_ENTRY(create)(MAX_BUFFER_SIZE); diff --git a/src/lladd/pageFile.c b/src/lladd/pageFile.c index 0b3646e..b71dccf 100644 --- a/src/lladd/pageFile.c +++ b/src/lladd/pageFile.c @@ -24,14 +24,15 @@ static int stable = -1; static pthread_mutex_t stable_mutex; - -static pageid_t myLseekNoLock(int f, pageid_t offset, int whence); +static void pfForcePageFile(); +static void pfClosePageFile(); +inline static pageid_t myLseekNoLock(int f, pageid_t offset, int whence); static int oldOffset = -1; int pageFile_isDurable = 1; -void pageRead(Page *ret) { +static void pfPageRead(Page *ret) { pageid_t pageoffset; pageid_t offset; @@ -72,7 +73,7 @@ void pageRead(Page *ret) { } /** @todo need to sync the page file to disk occasionally, so that the dirty page table can be kept up to date. */ -void pageWrite(Page * ret) { +static void pfPageWrite(Page * ret) { /** If the page is clean, there's no reason to write it out. */ assert(ret->dirty == dirtyPages_isDirty(ret)); if(!ret->dirty) { @@ -128,6 +129,11 @@ void pageWrite(Page * ret) { /** @todo O_DIRECT is broken in older linuxes (eg 2.4). The build script should disable it on such platforms. */ void openPageFile() { + pageRead = pfPageRead; + pageWrite = pfPageWrite; + forcePageFile = pfForcePageFile; + closePageFile = pfClosePageFile; + DEBUG("Opening storefile.\n"); #ifdef PAGE_FILE_O_DIRECT @@ -150,7 +156,7 @@ void openPageFile() { } -void forcePageFile() { +static void pfForcePageFile() { if(pageFile_isDurable) { #ifndef PAGE_FILE_O_DIRECT #ifdef HAVE_FDATASYNC @@ -162,7 +168,7 @@ void forcePageFile() { } } -void closePageFile() { +static void pfClosePageFile() { forcePageFile(); int ret = close(stable); diff --git a/src/lladd/pageFile.h b/src/lladd/pageFile.h index fdfee00..554caeb 100644 --- a/src/lladd/pageFile.h +++ b/src/lladd/pageFile.h @@ -1,50 +1,13 @@ #ifndef __PAGE_FILE_H #define __PAGE_FILE_H +/** + @todo this #include should be removed; almost nothing should + include pageFile.h +*/ +#include #include "page.h" -/** - * Write page to disk, including correct LSN. Doing so may require a - * call to logSync(). There is not much that can be done to avoid - * this call right now. In the future, it might make sense to check - * to see if some other page can be kicked, in order to avoid the log - * flush. - * - * This funciton is automatically called immediately before a page is - * evicted from cache. Operation implementors, and normal users - * should never have to call this routine. - * - * @see bufferManager.c for the implementation of pageWrite - * - * @param dat The page to be flushed to disk. - */ -void pageWrite(Page * dat); - -extern int pageFile_isDurable; - -/** - Read a page from disk. This bypassess the cache, and should only be - called by bufferManager and blobManager. To retrieve a page under - normal circumstances, use loadPage() instead. - - Operation implementors and normal users should never need to call - this routine. - - @param ret A page struct, with id set correctly. The rest of this - struct will be overwritten by pageMap. - - @see bufferManager.c for the implementation of pageRead. - - @todo pageRead and pageWrite should be static, but pageCache needs - to call them. -*/ -void pageRead(Page * ret); - -void forcePageFile(); - void openPageFile(); -void closePageFile(); - -void finalize(Page * p); #endif /* __PAGE_FILE_H */ diff --git a/src/lladd/transactional2.c b/src/lladd/transactional2.c index 9bb3e1d..5bbf0bb 100644 --- a/src/lladd/transactional2.c +++ b/src/lladd/transactional2.c @@ -1,16 +1,21 @@ #include +#include +#include +#include + #include #include "latches.h" #include - #include #include #include #include #include +#include "pageFile.h" #include "page.h" #include #include +#include #include #include #include "page/indirect.h" @@ -101,7 +106,25 @@ void setupOperationsTable() { */ } - +// @todo this factory stuff doesn't really belong here... +static stasis_handle_t * fast_factory(lsn_t off, lsn_t len, void * ignored) { + stasis_handle_t * h = stasis_handle(open_memory)(off); + //h = stasis_handle(open_debug)(h); + stasis_write_buffer_t * w = h->append_buffer(h, len); + w->h->release_write_buffer(w); + return h; +} +typedef struct sf_args { + char * filename; + int openMode; + int filePerm; +} sf_args; +static stasis_handle_t * slow_factory(void * argsP) { + sf_args * args = (sf_args*) argsP; + stasis_handle_t * h = stasis_handle(open_file)(0, args->filename, args->openMode, args->filePerm); + //h = stasis_handle(open_debug)(h); + return h; +} int Tinit() { pthread_mutex_init(&transactional_2_mutex, NULL); numActiveXactions = 0; @@ -112,6 +135,21 @@ int Tinit() { dirtyPagesInit(); LogInit(loggerType); pageInit(); + + struct sf_args * slow_arg = malloc(sizeof(sf_args)); + slow_arg->filename = STORE_FILE; +#ifdef PAGE_FILE_O_DIRECT + slow_arg->openMode = O_CREAT | O_RDWR | O_DIRECT; +#else + slow_arg->openMode = O_CREAT | O_RDWR; +#endif + slow_arg->filePerm = FILE_PERM; + // Allow 4MB of outstanding writes. + stasis_handle_t * pageFile = + stasis_handle(open_non_blocking)(slow_factory, slow_arg, fast_factory, + NULL, 20, PAGE_SIZE * 1024, 1024); + pageHandleOpen(pageFile); + // openPageFile(); bufInit(bufferManagerType); pageOperationsInit(); initNestedTopActions(); @@ -319,6 +357,7 @@ int Tdeinit() { truncationDeinit(); ThashDeinit(); bufDeinit(); + closePageFile(); pageDeinit(); LogDeinit(); dirtyPagesDeinit();