buffer manager refactoring; started work on new dirty page table writeback interface (to avoid circular dependencies)

This commit is contained in:
Sears Russell 2009-10-05 21:19:01 +00:00
parent d01e63e1e4
commit 24650fa190
15 changed files with 400 additions and 329 deletions

View file

@ -57,16 +57,8 @@ terms specified in this license.
#include <assert.h> #include <assert.h>
#include <stasis/bufferManager.h> #include <stasis/bufferManager.h>
#include <stasis/bufferManager/pageArray.h>
#include <stasis/bufferManager/bufferHash.h>
//#include <stasis/bufferManager/legacy/pageCache.h>
#include <stasis/bufferManager/legacy/legacyBufferManager.h>
#include <stasis/bufferPool.h>
#include <stasis/lockManager.h> #include <stasis/lockManager.h>
#include <stasis/pageHandle.h>
#undef loadPage #undef loadPage
#undef releasePage #undef releasePage
@ -130,7 +122,6 @@ compensated_function Page * __profile_loadPage(int xid, pageid_t pageid, char *
} }
compensated_function void __profile_releasePage(Page * p) { compensated_function void __profile_releasePage(Page * p) {
pthread_mutex_lock(&profile_load_mutex); pthread_mutex_lock(&profile_load_mutex);
@ -157,30 +148,23 @@ compensated_function void __profile_releasePage(Page * p) {
#endif #endif
Page * (*loadPageImpl)(int xid, pageid_t pageid, pagetype_t type) = 0;
Page * (*loadUninitPageImpl)(int xid, pageid_t pageid) = 0;
Page * (*getCachedPageImpl)(int xid, pageid_t pageid) = 0;
void (*releasePageImpl)(Page * p) = 0;
int (*writeBackPage)(pageid_t page) = 0;
void (*forcePages)() = 0;
void (*forcePageRange)(pageid_t start, pageid_t stop) = 0;
void (*stasis_buffer_manager_close)() = 0;
void (*stasis_buffer_manager_simulate_crash)() = 0;
Page * loadPage(int xid, pageid_t pageid) { Page * loadPage(int xid, pageid_t pageid) {
// This lock is released at Tcommit() // This lock is released at Tcommit()
if(globalLockManager.readLockPage) { globalLockManager.readLockPage(xid, pageid); } if(globalLockManager.readLockPage) { globalLockManager.readLockPage(xid, pageid); }
return loadPageImpl(xid, pageid, UNKNOWN_TYPE_PAGE); stasis_buffer_manager_t * bm = stasis_runtime_buffer_manager();
return bm->loadPageImpl(bm, xid, pageid, UNKNOWN_TYPE_PAGE);
} }
Page * loadPageOfType(int xid, pageid_t pageid, pagetype_t type) { Page * loadPageOfType(int xid, pageid_t pageid, pagetype_t type) {
if(globalLockManager.readLockPage) { globalLockManager.readLockPage(xid, pageid); } if(globalLockManager.readLockPage) { globalLockManager.readLockPage(xid, pageid); }
return loadPageImpl(xid, pageid, type); stasis_buffer_manager_t * bm = stasis_runtime_buffer_manager();
return bm->loadPageImpl(bm, xid, pageid, type);
} }
Page * loadUninitializedPage(int xid, pageid_t pageid) { Page * loadUninitializedPage(int xid, pageid_t pageid) {
// This lock is released at Tcommit() // This lock is released at Tcommit()
if(globalLockManager.readLockPage) { globalLockManager.readLockPage(xid, pageid); } if(globalLockManager.readLockPage) { globalLockManager.readLockPage(xid, pageid); }
return loadUninitPageImpl(xid, pageid); stasis_buffer_manager_t * bm = stasis_runtime_buffer_manager();
return bm->loadUninitPageImpl(bm, xid, pageid);
} }
Page * loadPageForOperation(int xid, pageid_t pageid, int op) { Page * loadPageForOperation(int xid, pageid_t pageid, int op) {
@ -200,31 +184,10 @@ Page * loadPageForOperation(int xid, pageid_t pageid, int op) {
} }
Page * getCachedPage(int xid, pageid_t pageid) { Page * getCachedPage(int xid, pageid_t pageid) {
return getCachedPageImpl(xid, pageid); stasis_buffer_manager_t * bm = stasis_runtime_buffer_manager();
return bm->getCachedPageImpl(bm, xid, pageid);
} }
void releasePage(Page * p) { void releasePage(Page * p) {
releasePageImpl(p); stasis_buffer_manager_t * bm = stasis_runtime_buffer_manager();
} bm->releasePageImpl(bm, p);
int stasis_buffer_manager_open(int type, stasis_page_handle_t * ph) {
bufferManagerType = type;
static int lastType = 0;
if(type == BUFFER_MANAGER_REOPEN) {
type = lastType;
}
lastType = type;
if(type == BUFFER_MANAGER_DEPRECATED_HASH) {
stasis_buffer_manager_deprecated_open(ph);
return 0;
} else if (type == BUFFER_MANAGER_MEM_ARRAY) {
stasis_buffer_manager_mem_array_open();
ph->close(ph); // XXX should never have been opened in the first place!
return 0;
} else if (type == BUFFER_MANAGER_HASH) {
stasis_buffer_manager_hash_open(ph);
return 0;
} else {
// XXX error handling
abort();
}
} }

View file

@ -15,19 +15,17 @@
#include <stasis/bufferPool.h> #include <stasis/bufferPool.h>
#include <stasis/doubleLinkedList.h> #include <stasis/doubleLinkedList.h>
#include <stasis/pageHandle.h>
#include <stasis/dirtyPageTable.h> #include <stasis/dirtyPageTable.h>
#include <stasis/transactional.h> #include <stasis/transactional.h>
#include <stasis/replacementPolicy.h> #include <stasis/replacementPolicy.h>
#include <stasis/bufferManager.h>
#include <stasis/page.h> #include <stasis/page.h>
#include <assert.h> #include <assert.h>
#include <stdio.h> #include <stdio.h>
//#define LATCH_SANITY_CHECKING //#define LATCH_SANITY_CHECKING
struct stasis_buffer_hash_t { typedef struct {
struct LH_ENTRY(table) * cachedPages; struct LH_ENTRY(table) * cachedPages;
pthread_t worker; pthread_t worker;
pthread_mutex_t mut; pthread_mutex_t mut;
@ -36,30 +34,10 @@ struct stasis_buffer_hash_t {
pageid_t pageCount; pageid_t pageCount;
replacementPolicy *lru; replacementPolicy *lru;
stasis_buffer_pool_t *buffer_pool; stasis_buffer_pool_t *buffer_pool;
stasis_page_handle_t *page_handler; stasis_page_handle_t *page_handle;
int flushing; int flushing;
int running; int running;
}; } stasis_buffer_hash_t;
static struct LH_ENTRY(table) * cachedPages;
static pthread_t worker;
static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t readComplete = PTHREAD_COND_INITIALIZER;
static pthread_cond_t needFree = PTHREAD_COND_INITIALIZER;
static pageid_t pageCount;
// A page is in LRU iff !pending, !pinned
static replacementPolicy * lru;
static stasis_buffer_pool_t * stasis_buffer_pool;
static stasis_page_handle_t * page_handle;
static int flushing;
static int running;
typedef struct LL_ENTRY(node_t) node_t; typedef struct LL_ENTRY(node_t) node_t;
@ -79,6 +57,7 @@ static inline intptr_t* pagePinCountPtr(Page * p) {
return ((intptr_t*)(&((p)->queue))); return ((intptr_t*)(&((p)->queue)));
} }
static inline int needFlush() { static inline int needFlush() {
// XXX need to remove call to stasis_runtime*
pageid_t count = stasis_dirty_page_table_dirty_count(stasis_runtime_dirty_page_table()); pageid_t count = stasis_dirty_page_table_dirty_count(stasis_runtime_dirty_page_table());
const pageid_t needed = 1000; //MAX_BUFFER_SIZE / 5; const pageid_t needed = 1000; //MAX_BUFFER_SIZE / 5;
if(count > needed) { if(count > needed) {
@ -121,9 +100,9 @@ inline static void checkPageState(Page * p) { }
#endif #endif
inline static int tryToWriteBackPage(pageid_t page) { inline static int tryToWriteBackPage(stasis_buffer_manager_t *bm, pageid_t page) {
stasis_buffer_hash_t * bh = bm->impl;
Page * p = LH_ENTRY(find)(cachedPages, &page, sizeof(page)); Page * p = LH_ENTRY(find)(bh->cachedPages, &page, sizeof(page));
if(!p) { return ENOENT; } if(!p) { return ENOENT; }
@ -133,29 +112,25 @@ inline static int tryToWriteBackPage(pageid_t page) {
return EBUSY; return EBUSY;
} }
DEBUG("Write(%ld)\n", (long)victim->id); DEBUG("Write(%ld)\n", (long)victim->id);
page_handle->write(page_handle, p); /// XXX pageCleanup and pageFlushed might be heavyweight. bh->page_handle->write(bh->page_handle, p); /// XXX pageCleanup and pageFlushed might be heavyweight.
// int locked = trywritelock(p->rwlatch,0);
// assert(locked);
// assert(!stasis_dirty_page_table_is_dirty(stasis_runtime_dirty_page_table(), p));
// unlock(p->rwlatch);
return 0; return 0;
} }
/** Returns a free page. The page will not be in freeList, /** Returns a free page. The page will not be in freeList,
cachedPages or lru. */ cachedPages or lru. */
inline static Page * getFreePage() { inline static Page * getFreePage(stasis_buffer_manager_t *bm) {
stasis_buffer_hash_t * bh = bm->impl;
Page * ret; Page * ret;
if(pageCount < MAX_BUFFER_SIZE) { if(bh->pageCount < MAX_BUFFER_SIZE) {
ret = stasis_buffer_pool_malloc_page(stasis_buffer_pool); ret = stasis_buffer_pool_malloc_page(bh->buffer_pool);
stasis_buffer_pool_free_page(stasis_buffer_pool, ret,-1); stasis_buffer_pool_free_page(bh->buffer_pool, ret,-1);
(*pagePinCountPtr(ret)) = 0; (*pagePinCountPtr(ret)) = 0;
(*pagePendingPtr(ret)) = 0; (*pagePendingPtr(ret)) = 0;
pageSetNode(ret,0,0); pageSetNode(ret,0,0);
pageCount++; bh->pageCount++;
} else { } else {
while((ret = lru->getStale(lru))) { while((ret = bh->lru->getStale(bh->lru))) {
// Make sure we have an exclusive lock on victim. // Make sure we have an exclusive lock on victim.
if(!ret) { if(!ret) {
printf("bufferHash.c: Cannot find free page for application request.\nbufferHash.c: This should not happen unless all pages have been pinned.\nbufferHash.c: Crashing."); printf("bufferHash.c: Cannot find free page for application request.\nbufferHash.c: This should not happen unless all pages have been pinned.\nbufferHash.c: Crashing.");
@ -164,19 +139,19 @@ inline static Page * getFreePage() {
assert(!*pagePinCountPtr(ret)); assert(!*pagePinCountPtr(ret));
assert(!*pagePendingPtr(ret)); assert(!*pagePendingPtr(ret));
if(ret->dirty) { if(ret->dirty) {
pthread_mutex_unlock(&mut); pthread_mutex_unlock(&bh->mut);
DEBUG("Blocking app thread"); DEBUG("Blocking app thread");
// We don't really care if this flush happens, so long as *something* is being written back, so ignore the EAGAIN it could return. // We don't really care if this flush happens, so long as *something* is being written back, so ignore the EAGAIN it could return.
// (Besides, once this returns EAGAIN twice, we know that some other flush concurrently was initiated + returned, so we're good to go...) // (Besides, once this returns EAGAIN twice, we know that some other flush concurrently was initiated + returned, so we're good to go...)
stasis_dirty_page_table_flush(stasis_runtime_dirty_page_table()); stasis_dirty_page_table_flush(stasis_runtime_dirty_page_table());
pthread_mutex_lock(&mut); pthread_mutex_lock(&bh->mut);
} else { } else {
break; break;
} }
} }
lru->remove(lru, ret); bh->lru->remove(bh->lru, ret);
Page * check = LH_ENTRY(remove)(cachedPages, &ret->id, sizeof(ret->id)); Page * check = LH_ENTRY(remove)(bh->cachedPages, &ret->id, sizeof(ret->id));
assert(check == ret); assert(check == ret);
} }
assert(!*pagePinCountPtr(ret)); assert(!*pagePinCountPtr(ret));
@ -186,31 +161,34 @@ inline static Page * getFreePage() {
return ret; return ret;
} }
static void * writeBackWorker(void * ignored) { static void * writeBackWorker(void * bmp) {
pthread_mutex_lock(&mut); stasis_buffer_manager_t* bm = bmp;
stasis_buffer_hash_t * bh = bm->impl;
pthread_mutex_lock(&bh->mut);
while(1) { while(1) {
while(running && (!needFlush())) { while(bh->running && (!needFlush())) {
flushing = 0; bh->flushing = 0;
DEBUG("Sleeping in write back worker (count = %lld)\n", stasis_dirty_page_table_dirty_count(stasis_runtime_dirty_page_table())); DEBUG("Sleeping in write back worker (count = %lld)\n", stasis_dirty_page_table_dirty_count(stasis_runtime_dirty_page_table()));
pthread_cond_wait(&needFree, &mut); pthread_cond_wait(&bh->needFree, &bh->mut);
DEBUG("Woke write back worker (count = %lld)\n", stasis_dirty_page_table_dirty_count(stasis_runtime_dirty_page_table())); DEBUG("Woke write back worker (count = %lld)\n", stasis_dirty_page_table_dirty_count(stasis_runtime_dirty_page_table()));
flushing = 1; bh->flushing = 1;
} }
if(!running) { break; } if(!bh->running) { break; }
pthread_mutex_unlock(&mut); pthread_mutex_unlock(&bh->mut);
DEBUG("Calling flush\n"); DEBUG("Calling flush\n");
// ignore ret val; this flush is for performance, not correctness. // ignore ret val; this flush is for performance, not correctness.
stasis_dirty_page_table_flush(stasis_runtime_dirty_page_table()); stasis_dirty_page_table_flush(stasis_runtime_dirty_page_table()); // XXX no call to stasis_runtime_*
pthread_mutex_lock(&mut); pthread_mutex_lock(&bh->mut);
} }
pthread_mutex_unlock(&mut); pthread_mutex_unlock(&bh->mut);
return 0; return 0;
} }
static Page * bhGetCachedPage(int xid, const pageid_t pageid) { static Page * bhGetCachedPage(stasis_buffer_manager_t* bm, int xid, const pageid_t pageid) {
pthread_mutex_lock(&mut); stasis_buffer_hash_t * bh = bm->impl;
pthread_mutex_lock(&bh->mut);
// Is the page in cache? // Is the page in cache?
Page * ret = LH_ENTRY(find)(cachedPages, &pageid, sizeof(pageid)); Page * ret = LH_ENTRY(find)(bh->cachedPages, &pageid, sizeof(pageid));
if(ret) { if(ret) {
checkPageState(ret); checkPageState(ret);
#ifdef LATCH_SANITY_CHECKING #ifdef LATCH_SANITY_CHECKING
@ -220,7 +198,7 @@ static Page * bhGetCachedPage(int xid, const pageid_t pageid) {
if(!*pagePendingPtr(ret)) { if(!*pagePendingPtr(ret)) {
if(!*pagePinCountPtr(ret) ) { if(!*pagePinCountPtr(ret) ) {
// Then ret is in lru (otherwise it would be pending, or not cached); remove it. // Then ret is in lru (otherwise it would be pending, or not cached); remove it.
lru->remove(lru, ret); bh->lru->remove(bh->lru, ret);
} }
(*pagePinCountPtr(ret))++; (*pagePinCountPtr(ret))++;
checkPageState(ret); checkPageState(ret);
@ -229,11 +207,12 @@ static Page * bhGetCachedPage(int xid, const pageid_t pageid) {
ret = 0; ret = 0;
} }
} }
pthread_mutex_unlock(&mut); pthread_mutex_unlock(&bh->mut);
return ret; return ret;
} }
static Page * bhLoadPageImpl_helper(int xid, const pageid_t pageid, int uninitialized, pagetype_t type) { static Page * bhLoadPageImpl_helper(stasis_buffer_manager_t* bm, int xid, const pageid_t pageid, int uninitialized, pagetype_t type) {
stasis_buffer_hash_t * bh = bm->impl;
// Note: Calls to loadlatch in this function violate lock order, but // Note: Calls to loadlatch in this function violate lock order, but
// should be safe, since we make sure no one can have a writelock // should be safe, since we make sure no one can have a writelock
@ -241,10 +220,10 @@ static Page * bhLoadPageImpl_helper(int xid, const pageid_t pageid, int uninitia
void* check; void* check;
pthread_mutex_lock(&mut); pthread_mutex_lock(&bh->mut);
// Is the page in cache? // Is the page in cache?
Page * ret = LH_ENTRY(find)(cachedPages, &pageid,sizeof(pageid)); Page * ret = LH_ENTRY(find)(bh->cachedPages, &pageid,sizeof(pageid));
do { do {
@ -253,9 +232,9 @@ static Page * bhLoadPageImpl_helper(int xid, const pageid_t pageid, int uninitia
while(ret) { while(ret) {
checkPageState(ret); checkPageState(ret);
if(*pagePendingPtr(ret)) { if(*pagePendingPtr(ret)) {
pthread_cond_wait(&readComplete, &mut); pthread_cond_wait(&bh->readComplete, &bh->mut);
if(ret->id != pageid) { if(ret->id != pageid) {
ret = LH_ENTRY(find)(cachedPages, &pageid, sizeof(pageid)); ret = LH_ENTRY(find)(bh->cachedPages, &pageid, sizeof(pageid));
} }
} else { } else {
#ifdef LATCH_SANITY_CHECKING #ifdef LATCH_SANITY_CHECKING
@ -264,11 +243,11 @@ static Page * bhLoadPageImpl_helper(int xid, const pageid_t pageid, int uninitia
#endif #endif
if(! *pagePinCountPtr(ret) ) { if(! *pagePinCountPtr(ret) ) {
// Then ret is in lru (otherwise it would be pending, or not cached); remove it. // Then ret is in lru (otherwise it would be pending, or not cached); remove it.
lru->remove(lru, ret); bh->lru->remove(bh->lru, ret);
} }
(*pagePinCountPtr(ret))++; (*pagePinCountPtr(ret))++;
checkPageState(ret); checkPageState(ret);
pthread_mutex_unlock(&mut); pthread_mutex_unlock(&bh->mut);
assert(ret->id == pageid); assert(ret->id == pageid);
return ret; return ret;
} }
@ -278,10 +257,10 @@ static Page * bhLoadPageImpl_helper(int xid, const pageid_t pageid, int uninitia
assert(!ret); assert(!ret);
// Remove a page from the freelist. This may cause writeback, and release our latch. // Remove a page from the freelist. This may cause writeback, and release our latch.
Page * ret2 = getFreePage(); Page * ret2 = getFreePage(bm);
// Did some other thread put the page in cache for us? // Did some other thread put the page in cache for us?
ret = LH_ENTRY(find)(cachedPages, &pageid,sizeof(pageid)); ret = LH_ENTRY(find)(bh->cachedPages, &pageid,sizeof(pageid));
if(!ret) { if(!ret) {
@ -295,8 +274,8 @@ static Page * bhLoadPageImpl_helper(int xid, const pageid_t pageid, int uninitia
break; break;
} else { } else {
// Put the page we were about to evict back in cached pages // Put the page we were about to evict back in cached pages
LH_ENTRY(insert)(cachedPages, &ret2->id, sizeof(ret2->id), ret2); LH_ENTRY(insert)(bh->cachedPages, &ret2->id, sizeof(ret2->id), ret2);
lru->insert(lru, ret2); bh->lru->insert(bh->lru, ret2);
// On the next loop iteration, we'll probably return the page the other thread inserted for us. // On the next loop iteration, we'll probably return the page the other thread inserted for us.
} }
// try again. // try again.
@ -305,7 +284,7 @@ static Page * bhLoadPageImpl_helper(int xid, const pageid_t pageid, int uninitia
// Add a pending entry to cachedPages to block like-minded threads and writeback // Add a pending entry to cachedPages to block like-minded threads and writeback
(*pagePendingPtr(ret)) = (void*)1; (*pagePendingPtr(ret)) = (void*)1;
check = LH_ENTRY(insert)(cachedPages,&pageid,sizeof(pageid), ret); check = LH_ENTRY(insert)(bh->cachedPages,&pageid,sizeof(pageid), ret);
assert(!check); assert(!check);
ret->id = pageid; ret->id = pageid;
@ -314,11 +293,11 @@ static Page * bhLoadPageImpl_helper(int xid, const pageid_t pageid, int uninitia
// Now, it is safe to release the mutex; other threads won't // Now, it is safe to release the mutex; other threads won't
// try to read this page from disk. // try to read this page from disk.
pthread_mutex_unlock(&mut); pthread_mutex_unlock(&bh->mut);
page_handle->read(page_handle, ret, type); bh->page_handle->read(bh->page_handle, ret, type);
pthread_mutex_lock(&mut); pthread_mutex_lock(&bh->mut);
} else { } else {
memset(ret->memAddr,0,PAGE_SIZE); memset(ret->memAddr,0,PAGE_SIZE);
@ -340,58 +319,63 @@ static Page * bhLoadPageImpl_helper(int xid, const pageid_t pageid, int uninitia
assert(locked); assert(locked);
#endif #endif
pthread_mutex_unlock(&mut); pthread_mutex_unlock(&bh->mut);
pthread_cond_broadcast(&readComplete); pthread_cond_broadcast(&bh->readComplete);
// TODO Improve writeback policy // TODO Improve writeback policy
if((!flushing) && needFlush()) { if((!bh->flushing) && needFlush()) {
pthread_cond_signal(&needFree); pthread_cond_signal(&bh->needFree);
} }
assert(ret->id == pageid); assert(ret->id == pageid);
checkPageState (ret); checkPageState (ret);
return ret; return ret;
} }
static Page * bhLoadPageImpl(int xid, const pageid_t pageid, pagetype_t type) { static Page * bhLoadPageImpl(stasis_buffer_manager_t *bm, int xid, const pageid_t pageid, pagetype_t type) {
return bhLoadPageImpl_helper(xid,pageid,0, type); return bhLoadPageImpl_helper(bm, xid, pageid, 0, type);
} }
static Page * bhLoadUninitPageImpl(int xid, const pageid_t pageid) { static Page * bhLoadUninitPageImpl(stasis_buffer_manager_t *bm, int xid, const pageid_t pageid) {
return bhLoadPageImpl_helper(xid,pageid,1,UNKNOWN_TYPE_PAGE); // 1 means dont care about preimage of page. return bhLoadPageImpl_helper(bm, xid,pageid,1,UNKNOWN_TYPE_PAGE); // 1 means dont care about preimage of page.
} }
static void bhReleasePage(Page * p) { static void bhReleasePage(stasis_buffer_manager_t * bm, Page * p) {
pthread_mutex_lock(&mut); stasis_buffer_hash_t * bh = bm->impl;
pthread_mutex_lock(&bh->mut);
checkPageState(p); checkPageState(p);
(*pagePinCountPtr(p))--; (*pagePinCountPtr(p))--;
if(!(*pagePinCountPtr(p))) { if(!(*pagePinCountPtr(p))) {
assert(!pageGetNode(p, 0)); assert(!pageGetNode(p, 0));
lru->insert(lru,p); bh->lru->insert(bh->lru,p);
} }
#ifdef LATCH_SANITY_CHECKING #ifdef LATCH_SANITY_CHECKING
unlock(p->loadlatch); unlock(p->loadlatch);
#endif #endif
pthread_mutex_unlock(&mut); pthread_mutex_unlock(&bh->mut);
} }
static int bhWriteBackPage(pageid_t pageid) { static int bhWriteBackPage(stasis_buffer_manager_t* bm, pageid_t pageid) {
pthread_mutex_lock(&mut); stasis_buffer_hash_t * bh = bm->impl;
int ret = tryToWriteBackPage(pageid); pthread_mutex_lock(&bh->mut);
pthread_mutex_unlock(&mut); int ret = tryToWriteBackPage(bm, pageid);
pthread_mutex_unlock(&bh->mut);
return ret; return ret;
} }
static void bhForcePages() { static void bhForcePages(stasis_buffer_manager_t* bm) {
page_handle->force_file(page_handle); stasis_buffer_hash_t * bh = bm->impl;
bh->page_handle->force_file(bh->page_handle);
} }
static void bhForcePageRange(pageid_t start, pageid_t stop) { static void bhForcePageRange(stasis_buffer_manager_t *bm, pageid_t start, pageid_t stop) {
page_handle->force_range(page_handle, start, stop); stasis_buffer_hash_t * bh = bm->impl;
bh->page_handle->force_range(bh->page_handle, start, stop);
} }
static void bhBufDeinit() { static void bhBufDeinit(stasis_buffer_manager_t * bm) {
pthread_mutex_lock(&mut); stasis_buffer_hash_t * bh = bm->impl;
running = 0; pthread_mutex_lock(&bh->mut);
pthread_mutex_unlock(&mut); bh->running = 0;
pthread_mutex_unlock(&bh->mut);
pthread_cond_signal(&needFree); // Wake up the writeback thread so it will exit. pthread_cond_signal(&bh->needFree); // Wake up the writeback thread so it will exit.
pthread_join(worker, 0); pthread_join(bh->worker, 0);
// XXX flush range should return an error number, which we would check. (Right now, it aborts...) // XXX flush range should return an error number, which we would check. (Right now, it aborts...)
int ret = stasis_dirty_page_table_flush(stasis_runtime_dirty_page_table()); int ret = stasis_dirty_page_table_flush(stasis_runtime_dirty_page_table());
@ -399,7 +383,7 @@ static void bhBufDeinit() {
struct LH_ENTRY(list) iter; struct LH_ENTRY(list) iter;
const struct LH_ENTRY(pair_t) * next; const struct LH_ENTRY(pair_t) * next;
LH_ENTRY(openlist)(cachedPages, &iter); LH_ENTRY(openlist)(bh->cachedPages, &iter);
while((next = LH_ENTRY(readlist)(&iter))) { while((next = LH_ENTRY(readlist)(&iter))) {
Page * p = next->value; Page * p = next->value;
assertunlocked(p->rwlatch); assertunlocked(p->rwlatch);
@ -410,23 +394,29 @@ static void bhBufDeinit() {
stasis_page_cleanup(p); // normally called by writeBackOnePage() stasis_page_cleanup(p); // normally called by writeBackOnePage()
} }
LH_ENTRY(closelist)(&iter); LH_ENTRY(closelist)(&iter);
LH_ENTRY(destroy)(cachedPages); LH_ENTRY(destroy)(bh->cachedPages);
lru->deinit(lru); bh->lru->deinit(bh->lru);
stasis_buffer_pool_deinit(stasis_buffer_pool); stasis_buffer_pool_deinit(bh->buffer_pool);
page_handle->close(page_handle); bh->page_handle->close(bh->page_handle);
pthread_mutex_destroy(&bh->mut);
pthread_cond_destroy(&bh->needFree);
pthread_cond_destroy(&bh->readComplete);
free(bh);
} }
static void bhSimulateBufferManagerCrash() { static void bhSimulateBufferManagerCrash(stasis_buffer_manager_t *bm) {
pthread_mutex_lock(&mut); stasis_buffer_hash_t * bh = bm->impl;
running = 0; pthread_mutex_lock(&bh->mut);
pthread_mutex_unlock(&mut); bh->running = 0;
pthread_mutex_unlock(&bh->mut);
pthread_cond_signal(&needFree); pthread_cond_signal(&bh->needFree);
pthread_join(worker, 0); pthread_join(bh->worker, 0);
struct LH_ENTRY(list) iter; struct LH_ENTRY(list) iter;
const struct LH_ENTRY(pair_t) * next; const struct LH_ENTRY(pair_t) * next;
LH_ENTRY(openlist)(cachedPages, &iter); LH_ENTRY(openlist)(bh->cachedPages, &iter);
while((next = LH_ENTRY(readlist)(&iter))) { while((next = LH_ENTRY(readlist)(&iter))) {
Page * p = next->value; Page * p = next->value;
writelock(p->rwlatch,0); writelock(p->rwlatch,0);
@ -435,42 +425,60 @@ static void bhSimulateBufferManagerCrash() {
unlock(p->rwlatch); unlock(p->rwlatch);
} }
LH_ENTRY(closelist)(&iter); LH_ENTRY(closelist)(&iter);
LH_ENTRY(destroy)(cachedPages); LH_ENTRY(destroy)(bh->cachedPages);
lru->deinit(lru); bh->lru->deinit(bh->lru);
stasis_buffer_pool_deinit(stasis_buffer_pool); stasis_buffer_pool_deinit(bh->buffer_pool);
page_handle->close(page_handle); bh->page_handle->close(bh->page_handle);
pthread_mutex_destroy(&bh->mut);
pthread_cond_destroy(&bh->needFree);
pthread_cond_destroy(&bh->readComplete);
free(bh);
} }
void stasis_buffer_manager_hash_open(stasis_page_handle_t * h) { stasis_buffer_manager_t* stasis_buffer_manager_hash_open(stasis_page_handle_t * h) {
page_handle = h; stasis_buffer_manager_t *bm = malloc(sizeof(*bm));
assert(!running); stasis_buffer_hash_t *bh = malloc(sizeof(*bh));
bm->loadPageImpl = bhLoadPageImpl;
bm->loadUninitPageImpl = bhLoadUninitPageImpl;
bm->getCachedPageImpl = bhGetCachedPage;
bm->releasePageImpl = bhReleasePage;
bm->writeBackPage = bhWriteBackPage;
bm->forcePages = bhForcePages;
bm->forcePageRange = bhForcePageRange;
bm->stasis_buffer_manager_close = bhBufDeinit;
bm->stasis_buffer_manager_simulate_crash = bhSimulateBufferManagerCrash;
bm->impl = bh;
bh->page_handle = h;
bh->running = 0;
#ifdef LONG_RUN #ifdef LONG_RUN
printf("Using expensive bufferHash sanity checking.\n"); printf("Using expensive bufferHash sanity checking.\n");
#endif #endif
loadPageImpl = bhLoadPageImpl; bh->flushing = 0;
loadUninitPageImpl = bhLoadUninitPageImpl;
getCachedPageImpl = bhGetCachedPage;
releasePageImpl = bhReleasePage;
writeBackPage = bhWriteBackPage;
forcePages = bhForcePages;
forcePageRange = bhForcePageRange;
stasis_buffer_manager_close = bhBufDeinit;
stasis_buffer_manager_simulate_crash = bhSimulateBufferManagerCrash;
flushing = 0; bh->buffer_pool = stasis_buffer_pool_init();
stasis_buffer_pool = stasis_buffer_pool_init(); bh->lru = lruFastInit(pageGetNode, pageSetNode, 0);
lru = lruFastInit(pageGetNode, pageSetNode, 0); bh->cachedPages = LH_ENTRY(create)(MAX_BUFFER_SIZE);
cachedPages = LH_ENTRY(create)(MAX_BUFFER_SIZE); bh->pageCount = 0;
pageCount = 0; bh->running = 1;
running = 1; pthread_mutex_init(&bh->mut,0);
pthread_create(&worker, 0, writeBackWorker, 0); pthread_cond_init(&bh->needFree,0);
pthread_cond_init(&bh->readComplete,0);
pthread_create(&bh->worker, 0, writeBackWorker, bm);
return bm;
} }

View file

@ -1,6 +1,5 @@
#include <stasis/common.h> #include <stasis/common.h>
#include <stasis/bufferManager.h>
#include <stasis/pageHandle.h> #include <stasis/pageHandle.h>
#include <stasis/bufferPool.h> #include <stasis/bufferPool.h>
#include <stasis/bufferManager/legacy/pageFile.h> #include <stasis/bufferManager/legacy/pageFile.h>
@ -21,17 +20,17 @@ static pthread_key_t lastPage;
#define RW 1 #define RW 1
static void bufManBufDeinit(); static void bufManBufDeinit();
static compensated_function Page *bufManLoadPage(int xid, pageid_t pageid, pagetype_t type); static compensated_function Page *bufManLoadPage(stasis_buffer_manager_t *ignored, int xid, pageid_t pageid, pagetype_t type);
static compensated_function Page *bufManGetCachedPage(int xid, pageid_t pageid); static compensated_function Page *bufManGetCachedPage(stasis_buffer_manager_t *ignored, int xid, pageid_t pageid);
static compensated_function Page *bufManLoadUninitPage(int xid, pageid_t pageid); static compensated_function Page *bufManLoadUninitPage(stasis_buffer_manager_t *ignored, int xid, pageid_t pageid);
static void bufManReleasePage (Page * p); static void bufManReleasePage (stasis_buffer_manager_t *ignored, Page * p);
static void bufManSimulateBufferManagerCrash(); static void bufManSimulateBufferManagerCrash();
static stasis_page_handle_t * page_handle; static stasis_page_handle_t * page_handle;
static stasis_buffer_pool_t * stasis_buffer_pool; static stasis_buffer_pool_t * stasis_buffer_pool;
static int pageWrite_legacyWrapper(pageid_t pageid) { static int pageWrite_legacyWrapper(stasis_buffer_manager_t *ignored, pageid_t pageid) {
Page * p = loadPage(-1, pageid); Page * p = loadPage(-1, pageid);
// XXX this is unsafe; the page could be pinned! // XXX this is unsafe; the page could be pinned!
page_handle->write(page_handle,p); page_handle->write(page_handle,p);
@ -41,21 +40,22 @@ static int pageWrite_legacyWrapper(pageid_t pageid) {
static void forcePageFile_legacyWrapper() { static void forcePageFile_legacyWrapper() {
page_handle->force_file(page_handle); page_handle->force_file(page_handle);
} }
static void forceRangePageFile_legacyWrapper(lsn_t start, lsn_t stop) { static void forceRangePageFile_legacyWrapper(stasis_buffer_manager_t *ignored, lsn_t start, lsn_t stop) {
page_handle->force_range(page_handle, start, stop); page_handle->force_range(page_handle, start, stop);
} }
int stasis_buffer_manager_deprecated_open(stasis_page_handle_t * ph) { stasis_buffer_manager_t* stasis_buffer_manager_deprecated_open(stasis_page_handle_t * ph) {
page_handle = ph; page_handle = ph;
releasePageImpl = bufManReleasePage; stasis_buffer_manager_t * bm = malloc(sizeof(*bm));
loadPageImpl = bufManLoadPage; bm->releasePageImpl = bufManReleasePage;
loadUninitPageImpl = bufManLoadUninitPage; bm->loadPageImpl = bufManLoadPage;
getCachedPageImpl = bufManGetCachedPage; bm->loadUninitPageImpl = bufManLoadUninitPage;
writeBackPage = pageWrite_legacyWrapper; bm->getCachedPageImpl = bufManGetCachedPage;
forcePages = forcePageFile_legacyWrapper; bm->writeBackPage = pageWrite_legacyWrapper;
forcePageRange = forceRangePageFile_legacyWrapper; bm->forcePages = forcePageFile_legacyWrapper;
stasis_buffer_manager_close = bufManBufDeinit; bm->forcePageRange = forceRangePageFile_legacyWrapper;
stasis_buffer_manager_simulate_crash = bufManSimulateBufferManagerCrash; bm->stasis_buffer_manager_close = bufManBufDeinit;
bm->stasis_buffer_manager_simulate_crash = bufManSimulateBufferManagerCrash;
stasis_buffer_pool = stasis_buffer_pool_init(); stasis_buffer_pool = stasis_buffer_pool_init();
@ -80,7 +80,8 @@ int stasis_buffer_manager_deprecated_open(stasis_page_handle_t * ph) {
profile_load_hash = LH_ENTRY(create)(10); profile_load_hash = LH_ENTRY(create)(10);
profile_load_pins_hash = LH_ENTRY(create)(10); profile_load_pins_hash = LH_ENTRY(create)(10);
#endif #endif
return 0; bm->impl = 0; // XXX hack, but this module is deprecated
return bm;
} }
static void bufManBufDeinit() { static void bufManBufDeinit() {
@ -126,7 +127,7 @@ static void bufManSimulateBufferManagerCrash() {
#endif #endif
} }
static void bufManReleasePage (Page * p) { static void bufManReleasePage (stasis_buffer_manager_t *ignored, Page * p) {
unlock(p->loadlatch); unlock(p->loadlatch);
#ifdef PIN_COUNT #ifdef PIN_COUNT
pthread_mutex_lock(&pinCount_mutex); pthread_mutex_lock(&pinCount_mutex);
@ -340,7 +341,7 @@ static Page* bufManGetPage(pageid_t pageid, int locktype, int uninitialized, pag
return ret; return ret;
} }
static compensated_function Page *bufManLoadPage(int xid, const pageid_t pageid, pagetype_t type) { static compensated_function Page *bufManLoadPage(stasis_buffer_manager_t *ignored, int xid, const pageid_t pageid, pagetype_t type) {
Page * ret = pthread_getspecific(lastPage); Page * ret = pthread_getspecific(lastPage);
@ -371,12 +372,12 @@ static compensated_function Page *bufManLoadPage(int xid, const pageid_t pageid,
return ret; return ret;
} }
static Page* bufManGetCachedPage(int xid, const pageid_t pageid) { static Page* bufManGetCachedPage(stasis_buffer_manager_t *ignored, int xid, const pageid_t pageid) {
// XXX hack; but this code is deprecated // XXX hack; but this code is deprecated
return bufManLoadPage(xid, pageid, UNKNOWN_TYPE_PAGE); return bufManLoadPage(ignored, xid, pageid, UNKNOWN_TYPE_PAGE);
} }
static compensated_function Page *bufManLoadUninitPage(int xid, pageid_t pageid) { static compensated_function Page *bufManLoadUninitPage(stasis_buffer_manager_t *ignored, int xid, pageid_t pageid) {
Page * ret = pthread_getspecific(lastPage); Page * ret = pthread_getspecific(lastPage);

View file

@ -1,79 +1,93 @@
#include <stasis/transactional.h> #include <stasis/transactional.h>
#include <stasis/bufferManager.h>
#include <stasis/bufferPool.h> #include <stasis/bufferPool.h>
#include <stasis/truncation.h> #include <stasis/truncation.h>
#include <stasis/latches.h> #include <stasis/latches.h>
#include <stasis/bufferManager/pageArray.h> #include <stasis/bufferManager/pageArray.h>
#include <stasis/page.h> #include <stasis/page.h>
static Page ** pageMap; typedef struct {
static pageid_t pageCount; Page ** pageMap;
static pthread_mutex_t pageArray_mut = PTHREAD_MUTEX_INITIALIZER; pageid_t pageCount;
pthread_mutex_t mut;
} stasis_buffer_manager_page_array_t;
static Page * paLoadPage(int xid, pageid_t pageid, pagetype_t type) { static Page * paLoadPage(stasis_buffer_manager_t *bm, int xid, pageid_t pageid, pagetype_t type) {
stasis_buffer_manager_page_array_t *pa = bm->impl;
pthread_mutex_lock(&pageArray_mut); pthread_mutex_lock(&pa->mut);
if(pageid >= pageCount) { if(pageid >= pa->pageCount) {
pageMap = realloc(pageMap, (1+pageid) * sizeof(Page*)); pa->pageMap = realloc(pa->pageMap, (1+pageid) * sizeof(Page*));
for(pageid_t i = pageCount; i <= pageid; i++) { for(pageid_t i = pa->pageCount; i <= pageid; i++) {
pageMap[i] = 0; pa->pageMap[i] = 0;
} }
pageCount = pageid + 1; pa->pageCount = pageid + 1;
} }
if(!pageMap[pageid]) { if(!pa->pageMap[pageid]) {
pageMap[pageid] = malloc(sizeof(Page)); pa->pageMap[pageid] = malloc(sizeof(Page));
pageMap[pageid]->id = pageid; pa->pageMap[pageid]->id = pageid;
pageMap[pageid]->pageType = type == UNKNOWN_TYPE_PAGE ? 0 : type; pa->pageMap[pageid]->pageType = type == UNKNOWN_TYPE_PAGE ? 0 : type;
pageMap[pageid]->LSN = 0; pa->pageMap[pageid]->LSN = 0;
pageMap[pageid]->dirty = 0; pa->pageMap[pageid]->dirty = 0;
pageMap[pageid]->next = 0; pa->pageMap[pageid]->next = 0;
pageMap[pageid]->prev = 0; pa->pageMap[pageid]->prev = 0;
pageMap[pageid]->queue = 0; pa->pageMap[pageid]->queue = 0;
pageMap[pageid]->inCache = 1; pa->pageMap[pageid]->inCache = 1;
pageMap[pageid]->rwlatch = initlock(); pa->pageMap[pageid]->rwlatch = initlock();
pageMap[pageid]->loadlatch = initlock(); pa->pageMap[pageid]->loadlatch = initlock();
pageMap[pageid]->memAddr= calloc(PAGE_SIZE, sizeof(byte)); pa->pageMap[pageid]->memAddr= calloc(PAGE_SIZE, sizeof(byte));
} else{ } else{
if(type != UNKNOWN_TYPE_PAGE) { assert(type == pageMap[pageid]->pageType); } if(type != UNKNOWN_TYPE_PAGE) { assert(type == pa->pageMap[pageid]->pageType); }
} }
pthread_mutex_unlock(&pageArray_mut); pthread_mutex_unlock(&pa->mut);
return pageMap[pageid]; return pa->pageMap[pageid];
} }
static Page* paGetCachedPage(int xid, pageid_t page) { static Page* paLoadUninitPage(stasis_buffer_manager_t *bm, int xid, pageid_t page) {
return paLoadPage(xid, page, UNKNOWN_TYPE_PAGE); return paLoadPage(bm, xid, page, UNKNOWN_TYPE_PAGE);
} }
static void paReleasePage(Page * p) { static Page* paGetCachedPage(stasis_buffer_manager_t *bm, int xid, pageid_t page) {
return paLoadPage(bm, xid, page, UNKNOWN_TYPE_PAGE);
}
static void paReleasePage(stasis_buffer_manager_t *bm, Page * p) {
writelock(p->rwlatch,0); writelock(p->rwlatch,0);
stasis_dirty_page_table_set_clean(stasis_runtime_dirty_page_table(), p); stasis_dirty_page_table_set_clean(stasis_runtime_dirty_page_table(), p);
unlock(p->rwlatch); unlock(p->rwlatch);
} }
static int paWriteBackPage(pageid_t p) { return 0; /* no-op */ } static int paWriteBackPage(stasis_buffer_manager_t *bm, pageid_t p) { return 0; /* no-op */ }
static void paForcePages() { /* no-op */ } static void paForcePages(stasis_buffer_manager_t * bm) { /* no-op */ }
static void paForcePageRange(pageid_t start, pageid_t stop) { /* no-op */ } static void paForcePageRange(stasis_buffer_manager_t *bm, pageid_t start, pageid_t stop) { /* no-op */ }
static void paBufDeinit() { static void paBufDeinit(stasis_buffer_manager_t * bm) {
for(pageid_t i =0; i < pageCount; i++) { stasis_buffer_manager_page_array_t *pa = bm->impl;
if(pageMap[i]) {
deletelock(pageMap[i]->rwlatch); for(pageid_t i =0; i < pa->pageCount; i++) {
deletelock(pageMap[i]->loadlatch); if(pa->pageMap[i]) {
free(pageMap[i]); deletelock(pa->pageMap[i]->rwlatch);
deletelock(pa->pageMap[i]->loadlatch);
free(pa->pageMap[i]);
} }
} }
pthread_mutex_destroy(&pa->mut);
free(pa);
} }
void stasis_buffer_manager_mem_array_open () { stasis_buffer_manager_t * stasis_buffer_manager_mem_array_open () {
releasePageImpl = paReleasePage; stasis_buffer_manager_t * bm = malloc(sizeof(*bm));
loadPageImpl = paLoadPage; stasis_buffer_manager_page_array_t * pa = malloc(sizeof(*pa));
getCachedPageImpl = paGetCachedPage;
writeBackPage = paWriteBackPage;
forcePages = paForcePages;
forcePageRange = paForcePageRange;
stasis_buffer_manager_close = paBufDeinit;
stasis_buffer_manager_simulate_crash = paBufDeinit;
pageCount = 0; bm->releasePageImpl = paReleasePage;
pageMap = 0; bm->loadPageImpl = paLoadPage;
bm->loadUninitPageImpl = paLoadUninitPage;
bm->getCachedPageImpl = paGetCachedPage;
bm->writeBackPage = paWriteBackPage;
bm->forcePages = paForcePages;
bm->forcePageRange = paForcePageRange;
bm->stasis_buffer_manager_close = paBufDeinit;
bm->stasis_buffer_manager_simulate_crash = paBufDeinit;
bm->impl = pa;
pa->pageCount = 0;
pa->pageMap = 0;
pthread_mutex_init(&pa->mut,0);
return bm;
} }

View file

@ -10,7 +10,6 @@
#include <stasis/flags.h> #include <stasis/flags.h>
#include <stasis/dirtyPageTable.h> #include <stasis/dirtyPageTable.h>
#include <stasis/page.h> #include <stasis/page.h>
#include <stasis/bufferManager.h>
#include <stdio.h> #include <stdio.h>
@ -28,6 +27,7 @@ static int dpt_cmp(const void *ap, const void * bp, const void * ignored) {
struct stasis_dirty_page_table_t { struct stasis_dirty_page_table_t {
struct rbtree * table; struct rbtree * table;
stasis_buffer_manager_t * bufferManager;
pageid_t count; pageid_t count;
pthread_mutex_t mutex; pthread_mutex_t mutex;
pthread_cond_t flushDone; pthread_cond_t flushDone;
@ -131,7 +131,7 @@ int stasis_dirty_page_table_flush(stasis_dirty_page_table_t * dirtyPages) {
if(off == stride) { if(off == stride) {
pthread_mutex_unlock(&dirtyPages->mutex); pthread_mutex_unlock(&dirtyPages->mutex);
for(pageid_t i = 0; i < off; i++) { for(pageid_t i = 0; i < off; i++) {
writeBackPage(vals[i]); dirtyPages->bufferManager->writeBackPage(dirtyPages->bufferManager, vals[i]);
} }
off = 0; off = 0;
strides++; strides++;
@ -140,7 +140,7 @@ int stasis_dirty_page_table_flush(stasis_dirty_page_table_t * dirtyPages) {
} }
pthread_mutex_unlock(&dirtyPages->mutex); pthread_mutex_unlock(&dirtyPages->mutex);
for(int i = 0; i < off; i++) { for(int i = 0; i < off; i++) {
writeBackPage(vals[i]); dirtyPages->bufferManager->writeBackPage(dirtyPages->bufferManager, vals[i]);
} }
pthread_mutex_lock(&dirtyPages->mutex); pthread_mutex_lock(&dirtyPages->mutex);
dirtyPages->flushing = 0; dirtyPages->flushing = 0;
@ -151,6 +151,30 @@ int stasis_dirty_page_table_flush(stasis_dirty_page_table_t * dirtyPages) {
return 0; return 0;
} }
int stasis_dirty_page_table_get_flush_candidates(stasis_dirty_page_table_t * dirtyPages, pageid_t start, pageid_t stop, int count, pageid_t* range_starts, pageid_t* range_ends) {
pthread_mutex_lock(&dirtyPages->mutex);
int n = 0;
int b = -1;
dpt_entry dummy;
dummy.lsn = -1;
dummy.p = start;
for(const dpt_entry *e = rblookup(RB_LUGTEQ, &dummy, dirtyPages->table);
e && (stop == 0 || e->p < stop) && n < count;
e = rblookup(RB_LUGREAT, e, dirtyPages->table)) {
if(n == 0 || range_ends[b] != e->p) {
b++;
range_starts[b] = e->p;
range_ends[b] = e->p+1;
} else {
range_ends[b]++;
}
n++;
}
pthread_mutex_unlock(&dirtyPages->mutex);
return b+1;
}
void stasis_dirty_page_table_flush_range(stasis_dirty_page_table_t * dirtyPages, pageid_t start, pageid_t stop) { void stasis_dirty_page_table_flush_range(stasis_dirty_page_table_t * dirtyPages, pageid_t start, pageid_t stop) {
pthread_mutex_lock(&dirtyPages->mutex); pthread_mutex_lock(&dirtyPages->mutex);
@ -178,11 +202,14 @@ void stasis_dirty_page_table_flush_range(stasis_dirty_page_table_t * dirtyPages,
pthread_mutex_unlock(&dirtyPages->mutex); pthread_mutex_unlock(&dirtyPages->mutex);
for(pageid_t i = 0; i < n; i++) { for(pageid_t i = 0; i < n; i++) {
int err = writeBackPage(staleDirtyPages[i]); int err = dirtyPages->bufferManager->writeBackPage(dirtyPages->bufferManager, staleDirtyPages[i]);
if(stop && (err == EBUSY)) { abort(); /*api violation!*/ } if(stop && (err == EBUSY)) { abort(); /*api violation!*/ }
} }
free(staleDirtyPages); free(staleDirtyPages);
// forcePageRange(start*PAGE_SIZE,stop*PAGE_SIZE); }
void stasis_dirty_page_table_set_buffer_manager(stasis_dirty_page_table_t * dpt, stasis_buffer_manager_t *bufferManager) {
dpt->bufferManager = bufferManager;
} }
stasis_dirty_page_table_t * stasis_dirty_page_table_init() { stasis_dirty_page_table_t * stasis_dirty_page_table_init() {

View file

@ -59,7 +59,8 @@ void TlsmRegionForceRid(int xid, void *conf) {
pageid_t pid; pageid_t pid;
Tread(xid,a.regionList,&pid); Tread(xid,a.regionList,&pid);
stasis_dirty_page_table_flush_range(stasis_runtime_dirty_page_table(), pid, pid+a.regionSize); stasis_dirty_page_table_flush_range(stasis_runtime_dirty_page_table(), pid, pid+a.regionSize);
forcePageRange(pid, pid+a.regionSize); stasis_buffer_manager_t *bm = stasis_runtime_buffer_manager();
bm->forcePageRange(bm, pid, pid+a.regionSize);
// TregionDealloc(xid,pid); // TregionDealloc(xid,pid);
} }
} }

View file

@ -10,7 +10,9 @@
#include <stasis/bufferManager/legacy/pageFile.h> #include <stasis/bufferManager/legacy/pageFile.h>
#include <stasis/bufferManager/pageArray.h>
#include <stasis/bufferManager/bufferHash.h>
#include <stasis/bufferManager/legacy/legacyBufferManager.h>
#include <stasis/logger/logger2.h> #include <stasis/logger/logger2.h>
#include <stasis/logger/safeWrites.h> #include <stasis/logger/safeWrites.h>
@ -34,6 +36,11 @@ stasis_dirty_page_table_t * stasis_dirty_page_table = 0;
static stasis_truncation_t * stasis_truncation = 0; static stasis_truncation_t * stasis_truncation = 0;
static stasis_alloc_t * stasis_alloc = 0; static stasis_alloc_t * stasis_alloc = 0;
static stasis_allocation_policy_t * stasis_allocation_policy = 0; static stasis_allocation_policy_t * stasis_allocation_policy = 0;
static stasis_buffer_manager_t * stasis_buffer_manager = 0;
void * stasis_runtime_buffer_manager() {
return stasis_buffer_manager;
}
/** /**
This mutex protects stasis_transaction_table, numActiveXactions and This mutex protects stasis_transaction_table, numActiveXactions and
@ -61,6 +68,27 @@ void * stasis_runtime_alloc_state() {
return stasis_alloc; return stasis_alloc;
} }
static stasis_buffer_manager_t* stasis_runtime_buffer_manager_open(int type, stasis_page_handle_t * ph) {
bufferManagerType = type;
static int lastType = 0;
if(type == BUFFER_MANAGER_REOPEN) {
type = lastType;
}
lastType = type;
if(type == BUFFER_MANAGER_DEPRECATED_HASH) {
return stasis_buffer_manager_deprecated_open(ph);
} else if (type == BUFFER_MANAGER_MEM_ARRAY) {
stasis_buffer_manager_t *ret = stasis_buffer_manager_mem_array_open();
ph->close(ph); // XXX should never have been opened in the first place!
return ret;
} else if (type == BUFFER_MANAGER_HASH) {
return stasis_buffer_manager_hash_open(ph);
} else {
// XXX error handling
abort();
}
}
int Tinit() { int Tinit() {
pthread_mutex_init(&stasis_transaction_table_mutex, NULL); pthread_mutex_init(&stasis_transaction_table_mutex, NULL);
stasis_initted = 1; stasis_initted = 1;
@ -99,8 +127,9 @@ int Tinit() {
page_handle = stasis_page_handle_open(h, stasis_log_file, stasis_dirty_page_table); page_handle = stasis_page_handle_open(h, stasis_log_file, stasis_dirty_page_table);
} }
stasis_buffer_manager_open(bufferManagerType, page_handle); stasis_buffer_manager = stasis_runtime_buffer_manager_open(bufferManagerType, page_handle);
DEBUG("Buffer manager type = %d\n", bufferManagerType); DEBUG("Buffer manager type = %d\n", bufferManagerType);
stasis_dirty_page_table_set_buffer_manager(stasis_dirty_page_table, stasis_buffer_manager); // xxx circular dependency.
pageOperationsInit(); pageOperationsInit();
stasis_allocation_policy = stasis_allocation_policy_init(); stasis_allocation_policy = stasis_allocation_policy_init();
stasis_alloc = stasis_alloc_init(stasis_allocation_policy); stasis_alloc = stasis_alloc_init(stasis_allocation_policy);
@ -114,7 +143,7 @@ int Tinit() {
//setupLockManagerCallbacksPage(); //setupLockManagerCallbacksPage();
stasis_recovery_initiate(stasis_log_file, stasis_alloc); stasis_recovery_initiate(stasis_log_file, stasis_alloc);
stasis_truncation = stasis_truncation_init(stasis_dirty_page_table, stasis_log_file); stasis_truncation = stasis_truncation_init(stasis_dirty_page_table, stasis_buffer_manager, stasis_log_file);
if(stasis_truncation_automatic) { if(stasis_truncation_automatic) {
// should this be before InitiateRecovery? // should this be before InitiateRecovery?
stasis_truncation_thread_start(stasis_truncation); stasis_truncation_thread_start(stasis_truncation);
@ -365,7 +394,7 @@ int Tdeinit() {
TnaiveHashDeinit(); TnaiveHashDeinit();
stasis_alloc_deinit(stasis_alloc); stasis_alloc_deinit(stasis_alloc);
stasis_allocation_policy_deinit(stasis_allocation_policy); stasis_allocation_policy_deinit(stasis_allocation_policy);
stasis_buffer_manager_close(); stasis_buffer_manager->stasis_buffer_manager_close(stasis_buffer_manager);
DEBUG("Closing page file tdeinit\n"); DEBUG("Closing page file tdeinit\n");
stasis_page_deinit(); stasis_page_deinit();
stasis_log_group_force_t * group_force = stasis_log_file->group_force; stasis_log_group_force_t * group_force = stasis_log_file->group_force;
@ -387,7 +416,7 @@ int TuncleanShutdown() {
stasis_alloc_deinit(stasis_alloc); stasis_alloc_deinit(stasis_alloc);
stasis_allocation_policy_deinit(stasis_allocation_policy); stasis_allocation_policy_deinit(stasis_allocation_policy);
stasis_buffer_manager_simulate_crash(); stasis_buffer_manager->stasis_buffer_manager_simulate_crash(stasis_buffer_manager);
// XXX: close_file? // XXX: close_file?
stasis_page_deinit(); stasis_page_deinit();
stasis_log_file->close(stasis_log_file); stasis_log_file->close(stasis_log_file);

View file

@ -8,6 +8,7 @@ struct stasis_truncation_t {
pthread_mutex_t shutdown_mutex; pthread_mutex_t shutdown_mutex;
pthread_cond_t shutdown_cond; pthread_cond_t shutdown_cond;
stasis_dirty_page_table_t * dirty_pages; stasis_dirty_page_table_t * dirty_pages;
stasis_buffer_manager_t * buffer_manager;
stasis_log_t * log; stasis_log_t * log;
}; };
@ -20,13 +21,14 @@ struct stasis_truncation_t {
#define TRUNCATE_INTERVAL 1 #define TRUNCATE_INTERVAL 1
#define MIN_INCREMENTAL_TRUNCATION (1024 * 1024 * 25) #define MIN_INCREMENTAL_TRUNCATION (1024 * 1024 * 25)
#endif #endif
stasis_truncation_t * stasis_truncation_init(stasis_dirty_page_table_t * dpt, stasis_log_t * log) { stasis_truncation_t * stasis_truncation_init(stasis_dirty_page_table_t * dpt, stasis_buffer_manager_t *buffer_manager, stasis_log_t *log) {
stasis_truncation_t * ret = malloc(sizeof(*ret)); stasis_truncation_t * ret = malloc(sizeof(*ret));
ret->initialized = 1; ret->initialized = 1;
ret->automaticallyTruncating = 0; ret->automaticallyTruncating = 0;
pthread_mutex_init(&ret->shutdown_mutex, 0); pthread_mutex_init(&ret->shutdown_mutex, 0);
pthread_cond_init(&ret->shutdown_cond, 0); pthread_cond_init(&ret->shutdown_cond, 0);
ret->dirty_pages = dpt; ret->dirty_pages = dpt;
ret->buffer_manager = buffer_manager;
ret->log = log; ret->log = log;
return ret; return ret;
} }
@ -99,7 +101,7 @@ int stasis_truncation_truncate(stasis_truncation_t* trunc, int force) {
if((rec_lsn - log_trunc) > MIN_INCREMENTAL_TRUNCATION) { if((rec_lsn - log_trunc) > MIN_INCREMENTAL_TRUNCATION) {
// fprintf(stderr, "Truncating now. rec_lsn = %ld, log_trunc = %ld\n", rec_lsn, log_trunc); // fprintf(stderr, "Truncating now. rec_lsn = %ld, log_trunc = %ld\n", rec_lsn, log_trunc);
// fprintf(stderr, "Truncating to rec_lsn = %ld\n", rec_lsn); // fprintf(stderr, "Truncating to rec_lsn = %ld\n", rec_lsn);
forcePages(); trunc->buffer_manager->forcePages(trunc->buffer_manager);
trunc->log->truncate(trunc->log, rec_lsn); trunc->log->truncate(trunc->log, rec_lsn);
return 1; return 1;
} else { } else {
@ -116,7 +118,7 @@ int stasis_truncation_truncate(stasis_truncation_t* trunc, int force) {
//fprintf(stderr, "Flushed Dirty Buffers. Truncating to rec_lsn = %ld\n", rec_lsn); //fprintf(stderr, "Flushed Dirty Buffers. Truncating to rec_lsn = %ld\n", rec_lsn);
forcePages(); trunc->buffer_manager->forcePages(trunc->buffer_manager);
trunc->log->truncate(trunc->log, rec_lsn); trunc->log->truncate(trunc->log, rec_lsn);
return 1; return 1;
} else { } else {

View file

@ -80,7 +80,6 @@ terms specified in this license.
#define __BUFFERMANAGER_H__ #define __BUFFERMANAGER_H__
#include <stasis/common.h> #include <stasis/common.h>
BEGIN_C_DECLS BEGIN_C_DECLS
#include <stasis/pageHandle.h>
/** /**
* Obtain a pointer to a page from the buffer manager. The page will * Obtain a pointer to a page from the buffer manager. The page will
* be pinned, and the pointer valid until releasePage is called. * be pinned, and the pointer valid until releasePage is called.
@ -99,37 +98,27 @@ Page * loadUninitializedPage(int xid, pageid_t pageid);
Page * loadPageForOperation(int xid, pageid_t pageid, int op); Page * loadPageForOperation(int xid, pageid_t pageid, int op);
Page * getCachedPage(int xid, const pageid_t pageid);
/**
This is the function pointer that stasis_buffer_manager_open sets in order to
override loadPage.
*/
extern Page * (*loadPageImpl)(int xid, pageid_t pageid, pagetype_t type);
extern Page * (*loadUninitPageImpl)(int xid, pageid_t pageid);
/** /**
Get a page from cache. This function should never block on I/O. Get a page from cache. This function should never block on I/O.
@return a pointer to the page, or NULL if the page is not in cache, or is being read from disk. @return a pointer to the page, or NULL if the page is not in cache, or is being read from disk.
*/ */
extern Page * (*getCachedPageImpl)(int xid, const pageid_t pageid); Page * getCachedPage(int xid, const pageid_t pageid);
/** /**
loadPage aquires a lock when it is called, effectively pinning it loadPage acquires a lock when it is called, effectively pinning it
in memory. releasePage releases this lock. in memory. releasePage releases this lock.
*/ */
void releasePage(Page *p); void releasePage(Page *p);
/** typedef struct stasis_buffer_manager_t stasis_buffer_manager_t;
This is the function pointer that stasis_buffer_manager_open sets in order to
override releasePage. struct stasis_buffer_manager_t {
*/ Page * (*loadPageImpl)(stasis_buffer_manager_t*, int xid, pageid_t pageid, pagetype_t type);
extern void (*releasePageImpl)(Page * p); Page * (*loadUninitPageImpl)(stasis_buffer_manager_t*, int xid, pageid_t pageid);
/** Page * (*getCachedPageImpl)(stasis_buffer_manager_t*, int xid, const pageid_t pageid);
* initialize buffer manager void (*releasePageImpl)(stasis_buffer_manager_t*, Page * p);
* @return 0 on success /**
* @return error code on failure
*/
/**
This is used by truncation to move dirty pages from Stasis cache This is used by truncation to move dirty pages from Stasis cache
into the operating system cache. Once writeBackPage(p) returns, into the operating system cache. Once writeBackPage(p) returns,
calling forcePages() will synchronously force page number p to calling forcePages() will synchronously force page number p to
@ -140,34 +129,33 @@ extern void (*releasePageImpl)(Page * p);
this call.) this call.)
@return 0 on success, ENOENT if the page is not in cache, and EBUSY if the page is pinned. @return 0 on success, ENOENT if the page is not in cache, and EBUSY if the page is pinned.
*/ */
extern int (*writeBackPage)(pageid_t p); int (*writeBackPage)(stasis_buffer_manager_t*, pageid_t p);
/** /**
Force any written back pages to disk. Force any written back pages to disk.
@see writeBackPage for more information. @see writeBackPage for more information.
If the buffer manager doesn't support stable storage, this call is If the buffer manager doesn't support stable storage, this call is
a no-op. a no-op.
*/ */
extern void (*forcePages)(); void (*forcePages)(struct stasis_buffer_manager_t*);
/** /**
Force written back pages that fall within a particular range to disk. Force written back pages that fall within a particular range to disk.
This does not force page that have not been written to with pageWrite(). This does not force page that have not been written to with pageWrite().
@param start the first pageid to be forced to disk @param start the first pageid to be forced to disk
@param stop the page after the last page to be forced to disk. @param stop the page after the last page to be forced to disk.
*/
extern void (*forcePageRange)(pageid_t start, pageid_t stop);
extern void (*stasis_buffer_manager_simulate_crash)();
int stasis_buffer_manager_open(int type, stasis_page_handle_t * ph);
/**
* will write out any dirty pages, assumes that there are no running
* transactions
*/ */
extern void (*stasis_buffer_manager_close)(); void (*forcePageRange)(struct stasis_buffer_manager_t*, pageid_t start, pageid_t stop);
void (*stasis_buffer_manager_simulate_crash)(struct stasis_buffer_manager_t*);
/**
* Write out any dirty pages. Assumes that there are no running transactions
*/
void (*stasis_buffer_manager_close)(struct stasis_buffer_manager_t*);
void * impl;
};
#ifdef PROFILE_LATCHES_WRITE_ONLY #ifdef PROFILE_LATCHES_WRITE_ONLY
#define loadPage(x,y) __profile_loadPage((x), (y), __FILE__, __LINE__) #define loadPage(x,y) __profile_loadPage((x), (y), __FILE__, __LINE__)

View file

@ -1,5 +1,6 @@
#ifndef STASIS_BUFFERMANAGER_BUFFERHASH_H #ifndef STASIS_BUFFERMANAGER_BUFFERHASH_H
#define STASIS_BUFFERMANAGER_BUFFERHASH_H #define STASIS_BUFFERMANAGER_BUFFERHASH_H
#include <stasis/bufferManager.h>
#include <stasis/pageHandle.h> #include <stasis/pageHandle.h>
void stasis_buffer_manager_hash_open(stasis_page_handle_t* ph); stasis_buffer_manager_t* stasis_buffer_manager_hash_open(stasis_page_handle_t* ph);
#endif //STASIS_BUFFERMANAGER_BUFFERHASH_H #endif //STASIS_BUFFERMANAGER_BUFFERHASH_H

View file

@ -1,5 +1,5 @@
#ifndef STASIS_BUFFERMANAGER_LEGACY_LEGACYBUFFERMANAGER_H #ifndef STASIS_BUFFERMANAGER_LEGACY_LEGACYBUFFERMANAGER_H
#define STASIS_BUFFERMANAGER_LEGACY_LEGACYBUFFERMANAGER_H #define STASIS_BUFFERMANAGER_LEGACY_LEGACYBUFFERMANAGER_H
#include <stasis/pageHandle.h> #include <stasis/pageHandle.h>
int stasis_buffer_manager_deprecated_open(stasis_page_handle_t * ph); stasis_buffer_manager_t* stasis_buffer_manager_deprecated_open(stasis_page_handle_t * ph);
#endif//STASIS_BUFFERMANAGER_LEGACY_LEGACYBUFFERMANAGER_H #endif//STASIS_BUFFERMANAGER_LEGACY_LEGACYBUFFERMANAGER_H

View file

@ -1 +1,2 @@
void stasis_buffer_manager_mem_array_open(); #include <stasis/bufferManager.h>
stasis_buffer_manager_t* stasis_buffer_manager_mem_array_open();

View file

@ -12,7 +12,10 @@ BEGIN_C_DECLS
typedef struct stasis_dirty_page_table_t stasis_dirty_page_table_t; typedef struct stasis_dirty_page_table_t stasis_dirty_page_table_t;
stasis_dirty_page_table_t * stasis_dirty_page_table_init(); #include <stasis/bufferManager.h>
stasis_dirty_page_table_t * stasis_dirty_page_table_init(void);
// XXX circular dependency
void stasis_dirty_page_table_set_buffer_manager(stasis_dirty_page_table_t* dpt, stasis_buffer_manager_t* bm);
void stasis_dirty_page_table_deinit(stasis_dirty_page_table_t * dirtyPages); void stasis_dirty_page_table_deinit(stasis_dirty_page_table_t * dirtyPages);
void stasis_dirty_page_table_set_dirty(stasis_dirty_page_table_t * dirtyPages, Page * p); void stasis_dirty_page_table_set_dirty(stasis_dirty_page_table_t * dirtyPages, Page * p);
@ -24,6 +27,36 @@ pageid_t stasis_dirty_page_table_dirty_count(stasis_dirty_page_table_t * dirtyPa
int stasis_dirty_page_table_flush(stasis_dirty_page_table_t * dirtyPages); int stasis_dirty_page_table_flush(stasis_dirty_page_table_t * dirtyPages);
lsn_t stasis_dirty_page_table_minRecLSN(stasis_dirty_page_table_t* dirtyPages); lsn_t stasis_dirty_page_table_minRecLSN(stasis_dirty_page_table_t* dirtyPages);
/**
This method returns a (mostly) contiguous range of the dirty page table for writeback.
Usage (to non-atomically flush all pages in a range, except ones that were dirtied while we were running)
int n = 100;
pageid_t range_starts[n], pageid_t range_ends[n];
pageid_t next = start;
while((blocks = stasis_dirty_page_table_get_flush_candidates(dpt, next, stop, n, range_starts, range_ends) {
for(int i = 0; i < blocks; i++) {
for(pageid_t p = range_starts[i]; p < range_ends[i]; p++) {
// flush p
}
}
next = range_ends[blocks-1];;
}
@param dirtyPages The dirty page table; this method will not change it.
@param start The first page to be considered for writeback
@param stop The page after the last page to be considered for writeback
@param count The maximum number of pages to be returned for writeback
@param range_starts An array of pageids of length count. Some number of these will be populated with the first page in a range to be written back.
@param range_ends An array of the same length as range_starts. This will be populated with the page after the last page in each range.
@return The number of entries in range_starts and range_ends that were populated. This can be less than count even if there are more dirty pages in the range. If this is zero, then the entire range is clean.
*/
int stasis_dirty_page_table_get_flush_candidates(stasis_dirty_page_table_t * dirtyPages, pageid_t start, pageid_t stop, int count, pageid_t* range_starts, pageid_t* range_ends);
/** /**
@todo flushRange's API sucks. It should be two functions, "startRangeFlush" and "waitRangeFlushes" or something. @todo flushRange's API sucks. It should be two functions, "startRangeFlush" and "waitRangeFlushes" or something.
*/ */

View file

@ -801,6 +801,8 @@ void * stasis_runtime_dirty_page_table();
void * stasis_runtime_alloc_state(); void * stasis_runtime_alloc_state();
void * stasis_runtime_buffer_manager();
#include "operations.h" #include "operations.h"
END_C_DECLS END_C_DECLS

View file

@ -64,8 +64,9 @@ typedef struct stasis_truncation_t stasis_truncation_t;
#include <stasis/logger/logger2.h> #include <stasis/logger/logger2.h>
#include <stasis/dirtyPageTable.h> #include <stasis/dirtyPageTable.h>
#include <stasis/bufferManager.h>
stasis_truncation_t * stasis_truncation_init(stasis_dirty_page_table_t * dpt, stasis_log_t * log); stasis_truncation_t * stasis_truncation_init(stasis_dirty_page_table_t * dpt, stasis_buffer_manager_t * bufferManager, stasis_log_t * log);
void stasis_truncation_deinit(stasis_truncation_t * trunc); void stasis_truncation_deinit(stasis_truncation_t * trunc);
/** /**