removed static variables from bufferPool.c; renamed methods, moved Page typedef to common.h
This commit is contained in:
parent
4b07b538a6
commit
7db35ecd39
6 changed files with 167 additions and 162 deletions
|
@ -6,7 +6,6 @@
|
|||
#include <stasis/doubleLinkedList.h>
|
||||
#include <stasis/lhtable.h>
|
||||
|
||||
#include <stasis/bufferPool.h>
|
||||
#include <stasis/pageHandle.h>
|
||||
|
||||
#include <stasis/replacementPolicy.h>
|
||||
|
@ -33,18 +32,20 @@ static Page ** freeList;
|
|||
// A page is in LRU iff !pending, !pinned
|
||||
static replacementPolicy * lru;
|
||||
|
||||
static stasis_buffer_pool_t * stasis_buffer_pool;
|
||||
|
||||
static int running;
|
||||
|
||||
typedef struct LL_ENTRY(node_t) node_t;
|
||||
|
||||
static node_t * pageGetNode(void * page, void * ignore) {
|
||||
static node_t * pageGetNode(void * page, void * ignore) {
|
||||
Page * p = page;
|
||||
return (node_t*)p->prev;
|
||||
}
|
||||
static void pageSetNode(void * page, node_t * n, void * ignore) {
|
||||
static void pageSetNode(void * page, node_t * n, void * ignore) {
|
||||
Page * p = page;
|
||||
p->prev = (Page *) n;
|
||||
|
||||
|
||||
}
|
||||
|
||||
#define pagePendingPtr(p) ((struct Page_s **)(&((p)->next)))
|
||||
|
@ -52,45 +53,45 @@ static void pageSetNode(void * page, node_t * n, void * ignore) {
|
|||
|
||||
#ifdef LONG_RUN
|
||||
|
||||
inline static void checkPageState(Page * p) {
|
||||
inline static void checkPageState(Page * p) {
|
||||
Page * check = LH_ENTRY(find)(cachedPages, &(p->id), sizeof(p->id));
|
||||
if(check) {
|
||||
if(check) {
|
||||
int pending = *pagePendingPtr(p);
|
||||
int pinned = *pagePinCountPtr(p);
|
||||
if((!pinned) && (!pending)) {
|
||||
if((!pinned) && (!pending)) {
|
||||
assert(pageGetNode(p, 0));
|
||||
} else {
|
||||
assert(!pageGetNode(p,0));
|
||||
}
|
||||
int notfound = 1;
|
||||
for(pageid_t i = 0; i < freeCount; i++) {
|
||||
for(pageid_t i = 0; i < freeCount; i++) {
|
||||
if(freeList[i] == p) { notfound = 0; }
|
||||
}
|
||||
assert(notfound);
|
||||
} else {
|
||||
} else {
|
||||
assert(!pageGetNode(p,0));
|
||||
assert(!*pagePendingPtr(p));
|
||||
assert(!*pagePinCountPtr(p));
|
||||
int found = 0;
|
||||
for(pageid_t i = 0; i < freeCount; i++) {
|
||||
for(pageid_t i = 0; i < freeCount; i++) {
|
||||
if(freeList[i] == p) { found = 1; }
|
||||
}
|
||||
assert(found);
|
||||
}
|
||||
}
|
||||
|
||||
#else
|
||||
#else
|
||||
|
||||
inline static void checkPageState(Page * p) { }
|
||||
|
||||
#endif
|
||||
|
||||
/** You need to hold mut before calling this.
|
||||
/** You need to hold mut before calling this.
|
||||
|
||||
@return the page that was just written back. It will not be in
|
||||
lru or cachedPages after the call returns.
|
||||
*/
|
||||
inline static Page * writeBackOnePage() {
|
||||
inline static Page * writeBackOnePage() {
|
||||
Page * victim = lru->getStale(lru);
|
||||
// Make sure we have an exclusive lock on victim.
|
||||
if(!victim) return 0;
|
||||
|
@ -100,7 +101,7 @@ inline static Page * writeBackOnePage() {
|
|||
#ifdef LATCH_SANITY_CHECKING
|
||||
int latched = trywritelock(victim->loadlatch,0);
|
||||
assert(latched);
|
||||
#endif
|
||||
#endif
|
||||
|
||||
checkPageState(victim);
|
||||
|
||||
|
@ -116,7 +117,7 @@ inline static Page * writeBackOnePage() {
|
|||
|
||||
#ifdef LATCH_SANITY_CHECKING
|
||||
// We can release the lock since we just grabbed it to see if
|
||||
// anyone else has pinned the page... the caller holds mut, so
|
||||
// anyone else has pinned the page... the caller holds mut, so
|
||||
// no-one will touch the page for now.
|
||||
unlock(victim->loadlatch);
|
||||
#endif
|
||||
|
@ -125,44 +126,44 @@ inline static Page * writeBackOnePage() {
|
|||
|
||||
/** Returns a free page. The page will not be in freeList,
|
||||
cachedPages or lru. */
|
||||
inline static Page * getFreePage() {
|
||||
inline static Page * getFreePage() {
|
||||
Page * ret;
|
||||
if(pageCount < MAX_BUFFER_SIZE) {
|
||||
ret = pageMalloc();
|
||||
pageFree(ret,-1);
|
||||
if(pageCount < MAX_BUFFER_SIZE) {
|
||||
ret = stasis_buffer_pool_malloc_page(stasis_buffer_pool);
|
||||
stasis_buffer_pool_free_page(stasis_buffer_pool, ret,-1);
|
||||
(*pagePinCountPtr(ret)) = 0;
|
||||
(*pagePendingPtr(ret)) = 0;
|
||||
pageSetNode(ret,0,0);
|
||||
pageCount++;
|
||||
} else {
|
||||
if(!freeCount) {
|
||||
pageCount++;
|
||||
} else {
|
||||
if(!freeCount) {
|
||||
ret = writeBackOnePage();
|
||||
if(!ret) {
|
||||
printf("bufferHash.c: Cannot find free page for application request.\nbufferHash.c: This should not happen unless all pages have been pinned.\nbufferHash.c: Crashing.");
|
||||
abort();
|
||||
}
|
||||
} else {
|
||||
} else {
|
||||
ret = freeList[freeCount-1];
|
||||
freeList[freeCount-1] = 0;
|
||||
freeCount--;
|
||||
}
|
||||
if(freeCount < freeLowWater) {
|
||||
if(freeCount < freeLowWater) {
|
||||
pthread_cond_signal(&needFree);
|
||||
}
|
||||
}
|
||||
}
|
||||
assert(!*pagePinCountPtr(ret));
|
||||
assert(!*pagePendingPtr(ret));
|
||||
assert(!pageGetNode(ret,0));
|
||||
return ret;
|
||||
}
|
||||
|
||||
static void * writeBackWorker(void * ignored) {
|
||||
static void * writeBackWorker(void * ignored) {
|
||||
pthread_mutex_lock(&mut);
|
||||
while(1) {
|
||||
while(running && (freeCount == freeListLength || pageCount < MAX_BUFFER_SIZE)) {
|
||||
while(1) {
|
||||
while(running && (freeCount == freeListLength || pageCount < MAX_BUFFER_SIZE)) {
|
||||
pthread_cond_wait(&needFree, &mut);
|
||||
}
|
||||
if(!running) { break; }
|
||||
if(!running) { break; }
|
||||
Page * victim = writeBackOnePage();
|
||||
|
||||
if(victim) {
|
||||
|
@ -184,13 +185,13 @@ static void * writeBackWorker(void * ignored) {
|
|||
}
|
||||
|
||||
static Page * bhLoadPageImpl_helper(int xid, const pageid_t pageid, int uninitialized) {
|
||||
|
||||
|
||||
// Note: Calls to loadlatch in this function violate lock order, but
|
||||
// should be safe, since we make sure no one can have a writelock
|
||||
// before we grab the readlock.
|
||||
|
||||
void* check;
|
||||
|
||||
|
||||
pthread_mutex_lock(&mut);
|
||||
|
||||
// Is the page in cache?
|
||||
|
@ -264,7 +265,7 @@ static Page * bhLoadPageImpl_helper(int xid, const pageid_t pageid, int uninitia
|
|||
|
||||
if(!uninitialized) {
|
||||
|
||||
// Now, it is safe to release the mutex; other threads won't
|
||||
// Now, it is safe to release the mutex; other threads won't
|
||||
// try to read this page from disk.
|
||||
pthread_mutex_unlock(&mut);
|
||||
|
||||
|
@ -307,11 +308,11 @@ static Page * bhLoadUninitPageImpl(int xid, const pageid_t pageid) {
|
|||
}
|
||||
|
||||
|
||||
static void bhReleasePage(Page * p) {
|
||||
static void bhReleasePage(Page * p) {
|
||||
pthread_mutex_lock(&mut);
|
||||
checkPageState(p);
|
||||
(*pagePinCountPtr(p))--;
|
||||
if(!(*pagePinCountPtr(p))) {
|
||||
if(!(*pagePinCountPtr(p))) {
|
||||
assert(!pageGetNode(p, 0));
|
||||
lru->insert(lru,p);
|
||||
}
|
||||
|
@ -321,15 +322,15 @@ static void bhReleasePage(Page * p) {
|
|||
pthread_mutex_unlock(&mut);
|
||||
}
|
||||
static void bhWriteBackPage(Page * p) {
|
||||
pageWrite(p);
|
||||
pageWrite(p);
|
||||
}
|
||||
static void bhForcePages() {
|
||||
forcePageFile();
|
||||
static void bhForcePages() {
|
||||
forcePageFile();
|
||||
}
|
||||
static void bhForcePageRange(pageid_t start, pageid_t stop) {
|
||||
forceRangePageFile(start, stop);
|
||||
}
|
||||
static void bhBufDeinit() {
|
||||
static void bhBufDeinit() {
|
||||
running = 0;
|
||||
|
||||
pthread_cond_signal(&needFree); // Wake up the writeback thread so it will exit.
|
||||
|
@ -338,17 +339,17 @@ static void bhBufDeinit() {
|
|||
struct LH_ENTRY(list) iter;
|
||||
const struct LH_ENTRY(pair_t) * next;
|
||||
LH_ENTRY(openlist)(cachedPages, &iter);
|
||||
while((next = LH_ENTRY(readlist)(&iter))) {
|
||||
while((next = LH_ENTRY(readlist)(&iter))) {
|
||||
pageWrite((next->value));
|
||||
stasis_page_cleanup((next->value)); // normally called by writeBackOnePage()
|
||||
}
|
||||
LH_ENTRY(closelist)(&iter);
|
||||
LH_ENTRY(destroy)(cachedPages);
|
||||
|
||||
|
||||
free(freeList);
|
||||
|
||||
lru->deinit(lru);
|
||||
bufferPoolDeInit();
|
||||
stasis_buffer_pool_deinit(stasis_buffer_pool);
|
||||
}
|
||||
static void bhSimulateBufferManagerCrash() {
|
||||
running = 0;
|
||||
|
@ -372,10 +373,10 @@ static void bhSimulateBufferManagerCrash() {
|
|||
free(freeList);
|
||||
|
||||
lru->deinit(lru);
|
||||
bufferPoolDeInit();
|
||||
stasis_buffer_pool_deinit(stasis_buffer_pool);
|
||||
}
|
||||
|
||||
void bhBufInit() {
|
||||
void bhBufInit() {
|
||||
|
||||
assert(!running);
|
||||
|
||||
|
@ -392,7 +393,7 @@ void bhBufInit() {
|
|||
bufDeinit = bhBufDeinit;
|
||||
simulateBufferManagerCrash = bhSimulateBufferManagerCrash;
|
||||
|
||||
bufferPoolInit();
|
||||
stasis_buffer_pool = stasis_buffer_pool_init();
|
||||
|
||||
lru = lruFastInit(pageGetNode, pageSetNode, 0);
|
||||
|
||||
|
@ -407,5 +408,5 @@ void bhBufInit() {
|
|||
|
||||
running = 1;
|
||||
|
||||
pthread_create(&worker, 0, writeBackWorker, 0);
|
||||
pthread_create(&worker, 0, writeBackWorker, 0);
|
||||
}
|
||||
|
|
|
@ -25,6 +25,8 @@ static compensated_function Page *bufManLoadUninitPage(int xid, pageid_t pageid)
|
|||
static void bufManReleasePage (Page * p);
|
||||
static void bufManSimulateBufferManagerCrash();
|
||||
|
||||
static stasis_buffer_pool_t * stasis_buffer_pool;
|
||||
|
||||
int bufManBufInit() {
|
||||
|
||||
releasePageImpl = bufManReleasePage;
|
||||
|
@ -33,20 +35,20 @@ int bufManBufInit() {
|
|||
writeBackPage = pageWrite;
|
||||
forcePages = forcePageFile;
|
||||
forcePageRange = forceRangePageFile;
|
||||
bufDeinit = bufManBufDeinit;
|
||||
bufDeinit = bufManBufDeinit;
|
||||
simulateBufferManagerCrash = bufManSimulateBufferManagerCrash;
|
||||
|
||||
bufferPoolInit();
|
||||
stasis_buffer_pool = stasis_buffer_pool_init();
|
||||
|
||||
pthread_mutex_init(&loadPagePtr_mutex, NULL);
|
||||
|
||||
activePages = LH_ENTRY(create)(16);
|
||||
|
||||
dummy_page = pageMalloc();
|
||||
pageFree(dummy_page, -1);
|
||||
dummy_page = stasis_buffer_pool_malloc_page(stasis_buffer_pool);
|
||||
stasis_buffer_pool_free_page(stasis_buffer_pool, dummy_page, -1);
|
||||
Page *first;
|
||||
first = pageMalloc();
|
||||
pageFree(first, 0);
|
||||
first = stasis_buffer_pool_malloc_page(stasis_buffer_pool);
|
||||
stasis_buffer_pool_free_page(stasis_buffer_pool, first, 0);
|
||||
LH_ENTRY(insert)(activePages, &first->id, sizeof(first->id), first);
|
||||
pageRead(first);
|
||||
pageCacheInit(first);
|
||||
|
@ -70,7 +72,7 @@ static void bufManBufDeinit() {
|
|||
const struct LH_ENTRY(pair_t) * next;
|
||||
LH_ENTRY(openlist(activePages, &iter));
|
||||
|
||||
while((next = LH_ENTRY(readlist)(&iter))) {
|
||||
while((next = LH_ENTRY(readlist)(&iter))) {
|
||||
pageWrite((Page*)next->value);
|
||||
DEBUG("+");
|
||||
}
|
||||
|
@ -78,15 +80,15 @@ static void bufManBufDeinit() {
|
|||
LH_ENTRY(destroy)(activePages);
|
||||
|
||||
pthread_mutex_destroy(&loadPagePtr_mutex);
|
||||
|
||||
|
||||
pageCacheDeinit();
|
||||
|
||||
//closePageFile();
|
||||
|
||||
bufferPoolDeInit();
|
||||
|
||||
stasis_buffer_pool_deinit(stasis_buffer_pool);
|
||||
|
||||
#ifdef PIN_COUNT
|
||||
if(pinCount != 0) {
|
||||
if(pinCount != 0) {
|
||||
printf("WARNING: At exit, %d pages were still pinned!\n", pinCount);
|
||||
}
|
||||
#endif
|
||||
|
@ -128,14 +130,14 @@ static Page* bufManGetPage(pageid_t pageid, int locktype, int uninitialized) {
|
|||
// called loadPage() on the pinned page since the last time it was
|
||||
// completely unpinned. One such site is responsible for the
|
||||
// leak.
|
||||
|
||||
|
||||
char * holder = LH_ENTRY(find)(profile_load_hash, &ret, sizeof(void*));
|
||||
int * pins = LH_ENTRY(find)(profile_load_pins_hash, &ret, sizeof(void*));
|
||||
char * holderD =0;
|
||||
int pinsD = 0;
|
||||
if(holder) {
|
||||
holderD = strdup(holder);
|
||||
pinsD = *pins;
|
||||
pinsD = *pins;
|
||||
}
|
||||
#endif
|
||||
if(locktype == RW) {
|
||||
|
@ -144,7 +146,7 @@ static Page* bufManGetPage(pageid_t pageid, int locktype, int uninitialized) {
|
|||
readlock(ret->loadlatch, 217);
|
||||
}
|
||||
#ifdef PROFILE_LATCHES_WRITE_ONLY
|
||||
if(holderD)
|
||||
if(holderD)
|
||||
free(holderD);
|
||||
#endif
|
||||
}
|
||||
|
@ -162,13 +164,13 @@ static Page* bufManGetPage(pageid_t pageid, int locktype, int uninitialized) {
|
|||
// called loadPage() on the pinned page since the last time it was
|
||||
// completely unpinned. One such site is responsible for the
|
||||
// leak.
|
||||
|
||||
|
||||
char * holder = LH_ENTRY(find)(profile_load_hash, &ret, sizeof(void*));
|
||||
int * pins = LH_ENTRY(find)(profile_load_pins_hash, &ret, sizeof(void*));
|
||||
|
||||
|
||||
char * holderD = 0;
|
||||
int pinsD = 0;
|
||||
if(holder) {
|
||||
if(holder) {
|
||||
holderD = strdup(holder);
|
||||
pinsD = *pins;
|
||||
}
|
||||
|
@ -187,9 +189,9 @@ static Page* bufManGetPage(pageid_t pageid, int locktype, int uninitialized) {
|
|||
if(spin > 10000 && !(spin % 10000)) {
|
||||
printf("GetPage is stuck!");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if(ret) {
|
||||
if(ret) {
|
||||
cacheHitOnPage(ret);
|
||||
assert(ret->id == pageid);
|
||||
pthread_mutex_unlock(&loadPagePtr_mutex);
|
||||
|
@ -200,7 +202,7 @@ static Page* bufManGetPage(pageid_t pageid, int locktype, int uninitialized) {
|
|||
a) there is no cache entry for pageid
|
||||
b) this is the only thread that has gotten this far,
|
||||
and that will try to add an entry for pageid
|
||||
c) the most recent version of this page has been
|
||||
c) the most recent version of this page has been
|
||||
written to the OS's file cache. */
|
||||
pageid_t oldid = -1;
|
||||
|
||||
|
@ -213,12 +215,12 @@ static Page* bufManGetPage(pageid_t pageid, int locktype, int uninitialized) {
|
|||
cacheRemovePage(ret);
|
||||
|
||||
oldid = ret->id;
|
||||
|
||||
|
||||
assert(oldid != pageid);
|
||||
|
||||
} else {
|
||||
|
||||
ret = pageMalloc();
|
||||
ret = stasis_buffer_pool_malloc_page(stasis_buffer_pool);
|
||||
ret->id = -1;
|
||||
ret->inCache = 0;
|
||||
}
|
||||
|
@ -235,14 +237,14 @@ static Page* bufManGetPage(pageid_t pageid, int locktype, int uninitialized) {
|
|||
|
||||
char * holderD = 0;
|
||||
int pinsD = 0;
|
||||
if(holder) {
|
||||
if(holder) {
|
||||
holderD = strdup(holder);
|
||||
pinsD = *pins;
|
||||
}
|
||||
|
||||
|
||||
#endif
|
||||
|
||||
writelock(ret->loadlatch, 217);
|
||||
writelock(ret->loadlatch, 217);
|
||||
#ifdef PROFILE_LATCHES_WRITE_ONLY
|
||||
if(holderD)
|
||||
free(holderD);
|
||||
|
@ -251,16 +253,16 @@ static Page* bufManGetPage(pageid_t pageid, int locktype, int uninitialized) {
|
|||
/* Inserting this into the cache before releasing the mutex
|
||||
ensures that constraint (b) above holds. */
|
||||
LH_ENTRY(insert)(activePages, &pageid, sizeof(pageid), ret);
|
||||
pthread_mutex_unlock(&loadPagePtr_mutex);
|
||||
pthread_mutex_unlock(&loadPagePtr_mutex);
|
||||
|
||||
/* Could writelock(ret) go here? */
|
||||
|
||||
assert(ret != dummy_page);
|
||||
if(ret->id != -1) {
|
||||
if(ret->id != -1) {
|
||||
pageWrite(ret);
|
||||
}
|
||||
|
||||
pageFree(ret, pageid);
|
||||
stasis_buffer_pool_free_page(stasis_buffer_pool, ret, pageid);
|
||||
if(!uninitialized) {
|
||||
pageRead(ret);
|
||||
} else {
|
||||
|
@ -273,7 +275,7 @@ static Page* bufManGetPage(pageid_t pageid, int locktype, int uninitialized) {
|
|||
}
|
||||
|
||||
writeunlock(ret->loadlatch);
|
||||
|
||||
|
||||
pthread_mutex_lock(&loadPagePtr_mutex);
|
||||
|
||||
LH_ENTRY(remove)(activePages, &(oldid), sizeof(oldid));
|
||||
|
@ -290,10 +292,10 @@ static Page* bufManGetPage(pageid_t pageid, int locktype, int uninitialized) {
|
|||
// called loadPage() on the pinned page since the last time it was
|
||||
// completely unpinned. One such site is responsible for the
|
||||
// leak.
|
||||
|
||||
|
||||
holder = LH_ENTRY(find)(profile_load_hash, &ret, sizeof(void*));
|
||||
pins = LH_ENTRY(find)(profile_load_pins_hash, &ret, sizeof(void*));
|
||||
|
||||
|
||||
if(holder) {
|
||||
holderD = strdup(holder);
|
||||
pinsD = *pins;
|
||||
|
@ -310,7 +312,7 @@ static Page* bufManGetPage(pageid_t pageid, int locktype, int uninitialized) {
|
|||
#endif
|
||||
if(ret->id != pageid) {
|
||||
unlock(ret->loadlatch);
|
||||
printf("pageCache.c: Thrashing detected. Strongly consider increasing LLADD's buffer pool size!\n");
|
||||
printf("pageCache.c: Thrashing detected. Strongly consider increasing LLADD's buffer pool size!\n");
|
||||
fflush(NULL);
|
||||
return bufManGetPage(pageid, locktype, uninitialized);
|
||||
}
|
||||
|
@ -324,20 +326,20 @@ static compensated_function Page *bufManLoadPage(int xid, pageid_t pageid) {
|
|||
|
||||
Page * ret = pthread_getspecific(lastPage);
|
||||
|
||||
if(ret && ret->id == pageid) {
|
||||
if(ret && ret->id == pageid) {
|
||||
pthread_mutex_lock(&loadPagePtr_mutex);
|
||||
readlock(ret->loadlatch, 1);
|
||||
if(ret->id != pageid) {
|
||||
if(ret->id != pageid) {
|
||||
unlock(ret->loadlatch);
|
||||
ret = 0;
|
||||
} else {
|
||||
} else {
|
||||
cacheHitOnPage(ret);
|
||||
pthread_mutex_unlock(&loadPagePtr_mutex);
|
||||
}
|
||||
} else {
|
||||
} else {
|
||||
ret = 0;
|
||||
}
|
||||
if(!ret) {
|
||||
if(!ret) {
|
||||
ret = bufManGetPage(pageid, RO, 0);
|
||||
pthread_setspecific(lastPage, ret);
|
||||
}
|
||||
|
@ -355,20 +357,20 @@ static compensated_function Page *bufManLoadUninitPage(int xid, pageid_t pageid)
|
|||
|
||||
Page * ret = pthread_getspecific(lastPage);
|
||||
|
||||
if(ret && ret->id == pageid) {
|
||||
if(ret && ret->id == pageid) {
|
||||
pthread_mutex_lock(&loadPagePtr_mutex);
|
||||
readlock(ret->loadlatch, 1);
|
||||
if(ret->id != pageid) {
|
||||
if(ret->id != pageid) {
|
||||
unlock(ret->loadlatch);
|
||||
ret = 0;
|
||||
} else {
|
||||
} else {
|
||||
cacheHitOnPage(ret);
|
||||
pthread_mutex_unlock(&loadPagePtr_mutex);
|
||||
}
|
||||
} else {
|
||||
} else {
|
||||
ret = 0;
|
||||
}
|
||||
if(!ret) {
|
||||
if(!ret) {
|
||||
ret = bufManGetPage(pageid, RO, 1);
|
||||
pthread_setspecific(lastPage, ret);
|
||||
}
|
||||
|
|
|
@ -46,85 +46,79 @@ terms specified in this license.
|
|||
* Implementation of in memory buffer pool
|
||||
*
|
||||
* $Id$
|
||||
*
|
||||
*
|
||||
*/
|
||||
|
||||
/* _XOPEN_SOURCE is needed for posix_memalign */
|
||||
#define _XOPEN_SOURCE 600
|
||||
#include <stdlib.h>
|
||||
#include <assert.h>
|
||||
|
||||
#include <stasis/bufferPool.h>
|
||||
#include <assert.h>
|
||||
#include <stasis/truncation.h>
|
||||
#include <stasis/page.h>
|
||||
/* TODO: Combine with buffer size... */
|
||||
static pageid_t nextPage = 0;
|
||||
static pthread_mutex_t pageMallocMutex;
|
||||
|
||||
static void * addressFromMalloc = 0;
|
||||
struct stasis_buffer_pool_t {
|
||||
pageid_t nextPage;
|
||||
Page* pool;
|
||||
pthread_mutex_t mut;
|
||||
void * addr_to_free;
|
||||
};
|
||||
|
||||
/** We need one dummy page for locking purposes, so this array has one extra page in it. */
|
||||
Page pool[MAX_BUFFER_SIZE+1];
|
||||
stasis_buffer_pool_t* stasis_buffer_pool_init() {
|
||||
|
||||
stasis_buffer_pool_t * ret = malloc(sizeof(*ret));
|
||||
|
||||
void bufferPoolInit() {
|
||||
ret->nextPage = 0;
|
||||
|
||||
nextPage = 0;
|
||||
|
||||
pthread_mutex_init(&pageMallocMutex, NULL);
|
||||
pthread_mutex_init(&(ret->mut), NULL);
|
||||
|
||||
byte * bufferSpace ;
|
||||
|
||||
bufferSpace = calloc((MAX_BUFFER_SIZE + 2), PAGE_SIZE);
|
||||
byte * bufferSpace = calloc((MAX_BUFFER_SIZE + 2), PAGE_SIZE);
|
||||
assert(bufferSpace);
|
||||
addressFromMalloc = bufferSpace;
|
||||
bufferSpace = (byte*)(((long)bufferSpace) +
|
||||
PAGE_SIZE -
|
||||
ret->addr_to_free = bufferSpace;
|
||||
|
||||
bufferSpace = (byte*)(((long)bufferSpace) +
|
||||
PAGE_SIZE -
|
||||
(((long)bufferSpace) % PAGE_SIZE));
|
||||
|
||||
// We need one dummy page for locking purposes,
|
||||
// so this array has one extra page in it.
|
||||
ret->pool = malloc(sizeof(ret->pool[0])*(MAX_BUFFER_SIZE+1));
|
||||
|
||||
for(pageid_t i = 0; i < MAX_BUFFER_SIZE+1; i++) {
|
||||
pool[i].rwlatch = initlock();
|
||||
pool[i].loadlatch = initlock();
|
||||
pool[i].memAddr = &(bufferSpace[i*PAGE_SIZE]);
|
||||
pool[i].dirty = 0;
|
||||
ret->pool[i].rwlatch = initlock();
|
||||
ret->pool[i].loadlatch = initlock();
|
||||
ret->pool[i].memAddr = &(bufferSpace[i*PAGE_SIZE]);
|
||||
ret->pool[i].dirty = 0;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void bufferPoolDeInit() {
|
||||
void stasis_buffer_pool_deinit(stasis_buffer_pool_t * ret) {
|
||||
for(pageid_t i = 0; i < MAX_BUFFER_SIZE+1; i++) {
|
||||
deletelock(pool[i].rwlatch);
|
||||
deletelock(pool[i].loadlatch);
|
||||
deletelock(ret->pool[i].rwlatch);
|
||||
deletelock(ret->pool[i].loadlatch);
|
||||
}
|
||||
free(addressFromMalloc); // breaks efence
|
||||
pthread_mutex_destroy(&pageMallocMutex);
|
||||
free(ret->addr_to_free); // breaks efence
|
||||
pthread_mutex_destroy(&ret->mut);
|
||||
}
|
||||
|
||||
Page* pageMalloc() {
|
||||
Page* stasis_buffer_pool_malloc_page(stasis_buffer_pool_t * ret) {
|
||||
Page *page;
|
||||
|
||||
pthread_mutex_lock(&pageMallocMutex);
|
||||
|
||||
page = &(pool[nextPage]);
|
||||
|
||||
nextPage++;
|
||||
/* There's a dummy page that we need to keep around, thus the +1 */
|
||||
assert(nextPage <= MAX_BUFFER_SIZE + 1);
|
||||
pthread_mutex_lock(&ret->mut);
|
||||
|
||||
pthread_mutex_unlock(&pageMallocMutex);
|
||||
page = &(ret->pool[ret->nextPage]);
|
||||
|
||||
(ret->nextPage)++;
|
||||
/* There's a dummy page that we need to keep around, thus the +1 */
|
||||
assert(ret->nextPage <= MAX_BUFFER_SIZE + 1);
|
||||
|
||||
pthread_mutex_unlock(&ret->mut);
|
||||
|
||||
return page;
|
||||
|
||||
}
|
||||
|
||||
|
||||
static void pageFreeNoLock(Page *p, pageid_t id) {
|
||||
void stasis_buffer_pool_free_page(stasis_buffer_pool_t * ret, Page *p, pageid_t id) {
|
||||
writelock(p->rwlatch, 10);
|
||||
p->id = id;
|
||||
p->LSN = 0;
|
||||
p->dirty = 0;
|
||||
}
|
||||
|
||||
void pageFree(Page *p, pageid_t id) {
|
||||
writelock(p->rwlatch, 10);
|
||||
pageFreeNoLock(p,id);
|
||||
writeunlock(p->rwlatch);
|
||||
}
|
||||
|
|
|
@ -48,10 +48,10 @@ terms specified in this license.
|
|||
That is left to a cacheManager. (Multiple cacheManagers could be
|
||||
used with a single bufferManager.)
|
||||
|
||||
@todo Allow error checking!
|
||||
|
||||
@todo Allow error checking!
|
||||
|
||||
@todo Refactoring for lock manager
|
||||
|
||||
|
||||
Possible interface for lockManager:
|
||||
|
||||
Define three classes of objects that the lock manager is interested in:
|
||||
|
@ -63,7 +63,7 @@ terms specified in this license.
|
|||
Stasis already has operations and transactions, and these can be
|
||||
relatively unchanged. Predicates are read only operations that
|
||||
return a set of tuples. Tread() is the simplest predicate.
|
||||
Index scans provide a motivating example.
|
||||
Index scans provide a motivating example.
|
||||
|
||||
See http://research.microsoft.com/%7Eadya/pubs/icde00.pdf
|
||||
(Generalized Isolation Level Definitions, Adya, Liskov, O'Neil,
|
||||
|
@ -84,13 +84,6 @@ terms specified in this license.
|
|||
|
||||
BEGIN_C_DECLS
|
||||
|
||||
typedef struct Page_s Page_s;
|
||||
/**
|
||||
Page is defined in bufferManager.h as an incomplete type to enforce
|
||||
an abstraction barrier between page.h and the rest of the system.
|
||||
*/
|
||||
typedef struct Page_s Page;
|
||||
|
||||
/**
|
||||
* Obtain a pointer to a page from the buffer manager. The page will
|
||||
* be pinned, and the pointer valid until releasePage is called.
|
||||
|
@ -105,7 +98,7 @@ Page * loadPage(int xid, pageid_t pageid);
|
|||
|
||||
Page * loadUninitializedPage(int xid, pageid_t pageid);
|
||||
|
||||
/**
|
||||
/**
|
||||
This is the function pointer that bufInit sets in order to
|
||||
override loadPage.
|
||||
*/
|
||||
|
@ -117,7 +110,7 @@ extern Page * (*loadUninitPageImpl)(int xid, pageid_t pageid);
|
|||
*/
|
||||
void releasePage(Page *p);
|
||||
|
||||
/**
|
||||
/**
|
||||
This is the function pointer that bufInit sets in order to
|
||||
override releasePage.
|
||||
*/
|
||||
|
@ -127,7 +120,7 @@ extern void (*releasePageImpl)(Page * p);
|
|||
* @return 0 on success
|
||||
* @return error code on failure
|
||||
*/
|
||||
/**
|
||||
/**
|
||||
This is used by truncation to move dirty pages from Stasis cache
|
||||
into the operating system cache. Once writeBackPage(p) returns,
|
||||
calling forcePages() will synchronously force page number p to
|
||||
|
@ -140,7 +133,7 @@ extern void (*releasePageImpl)(Page * p);
|
|||
extern void (*writeBackPage)(Page * p);
|
||||
/**
|
||||
Force any written back pages to disk.
|
||||
|
||||
|
||||
@see writeBackPage for more information.
|
||||
|
||||
If the buffer manager doesn't support stable storage, this call is
|
||||
|
|
|
@ -40,6 +40,9 @@ permission to use and distribute the software in accordance with the
|
|||
terms specified in this license.
|
||||
---*/
|
||||
|
||||
#ifndef STASIS_BUFFER_POOL_H
|
||||
#define STASIS_BUFFER_POOL_H
|
||||
|
||||
/**
|
||||
* @file
|
||||
*
|
||||
|
@ -47,16 +50,19 @@ terms specified in this license.
|
|||
* $Id$
|
||||
*/
|
||||
|
||||
#include "bufferManager.h"
|
||||
#include <stasis/common.h>
|
||||
|
||||
void bufferPoolInit();
|
||||
typedef struct stasis_buffer_pool_t stasis_buffer_pool_t;
|
||||
|
||||
void bufferPoolDeInit();
|
||||
stasis_buffer_pool_t* stasis_buffer_pool_init();
|
||||
|
||||
Page* pageMalloc();
|
||||
/**
|
||||
void stasis_buffer_pool_deinit(stasis_buffer_pool_t* pool);
|
||||
|
||||
Page* stasis_buffer_pool_malloc_page(stasis_buffer_pool_t* pool);
|
||||
/**
|
||||
Return a page to the in memory pool of free pages.
|
||||
|
||||
@see pageMalloc()
|
||||
@see stasis_buffer_pool_malloc_page()
|
||||
*/
|
||||
void pageFree(Page * p, pageid_t id);
|
||||
void stasis_buffer_pool_free_page(stasis_buffer_pool_t* pool, Page * p, pageid_t id);
|
||||
#endif // STASIS_BUFFER_POOL_H
|
||||
|
|
|
@ -40,9 +40,9 @@ permission to use and distribute the software in accordance with the
|
|||
terms specified in this license.
|
||||
---*/
|
||||
/**
|
||||
* @file
|
||||
* @file
|
||||
*
|
||||
* A standard header file, adopted from Autobook.
|
||||
* A standard header file, adopted from Autobook.
|
||||
*
|
||||
* http://sources.redhat.com/autobook/
|
||||
*
|
||||
|
@ -114,11 +114,11 @@ typedef uint16_t pageoff_t;
|
|||
/*#define PROFILE_LATCHES*/
|
||||
/*#define NO_LATCHES */
|
||||
|
||||
#ifdef DEBUGGING
|
||||
#ifdef DEBUGGING
|
||||
/** @todo Files that use DEBUG have to pull in stdio.h, which is a pain! */
|
||||
#define DEBUG(...) \
|
||||
printf(__VA_ARGS__); fflush(NULL)
|
||||
#else
|
||||
printf(__VA_ARGS__); fflush(NULL)
|
||||
#else
|
||||
#define DEBUG(...)
|
||||
#endif /*DEBUGGING*/
|
||||
|
||||
|
@ -134,6 +134,15 @@ typedef struct {
|
|||
} recordid;
|
||||
#pragma pack(pop)
|
||||
|
||||
/*
|
||||
Define Page as an incomplete type to hide its implementation from clients.
|
||||
|
||||
Include stasis/page.h for the complete definition.
|
||||
*/
|
||||
typedef struct Page_s Page;
|
||||
|
||||
|
||||
|
||||
#include "compensations.h"
|
||||
|
||||
#endif /* __stasis_common_h */
|
||||
|
|
Loading…
Reference in a new issue