This fixes in Stasis issue 22.

Replacement policies now track the pin count of each page that they interact with.  This behavior used to live in bufferHash, and had never been ported to concurrentBufferManager.

For various reasons, the pin counts have to be updated atomically with the LRU state.  This would be very difficult to do inside of concurrentBufferManager, which sometimes needs to add and remove pages to replacement policies without holding any other latches.
This commit is contained in:
Sears Russell 2010-04-20 01:09:32 +00:00
parent 5138af6d1c
commit c7fa929e71
6 changed files with 66 additions and 51 deletions

View file

@ -62,7 +62,8 @@ static void pageSetNode(void * page, node_t * n, void * ignore) {
static inline struct Page_s ** pagePendingPtr(Page * p) { static inline struct Page_s ** pagePendingPtr(Page * p) {
return ((struct Page_s **)(&((p)->next))); return ((struct Page_s **)(&((p)->next)));
} }
static inline intptr_t* pagePinCountPtr(Page * p) { static inline intptr_t* pagePinCountPtr(void * page) {
Page * p = page;
return ((intptr_t*)(&((p)->queue))); return ((intptr_t*)(&((p)->queue)));
} }
static inline int needFlush(stasis_buffer_manager_t * bm) { static inline int needFlush(stasis_buffer_manager_t * bm) {
@ -134,9 +135,9 @@ inline static Page * getFreePage(stasis_buffer_manager_t *bm) {
if(bh->pageCount < MAX_BUFFER_SIZE) { if(bh->pageCount < MAX_BUFFER_SIZE) {
ret = stasis_buffer_pool_malloc_page(bh->buffer_pool); ret = stasis_buffer_pool_malloc_page(bh->buffer_pool);
stasis_buffer_pool_free_page(bh->buffer_pool, ret,-1); stasis_buffer_pool_free_page(bh->buffer_pool, ret,-1);
(*pagePinCountPtr(ret)) = 0;
(*pagePendingPtr(ret)) = 0; (*pagePendingPtr(ret)) = 0;
pageSetNode(ret,0,0); pageSetNode(ret,0,0);
(*pagePinCountPtr(ret)) = 1; // to match what happens after the next block calls lru->remove()
bh->pageCount++; bh->pageCount++;
} else { } else {
while((ret = bh->lru->getStale(bh->lru))) { while((ret = bh->lru->getStale(bh->lru))) {
@ -163,9 +164,9 @@ inline static Page * getFreePage(stasis_buffer_manager_t *bm) {
Page * check = LH_ENTRY(remove)(bh->cachedPages, &ret->id, sizeof(ret->id)); Page * check = LH_ENTRY(remove)(bh->cachedPages, &ret->id, sizeof(ret->id));
assert(check == ret); assert(check == ret);
} }
assert(!*pagePinCountPtr(ret));
assert(!*pagePendingPtr(ret)); assert(!*pagePendingPtr(ret));
assert(!pageGetNode(ret,0)); assert(!pageGetNode(ret,0));
assert(1 == *pagePinCountPtr(ret)); // was zero before this call...
assert(!ret->dirty); assert(!ret->dirty);
return ret; return ret;
} }
@ -205,11 +206,8 @@ static Page * bhGetCachedPage(stasis_buffer_manager_t* bm, int xid, const pageid
assert(locked); assert(locked);
#endif #endif
if(!*pagePendingPtr(ret)) { if(!*pagePendingPtr(ret)) {
if(!*pagePinCountPtr(ret) ) { bh->lru->remove(bh->lru, ret);
// Then ret is in lru (otherwise it would be pending, or not cached); remove it.
bh->lru->remove(bh->lru, ret);
}
(*pagePinCountPtr(ret))++;
checkPageState(ret); checkPageState(ret);
assert(ret->id == pageid); assert(ret->id == pageid);
} else { } else {
@ -253,11 +251,8 @@ static Page * bhLoadPageImpl_helper(stasis_buffer_manager_t* bm, stasis_buffer_m
int locked = tryreadlock(ret->loadlatch,0); int locked = tryreadlock(ret->loadlatch,0);
assert(locked); assert(locked);
#endif #endif
if(! *pagePinCountPtr(ret) ) { bh->lru->remove(bh->lru, ret);
// Then ret is in lru (otherwise it would be pending, or not cached); remove it.
bh->lru->remove(bh->lru, ret);
}
(*pagePinCountPtr(ret))++;
checkPageState(ret); checkPageState(ret);
pthread_mutex_unlock(&bh->mut); pthread_mutex_unlock(&bh->mut);
assert(ret->id == pageid); assert(ret->id == pageid);
@ -328,13 +323,11 @@ static Page * bhLoadPageImpl_helper(stasis_buffer_manager_t* bm, stasis_buffer_m
ret->LSN = *stasis_page_lsn_ptr(ret) = xid_lsn == INVALID_LSN ? (log_lsn - 1) : xid_lsn; ret->LSN = *stasis_page_lsn_ptr(ret) = xid_lsn == INVALID_LSN ? (log_lsn - 1) : xid_lsn;
} }
*pagePendingPtr(ret) = 0; *pagePendingPtr(ret) = 0;
// Would remove from lru, but getFreePage() guarantees that it isn't // Would remove from lru, but getFreePage() guarantees that it isn't
// there. // there.
assert(!pageGetNode(ret, 0)); assert(!pageGetNode(ret, 0));
assert(!(*pagePinCountPtr(ret)));
(*pagePinCountPtr(ret))++;
#ifdef LATCH_SANITY_CHECKING #ifdef LATCH_SANITY_CHECKING
int locked = tryreadlock(ret->loadlatch, 0); int locked = tryreadlock(ret->loadlatch, 0);
assert(locked); assert(locked);
@ -407,11 +400,8 @@ static void bhReleasePage(stasis_buffer_manager_t * bm, Page * p) {
stasis_buffer_hash_t * bh = bm->impl; stasis_buffer_hash_t * bh = bm->impl;
pthread_mutex_lock(&bh->mut); pthread_mutex_lock(&bh->mut);
checkPageState(p); checkPageState(p);
(*pagePinCountPtr(p))--; bh->lru->insert(bh->lru,p);
if(!(*pagePinCountPtr(p))) {
assert(!pageGetNode(p, 0));
bh->lru->insert(bh->lru,p);
}
#ifdef LATCH_SANITY_CHECKING #ifdef LATCH_SANITY_CHECKING
unlock(p->loadlatch); unlock(p->loadlatch);
#endif #endif
@ -556,7 +546,7 @@ stasis_buffer_manager_t* stasis_buffer_manager_hash_open(stasis_page_handle_t *
bh->buffer_pool = stasis_buffer_pool_init(); bh->buffer_pool = stasis_buffer_pool_init();
bh->lru = lruFastInit(pageGetNode, pageSetNode, 0); bh->lru = lruFastInit(pageGetNode, pageSetNode, pagePinCountPtr, 0);
bh->cachedPages = LH_ENTRY(create)(MAX_BUFFER_SIZE); bh->cachedPages = LH_ENTRY(create)(MAX_BUFFER_SIZE);

View file

@ -40,6 +40,10 @@ static void pageSetNode(void * page, node_t * n, void * ignore) {
Page * p = page; Page * p = page;
p->prev = (Page *) n; p->prev = (Page *) n;
} }
static inline intptr_t* pagePinCountPtr(void * page) {
Page * p = page;
return ((intptr_t*)(&((p)->queue)));
}
static inline int needFlush(stasis_buffer_manager_t * bm) { static inline int needFlush(stasis_buffer_manager_t * bm) {
stasis_buffer_concurrent_hash_t *bh = bm->impl; stasis_buffer_concurrent_hash_t *bh = bm->impl;
@ -95,7 +99,7 @@ static Page * chGetCachedPage(stasis_buffer_manager_t* bm, int xid, const pageid
hashtable_bucket_handle_t h; hashtable_bucket_handle_t h;
Page * p = hashtable_lookup_lock(ch->ht, pageid, &h); Page * p = hashtable_lookup_lock(ch->ht, pageid, &h);
readlock(p->loadlatch, 0); readlock(p->loadlatch, 0);
ch->lru->hit(ch->lru, p); ch->lru->remove(ch->lru, p);
hashtable_unlock(&h); hashtable_unlock(&h);
return p; return p;
} }
@ -187,7 +191,7 @@ static Page * chLoadPageImpl_helper(stasis_buffer_manager_t* bm, int xid, const
if(needFlush(bm)) { pthread_cond_signal(&ch->needFree); } if(needFlush(bm)) { pthread_cond_signal(&ch->needFree); }
} }
readlock(p->loadlatch, 0); readlock(p->loadlatch, 0);
if(first) { ch->lru->hit(ch->lru, p); } ch->lru->remove(ch->lru, p);
hashtable_unlock(&h); hashtable_unlock(&h);
assert(p->id == pageid); assert(p->id == pageid);
return p; return p;
@ -199,6 +203,8 @@ static Page * chLoadUninitPageImpl(stasis_buffer_manager_t *bm, int xid, const p
return chLoadPageImpl_helper(bm, xid,pageid,1,UNKNOWN_TYPE_PAGE); // 1 means dont care about preimage of page. return chLoadPageImpl_helper(bm, xid,pageid,1,UNKNOWN_TYPE_PAGE); // 1 means dont care about preimage of page.
} }
static void chReleasePage(stasis_buffer_manager_t * bm, Page * p) { static void chReleasePage(stasis_buffer_manager_t * bm, Page * p) {
stasis_buffer_concurrent_hash_t * ch = bm->impl;
ch->lru->insert(ch->lru, p);
unlock(p->loadlatch); unlock(p->loadlatch);
} }
static void chForcePages(stasis_buffer_manager_t* bm, stasis_buffer_manager_handle_t *h) { static void chForcePages(stasis_buffer_manager_t* bm, stasis_buffer_manager_handle_t *h) {
@ -274,7 +280,7 @@ stasis_buffer_manager_t* stasis_buffer_manager_concurrent_hash_open(stasis_page_
ch->lru = replacementPolicyConcurrentWrapperInit(lrus, 37); ch->lru = replacementPolicyConcurrentWrapperInit(lrus, 37);
free(lrus); free(lrus);
#else #else
ch->lru = replacementPolicyThreadsafeWrapperInit(lruFastInit(pageGetNode, pageSetNode, 0)); ch->lru = replacementPolicyThreadsafeWrapperInit(lruFastInit(pageGetNode, pageSetNode, pagePinCountPtr, 0));
#endif #endif
ch->ht = hashtable_init(MAX_BUFFER_SIZE * 4); ch->ht = hashtable_init(MAX_BUFFER_SIZE * 4);
@ -282,7 +288,8 @@ stasis_buffer_manager_t* stasis_buffer_manager_concurrent_hash_open(stasis_page_
Page *p = stasis_buffer_pool_malloc_page(ch->buffer_pool); Page *p = stasis_buffer_pool_malloc_page(ch->buffer_pool);
stasis_buffer_pool_free_page(ch->buffer_pool, p,-1*i); stasis_buffer_pool_free_page(ch->buffer_pool, p,-1*i);
pageSetNode(p,0,0); pageSetNode(p,0,0);
ch->lru->insert(ch->lru, p); (*pagePinCountPtr(p)) = 1;
ch->lru->insert(ch->lru, p); // decrements pin count ptr (setting it to zero)
hashtable_insert(ch->ht, p->id, p); hashtable_insert(ch->ht, p->id, p);
} }

View file

@ -10,8 +10,8 @@ typedef struct LL_ENTRY(list) list;
typedef struct lruFast { typedef struct lruFast {
struct LL_ENTRY(list) * lru; struct LL_ENTRY(list) * lru;
node_t * (*getNode)(void * page, void * conf); node_t * (*getNode)(void * page, void * conf);
void (*setNode)(void * page, node_t * n, void (*setNode)(void * page, node_t * n, void * conf);
void * conf); intptr_t*(*derefCount)(void* page);
void * conf; void * conf;
} lruFast; } lruFast;
@ -29,25 +29,36 @@ static void* getStale(struct replacementPolicy * r) {
} }
static void* remove(struct replacementPolicy* r, void * p) { static void* remove(struct replacementPolicy* r, void * p) {
lruFast * l = r->impl; lruFast * l = r->impl;
node_t * n = l->getNode(p, l->conf); void *ret = NULL;
assert(n);
value_t * v = n->v; if(!*l->derefCount(p)) {
LL_ENTRY(remove)(l->lru, n); node_t * n = l->getNode(p, l->conf);
l->setNode(p, 0, l->conf); assert(n);
return v; value_t * v = n->v;
LL_ENTRY(remove)(l->lru, n);
l->setNode(p, 0, l->conf);
ret = v;
}
(*l->derefCount(p))++;
return ret;
} }
static void* getStaleAndRemove(struct replacementPolicy* r) { static void* getStaleAndRemove(struct replacementPolicy* r) {
lruFast * l = r->impl; lruFast * l = r->impl;
void * ret = LL_ENTRY(shift)(l->lru); void * ret = LL_ENTRY(shift)(l->lru);
l->setNode(ret, 0, l->conf); l->setNode(ret, 0, l->conf);
assert(!(*l->derefCount(ret)));
(*l->derefCount(ret))++;
return ret; return ret;
} }
static void insert(struct replacementPolicy* r, static void insert(struct replacementPolicy* r, void * p) {
void * p) {
lruFast * l = r->impl; lruFast * l = r->impl;
assert(0 == l->getNode(p, l->conf)); (*l->derefCount(p))--;
node_t * n = LL_ENTRY(push)(l->lru, p); assert(*l->derefCount(p) >= 0);
l->setNode(p, n, l->conf); if(!*l->derefCount(p)) {
assert(0 == l->getNode(p, l->conf));
node_t * n = LL_ENTRY(push)(l->lru, p);
l->setNode(p, n, l->conf);
}
} }
static void deinit(struct replacementPolicy * r) { static void deinit(struct replacementPolicy * r) {
lruFast * l = r->impl; lruFast * l = r->impl;
@ -60,8 +71,9 @@ static void deinit(struct replacementPolicy * r) {
replacementPolicy * lruFastInit( replacementPolicy * lruFastInit(
struct LL_ENTRY(node_t) * (*getNode)(void * page, void * conf), struct LL_ENTRY(node_t) * (*getNode)(void * page, void * conf),
void (*setNode)(void * page, void (*setNode)(void * page,
struct LL_ENTRY(node_t) * n, struct LL_ENTRY(node_t) * n,
void * conf), void * conf),
intptr_t* (*derefCount)(void *page),
void * conf) { void * conf) {
struct replacementPolicy * ret = malloc(sizeof(struct replacementPolicy)); struct replacementPolicy * ret = malloc(sizeof(struct replacementPolicy));
ret->deinit = deinit; ret->deinit = deinit;
@ -74,6 +86,7 @@ replacementPolicy * lruFastInit(
l->lru = LL_ENTRY(create)(); l->lru = LL_ENTRY(create)();
l->getNode = getNode; l->getNode = getNode;
l->setNode = setNode; l->setNode = setNode;
l->derefCount = derefCount;
ret->impl = l; ret->impl = l;
return ret; return ret;
} }

View file

@ -37,8 +37,9 @@ replacementPolicy * stasis_replacement_policy_lru_init();
replacementPolicy * lruFastInit( replacementPolicy * lruFastInit(
struct LL_ENTRY(node_t) * (*getNode)(void * page, void * conf), struct LL_ENTRY(node_t) * (*getNode)(void * page, void * conf),
void (*setNode)(void * page, void (*setNode)(void * page,
struct LL_ENTRY(node_t) * n, struct LL_ENTRY(node_t) * n,
void * conf), void * conf),
intptr_t* (*derefCount)(void *page),
void * conf); void * conf);
replacementPolicy* replacementPolicyThreadsafeWrapperInit(replacementPolicy* rp); replacementPolicy* replacementPolicyThreadsafeWrapperInit(replacementPolicy* rp);
replacementPolicy* replacementPolicyConcurrentWrapperInit(replacementPolicy** rp, int count); replacementPolicy* replacementPolicyConcurrentWrapperInit(replacementPolicy** rp, int count);

View file

@ -356,8 +356,6 @@ START_TEST(stalePinTest) {
stalePinTestImpl(stasis_buffer_manager_hash_factory); stalePinTestImpl(stasis_buffer_manager_hash_factory);
} END_TEST } END_TEST
START_TEST(stalePinTestConcurrentBufferManager) { START_TEST(stalePinTestConcurrentBufferManager) {
printf("Fail: Bug 22");
abort();
stalePinTestImpl(stasis_buffer_manager_concurrent_hash_factory); stalePinTestImpl(stasis_buffer_manager_concurrent_hash_factory);
} END_TEST } END_TEST
//START_TEST(stalePinTestDeprecatedBufferManager) { //START_TEST(stalePinTestDeprecatedBufferManager) {

View file

@ -21,6 +21,7 @@ typedef struct LL_ENTRY(node_t) node_t;
typedef struct tracker { typedef struct tracker {
pageid_t val; // Must be pageid_t. Otherwise, there will be mismatches inside of concurrentWrapper's hash. pageid_t val; // Must be pageid_t. Otherwise, there will be mismatches inside of concurrentWrapper's hash.
intptr_t key; intptr_t key;
intptr_t pinCount;
int inCache; int inCache;
pthread_mutex_t mut; pthread_mutex_t mut;
} tracker; } tracker;
@ -36,6 +37,10 @@ static void setKey(void * page, node_t * n, void * ignore) {
tracker * p = page; tracker * p = page;
p->key = (intptr_t) n; p->key = (intptr_t) n;
} }
static intptr_t * pinCount(void *page) {
tracker * p = page;
return &p->pinCount;
}
tracker * t; tracker * t;
@ -49,6 +54,7 @@ void randomSetup() {
t = calloc(OBJECT_COUNT, sizeof(tracker)); t = calloc(OBJECT_COUNT, sizeof(tracker));
for(int i = 0; i < OBJECT_COUNT; i++) { for(int i = 0; i < OBJECT_COUNT; i++) {
t[i].val = i; t[i].val = i;
t[i].pinCount = 1;
} }
} }
@ -125,7 +131,7 @@ void randomTest(replacementPolicy * lru, unsigned long count) {
} }
} }
START_TEST(replacementPolicyLRURandomTest) { START_TEST(replacementPolicyLRURandomTest) {
replacementPolicy * lru = lruFastInit(getKey, setKey, 0); replacementPolicy * lru = lruFastInit(getKey, setKey, pinCount, 0);
threaded = 0; threaded = 0;
randomSetup(); randomSetup();
randomTest(lru, LONG_COUNT); randomTest(lru, LONG_COUNT);
@ -133,7 +139,7 @@ START_TEST(replacementPolicyLRURandomTest) {
randomTeardown(); randomTeardown();
} END_TEST } END_TEST
START_TEST(replacementPolicyLRUFastRandomTest) { START_TEST(replacementPolicyLRUFastRandomTest) {
replacementPolicy * lru = lruFastInit(getKey, setKey, 0); replacementPolicy * lru = lruFastInit(getKey, setKey, pinCount, 0);
threaded = 0; threaded = 0;
randomSetup(); randomSetup();
randomTest(lru, LONG_COUNT); randomTest(lru, LONG_COUNT);
@ -141,7 +147,7 @@ START_TEST(replacementPolicyLRUFastRandomTest) {
randomTeardown(); randomTeardown();
} END_TEST } END_TEST
START_TEST(replacementPolicyThreadsafeRandomTest) { START_TEST(replacementPolicyThreadsafeRandomTest) {
replacementPolicy * lru = lruFastInit(getKey, setKey, 0); replacementPolicy * lru = lruFastInit(getKey, setKey, pinCount, 0);
replacementPolicy * tsLru = replacementPolicyThreadsafeWrapperInit(lru); replacementPolicy * tsLru = replacementPolicyThreadsafeWrapperInit(lru);
threaded = 0; threaded = 0;
randomSetup(); randomSetup();
@ -154,7 +160,7 @@ START_TEST(replacementPolicyConcurrentRandomTest) {
int LRU_COUNT = OBJECT_COUNT / 51; int LRU_COUNT = OBJECT_COUNT / 51;
replacementPolicy * lru[LRU_COUNT]; replacementPolicy * lru[LRU_COUNT];
for(int i = 0; i < LRU_COUNT; i++) { for(int i = 0; i < LRU_COUNT; i++) {
lru[i] = lruFastInit(getKey, setKey, 0); lru[i] = lruFastInit(getKey, setKey, pinCount, 0);
} }
threaded = 0; threaded = 0;
replacementPolicy * cwLru = replacementPolicyConcurrentWrapperInit(lru, LRU_COUNT); replacementPolicy * cwLru = replacementPolicyConcurrentWrapperInit(lru, LRU_COUNT);
@ -172,7 +178,7 @@ void * randomTestWorker(void * arg) {
} }
START_TEST(replacementPolicyThreadsafeThreadTest) { START_TEST(replacementPolicyThreadsafeThreadTest) {
replacementPolicy * lru = lruFastInit(getKey, setKey, 0); replacementPolicy * lru = lruFastInit(getKey, setKey, pinCount, 0);
replacementPolicy * tsLru = replacementPolicyThreadsafeWrapperInit(lru); replacementPolicy * tsLru = replacementPolicyThreadsafeWrapperInit(lru);
threaded = 1; threaded = 1;
worker_lru = tsLru; worker_lru = tsLru;
@ -192,7 +198,7 @@ START_TEST(replacementPolicyConcurrentThreadTest) {
int LRU_COUNT = OBJECT_COUNT / 51; int LRU_COUNT = OBJECT_COUNT / 51;
replacementPolicy * lru[LRU_COUNT]; replacementPolicy * lru[LRU_COUNT];
for(int i = 0; i < LRU_COUNT; i++) { for(int i = 0; i < LRU_COUNT; i++) {
lru[i] = lruFastInit(getKey, setKey, 0); lru[i] = lruFastInit(getKey, setKey, pinCount, 0);
} }
replacementPolicy * cwLru = replacementPolicyConcurrentWrapperInit(lru, THREAD_COUNT); replacementPolicy * cwLru = replacementPolicyConcurrentWrapperInit(lru, THREAD_COUNT);
threaded = 1; threaded = 1;