diff --git a/lladd/constants.h b/lladd/constants.h index 265d385..8182dbe 100644 --- a/lladd/constants.h +++ b/lladd/constants.h @@ -94,6 +94,7 @@ terms specified in this license. #define BUFFER_MANAGER_REOPEN 0 #define BUFFER_MANAGER_HASH 1 #define BUFFER_MANAGER_MEM_ARRAY 2 +#define BUFFER_MANAGER_DEPRECATED_HASH 3 #define LOG_TO_FILE 0 #define LOG_TO_MEMORY 1 diff --git a/lladd/pageCache.h b/lladd/pageCache.h index 1d2e3f7..829d6b9 100644 --- a/lladd/pageCache.h +++ b/lladd/pageCache.h @@ -3,9 +3,6 @@ #include -#define RO 0 -#define RW 1 - //Page * getPage(int pageid, int locktype); /** Implements lladd's caching policy. Looks up pageid in the cache. @@ -18,14 +15,17 @@ Engine Results" If you would like to implement your own caching policy, implement - the three functions below. They are relatively straightforward. + the functions below. They are relatively straightforward. Note that pageCache does not perform any file I/O of its own. + The implementation of this module does not need to be threadsafe. + @todo pageCache should not include page.h at all. It should treat pages as (int, void*) pairs. (But the page struct contains the pointers that pageCache manipulates..) */ + void pageCacheInit(Page * first); void pageCacheDeinit(); diff --git a/lladd/replacementPolicy.h b/lladd/replacementPolicy.h new file mode 100644 index 0000000..6f7714a --- /dev/null +++ b/lladd/replacementPolicy.h @@ -0,0 +1,35 @@ +/** + @file + + Implements cache replacement policies. Eventually, this could be + extended to support application specific caching schemes. + + @todo Stasis used to use LRU-2S. LRU-2S is described in Markatos + "On Caching Searching Engine Results". (This needs to be + re-implemented properly.) + + For now, Stasis uses plain-old LRU. DB-MIN would be an interesting + extension. + + If you would like to implement your own caching policy, implement + the functions below. They are relatively straightforward. Note + that replacementPolicy implementations do not perform any file I/O + of their own. + + The implementation of this module does not need to be threadsafe. + +*/ + + +typedef struct replacementPolicy { + struct replacementPolicy* (*init)(); + void (*deinit) (struct replacementPolicy* impl); + void (*hit) (struct replacementPolicy* impl, int id); + void*(*getStale)(struct replacementPolicy* impl); + void*(*remove) (struct replacementPolicy* impl, int id); + void (*insert) (struct replacementPolicy* impl, int id, void * page); + void * impl; +} replacementPolicy; + +replacementPolicy * lruInit(); + diff --git a/src/lladd/Makefile.am b/src/lladd/Makefile.am index 65bf01f..ab6e363 100644 --- a/src/lladd/Makefile.am +++ b/src/lladd/Makefile.am @@ -19,7 +19,8 @@ liblladd_a_SOURCES=crc32.c redblack.c lhtable.c common.c stats.c io.c bufferMana operations/pageOrientedListNTA.c operations/bTree.c \ operations/regions.c \ io/rangeTracker.c io/memory.c io/file.c io/non_blocking.c io/debug.c \ - bufferManager/pageArray.c + bufferManager/pageArray.c bufferManager/bufferHash.c \ + replacementPolicy/lru.c # page/header.c logger/logMemory.c \ ringbuffer.c \ asdfas #operations/lladdhash.c #AM_CFLAGS= -g -Wall -pedantic -std=gnu99 diff --git a/src/lladd/bufferManager.c b/src/lladd/bufferManager.c index fb42e99..880a47b 100644 --- a/src/lladd/bufferManager.c +++ b/src/lladd/bufferManager.c @@ -47,8 +47,13 @@ terms specified in this license. #include +#define RO 0 +#define RW 1 + #ifdef PROFILE_LATCHES_WRITE_ONLY + + #define _GNU_SOURCE #include // Need _GNU_SOURCE for asprintf #include @@ -61,7 +66,7 @@ terms specified in this license. #include #include "bufferManager/pageArray.h" - +#include "bufferManager/bufferHash.h" #include #include @@ -522,12 +527,15 @@ int bufInit(int type) { type = lastType; } lastType = type; - if(type == BUFFER_MANAGER_HASH) { + if(type == BUFFER_MANAGER_DEPRECATED_HASH) { bufManBufInit(); return 0; } else if (type == BUFFER_MANAGER_MEM_ARRAY) { paBufInit(); return 0; + } else if (type == BUFFER_MANAGER_HASH) { + bhBufInit(); + return 0; } else { // XXX error handling abort(); diff --git a/src/lladd/bufferManager/bufferHash.c b/src/lladd/bufferManager/bufferHash.c new file mode 100644 index 0000000..9060b68 --- /dev/null +++ b/src/lladd/bufferManager/bufferHash.c @@ -0,0 +1,262 @@ +#include +#include +#include "bufferManager/bufferHash.h" +//#include +#include +#include +#include +#include "latches.h" + + +#include +#include "pageFile.h" + +#include +#include + +#include +static struct LH_ENTRY(table) * cachedPages; +static struct LH_ENTRY(table) * pendingPages; + +static pthread_t worker; +static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t readComplete = PTHREAD_COND_INITIALIZER; +static pthread_cond_t haveFree = PTHREAD_COND_INITIALIZER; +static pthread_cond_t needFree = PTHREAD_COND_INITIALIZER; + +static int freeLowWater; +static int freeListLength; +static int freeCount; +static int pageCount; + +static Page ** freeList; + +static replacementPolicy * lru; + +static int running; + + +static Page * getFreePage() { + Page * ret; + if(pageCount < MAX_BUFFER_SIZE) { + ret = pageMalloc(); + pageCount++; + } else { + while(!freeCount) { + pthread_cond_signal(&needFree); + pthread_cond_wait(&haveFree, &mut); + } + ret = freeList[freeCount-1]; + freeList[freeCount-1] = 0; + freeCount--; + assert(ret); + if(freeCount < freeLowWater) { + pthread_cond_signal(&needFree); + } + } + return ret; +} + +static void * writeBackWorker(void * ignored) { + pthread_mutex_lock(&mut); + while(1) { + while(running && (freeCount == freeListLength || pageCount < MAX_BUFFER_SIZE)) { + pthread_cond_wait(&needFree, &mut); + } + if(!running) { break; } + assert(freeCount < freeListLength); + + Page * victim = lru->getStale(lru); + assert(victim); + + int i = 0; + while(!trywritelock(victim->loadlatch,0)) { + // someone's pinned this page... + lru->hit(lru, victim->id); + victim = lru->getStale(lru); + i++; + if(i > MAX_BUFFER_SIZE) { + printf("Couldn't find a non-pinned page to write back. Aborting.\n"); + abort(); + } + if(! i % 100 ) { + printf("Severe thrashing detected. :(\n"); + } + } + + if(victim) { + // We have a write lock on victim. + lru->remove(lru, victim->id); + Page * old = LH_ENTRY(remove)(cachedPages, &(victim->id), sizeof(int)); + assert(old == victim); + + pageWrite(victim); + + freeList[freeCount] = victim; + freeCount++; + + unlock(victim->loadlatch); + pthread_mutex_unlock(&mut); + pthread_cond_signal(&haveFree); + pthread_mutex_lock(&mut); + } + } + pthread_mutex_unlock(&mut); + return 0; +} + +static Page * bhLoadPageImpl(int xid, int pageid) { + + // Note: Calls to loadlatch in this function violate lock order, but + // should be safe, since we make sure no one can have a writelock + // before we grab the readlock. + + void* check; + + pthread_mutex_lock(&mut); + + // Is the page in cache? + + Page * ret = LH_ENTRY(find)(cachedPages, &pageid,sizeof(int)); + + if(ret) { + int locked = tryreadlock(ret->loadlatch, 0); + assert(locked); + pthread_mutex_unlock(&mut); + assert(ret->id == pageid); + return ret; + } + + // Is the page already being read from disk? + + intptr_t * pending = (intptr_t*) LH_ENTRY(find)(pendingPages,&pageid,sizeof(int)); + + while(pending) { + pthread_cond_wait(&readComplete, &mut); + ret = LH_ENTRY(find)(cachedPages, &pageid, sizeof(int)); + pending = LH_ENTRY(find)(pendingPages,&pageid,sizeof(int)); + if(ret) { + int locked = tryreadlock(ret->loadlatch,0); + assert(locked); + pthread_mutex_unlock(&mut); + assert(ret->id == pageid); + return ret; + } + } + + assert(!pending && ! ret); + + // Either: + + // The page is not in cache, and was not pending. + + // -or- + + // The page was read then evicted since this function was + // called. It is now this thread's responsibility to read + // the page from disk. + + // Add an entry to pending to block like-minded threads. + + check = LH_ENTRY(insert)(pendingPages,&pageid,sizeof(int), (void*)1); + assert(!check); + + // Now, it is safe to release the mutex; other threads won't + // try to read this page from disk. + + // Remove a page from the freelist. + ret = getFreePage(); + + pthread_mutex_unlock(&mut); + + ret->id = pageid; + pageRead(ret); + + pthread_mutex_lock(&mut); + + check = LH_ENTRY(remove)(pendingPages, &pageid,sizeof(int)); + assert(check); + check = LH_ENTRY(insert)(cachedPages, &pageid, sizeof(int), ret); + assert(!check); + pthread_cond_broadcast(&readComplete); + lru->insert(lru, ret->id, ret); + int locked = tryreadlock(ret->loadlatch, 0); + assert(locked); + pthread_mutex_unlock(&mut); + + assert(ret->id == pageid); + return ret; +} +static void bhReleasePage(Page * p) { + pthread_mutex_lock(&mut); + lru->hit(lru, p->id); + unlock(p->loadlatch); + pthread_mutex_unlock(&mut); +} +static void bhWriteBackPage(Page * p) { + pageWrite(p); //XXX Correct?!? +} +static void bhForcePages() { + forcePageFile(); // XXX Correct?!? +} +static void bhBufDeinit() { + running = 0; + + pthread_cond_signal(&needFree); // Wake up the writeback thread so it will exit. + pthread_join(worker, 0); + + struct LH_ENTRY(list) iter; + const struct LH_ENTRY(pair_t) * next; + LH_ENTRY(openlist)(cachedPages, &iter); + + while((next = LH_ENTRY(readlist)(&iter))) { + pageWrite((next->value)); + } + + LH_ENTRY(closelist)(&iter); + LH_ENTRY(destroy)(cachedPages); + + LH_ENTRY(openlist(pendingPages, &iter)); + if((next = LH_ENTRY(readlist)(&iter))) { + abort(); // Pending loadPage during Tdeinit()! + } + LH_ENTRY(closelist)(&iter); + LH_ENTRY(destroy)(pendingPages); + + free(freeList); + + closePageFile(); + lru->deinit(lru); + bufferPoolDeInit(); +} +void bhBufInit() { + + assert(!running); + + loadPageImpl = bhLoadPageImpl; + releasePage = bhReleasePage; + writeBackPage = bhWriteBackPage; + forcePages = bhForcePages; + bufDeinit = bhBufDeinit; + simulateBufferManagerCrash = bhBufDeinit; + + bufferPoolInit(); + + openPageFile(); + lru = lruInit(); + + cachedPages = LH_ENTRY(create)(MAX_BUFFER_SIZE); + pendingPages = LH_ENTRY(create)(MAX_BUFFER_SIZE/10); + + freeListLength = MAX_BUFFER_SIZE / 10; + freeLowWater = freeListLength / 2; + freeCount = 0; + pageCount = 0; + + freeList = calloc(freeListLength, sizeof(Page*)); + + running = 1; + + pthread_create(&worker, 0, writeBackWorker, 0); + +} diff --git a/src/lladd/bufferManager/bufferHash.h b/src/lladd/bufferManager/bufferHash.h new file mode 100644 index 0000000..efb3bb8 --- /dev/null +++ b/src/lladd/bufferManager/bufferHash.h @@ -0,0 +1 @@ +void bhBufInit(); diff --git a/src/lladd/replacementPolicy.c b/src/lladd/replacementPolicy.c new file mode 100644 index 0000000..e2ba437 --- /dev/null +++ b/src/lladd/replacementPolicy.c @@ -0,0 +1,9 @@ +#include +#include +#include + +replacementPolicy * initReplacementPolicy(int impl) { + if(IMPL = REPLACEMENT_POLICY_LRU) { + + } +}