memoized loadPage() (per thread)

no longer requires pbl compatibility wrappers
This commit is contained in:
Sears Russell 2006-08-10 23:45:25 +00:00
parent 7847331771
commit 6f9c6af99c

View file

@ -68,10 +68,11 @@ terms specified in this license.
#include "blobManager.h" #include "blobManager.h"
#include <lladd/pageCache.h> #include <lladd/pageCache.h>
#include "pageFile.h" #include "pageFile.h"
#include <pbl/pbl.h>
#include <lladd/truncation.h> #include <lladd/truncation.h>
#include <lladd/lhtable.h>
#undef loadPage #undef loadPage
#undef releasePage #undef releasePage
#undef Page #undef Page
@ -98,7 +99,7 @@ pthread_mutex_t pinCount_mutex = PTHREAD_MUTEX_INITIALIZER;
int pinCount = 0; int pinCount = 0;
#endif #endif
static pblHashTable_t *activePages; /* page lookup */ static struct LH_ENTRY(table) *activePages; /* page lookup */
/*static Page * activePagePtrs[MAX_BUFFER_SIZE];*/ /*static Page * activePagePtrs[MAX_BUFFER_SIZE];*/
@ -106,6 +107,8 @@ static pthread_mutex_t loadPagePtr_mutex;
static Page * dummy_page; static Page * dummy_page;
pthread_key_t lastPage;
int bufInit() { int bufInit() {
pageInit(); pageInit();
@ -114,19 +117,22 @@ int bufInit() {
pthread_mutex_init(&loadPagePtr_mutex, NULL); pthread_mutex_init(&loadPagePtr_mutex, NULL);
activePages = pblHtCreate(); activePages = LH_ENTRY(create)(16);
dummy_page = pageMalloc(); dummy_page = pageMalloc();
pageFree(dummy_page, -1); pageFree(dummy_page, -1);
Page *first; Page *first;
first = pageMalloc(); first = pageMalloc();
pageFree(first, 0); pageFree(first, 0);
pblHtInsert(activePages, &first->id, sizeof(int), first); LH_ENTRY(insert)(activePages, &first->id, sizeof(int), first);
openBlobStore(); openBlobStore();
pageCacheInit(first); pageCacheInit(first);
int err = pthread_key_create(&lastPage, 0);
assert(!err);
assert(activePages); assert(activePages);
#ifdef PROFILE_LATCHES_WRITE_ONLY #ifdef PROFILE_LATCHES_WRITE_ONLY
profile_load_hash = LH_ENTRY(create)(10); profile_load_hash = LH_ENTRY(create)(10);
@ -139,20 +145,21 @@ void bufDeinit() {
closeBlobStore(); closeBlobStore();
Page *p;
DEBUG("pageCacheDeinit()"); DEBUG("pageCacheDeinit()");
for( p = (Page*)pblHtFirst( activePages ); p; p = (Page*)pblHtNext(activePages)) { struct LH_ENTRY(list) iter;
const struct LH_ENTRY(pair_t) * next;
LH_ENTRY(openlist(activePages, &iter));
pblHtRemove( activePages, 0, 0 ); while((next = LH_ENTRY(readlist)(&iter))) {
pageWrite((Page*)next->value);
DEBUG("+"); DEBUG("+");
pageWrite(p);
} }
LH_ENTRY(destroy)(activePages);
pthread_mutex_destroy(&loadPagePtr_mutex); pthread_mutex_destroy(&loadPagePtr_mutex);
pblHtDelete(activePages);
pageCacheDeinit(); pageCacheDeinit();
closePageFile(); closePageFile();
@ -191,7 +198,7 @@ static Page * getPage(int pageid, int locktype) {
int spin = 0; int spin = 0;
pthread_mutex_lock(&loadPagePtr_mutex); pthread_mutex_lock(&loadPagePtr_mutex);
ret = pblHtLookup(activePages, &pageid, sizeof(int)); ret = LH_ENTRY(find)(activePages, &pageid, sizeof(int));
if(ret) { if(ret) {
#ifdef PROFILE_LATCHES_WRITE_ONLY #ifdef PROFILE_LATCHES_WRITE_ONLY
@ -225,7 +232,7 @@ static Page * getPage(int pageid, int locktype) {
pthread_mutex_unlock(&loadPagePtr_mutex); pthread_mutex_unlock(&loadPagePtr_mutex);
sched_yield(); sched_yield();
pthread_mutex_lock(&loadPagePtr_mutex); pthread_mutex_lock(&loadPagePtr_mutex);
ret = pblHtLookup(activePages, &pageid, sizeof(int)); ret = LH_ENTRY(find)(activePages, &pageid, sizeof(int));
if(ret) { if(ret) {
#ifdef PROFILE_LATCHES_WRITE_ONLY #ifdef PROFILE_LATCHES_WRITE_ONLY
@ -321,8 +328,7 @@ static Page * getPage(int pageid, int locktype) {
/* Inserting this into the cache before releasing the mutex /* Inserting this into the cache before releasing the mutex
ensures that constraint (b) above holds. */ ensures that constraint (b) above holds. */
pblHtInsert(activePages, &pageid, sizeof(int), ret); LH_ENTRY(insert)(activePages, &pageid, sizeof(int), ret);
pthread_mutex_unlock(&loadPagePtr_mutex); pthread_mutex_unlock(&loadPagePtr_mutex);
/* Could writelock(ret) go here? */ /* Could writelock(ret) go here? */
@ -340,7 +346,7 @@ static Page * getPage(int pageid, int locktype) {
pthread_mutex_lock(&loadPagePtr_mutex); pthread_mutex_lock(&loadPagePtr_mutex);
pblHtRemove(activePages, &(oldid), sizeof(int)); LH_ENTRY(remove)(activePages, &(oldid), sizeof(int));
/* @todo Put off putting this back into cache until we're done with /* @todo Put off putting this back into cache until we're done with
it. -- This could cause the cache to empty out if the ratio of it. -- This could cause the cache to empty out if the ratio of
@ -454,7 +460,25 @@ compensated_function Page *loadPage(int xid, int pageid) {
} end_ret(NULL); } end_ret(NULL);
Page * ret = getPage(pageid, RO); Page * ret = pthread_getspecific(lastPage);
if(ret && ret->id == pageid) {
pthread_mutex_lock(&loadPagePtr_mutex);
readlock(ret->loadlatch, 1);
if(ret->id != pageid) {
unlock(ret->loadlatch);
ret = 0;
} else {
cacheHitOnPage(ret);
pthread_mutex_unlock(&loadPagePtr_mutex);
}
} else {
ret = 0;
}
if(!ret) {
ret = getPage(pageid, RO);
pthread_setspecific(lastPage, ret);
}
#ifdef PIN_COUNT #ifdef PIN_COUNT
pthread_mutex_lock(&pinCount_mutex); pthread_mutex_lock(&pinCount_mutex);