From 2535f69fbc9dbc811d3ee1f4fa08d28993d4a657 Mon Sep 17 00:00:00 2001 From: Sears Russell Date: Tue, 24 Nov 2009 01:43:36 +0000 Subject: [PATCH] switch the concurrent hash to use lru->getStaleAndRemove; fix crashes due to partial leak of thread local pages on thread shutdown --- .../bufferManager/concurrentBufferManager.c | 32 ++++++++----------- .../replacementPolicy/concurrentWrapper.c | 27 +++++++++++----- src/stasis/replacementPolicy/lru.c | 7 ++++ src/stasis/replacementPolicy/lruFast.c | 14 +++++--- .../replacementPolicy/threadsafeWrapper.c | 16 +++++----- stasis/replacementPolicy.h | 1 + 6 files changed, 57 insertions(+), 40 deletions(-) diff --git a/src/stasis/bufferManager/concurrentBufferManager.c b/src/stasis/bufferManager/concurrentBufferManager.c index 048e170..d4bd561 100644 --- a/src/stasis/bufferManager/concurrentBufferManager.c +++ b/src/stasis/bufferManager/concurrentBufferManager.c @@ -108,10 +108,13 @@ static Page * chGetCachedPage(stasis_buffer_manager_t* bm, int xid, const pageid static void deinitTLS(void *tlsp) { stasis_buffer_concurrent_hash_tls_t * tls = tlsp; stasis_buffer_concurrent_hash_t *ch = tls->bm->impl; - if(tls->p) { - ch->lru->insert(ch->lru, tls->p); + + Page * p = tls->p; + p->id = -1; + while(hashtable_test_and_set(ch->ht,p->id, p)) { + p->id --; } - free(tls); + ch->lru->insert(ch->lru, tls->p); } static inline stasis_buffer_concurrent_hash_tls_t * populateTLS(stasis_buffer_manager_t* bm) { stasis_buffer_concurrent_hash_t *ch = bm->impl; @@ -124,7 +127,7 @@ static inline stasis_buffer_concurrent_hash_tls_t * populateTLS(stasis_buffer_ma } int count = 0; while(tls->p == NULL) { - Page * tmp = ch->lru->getStale(ch->lru); + Page * tmp = ch->lru->getStaleAndRemove(ch->lru); hashtable_bucket_handle_t h; tls->p = hashtable_remove_begin(ch->ht, tmp->id, &h); if(tls->p) { @@ -132,9 +135,7 @@ static inline stasis_buffer_concurrent_hash_tls_t * populateTLS(stasis_buffer_ma // However, this would cause subtle problems; page->id could change while we're in LRU. LRU partitions its latches on page->id, and references state // stored with the page... writelock(tls->p->loadlatch,0); - // note that the removes are atomic. This is guaranteed by the hashtable lock. - ch->lru->remove(ch->lru, tls->p); - // The getStale was not atomic, which is OK (but we can't trust tmp anymore...) + // The getStaleAndRemove was not atomic with the hashtable remove, which is OK (but we can't trust tmp anymore...) tmp = 0; if(tls->p->id >= 0) { ch->page_handle->write(ch->page_handle, tls->p); @@ -151,17 +152,10 @@ static inline stasis_buffer_concurrent_hash_tls_t * populateTLS(stasis_buffer_ma // // Go around the loop again. // tls->p = NULL; } else { - // page is not in hashtable, but it is in LRU. We can observe this because getStale and hashtable remove are not atomic. - - // no need to hit the page; we will not spin on it for long (the other thread will be removing it from lru before it blocks on I/O) - - // hit the page so we don't spin on it; readlock suffices, since we don't want to change the pageid. -// readlock(tmp->loadlatch,0); -// ch->lru->hit(ch->lru, tmp); -// unlock(tmp->loadlatch); - - // remove failed; need to 'complete' it to release latch (this call is a no-op) - hashtable_remove_finish(ch->ht, &h); + // page is not in hashtable, but it is in LRU. getStale and hashtable remove are not atomic. + // However, we cannot observe this; the lru remove happens before the hashtable remove, + // and the hashtable insert happens before the lru insert. + abort(); } count ++; if(count == 100) { @@ -185,7 +179,7 @@ static Page * chLoadPageImpl_helper(stasis_buffer_manager_t* bm, int xid, const int succ = trywritelock(p->loadlatch, 0); assert(succ); - + // this has to happen after the hashtable insertion succeeds, otherwise above, we could get a page from lru that isn't in the cache. ch->lru->insert(ch->lru, p); hashtable_unlock(&h); diff --git a/src/stasis/replacementPolicy/concurrentWrapper.c b/src/stasis/replacementPolicy/concurrentWrapper.c index 0380b01..8f0b6f1 100644 --- a/src/stasis/replacementPolicy/concurrentWrapper.c +++ b/src/stasis/replacementPolicy/concurrentWrapper.c @@ -64,13 +64,24 @@ static void* cwRemove (struct replacementPolicy* impl, void * page) { pthread_mutex_unlock(&rp->mut[bucket]); return ret; } -//static void* tsGetStaleAndRemove (struct replacementPolicy* impl) { -// stasis_replacement_policy_threadsafe_wrapper_t * rp = impl->impl; -// pthread_mutex_lock(&rp->mut); -// void *ret = rp->impl->getStaleAndRemove(rp->impl); -// pthread_mutex_unlock(&rp->mut); -// return ret; -//} +static void* tsGetStaleAndRemove (struct replacementPolicy* impl) { + stasis_replacement_policy_concurrent_wrapper_t * rp = impl->impl; + intptr_t bucket = (intptr_t)pthread_getspecific(rp->next_bucket); + intptr_t oldbucket = bucket; + void *ret = 0; + while(ret == 0) { + while(pthread_mutex_trylock(&rp->mut[bucket])) { + bucket = (bucket + 1) % rp->num_buckets; + } + ret = rp->impl[bucket]->getStaleAndRemove(rp->impl[bucket]); + pthread_mutex_unlock(&rp->mut[bucket]); + bucket = (bucket + 1) % rp->num_buckets; + } + if(bucket != oldbucket) { + pthread_setspecific(rp->next_bucket, (void*) bucket); + } + return ret; +} static void cwInsert (struct replacementPolicy* impl, void * page) { stasis_replacement_policy_concurrent_wrapper_t * rp = impl->impl; unsigned int bucket = bucket_hash(rp, page); @@ -95,7 +106,7 @@ replacementPolicy* replacementPolicyConcurrentWrapperInit(replacementPolicy** rp ret->deinit = cwDeinit; ret->hit = cwHit; ret->getStale = cwGetStale; -// ret->getStaleAndRemove = tsGetStaleAndRemove; + ret->getStaleAndRemove = tsGetStaleAndRemove; ret->remove = cwRemove; ret->insert = cwInsert; ret->impl = rpw; diff --git a/src/stasis/replacementPolicy/lru.c b/src/stasis/replacementPolicy/lru.c index 8654732..ff7c3f4 100644 --- a/src/stasis/replacementPolicy/lru.c +++ b/src/stasis/replacementPolicy/lru.c @@ -69,6 +69,12 @@ static void* stasis_replacement_policy_lru_remove(replacementPolicy* r, void * p free(e); return ret; } +static void* stasis_replacement_policy_lru_get_stale_and_remove(replacementPolicy* r) { + void* ret = stasis_replacement_policy_lru_get_stale(r); + stasis_replacement_policy_lru_remove(r, ret); + return ret; +} + static void stasis_replacement_policy_lru_insert(replacementPolicy* r, void * p) { stasis_replacement_policy_lru_t * l = r->impl; stasis_replacement_policy_lru_entry * e = malloc(sizeof(stasis_replacement_policy_lru_entry)); @@ -92,6 +98,7 @@ replacementPolicy * stasis_replacement_policy_lru_init() { ret->hit = stasis_replacement_policy_lru_hit; ret->getStale = stasis_replacement_policy_lru_get_stale; ret->remove = stasis_replacement_policy_lru_remove; + ret->getStaleAndRemove = stasis_replacement_policy_lru_get_stale_and_remove; ret->insert = stasis_replacement_policy_lru_insert; ret->impl = l; return ret; diff --git a/src/stasis/replacementPolicy/lruFast.c b/src/stasis/replacementPolicy/lruFast.c index 55a2623..aba6f7d 100644 --- a/src/stasis/replacementPolicy/lruFast.c +++ b/src/stasis/replacementPolicy/lruFast.c @@ -8,7 +8,6 @@ typedef struct LL_ENTRY(node_t) node_t; typedef struct LL_ENTRY(list) list; typedef struct lruFast { - // struct LH_ENTRY(table) * hash; struct LL_ENTRY(list) * lru; node_t * (*getNode)(void * page, void * conf); void (*setNode)(void * page, node_t * n, @@ -18,9 +17,9 @@ typedef struct lruFast { static void hit(struct replacementPolicy * r, void * p) { lruFast * l = r->impl; - // node_t * n = LH_ENTRY(find)(l->hash, &id, sizeof(int)); node_t * n = l->getNode(p, l->conf); if(!n) { return; } // ignore attempts to hit pages not in lru + assert(n); LL_ENTRY(removeNoFree)(l->lru, n); LL_ENTRY(pushNode)(l->lru, n); } @@ -30,25 +29,29 @@ static void* getStale(struct replacementPolicy * r) { } static void* remove(struct replacementPolicy* r, void * p) { lruFast * l = r->impl; - node_t * n = l->getNode(p, l->conf); //LH_ENTRY(remove)(l->hash, &id, sizeof(int)); + node_t * n = l->getNode(p, l->conf); assert(n); value_t * v = n->v; LL_ENTRY(remove)(l->lru, n); l->setNode(p, 0, l->conf); return v; } +static void* getStaleAndRemove(struct replacementPolicy* r) { + lruFast * l = r->impl; + void * ret = LL_ENTRY(shift)(l->lru); + l->setNode(ret, 0, l->conf); + return ret; +} static void insert(struct replacementPolicy* r, void * p) { lruFast * l = r->impl; node_t * n = LL_ENTRY(push)(l->lru, p); - // LH_ENTRY(insert)(l->hash, &id, sizeof(int), n); l->setNode(p, n, l->conf); } static void deinit(struct replacementPolicy * r) { lruFast * l = r->impl; // the node_t's get freed by LL_ENTRY. It's the caller's // responsibility to free the void *'s passed into us. - //LH_ENTRY(destroy)(l->hash); LL_ENTRY(destroy)(l->lru); free(l); free(r); @@ -64,6 +67,7 @@ replacementPolicy * lruFastInit( ret->hit = hit; ret->getStale = getStale; ret->remove = remove; + ret->getStaleAndRemove = getStaleAndRemove; ret->insert = insert; lruFast * l = malloc(sizeof(lruFast)); l->lru = LL_ENTRY(create)(); diff --git a/src/stasis/replacementPolicy/threadsafeWrapper.c b/src/stasis/replacementPolicy/threadsafeWrapper.c index 4c8525a..dc4e85e 100644 --- a/src/stasis/replacementPolicy/threadsafeWrapper.c +++ b/src/stasis/replacementPolicy/threadsafeWrapper.c @@ -37,13 +37,13 @@ static void* tsRemove (struct replacementPolicy* impl, void * page) { pthread_mutex_unlock(&rp->mut); return ret; } -//static void* tsGetStaleAndRemove (struct replacementPolicy* impl) { -// stasis_replacement_policy_threadsafe_wrapper_t * rp = impl->impl; -// pthread_mutex_lock(&rp->mut); -// void *ret = rp->impl->getStaleAndRemove(rp->impl); -// pthread_mutex_unlock(&rp->mut); -// return ret; -//} +static void* tsGetStaleAndRemove (struct replacementPolicy* impl) { + stasis_replacement_policy_threadsafe_wrapper_t * rp = impl->impl; + pthread_mutex_lock(&rp->mut); + void *ret = rp->impl->getStaleAndRemove(rp->impl); + pthread_mutex_unlock(&rp->mut); + return ret; +} static void tsInsert (struct replacementPolicy* impl, void * page) { stasis_replacement_policy_threadsafe_wrapper_t * rp = impl->impl; pthread_mutex_lock(&rp->mut); @@ -60,7 +60,7 @@ replacementPolicy* replacementPolicyThreadsafeWrapperInit(replacementPolicy* rp) ret->deinit = tsDeinit; ret->hit = tsHit; ret->getStale = tsGetStale; -// ret->getStaleAndRemove = tsGetStaleAndRemove; + ret->getStaleAndRemove = tsGetStaleAndRemove; ret->remove = tsRemove; ret->insert = tsInsert; ret->impl = rpw; diff --git a/stasis/replacementPolicy.h b/stasis/replacementPolicy.h index a5e2b9f..49de4e4 100644 --- a/stasis/replacementPolicy.h +++ b/stasis/replacementPolicy.h @@ -28,6 +28,7 @@ typedef struct replacementPolicy { void (*hit) (struct replacementPolicy* impl, void * page); void*(*getStale)(struct replacementPolicy* impl); void*(*remove) (struct replacementPolicy* impl, void * page); + void*(*getStaleAndRemove)(struct replacementPolicy* impl); void (*insert) (struct replacementPolicy* impl, void * page); void * impl; } replacementPolicy;