diff --git a/src/stasis/bufferManager.c b/src/stasis/bufferManager.c index 14fc59f..8aa50e0 100644 --- a/src/stasis/bufferManager.c +++ b/src/stasis/bufferManager.c @@ -57,16 +57,8 @@ terms specified in this license. #include #include -#include -#include - -//#include -#include - -#include #include -#include #undef loadPage #undef releasePage @@ -130,7 +122,6 @@ compensated_function Page * __profile_loadPage(int xid, pageid_t pageid, char * } - compensated_function void __profile_releasePage(Page * p) { pthread_mutex_lock(&profile_load_mutex); @@ -157,30 +148,23 @@ compensated_function void __profile_releasePage(Page * p) { #endif -Page * (*loadPageImpl)(int xid, pageid_t pageid, pagetype_t type) = 0; -Page * (*loadUninitPageImpl)(int xid, pageid_t pageid) = 0; -Page * (*getCachedPageImpl)(int xid, pageid_t pageid) = 0; -void (*releasePageImpl)(Page * p) = 0; -int (*writeBackPage)(pageid_t page) = 0; -void (*forcePages)() = 0; -void (*forcePageRange)(pageid_t start, pageid_t stop) = 0; -void (*stasis_buffer_manager_close)() = 0; -void (*stasis_buffer_manager_simulate_crash)() = 0; - Page * loadPage(int xid, pageid_t pageid) { // This lock is released at Tcommit() if(globalLockManager.readLockPage) { globalLockManager.readLockPage(xid, pageid); } - return loadPageImpl(xid, pageid, UNKNOWN_TYPE_PAGE); + stasis_buffer_manager_t * bm = stasis_runtime_buffer_manager(); + return bm->loadPageImpl(bm, xid, pageid, UNKNOWN_TYPE_PAGE); } Page * loadPageOfType(int xid, pageid_t pageid, pagetype_t type) { if(globalLockManager.readLockPage) { globalLockManager.readLockPage(xid, pageid); } - return loadPageImpl(xid, pageid, type); + stasis_buffer_manager_t * bm = stasis_runtime_buffer_manager(); + return bm->loadPageImpl(bm, xid, pageid, type); } Page * loadUninitializedPage(int xid, pageid_t pageid) { // This lock is released at Tcommit() if(globalLockManager.readLockPage) { globalLockManager.readLockPage(xid, pageid); } - return loadUninitPageImpl(xid, pageid); + stasis_buffer_manager_t * bm = stasis_runtime_buffer_manager(); + return bm->loadUninitPageImpl(bm, xid, pageid); } Page * loadPageForOperation(int xid, pageid_t pageid, int op) { @@ -200,31 +184,10 @@ Page * loadPageForOperation(int xid, pageid_t pageid, int op) { } Page * getCachedPage(int xid, pageid_t pageid) { - return getCachedPageImpl(xid, pageid); + stasis_buffer_manager_t * bm = stasis_runtime_buffer_manager(); + return bm->getCachedPageImpl(bm, xid, pageid); } void releasePage(Page * p) { - releasePageImpl(p); -} - -int stasis_buffer_manager_open(int type, stasis_page_handle_t * ph) { - bufferManagerType = type; - static int lastType = 0; - if(type == BUFFER_MANAGER_REOPEN) { - type = lastType; - } - lastType = type; - if(type == BUFFER_MANAGER_DEPRECATED_HASH) { - stasis_buffer_manager_deprecated_open(ph); - return 0; - } else if (type == BUFFER_MANAGER_MEM_ARRAY) { - stasis_buffer_manager_mem_array_open(); - ph->close(ph); // XXX should never have been opened in the first place! - return 0; - } else if (type == BUFFER_MANAGER_HASH) { - stasis_buffer_manager_hash_open(ph); - return 0; - } else { - // XXX error handling - abort(); - } + stasis_buffer_manager_t * bm = stasis_runtime_buffer_manager(); + bm->releasePageImpl(bm, p); } diff --git a/src/stasis/bufferManager/bufferHash.c b/src/stasis/bufferManager/bufferHash.c index d86f1bb..6dd3095 100644 --- a/src/stasis/bufferManager/bufferHash.c +++ b/src/stasis/bufferManager/bufferHash.c @@ -15,19 +15,17 @@ #include #include -#include #include #include #include -#include #include #include #include //#define LATCH_SANITY_CHECKING -struct stasis_buffer_hash_t { +typedef struct { struct LH_ENTRY(table) * cachedPages; pthread_t worker; pthread_mutex_t mut; @@ -36,30 +34,10 @@ struct stasis_buffer_hash_t { pageid_t pageCount; replacementPolicy *lru; stasis_buffer_pool_t *buffer_pool; - stasis_page_handle_t *page_handler; + stasis_page_handle_t *page_handle; int flushing; int running; -}; - -static struct LH_ENTRY(table) * cachedPages; - -static pthread_t worker; -static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER; -static pthread_cond_t readComplete = PTHREAD_COND_INITIALIZER; -static pthread_cond_t needFree = PTHREAD_COND_INITIALIZER; - -static pageid_t pageCount; - -// A page is in LRU iff !pending, !pinned -static replacementPolicy * lru; - -static stasis_buffer_pool_t * stasis_buffer_pool; - -static stasis_page_handle_t * page_handle; - -static int flushing; - -static int running; +} stasis_buffer_hash_t; typedef struct LL_ENTRY(node_t) node_t; @@ -79,6 +57,7 @@ static inline intptr_t* pagePinCountPtr(Page * p) { return ((intptr_t*)(&((p)->queue))); } static inline int needFlush() { + // XXX need to remove call to stasis_runtime* pageid_t count = stasis_dirty_page_table_dirty_count(stasis_runtime_dirty_page_table()); const pageid_t needed = 1000; //MAX_BUFFER_SIZE / 5; if(count > needed) { @@ -121,9 +100,9 @@ inline static void checkPageState(Page * p) { } #endif -inline static int tryToWriteBackPage(pageid_t page) { - - Page * p = LH_ENTRY(find)(cachedPages, &page, sizeof(page)); +inline static int tryToWriteBackPage(stasis_buffer_manager_t *bm, pageid_t page) { + stasis_buffer_hash_t * bh = bm->impl; + Page * p = LH_ENTRY(find)(bh->cachedPages, &page, sizeof(page)); if(!p) { return ENOENT; } @@ -133,29 +112,25 @@ inline static int tryToWriteBackPage(pageid_t page) { return EBUSY; } DEBUG("Write(%ld)\n", (long)victim->id); - page_handle->write(page_handle, p); /// XXX pageCleanup and pageFlushed might be heavyweight. - -// int locked = trywritelock(p->rwlatch,0); -// assert(locked); -// assert(!stasis_dirty_page_table_is_dirty(stasis_runtime_dirty_page_table(), p)); -// unlock(p->rwlatch); + bh->page_handle->write(bh->page_handle, p); /// XXX pageCleanup and pageFlushed might be heavyweight. return 0; } /** Returns a free page. The page will not be in freeList, cachedPages or lru. */ -inline static Page * getFreePage() { +inline static Page * getFreePage(stasis_buffer_manager_t *bm) { + stasis_buffer_hash_t * bh = bm->impl; Page * ret; - if(pageCount < MAX_BUFFER_SIZE) { - ret = stasis_buffer_pool_malloc_page(stasis_buffer_pool); - stasis_buffer_pool_free_page(stasis_buffer_pool, ret,-1); + if(bh->pageCount < MAX_BUFFER_SIZE) { + ret = stasis_buffer_pool_malloc_page(bh->buffer_pool); + stasis_buffer_pool_free_page(bh->buffer_pool, ret,-1); (*pagePinCountPtr(ret)) = 0; (*pagePendingPtr(ret)) = 0; pageSetNode(ret,0,0); - pageCount++; + bh->pageCount++; } else { - while((ret = lru->getStale(lru))) { + while((ret = bh->lru->getStale(bh->lru))) { // Make sure we have an exclusive lock on victim. if(!ret) { printf("bufferHash.c: Cannot find free page for application request.\nbufferHash.c: This should not happen unless all pages have been pinned.\nbufferHash.c: Crashing."); @@ -164,19 +139,19 @@ inline static Page * getFreePage() { assert(!*pagePinCountPtr(ret)); assert(!*pagePendingPtr(ret)); if(ret->dirty) { - pthread_mutex_unlock(&mut); + pthread_mutex_unlock(&bh->mut); DEBUG("Blocking app thread"); // We don't really care if this flush happens, so long as *something* is being written back, so ignore the EAGAIN it could return. // (Besides, once this returns EAGAIN twice, we know that some other flush concurrently was initiated + returned, so we're good to go...) stasis_dirty_page_table_flush(stasis_runtime_dirty_page_table()); - pthread_mutex_lock(&mut); + pthread_mutex_lock(&bh->mut); } else { break; } } - lru->remove(lru, ret); - Page * check = LH_ENTRY(remove)(cachedPages, &ret->id, sizeof(ret->id)); + bh->lru->remove(bh->lru, ret); + Page * check = LH_ENTRY(remove)(bh->cachedPages, &ret->id, sizeof(ret->id)); assert(check == ret); } assert(!*pagePinCountPtr(ret)); @@ -186,31 +161,34 @@ inline static Page * getFreePage() { return ret; } -static void * writeBackWorker(void * ignored) { - pthread_mutex_lock(&mut); +static void * writeBackWorker(void * bmp) { + stasis_buffer_manager_t* bm = bmp; + stasis_buffer_hash_t * bh = bm->impl; + pthread_mutex_lock(&bh->mut); while(1) { - while(running && (!needFlush())) { - flushing = 0; + while(bh->running && (!needFlush())) { + bh->flushing = 0; DEBUG("Sleeping in write back worker (count = %lld)\n", stasis_dirty_page_table_dirty_count(stasis_runtime_dirty_page_table())); - pthread_cond_wait(&needFree, &mut); + pthread_cond_wait(&bh->needFree, &bh->mut); DEBUG("Woke write back worker (count = %lld)\n", stasis_dirty_page_table_dirty_count(stasis_runtime_dirty_page_table())); - flushing = 1; + bh->flushing = 1; } - if(!running) { break; } - pthread_mutex_unlock(&mut); + if(!bh->running) { break; } + pthread_mutex_unlock(&bh->mut); DEBUG("Calling flush\n"); // ignore ret val; this flush is for performance, not correctness. - stasis_dirty_page_table_flush(stasis_runtime_dirty_page_table()); - pthread_mutex_lock(&mut); + stasis_dirty_page_table_flush(stasis_runtime_dirty_page_table()); // XXX no call to stasis_runtime_* + pthread_mutex_lock(&bh->mut); } - pthread_mutex_unlock(&mut); + pthread_mutex_unlock(&bh->mut); return 0; } -static Page * bhGetCachedPage(int xid, const pageid_t pageid) { - pthread_mutex_lock(&mut); +static Page * bhGetCachedPage(stasis_buffer_manager_t* bm, int xid, const pageid_t pageid) { + stasis_buffer_hash_t * bh = bm->impl; + pthread_mutex_lock(&bh->mut); // Is the page in cache? - Page * ret = LH_ENTRY(find)(cachedPages, &pageid, sizeof(pageid)); + Page * ret = LH_ENTRY(find)(bh->cachedPages, &pageid, sizeof(pageid)); if(ret) { checkPageState(ret); #ifdef LATCH_SANITY_CHECKING @@ -220,7 +198,7 @@ static Page * bhGetCachedPage(int xid, const pageid_t pageid) { if(!*pagePendingPtr(ret)) { if(!*pagePinCountPtr(ret) ) { // Then ret is in lru (otherwise it would be pending, or not cached); remove it. - lru->remove(lru, ret); + bh->lru->remove(bh->lru, ret); } (*pagePinCountPtr(ret))++; checkPageState(ret); @@ -229,11 +207,12 @@ static Page * bhGetCachedPage(int xid, const pageid_t pageid) { ret = 0; } } - pthread_mutex_unlock(&mut); + pthread_mutex_unlock(&bh->mut); return ret; } -static Page * bhLoadPageImpl_helper(int xid, const pageid_t pageid, int uninitialized, pagetype_t type) { +static Page * bhLoadPageImpl_helper(stasis_buffer_manager_t* bm, int xid, const pageid_t pageid, int uninitialized, pagetype_t type) { + stasis_buffer_hash_t * bh = bm->impl; // Note: Calls to loadlatch in this function violate lock order, but // should be safe, since we make sure no one can have a writelock @@ -241,10 +220,10 @@ static Page * bhLoadPageImpl_helper(int xid, const pageid_t pageid, int uninitia void* check; - pthread_mutex_lock(&mut); + pthread_mutex_lock(&bh->mut); // Is the page in cache? - Page * ret = LH_ENTRY(find)(cachedPages, &pageid,sizeof(pageid)); + Page * ret = LH_ENTRY(find)(bh->cachedPages, &pageid,sizeof(pageid)); do { @@ -253,9 +232,9 @@ static Page * bhLoadPageImpl_helper(int xid, const pageid_t pageid, int uninitia while(ret) { checkPageState(ret); if(*pagePendingPtr(ret)) { - pthread_cond_wait(&readComplete, &mut); + pthread_cond_wait(&bh->readComplete, &bh->mut); if(ret->id != pageid) { - ret = LH_ENTRY(find)(cachedPages, &pageid, sizeof(pageid)); + ret = LH_ENTRY(find)(bh->cachedPages, &pageid, sizeof(pageid)); } } else { #ifdef LATCH_SANITY_CHECKING @@ -264,11 +243,11 @@ static Page * bhLoadPageImpl_helper(int xid, const pageid_t pageid, int uninitia #endif if(! *pagePinCountPtr(ret) ) { // Then ret is in lru (otherwise it would be pending, or not cached); remove it. - lru->remove(lru, ret); + bh->lru->remove(bh->lru, ret); } (*pagePinCountPtr(ret))++; checkPageState(ret); - pthread_mutex_unlock(&mut); + pthread_mutex_unlock(&bh->mut); assert(ret->id == pageid); return ret; } @@ -278,10 +257,10 @@ static Page * bhLoadPageImpl_helper(int xid, const pageid_t pageid, int uninitia assert(!ret); // Remove a page from the freelist. This may cause writeback, and release our latch. - Page * ret2 = getFreePage(); + Page * ret2 = getFreePage(bm); // Did some other thread put the page in cache for us? - ret = LH_ENTRY(find)(cachedPages, &pageid,sizeof(pageid)); + ret = LH_ENTRY(find)(bh->cachedPages, &pageid,sizeof(pageid)); if(!ret) { @@ -295,8 +274,8 @@ static Page * bhLoadPageImpl_helper(int xid, const pageid_t pageid, int uninitia break; } else { // Put the page we were about to evict back in cached pages - LH_ENTRY(insert)(cachedPages, &ret2->id, sizeof(ret2->id), ret2); - lru->insert(lru, ret2); + LH_ENTRY(insert)(bh->cachedPages, &ret2->id, sizeof(ret2->id), ret2); + bh->lru->insert(bh->lru, ret2); // On the next loop iteration, we'll probably return the page the other thread inserted for us. } // try again. @@ -305,7 +284,7 @@ static Page * bhLoadPageImpl_helper(int xid, const pageid_t pageid, int uninitia // Add a pending entry to cachedPages to block like-minded threads and writeback (*pagePendingPtr(ret)) = (void*)1; - check = LH_ENTRY(insert)(cachedPages,&pageid,sizeof(pageid), ret); + check = LH_ENTRY(insert)(bh->cachedPages,&pageid,sizeof(pageid), ret); assert(!check); ret->id = pageid; @@ -314,11 +293,11 @@ static Page * bhLoadPageImpl_helper(int xid, const pageid_t pageid, int uninitia // Now, it is safe to release the mutex; other threads won't // try to read this page from disk. - pthread_mutex_unlock(&mut); + pthread_mutex_unlock(&bh->mut); - page_handle->read(page_handle, ret, type); + bh->page_handle->read(bh->page_handle, ret, type); - pthread_mutex_lock(&mut); + pthread_mutex_lock(&bh->mut); } else { memset(ret->memAddr,0,PAGE_SIZE); @@ -340,58 +319,63 @@ static Page * bhLoadPageImpl_helper(int xid, const pageid_t pageid, int uninitia assert(locked); #endif - pthread_mutex_unlock(&mut); - pthread_cond_broadcast(&readComplete); + pthread_mutex_unlock(&bh->mut); + pthread_cond_broadcast(&bh->readComplete); // TODO Improve writeback policy - if((!flushing) && needFlush()) { - pthread_cond_signal(&needFree); + if((!bh->flushing) && needFlush()) { + pthread_cond_signal(&bh->needFree); } assert(ret->id == pageid); checkPageState (ret); return ret; } -static Page * bhLoadPageImpl(int xid, const pageid_t pageid, pagetype_t type) { - return bhLoadPageImpl_helper(xid,pageid,0, type); +static Page * bhLoadPageImpl(stasis_buffer_manager_t *bm, int xid, const pageid_t pageid, pagetype_t type) { + return bhLoadPageImpl_helper(bm, xid, pageid, 0, type); } -static Page * bhLoadUninitPageImpl(int xid, const pageid_t pageid) { - return bhLoadPageImpl_helper(xid,pageid,1,UNKNOWN_TYPE_PAGE); // 1 means dont care about preimage of page. +static Page * bhLoadUninitPageImpl(stasis_buffer_manager_t *bm, int xid, const pageid_t pageid) { + return bhLoadPageImpl_helper(bm, xid,pageid,1,UNKNOWN_TYPE_PAGE); // 1 means dont care about preimage of page. } -static void bhReleasePage(Page * p) { - pthread_mutex_lock(&mut); +static void bhReleasePage(stasis_buffer_manager_t * bm, Page * p) { + stasis_buffer_hash_t * bh = bm->impl; + pthread_mutex_lock(&bh->mut); checkPageState(p); (*pagePinCountPtr(p))--; if(!(*pagePinCountPtr(p))) { assert(!pageGetNode(p, 0)); - lru->insert(lru,p); + bh->lru->insert(bh->lru,p); } #ifdef LATCH_SANITY_CHECKING unlock(p->loadlatch); #endif - pthread_mutex_unlock(&mut); + pthread_mutex_unlock(&bh->mut); } -static int bhWriteBackPage(pageid_t pageid) { - pthread_mutex_lock(&mut); - int ret = tryToWriteBackPage(pageid); - pthread_mutex_unlock(&mut); +static int bhWriteBackPage(stasis_buffer_manager_t* bm, pageid_t pageid) { + stasis_buffer_hash_t * bh = bm->impl; + pthread_mutex_lock(&bh->mut); + int ret = tryToWriteBackPage(bm, pageid); + pthread_mutex_unlock(&bh->mut); return ret; } -static void bhForcePages() { - page_handle->force_file(page_handle); +static void bhForcePages(stasis_buffer_manager_t* bm) { + stasis_buffer_hash_t * bh = bm->impl; + bh->page_handle->force_file(bh->page_handle); } -static void bhForcePageRange(pageid_t start, pageid_t stop) { - page_handle->force_range(page_handle, start, stop); +static void bhForcePageRange(stasis_buffer_manager_t *bm, pageid_t start, pageid_t stop) { + stasis_buffer_hash_t * bh = bm->impl; + bh->page_handle->force_range(bh->page_handle, start, stop); } -static void bhBufDeinit() { - pthread_mutex_lock(&mut); - running = 0; - pthread_mutex_unlock(&mut); +static void bhBufDeinit(stasis_buffer_manager_t * bm) { + stasis_buffer_hash_t * bh = bm->impl; + pthread_mutex_lock(&bh->mut); + bh->running = 0; + pthread_mutex_unlock(&bh->mut); - pthread_cond_signal(&needFree); // Wake up the writeback thread so it will exit. - pthread_join(worker, 0); + pthread_cond_signal(&bh->needFree); // Wake up the writeback thread so it will exit. + pthread_join(bh->worker, 0); // XXX flush range should return an error number, which we would check. (Right now, it aborts...) int ret = stasis_dirty_page_table_flush(stasis_runtime_dirty_page_table()); @@ -399,7 +383,7 @@ static void bhBufDeinit() { struct LH_ENTRY(list) iter; const struct LH_ENTRY(pair_t) * next; - LH_ENTRY(openlist)(cachedPages, &iter); + LH_ENTRY(openlist)(bh->cachedPages, &iter); while((next = LH_ENTRY(readlist)(&iter))) { Page * p = next->value; assertunlocked(p->rwlatch); @@ -410,23 +394,29 @@ static void bhBufDeinit() { stasis_page_cleanup(p); // normally called by writeBackOnePage() } LH_ENTRY(closelist)(&iter); - LH_ENTRY(destroy)(cachedPages); + LH_ENTRY(destroy)(bh->cachedPages); - lru->deinit(lru); - stasis_buffer_pool_deinit(stasis_buffer_pool); - page_handle->close(page_handle); + bh->lru->deinit(bh->lru); + stasis_buffer_pool_deinit(bh->buffer_pool); + bh->page_handle->close(bh->page_handle); + + pthread_mutex_destroy(&bh->mut); + pthread_cond_destroy(&bh->needFree); + pthread_cond_destroy(&bh->readComplete); + free(bh); } -static void bhSimulateBufferManagerCrash() { - pthread_mutex_lock(&mut); - running = 0; - pthread_mutex_unlock(&mut); +static void bhSimulateBufferManagerCrash(stasis_buffer_manager_t *bm) { + stasis_buffer_hash_t * bh = bm->impl; + pthread_mutex_lock(&bh->mut); + bh->running = 0; + pthread_mutex_unlock(&bh->mut); - pthread_cond_signal(&needFree); - pthread_join(worker, 0); + pthread_cond_signal(&bh->needFree); + pthread_join(bh->worker, 0); struct LH_ENTRY(list) iter; const struct LH_ENTRY(pair_t) * next; - LH_ENTRY(openlist)(cachedPages, &iter); + LH_ENTRY(openlist)(bh->cachedPages, &iter); while((next = LH_ENTRY(readlist)(&iter))) { Page * p = next->value; writelock(p->rwlatch,0); @@ -435,42 +425,60 @@ static void bhSimulateBufferManagerCrash() { unlock(p->rwlatch); } LH_ENTRY(closelist)(&iter); - LH_ENTRY(destroy)(cachedPages); + LH_ENTRY(destroy)(bh->cachedPages); - lru->deinit(lru); - stasis_buffer_pool_deinit(stasis_buffer_pool); - page_handle->close(page_handle); + bh->lru->deinit(bh->lru); + stasis_buffer_pool_deinit(bh->buffer_pool); + bh->page_handle->close(bh->page_handle); + + pthread_mutex_destroy(&bh->mut); + pthread_cond_destroy(&bh->needFree); + pthread_cond_destroy(&bh->readComplete); + free(bh); } -void stasis_buffer_manager_hash_open(stasis_page_handle_t * h) { - page_handle = h; - assert(!running); +stasis_buffer_manager_t* stasis_buffer_manager_hash_open(stasis_page_handle_t * h) { + stasis_buffer_manager_t *bm = malloc(sizeof(*bm)); + stasis_buffer_hash_t *bh = malloc(sizeof(*bh)); + + bm->loadPageImpl = bhLoadPageImpl; + bm->loadUninitPageImpl = bhLoadUninitPageImpl; + bm->getCachedPageImpl = bhGetCachedPage; + bm->releasePageImpl = bhReleasePage; + bm->writeBackPage = bhWriteBackPage; + bm->forcePages = bhForcePages; + bm->forcePageRange = bhForcePageRange; + bm->stasis_buffer_manager_close = bhBufDeinit; + bm->stasis_buffer_manager_simulate_crash = bhSimulateBufferManagerCrash; + + bm->impl = bh; + + bh->page_handle = h; + bh->running = 0; #ifdef LONG_RUN printf("Using expensive bufferHash sanity checking.\n"); #endif - loadPageImpl = bhLoadPageImpl; - loadUninitPageImpl = bhLoadUninitPageImpl; - getCachedPageImpl = bhGetCachedPage; - releasePageImpl = bhReleasePage; - writeBackPage = bhWriteBackPage; - forcePages = bhForcePages; - forcePageRange = bhForcePageRange; - stasis_buffer_manager_close = bhBufDeinit; - stasis_buffer_manager_simulate_crash = bhSimulateBufferManagerCrash; + bh->flushing = 0; - flushing = 0; + bh->buffer_pool = stasis_buffer_pool_init(); - stasis_buffer_pool = stasis_buffer_pool_init(); + bh->lru = lruFastInit(pageGetNode, pageSetNode, 0); - lru = lruFastInit(pageGetNode, pageSetNode, 0); + bh->cachedPages = LH_ENTRY(create)(MAX_BUFFER_SIZE); - cachedPages = LH_ENTRY(create)(MAX_BUFFER_SIZE); + bh->pageCount = 0; - pageCount = 0; + bh->running = 1; - running = 1; + pthread_mutex_init(&bh->mut,0); - pthread_create(&worker, 0, writeBackWorker, 0); + pthread_cond_init(&bh->needFree,0); + + pthread_cond_init(&bh->readComplete,0); + + pthread_create(&bh->worker, 0, writeBackWorker, bm); + + return bm; } diff --git a/src/stasis/bufferManager/legacy/legacyBufferManager.c b/src/stasis/bufferManager/legacy/legacyBufferManager.c index 3ded684..5862d36 100644 --- a/src/stasis/bufferManager/legacy/legacyBufferManager.c +++ b/src/stasis/bufferManager/legacy/legacyBufferManager.c @@ -1,6 +1,5 @@ #include -#include #include #include #include @@ -21,17 +20,17 @@ static pthread_key_t lastPage; #define RW 1 static void bufManBufDeinit(); -static compensated_function Page *bufManLoadPage(int xid, pageid_t pageid, pagetype_t type); -static compensated_function Page *bufManGetCachedPage(int xid, pageid_t pageid); -static compensated_function Page *bufManLoadUninitPage(int xid, pageid_t pageid); -static void bufManReleasePage (Page * p); +static compensated_function Page *bufManLoadPage(stasis_buffer_manager_t *ignored, int xid, pageid_t pageid, pagetype_t type); +static compensated_function Page *bufManGetCachedPage(stasis_buffer_manager_t *ignored, int xid, pageid_t pageid); +static compensated_function Page *bufManLoadUninitPage(stasis_buffer_manager_t *ignored, int xid, pageid_t pageid); +static void bufManReleasePage (stasis_buffer_manager_t *ignored, Page * p); static void bufManSimulateBufferManagerCrash(); static stasis_page_handle_t * page_handle; static stasis_buffer_pool_t * stasis_buffer_pool; -static int pageWrite_legacyWrapper(pageid_t pageid) { +static int pageWrite_legacyWrapper(stasis_buffer_manager_t *ignored, pageid_t pageid) { Page * p = loadPage(-1, pageid); // XXX this is unsafe; the page could be pinned! page_handle->write(page_handle,p); @@ -41,21 +40,22 @@ static int pageWrite_legacyWrapper(pageid_t pageid) { static void forcePageFile_legacyWrapper() { page_handle->force_file(page_handle); } -static void forceRangePageFile_legacyWrapper(lsn_t start, lsn_t stop) { +static void forceRangePageFile_legacyWrapper(stasis_buffer_manager_t *ignored, lsn_t start, lsn_t stop) { page_handle->force_range(page_handle, start, stop); } -int stasis_buffer_manager_deprecated_open(stasis_page_handle_t * ph) { +stasis_buffer_manager_t* stasis_buffer_manager_deprecated_open(stasis_page_handle_t * ph) { page_handle = ph; - releasePageImpl = bufManReleasePage; - loadPageImpl = bufManLoadPage; - loadUninitPageImpl = bufManLoadUninitPage; - getCachedPageImpl = bufManGetCachedPage; - writeBackPage = pageWrite_legacyWrapper; - forcePages = forcePageFile_legacyWrapper; - forcePageRange = forceRangePageFile_legacyWrapper; - stasis_buffer_manager_close = bufManBufDeinit; - stasis_buffer_manager_simulate_crash = bufManSimulateBufferManagerCrash; + stasis_buffer_manager_t * bm = malloc(sizeof(*bm)); + bm->releasePageImpl = bufManReleasePage; + bm->loadPageImpl = bufManLoadPage; + bm->loadUninitPageImpl = bufManLoadUninitPage; + bm->getCachedPageImpl = bufManGetCachedPage; + bm->writeBackPage = pageWrite_legacyWrapper; + bm->forcePages = forcePageFile_legacyWrapper; + bm->forcePageRange = forceRangePageFile_legacyWrapper; + bm->stasis_buffer_manager_close = bufManBufDeinit; + bm->stasis_buffer_manager_simulate_crash = bufManSimulateBufferManagerCrash; stasis_buffer_pool = stasis_buffer_pool_init(); @@ -80,7 +80,8 @@ int stasis_buffer_manager_deprecated_open(stasis_page_handle_t * ph) { profile_load_hash = LH_ENTRY(create)(10); profile_load_pins_hash = LH_ENTRY(create)(10); #endif - return 0; + bm->impl = 0; // XXX hack, but this module is deprecated + return bm; } static void bufManBufDeinit() { @@ -126,7 +127,7 @@ static void bufManSimulateBufferManagerCrash() { #endif } -static void bufManReleasePage (Page * p) { +static void bufManReleasePage (stasis_buffer_manager_t *ignored, Page * p) { unlock(p->loadlatch); #ifdef PIN_COUNT pthread_mutex_lock(&pinCount_mutex); @@ -340,7 +341,7 @@ static Page* bufManGetPage(pageid_t pageid, int locktype, int uninitialized, pag return ret; } -static compensated_function Page *bufManLoadPage(int xid, const pageid_t pageid, pagetype_t type) { +static compensated_function Page *bufManLoadPage(stasis_buffer_manager_t *ignored, int xid, const pageid_t pageid, pagetype_t type) { Page * ret = pthread_getspecific(lastPage); @@ -371,12 +372,12 @@ static compensated_function Page *bufManLoadPage(int xid, const pageid_t pageid, return ret; } -static Page* bufManGetCachedPage(int xid, const pageid_t pageid) { +static Page* bufManGetCachedPage(stasis_buffer_manager_t *ignored, int xid, const pageid_t pageid) { // XXX hack; but this code is deprecated - return bufManLoadPage(xid, pageid, UNKNOWN_TYPE_PAGE); + return bufManLoadPage(ignored, xid, pageid, UNKNOWN_TYPE_PAGE); } -static compensated_function Page *bufManLoadUninitPage(int xid, pageid_t pageid) { +static compensated_function Page *bufManLoadUninitPage(stasis_buffer_manager_t *ignored, int xid, pageid_t pageid) { Page * ret = pthread_getspecific(lastPage); diff --git a/src/stasis/bufferManager/pageArray.c b/src/stasis/bufferManager/pageArray.c index c4bf9ee..2a239ce 100644 --- a/src/stasis/bufferManager/pageArray.c +++ b/src/stasis/bufferManager/pageArray.c @@ -1,79 +1,93 @@ #include -#include #include #include #include #include #include -static Page ** pageMap; -static pageid_t pageCount; -static pthread_mutex_t pageArray_mut = PTHREAD_MUTEX_INITIALIZER; +typedef struct { + Page ** pageMap; + pageid_t pageCount; + pthread_mutex_t mut; +} stasis_buffer_manager_page_array_t; -static Page * paLoadPage(int xid, pageid_t pageid, pagetype_t type) { - - pthread_mutex_lock(&pageArray_mut); - if(pageid >= pageCount) { - pageMap = realloc(pageMap, (1+pageid) * sizeof(Page*)); - for(pageid_t i = pageCount; i <= pageid; i++) { - pageMap[i] = 0; +static Page * paLoadPage(stasis_buffer_manager_t *bm, int xid, pageid_t pageid, pagetype_t type) { + stasis_buffer_manager_page_array_t *pa = bm->impl; + pthread_mutex_lock(&pa->mut); + if(pageid >= pa->pageCount) { + pa->pageMap = realloc(pa->pageMap, (1+pageid) * sizeof(Page*)); + for(pageid_t i = pa->pageCount; i <= pageid; i++) { + pa->pageMap[i] = 0; } - pageCount = pageid + 1; + pa->pageCount = pageid + 1; } - if(!pageMap[pageid]) { - pageMap[pageid] = malloc(sizeof(Page)); - pageMap[pageid]->id = pageid; - pageMap[pageid]->pageType = type == UNKNOWN_TYPE_PAGE ? 0 : type; - pageMap[pageid]->LSN = 0; - pageMap[pageid]->dirty = 0; - pageMap[pageid]->next = 0; - pageMap[pageid]->prev = 0; - pageMap[pageid]->queue = 0; - pageMap[pageid]->inCache = 1; - pageMap[pageid]->rwlatch = initlock(); - pageMap[pageid]->loadlatch = initlock(); - pageMap[pageid]->memAddr= calloc(PAGE_SIZE, sizeof(byte)); + if(!pa->pageMap[pageid]) { + pa->pageMap[pageid] = malloc(sizeof(Page)); + pa->pageMap[pageid]->id = pageid; + pa->pageMap[pageid]->pageType = type == UNKNOWN_TYPE_PAGE ? 0 : type; + pa->pageMap[pageid]->LSN = 0; + pa->pageMap[pageid]->dirty = 0; + pa->pageMap[pageid]->next = 0; + pa->pageMap[pageid]->prev = 0; + pa->pageMap[pageid]->queue = 0; + pa->pageMap[pageid]->inCache = 1; + pa->pageMap[pageid]->rwlatch = initlock(); + pa->pageMap[pageid]->loadlatch = initlock(); + pa->pageMap[pageid]->memAddr= calloc(PAGE_SIZE, sizeof(byte)); } else{ - if(type != UNKNOWN_TYPE_PAGE) { assert(type == pageMap[pageid]->pageType); } + if(type != UNKNOWN_TYPE_PAGE) { assert(type == pa->pageMap[pageid]->pageType); } } - pthread_mutex_unlock(&pageArray_mut); - return pageMap[pageid]; + pthread_mutex_unlock(&pa->mut); + return pa->pageMap[pageid]; } -static Page* paGetCachedPage(int xid, pageid_t page) { - return paLoadPage(xid, page, UNKNOWN_TYPE_PAGE); +static Page* paLoadUninitPage(stasis_buffer_manager_t *bm, int xid, pageid_t page) { + return paLoadPage(bm, xid, page, UNKNOWN_TYPE_PAGE); } -static void paReleasePage(Page * p) { +static Page* paGetCachedPage(stasis_buffer_manager_t *bm, int xid, pageid_t page) { + return paLoadPage(bm, xid, page, UNKNOWN_TYPE_PAGE); +} +static void paReleasePage(stasis_buffer_manager_t *bm, Page * p) { writelock(p->rwlatch,0); stasis_dirty_page_table_set_clean(stasis_runtime_dirty_page_table(), p); unlock(p->rwlatch); } -static int paWriteBackPage(pageid_t p) { return 0; /* no-op */ } -static void paForcePages() { /* no-op */ } -static void paForcePageRange(pageid_t start, pageid_t stop) { /* no-op */ } +static int paWriteBackPage(stasis_buffer_manager_t *bm, pageid_t p) { return 0; /* no-op */ } +static void paForcePages(stasis_buffer_manager_t * bm) { /* no-op */ } +static void paForcePageRange(stasis_buffer_manager_t *bm, pageid_t start, pageid_t stop) { /* no-op */ } -static void paBufDeinit() { - for(pageid_t i =0; i < pageCount; i++) { - if(pageMap[i]) { - deletelock(pageMap[i]->rwlatch); - deletelock(pageMap[i]->loadlatch); - free(pageMap[i]); +static void paBufDeinit(stasis_buffer_manager_t * bm) { + stasis_buffer_manager_page_array_t *pa = bm->impl; + + for(pageid_t i =0; i < pa->pageCount; i++) { + if(pa->pageMap[i]) { + deletelock(pa->pageMap[i]->rwlatch); + deletelock(pa->pageMap[i]->loadlatch); + free(pa->pageMap[i]); } } + pthread_mutex_destroy(&pa->mut); + free(pa); } -void stasis_buffer_manager_mem_array_open () { +stasis_buffer_manager_t * stasis_buffer_manager_mem_array_open () { - releasePageImpl = paReleasePage; - loadPageImpl = paLoadPage; - getCachedPageImpl = paGetCachedPage; - writeBackPage = paWriteBackPage; - forcePages = paForcePages; - forcePageRange = paForcePageRange; - stasis_buffer_manager_close = paBufDeinit; - stasis_buffer_manager_simulate_crash = paBufDeinit; + stasis_buffer_manager_t * bm = malloc(sizeof(*bm)); + stasis_buffer_manager_page_array_t * pa = malloc(sizeof(*pa)); - pageCount = 0; - pageMap = 0; + bm->releasePageImpl = paReleasePage; + bm->loadPageImpl = paLoadPage; + bm->loadUninitPageImpl = paLoadUninitPage; + bm->getCachedPageImpl = paGetCachedPage; + bm->writeBackPage = paWriteBackPage; + bm->forcePages = paForcePages; + bm->forcePageRange = paForcePageRange; + bm->stasis_buffer_manager_close = paBufDeinit; + bm->stasis_buffer_manager_simulate_crash = paBufDeinit; + bm->impl = pa; + pa->pageCount = 0; + pa->pageMap = 0; + pthread_mutex_init(&pa->mut,0); + return bm; } diff --git a/src/stasis/dirtyPageTable.c b/src/stasis/dirtyPageTable.c index de475b1..83c48b1 100644 --- a/src/stasis/dirtyPageTable.c +++ b/src/stasis/dirtyPageTable.c @@ -10,7 +10,6 @@ #include #include #include -#include #include @@ -28,6 +27,7 @@ static int dpt_cmp(const void *ap, const void * bp, const void * ignored) { struct stasis_dirty_page_table_t { struct rbtree * table; + stasis_buffer_manager_t * bufferManager; pageid_t count; pthread_mutex_t mutex; pthread_cond_t flushDone; @@ -131,7 +131,7 @@ int stasis_dirty_page_table_flush(stasis_dirty_page_table_t * dirtyPages) { if(off == stride) { pthread_mutex_unlock(&dirtyPages->mutex); for(pageid_t i = 0; i < off; i++) { - writeBackPage(vals[i]); + dirtyPages->bufferManager->writeBackPage(dirtyPages->bufferManager, vals[i]); } off = 0; strides++; @@ -140,7 +140,7 @@ int stasis_dirty_page_table_flush(stasis_dirty_page_table_t * dirtyPages) { } pthread_mutex_unlock(&dirtyPages->mutex); for(int i = 0; i < off; i++) { - writeBackPage(vals[i]); + dirtyPages->bufferManager->writeBackPage(dirtyPages->bufferManager, vals[i]); } pthread_mutex_lock(&dirtyPages->mutex); dirtyPages->flushing = 0; @@ -151,6 +151,30 @@ int stasis_dirty_page_table_flush(stasis_dirty_page_table_t * dirtyPages) { return 0; } + +int stasis_dirty_page_table_get_flush_candidates(stasis_dirty_page_table_t * dirtyPages, pageid_t start, pageid_t stop, int count, pageid_t* range_starts, pageid_t* range_ends) { + pthread_mutex_lock(&dirtyPages->mutex); + int n = 0; + int b = -1; + dpt_entry dummy; + dummy.lsn = -1; + dummy.p = start; + + for(const dpt_entry *e = rblookup(RB_LUGTEQ, &dummy, dirtyPages->table); + e && (stop == 0 || e->p < stop) && n < count; + e = rblookup(RB_LUGREAT, e, dirtyPages->table)) { + if(n == 0 || range_ends[b] != e->p) { + b++; + range_starts[b] = e->p; + range_ends[b] = e->p+1; + } else { + range_ends[b]++; + } + n++; + } + pthread_mutex_unlock(&dirtyPages->mutex); + return b+1; +} void stasis_dirty_page_table_flush_range(stasis_dirty_page_table_t * dirtyPages, pageid_t start, pageid_t stop) { pthread_mutex_lock(&dirtyPages->mutex); @@ -178,11 +202,14 @@ void stasis_dirty_page_table_flush_range(stasis_dirty_page_table_t * dirtyPages, pthread_mutex_unlock(&dirtyPages->mutex); for(pageid_t i = 0; i < n; i++) { - int err = writeBackPage(staleDirtyPages[i]); + int err = dirtyPages->bufferManager->writeBackPage(dirtyPages->bufferManager, staleDirtyPages[i]); if(stop && (err == EBUSY)) { abort(); /*api violation!*/ } } free(staleDirtyPages); -// forcePageRange(start*PAGE_SIZE,stop*PAGE_SIZE); +} + +void stasis_dirty_page_table_set_buffer_manager(stasis_dirty_page_table_t * dpt, stasis_buffer_manager_t *bufferManager) { + dpt->bufferManager = bufferManager; } stasis_dirty_page_table_t * stasis_dirty_page_table_init() { diff --git a/src/stasis/operations/lsmTree.c b/src/stasis/operations/lsmTree.c index 5c590ed..0d65884 100644 --- a/src/stasis/operations/lsmTree.c +++ b/src/stasis/operations/lsmTree.c @@ -59,7 +59,8 @@ void TlsmRegionForceRid(int xid, void *conf) { pageid_t pid; Tread(xid,a.regionList,&pid); stasis_dirty_page_table_flush_range(stasis_runtime_dirty_page_table(), pid, pid+a.regionSize); - forcePageRange(pid, pid+a.regionSize); + stasis_buffer_manager_t *bm = stasis_runtime_buffer_manager(); + bm->forcePageRange(bm, pid, pid+a.regionSize); // TregionDealloc(xid,pid); } } diff --git a/src/stasis/transactional2.c b/src/stasis/transactional2.c index 63ab946..ffa2195 100644 --- a/src/stasis/transactional2.c +++ b/src/stasis/transactional2.c @@ -10,7 +10,9 @@ #include - +#include +#include +#include #include #include @@ -34,6 +36,11 @@ stasis_dirty_page_table_t * stasis_dirty_page_table = 0; static stasis_truncation_t * stasis_truncation = 0; static stasis_alloc_t * stasis_alloc = 0; static stasis_allocation_policy_t * stasis_allocation_policy = 0; +static stasis_buffer_manager_t * stasis_buffer_manager = 0; + +void * stasis_runtime_buffer_manager() { + return stasis_buffer_manager; +} /** This mutex protects stasis_transaction_table, numActiveXactions and @@ -61,6 +68,27 @@ void * stasis_runtime_alloc_state() { return stasis_alloc; } +static stasis_buffer_manager_t* stasis_runtime_buffer_manager_open(int type, stasis_page_handle_t * ph) { + bufferManagerType = type; + static int lastType = 0; + if(type == BUFFER_MANAGER_REOPEN) { + type = lastType; + } + lastType = type; + if(type == BUFFER_MANAGER_DEPRECATED_HASH) { + return stasis_buffer_manager_deprecated_open(ph); + } else if (type == BUFFER_MANAGER_MEM_ARRAY) { + stasis_buffer_manager_t *ret = stasis_buffer_manager_mem_array_open(); + ph->close(ph); // XXX should never have been opened in the first place! + return ret; + } else if (type == BUFFER_MANAGER_HASH) { + return stasis_buffer_manager_hash_open(ph); + } else { + // XXX error handling + abort(); + } +} + int Tinit() { pthread_mutex_init(&stasis_transaction_table_mutex, NULL); stasis_initted = 1; @@ -99,8 +127,9 @@ int Tinit() { page_handle = stasis_page_handle_open(h, stasis_log_file, stasis_dirty_page_table); } - stasis_buffer_manager_open(bufferManagerType, page_handle); + stasis_buffer_manager = stasis_runtime_buffer_manager_open(bufferManagerType, page_handle); DEBUG("Buffer manager type = %d\n", bufferManagerType); + stasis_dirty_page_table_set_buffer_manager(stasis_dirty_page_table, stasis_buffer_manager); // xxx circular dependency. pageOperationsInit(); stasis_allocation_policy = stasis_allocation_policy_init(); stasis_alloc = stasis_alloc_init(stasis_allocation_policy); @@ -114,7 +143,7 @@ int Tinit() { //setupLockManagerCallbacksPage(); stasis_recovery_initiate(stasis_log_file, stasis_alloc); - stasis_truncation = stasis_truncation_init(stasis_dirty_page_table, stasis_log_file); + stasis_truncation = stasis_truncation_init(stasis_dirty_page_table, stasis_buffer_manager, stasis_log_file); if(stasis_truncation_automatic) { // should this be before InitiateRecovery? stasis_truncation_thread_start(stasis_truncation); @@ -365,7 +394,7 @@ int Tdeinit() { TnaiveHashDeinit(); stasis_alloc_deinit(stasis_alloc); stasis_allocation_policy_deinit(stasis_allocation_policy); - stasis_buffer_manager_close(); + stasis_buffer_manager->stasis_buffer_manager_close(stasis_buffer_manager); DEBUG("Closing page file tdeinit\n"); stasis_page_deinit(); stasis_log_group_force_t * group_force = stasis_log_file->group_force; @@ -387,7 +416,7 @@ int TuncleanShutdown() { stasis_alloc_deinit(stasis_alloc); stasis_allocation_policy_deinit(stasis_allocation_policy); - stasis_buffer_manager_simulate_crash(); + stasis_buffer_manager->stasis_buffer_manager_simulate_crash(stasis_buffer_manager); // XXX: close_file? stasis_page_deinit(); stasis_log_file->close(stasis_log_file); diff --git a/src/stasis/truncation.c b/src/stasis/truncation.c index 3e483dc..1d1c60b 100644 --- a/src/stasis/truncation.c +++ b/src/stasis/truncation.c @@ -8,6 +8,7 @@ struct stasis_truncation_t { pthread_mutex_t shutdown_mutex; pthread_cond_t shutdown_cond; stasis_dirty_page_table_t * dirty_pages; + stasis_buffer_manager_t * buffer_manager; stasis_log_t * log; }; @@ -20,13 +21,14 @@ struct stasis_truncation_t { #define TRUNCATE_INTERVAL 1 #define MIN_INCREMENTAL_TRUNCATION (1024 * 1024 * 25) #endif -stasis_truncation_t * stasis_truncation_init(stasis_dirty_page_table_t * dpt, stasis_log_t * log) { +stasis_truncation_t * stasis_truncation_init(stasis_dirty_page_table_t * dpt, stasis_buffer_manager_t *buffer_manager, stasis_log_t *log) { stasis_truncation_t * ret = malloc(sizeof(*ret)); ret->initialized = 1; ret->automaticallyTruncating = 0; pthread_mutex_init(&ret->shutdown_mutex, 0); pthread_cond_init(&ret->shutdown_cond, 0); ret->dirty_pages = dpt; + ret->buffer_manager = buffer_manager; ret->log = log; return ret; } @@ -99,7 +101,7 @@ int stasis_truncation_truncate(stasis_truncation_t* trunc, int force) { if((rec_lsn - log_trunc) > MIN_INCREMENTAL_TRUNCATION) { // fprintf(stderr, "Truncating now. rec_lsn = %ld, log_trunc = %ld\n", rec_lsn, log_trunc); // fprintf(stderr, "Truncating to rec_lsn = %ld\n", rec_lsn); - forcePages(); + trunc->buffer_manager->forcePages(trunc->buffer_manager); trunc->log->truncate(trunc->log, rec_lsn); return 1; } else { @@ -116,7 +118,7 @@ int stasis_truncation_truncate(stasis_truncation_t* trunc, int force) { //fprintf(stderr, "Flushed Dirty Buffers. Truncating to rec_lsn = %ld\n", rec_lsn); - forcePages(); + trunc->buffer_manager->forcePages(trunc->buffer_manager); trunc->log->truncate(trunc->log, rec_lsn); return 1; } else { diff --git a/stasis/bufferManager.h b/stasis/bufferManager.h index 95377c2..58340ca 100644 --- a/stasis/bufferManager.h +++ b/stasis/bufferManager.h @@ -80,7 +80,6 @@ terms specified in this license. #define __BUFFERMANAGER_H__ #include BEGIN_C_DECLS -#include /** * Obtain a pointer to a page from the buffer manager. The page will * be pinned, and the pointer valid until releasePage is called. @@ -99,75 +98,64 @@ Page * loadUninitializedPage(int xid, pageid_t pageid); Page * loadPageForOperation(int xid, pageid_t pageid, int op); -Page * getCachedPage(int xid, const pageid_t pageid); - -/** - This is the function pointer that stasis_buffer_manager_open sets in order to - override loadPage. -*/ -extern Page * (*loadPageImpl)(int xid, pageid_t pageid, pagetype_t type); -extern Page * (*loadUninitPageImpl)(int xid, pageid_t pageid); /** Get a page from cache. This function should never block on I/O. @return a pointer to the page, or NULL if the page is not in cache, or is being read from disk. */ -extern Page * (*getCachedPageImpl)(int xid, const pageid_t pageid); +Page * getCachedPage(int xid, const pageid_t pageid); + /** - loadPage aquires a lock when it is called, effectively pinning it + loadPage acquires a lock when it is called, effectively pinning it in memory. releasePage releases this lock. */ void releasePage(Page *p); -/** - This is the function pointer that stasis_buffer_manager_open sets in order to - override releasePage. -*/ -extern void (*releasePageImpl)(Page * p); -/** - * initialize buffer manager - * @return 0 on success - * @return error code on failure - */ -/** - This is used by truncation to move dirty pages from Stasis cache - into the operating system cache. Once writeBackPage(p) returns, - calling forcePages() will synchronously force page number p to - disk. +typedef struct stasis_buffer_manager_t stasis_buffer_manager_t; - (Not all buffer managers support synchronous writes to stable - storage. For compatibility, such buffer managers should ignore - this call.) +struct stasis_buffer_manager_t { + Page * (*loadPageImpl)(stasis_buffer_manager_t*, int xid, pageid_t pageid, pagetype_t type); + Page * (*loadUninitPageImpl)(stasis_buffer_manager_t*, int xid, pageid_t pageid); + Page * (*getCachedPageImpl)(stasis_buffer_manager_t*, int xid, const pageid_t pageid); + void (*releasePageImpl)(stasis_buffer_manager_t*, Page * p); + /** + This is used by truncation to move dirty pages from Stasis cache + into the operating system cache. Once writeBackPage(p) returns, + calling forcePages() will synchronously force page number p to + disk. - @return 0 on success, ENOENT if the page is not in cache, and EBUSY if the page is pinned. -*/ -extern int (*writeBackPage)(pageid_t p); -/** - Force any written back pages to disk. + (Not all buffer managers support synchronous writes to stable + storage. For compatibility, such buffer managers should ignore + this call.) - @see writeBackPage for more information. + @return 0 on success, ENOENT if the page is not in cache, and EBUSY if the page is pinned. + */ + int (*writeBackPage)(stasis_buffer_manager_t*, pageid_t p); + /** + Force any written back pages to disk. - If the buffer manager doesn't support stable storage, this call is - a no-op. -*/ -extern void (*forcePages)(); -/** - Force written back pages that fall within a particular range to disk. + @see writeBackPage for more information. - This does not force page that have not been written to with pageWrite(). + If the buffer manager doesn't support stable storage, this call is + a no-op. + */ + void (*forcePages)(struct stasis_buffer_manager_t*); + /** + Force written back pages that fall within a particular range to disk. - @param start the first pageid to be forced to disk - @param stop the page after the last page to be forced to disk. -*/ -extern void (*forcePageRange)(pageid_t start, pageid_t stop); -extern void (*stasis_buffer_manager_simulate_crash)(); + This does not force page that have not been written to with pageWrite(). -int stasis_buffer_manager_open(int type, stasis_page_handle_t * ph); -/** - * will write out any dirty pages, assumes that there are no running - * transactions - */ -extern void (*stasis_buffer_manager_close)(); + @param start the first pageid to be forced to disk + @param stop the page after the last page to be forced to disk. + */ + void (*forcePageRange)(struct stasis_buffer_manager_t*, pageid_t start, pageid_t stop); + void (*stasis_buffer_manager_simulate_crash)(struct stasis_buffer_manager_t*); + /** + * Write out any dirty pages. Assumes that there are no running transactions + */ + void (*stasis_buffer_manager_close)(struct stasis_buffer_manager_t*); + void * impl; +}; #ifdef PROFILE_LATCHES_WRITE_ONLY #define loadPage(x,y) __profile_loadPage((x), (y), __FILE__, __LINE__) diff --git a/stasis/bufferManager/bufferHash.h b/stasis/bufferManager/bufferHash.h index 6249b38..a5c2021 100644 --- a/stasis/bufferManager/bufferHash.h +++ b/stasis/bufferManager/bufferHash.h @@ -1,5 +1,6 @@ #ifndef STASIS_BUFFERMANAGER_BUFFERHASH_H #define STASIS_BUFFERMANAGER_BUFFERHASH_H +#include #include -void stasis_buffer_manager_hash_open(stasis_page_handle_t* ph); +stasis_buffer_manager_t* stasis_buffer_manager_hash_open(stasis_page_handle_t* ph); #endif //STASIS_BUFFERMANAGER_BUFFERHASH_H diff --git a/stasis/bufferManager/legacy/legacyBufferManager.h b/stasis/bufferManager/legacy/legacyBufferManager.h index 407631b..0584dbe 100644 --- a/stasis/bufferManager/legacy/legacyBufferManager.h +++ b/stasis/bufferManager/legacy/legacyBufferManager.h @@ -1,5 +1,5 @@ #ifndef STASIS_BUFFERMANAGER_LEGACY_LEGACYBUFFERMANAGER_H #define STASIS_BUFFERMANAGER_LEGACY_LEGACYBUFFERMANAGER_H #include -int stasis_buffer_manager_deprecated_open(stasis_page_handle_t * ph); +stasis_buffer_manager_t* stasis_buffer_manager_deprecated_open(stasis_page_handle_t * ph); #endif//STASIS_BUFFERMANAGER_LEGACY_LEGACYBUFFERMANAGER_H diff --git a/stasis/bufferManager/pageArray.h b/stasis/bufferManager/pageArray.h index 601c05c..5b365c4 100644 --- a/stasis/bufferManager/pageArray.h +++ b/stasis/bufferManager/pageArray.h @@ -1 +1,2 @@ -void stasis_buffer_manager_mem_array_open(); +#include +stasis_buffer_manager_t* stasis_buffer_manager_mem_array_open(); diff --git a/stasis/dirtyPageTable.h b/stasis/dirtyPageTable.h index 7954e88..ade0916 100644 --- a/stasis/dirtyPageTable.h +++ b/stasis/dirtyPageTable.h @@ -12,7 +12,10 @@ BEGIN_C_DECLS typedef struct stasis_dirty_page_table_t stasis_dirty_page_table_t; -stasis_dirty_page_table_t * stasis_dirty_page_table_init(); +#include +stasis_dirty_page_table_t * stasis_dirty_page_table_init(void); +// XXX circular dependency +void stasis_dirty_page_table_set_buffer_manager(stasis_dirty_page_table_t* dpt, stasis_buffer_manager_t* bm); void stasis_dirty_page_table_deinit(stasis_dirty_page_table_t * dirtyPages); void stasis_dirty_page_table_set_dirty(stasis_dirty_page_table_t * dirtyPages, Page * p); @@ -24,6 +27,36 @@ pageid_t stasis_dirty_page_table_dirty_count(stasis_dirty_page_table_t * dirtyPa int stasis_dirty_page_table_flush(stasis_dirty_page_table_t * dirtyPages); lsn_t stasis_dirty_page_table_minRecLSN(stasis_dirty_page_table_t* dirtyPages); +/** + This method returns a (mostly) contiguous range of the dirty page table for writeback. + + Usage (to non-atomically flush all pages in a range, except ones that were dirtied while we were running) + + int n = 100; + pageid_t range_starts[n], pageid_t range_ends[n]; + + pageid_t next = start; + + while((blocks = stasis_dirty_page_table_get_flush_candidates(dpt, next, stop, n, range_starts, range_ends) { + for(int i = 0; i < blocks; i++) { + for(pageid_t p = range_starts[i]; p < range_ends[i]; p++) { + // flush p + } + } + next = range_ends[blocks-1];; + } + + @param dirtyPages The dirty page table; this method will not change it. + @param start The first page to be considered for writeback + @param stop The page after the last page to be considered for writeback + @param count The maximum number of pages to be returned for writeback + @param range_starts An array of pageids of length count. Some number of these will be populated with the first page in a range to be written back. + @param range_ends An array of the same length as range_starts. This will be populated with the page after the last page in each range. + + @return The number of entries in range_starts and range_ends that were populated. This can be less than count even if there are more dirty pages in the range. If this is zero, then the entire range is clean. +*/ +int stasis_dirty_page_table_get_flush_candidates(stasis_dirty_page_table_t * dirtyPages, pageid_t start, pageid_t stop, int count, pageid_t* range_starts, pageid_t* range_ends); + /** @todo flushRange's API sucks. It should be two functions, "startRangeFlush" and "waitRangeFlushes" or something. */ diff --git a/stasis/transactional.h b/stasis/transactional.h index b16c403..c8d64be 100644 --- a/stasis/transactional.h +++ b/stasis/transactional.h @@ -801,6 +801,8 @@ void * stasis_runtime_dirty_page_table(); void * stasis_runtime_alloc_state(); +void * stasis_runtime_buffer_manager(); + #include "operations.h" END_C_DECLS diff --git a/stasis/truncation.h b/stasis/truncation.h index e33762d..a07522c 100644 --- a/stasis/truncation.h +++ b/stasis/truncation.h @@ -64,8 +64,9 @@ typedef struct stasis_truncation_t stasis_truncation_t; #include #include +#include -stasis_truncation_t * stasis_truncation_init(stasis_dirty_page_table_t * dpt, stasis_log_t * log); +stasis_truncation_t * stasis_truncation_init(stasis_dirty_page_table_t * dpt, stasis_buffer_manager_t * bufferManager, stasis_log_t * log); void stasis_truncation_deinit(stasis_truncation_t * trunc); /**