switch the concurrent hash to use lru->getStaleAndRemove; fix crashes due to partial leak of thread local pages on thread shutdown

This commit is contained in:
Sears Russell 2009-11-24 01:43:36 +00:00
parent 38b999dfed
commit 2535f69fbc
6 changed files with 57 additions and 40 deletions

View file

@ -108,10 +108,13 @@ static Page * chGetCachedPage(stasis_buffer_manager_t* bm, int xid, const pageid
static void deinitTLS(void *tlsp) {
stasis_buffer_concurrent_hash_tls_t * tls = tlsp;
stasis_buffer_concurrent_hash_t *ch = tls->bm->impl;
if(tls->p) {
ch->lru->insert(ch->lru, tls->p);
Page * p = tls->p;
p->id = -1;
while(hashtable_test_and_set(ch->ht,p->id, p)) {
p->id --;
}
free(tls);
ch->lru->insert(ch->lru, tls->p);
}
static inline stasis_buffer_concurrent_hash_tls_t * populateTLS(stasis_buffer_manager_t* bm) {
stasis_buffer_concurrent_hash_t *ch = bm->impl;
@ -124,7 +127,7 @@ static inline stasis_buffer_concurrent_hash_tls_t * populateTLS(stasis_buffer_ma
}
int count = 0;
while(tls->p == NULL) {
Page * tmp = ch->lru->getStale(ch->lru);
Page * tmp = ch->lru->getStaleAndRemove(ch->lru);
hashtable_bucket_handle_t h;
tls->p = hashtable_remove_begin(ch->ht, tmp->id, &h);
if(tls->p) {
@ -132,9 +135,7 @@ static inline stasis_buffer_concurrent_hash_tls_t * populateTLS(stasis_buffer_ma
// However, this would cause subtle problems; page->id could change while we're in LRU. LRU partitions its latches on page->id, and references state
// stored with the page...
writelock(tls->p->loadlatch,0);
// note that the removes are atomic. This is guaranteed by the hashtable lock.
ch->lru->remove(ch->lru, tls->p);
// The getStale was not atomic, which is OK (but we can't trust tmp anymore...)
// The getStaleAndRemove was not atomic with the hashtable remove, which is OK (but we can't trust tmp anymore...)
tmp = 0;
if(tls->p->id >= 0) {
ch->page_handle->write(ch->page_handle, tls->p);
@ -151,17 +152,10 @@ static inline stasis_buffer_concurrent_hash_tls_t * populateTLS(stasis_buffer_ma
// // Go around the loop again.
// tls->p = NULL;
} else {
// page is not in hashtable, but it is in LRU. We can observe this because getStale and hashtable remove are not atomic.
// no need to hit the page; we will not spin on it for long (the other thread will be removing it from lru before it blocks on I/O)
// hit the page so we don't spin on it; readlock suffices, since we don't want to change the pageid.
// readlock(tmp->loadlatch,0);
// ch->lru->hit(ch->lru, tmp);
// unlock(tmp->loadlatch);
// remove failed; need to 'complete' it to release latch (this call is a no-op)
hashtable_remove_finish(ch->ht, &h);
// page is not in hashtable, but it is in LRU. getStale and hashtable remove are not atomic.
// However, we cannot observe this; the lru remove happens before the hashtable remove,
// and the hashtable insert happens before the lru insert.
abort();
}
count ++;
if(count == 100) {
@ -185,7 +179,7 @@ static Page * chLoadPageImpl_helper(stasis_buffer_manager_t* bm, int xid, const
int succ = trywritelock(p->loadlatch, 0);
assert(succ);
// this has to happen after the hashtable insertion succeeds, otherwise above, we could get a page from lru that isn't in the cache.
ch->lru->insert(ch->lru, p);
hashtable_unlock(&h);

View file

@ -64,13 +64,24 @@ static void* cwRemove (struct replacementPolicy* impl, void * page) {
pthread_mutex_unlock(&rp->mut[bucket]);
return ret;
}
//static void* tsGetStaleAndRemove (struct replacementPolicy* impl) {
// stasis_replacement_policy_threadsafe_wrapper_t * rp = impl->impl;
// pthread_mutex_lock(&rp->mut);
// void *ret = rp->impl->getStaleAndRemove(rp->impl);
// pthread_mutex_unlock(&rp->mut);
// return ret;
//}
static void* tsGetStaleAndRemove (struct replacementPolicy* impl) {
stasis_replacement_policy_concurrent_wrapper_t * rp = impl->impl;
intptr_t bucket = (intptr_t)pthread_getspecific(rp->next_bucket);
intptr_t oldbucket = bucket;
void *ret = 0;
while(ret == 0) {
while(pthread_mutex_trylock(&rp->mut[bucket])) {
bucket = (bucket + 1) % rp->num_buckets;
}
ret = rp->impl[bucket]->getStaleAndRemove(rp->impl[bucket]);
pthread_mutex_unlock(&rp->mut[bucket]);
bucket = (bucket + 1) % rp->num_buckets;
}
if(bucket != oldbucket) {
pthread_setspecific(rp->next_bucket, (void*) bucket);
}
return ret;
}
static void cwInsert (struct replacementPolicy* impl, void * page) {
stasis_replacement_policy_concurrent_wrapper_t * rp = impl->impl;
unsigned int bucket = bucket_hash(rp, page);
@ -95,7 +106,7 @@ replacementPolicy* replacementPolicyConcurrentWrapperInit(replacementPolicy** rp
ret->deinit = cwDeinit;
ret->hit = cwHit;
ret->getStale = cwGetStale;
// ret->getStaleAndRemove = tsGetStaleAndRemove;
ret->getStaleAndRemove = tsGetStaleAndRemove;
ret->remove = cwRemove;
ret->insert = cwInsert;
ret->impl = rpw;

View file

@ -69,6 +69,12 @@ static void* stasis_replacement_policy_lru_remove(replacementPolicy* r, void * p
free(e);
return ret;
}
static void* stasis_replacement_policy_lru_get_stale_and_remove(replacementPolicy* r) {
void* ret = stasis_replacement_policy_lru_get_stale(r);
stasis_replacement_policy_lru_remove(r, ret);
return ret;
}
static void stasis_replacement_policy_lru_insert(replacementPolicy* r, void * p) {
stasis_replacement_policy_lru_t * l = r->impl;
stasis_replacement_policy_lru_entry * e = malloc(sizeof(stasis_replacement_policy_lru_entry));
@ -92,6 +98,7 @@ replacementPolicy * stasis_replacement_policy_lru_init() {
ret->hit = stasis_replacement_policy_lru_hit;
ret->getStale = stasis_replacement_policy_lru_get_stale;
ret->remove = stasis_replacement_policy_lru_remove;
ret->getStaleAndRemove = stasis_replacement_policy_lru_get_stale_and_remove;
ret->insert = stasis_replacement_policy_lru_insert;
ret->impl = l;
return ret;

View file

@ -8,7 +8,6 @@ typedef struct LL_ENTRY(node_t) node_t;
typedef struct LL_ENTRY(list) list;
typedef struct lruFast {
// struct LH_ENTRY(table) * hash;
struct LL_ENTRY(list) * lru;
node_t * (*getNode)(void * page, void * conf);
void (*setNode)(void * page, node_t * n,
@ -18,9 +17,9 @@ typedef struct lruFast {
static void hit(struct replacementPolicy * r, void * p) {
lruFast * l = r->impl;
// node_t * n = LH_ENTRY(find)(l->hash, &id, sizeof(int));
node_t * n = l->getNode(p, l->conf);
if(!n) { return; } // ignore attempts to hit pages not in lru
assert(n);
LL_ENTRY(removeNoFree)(l->lru, n);
LL_ENTRY(pushNode)(l->lru, n);
}
@ -30,25 +29,29 @@ static void* getStale(struct replacementPolicy * r) {
}
static void* remove(struct replacementPolicy* r, void * p) {
lruFast * l = r->impl;
node_t * n = l->getNode(p, l->conf); //LH_ENTRY(remove)(l->hash, &id, sizeof(int));
node_t * n = l->getNode(p, l->conf);
assert(n);
value_t * v = n->v;
LL_ENTRY(remove)(l->lru, n);
l->setNode(p, 0, l->conf);
return v;
}
static void* getStaleAndRemove(struct replacementPolicy* r) {
lruFast * l = r->impl;
void * ret = LL_ENTRY(shift)(l->lru);
l->setNode(ret, 0, l->conf);
return ret;
}
static void insert(struct replacementPolicy* r,
void * p) {
lruFast * l = r->impl;
node_t * n = LL_ENTRY(push)(l->lru, p);
// LH_ENTRY(insert)(l->hash, &id, sizeof(int), n);
l->setNode(p, n, l->conf);
}
static void deinit(struct replacementPolicy * r) {
lruFast * l = r->impl;
// the node_t's get freed by LL_ENTRY. It's the caller's
// responsibility to free the void *'s passed into us.
//LH_ENTRY(destroy)(l->hash);
LL_ENTRY(destroy)(l->lru);
free(l);
free(r);
@ -64,6 +67,7 @@ replacementPolicy * lruFastInit(
ret->hit = hit;
ret->getStale = getStale;
ret->remove = remove;
ret->getStaleAndRemove = getStaleAndRemove;
ret->insert = insert;
lruFast * l = malloc(sizeof(lruFast));
l->lru = LL_ENTRY(create)();

View file

@ -37,13 +37,13 @@ static void* tsRemove (struct replacementPolicy* impl, void * page) {
pthread_mutex_unlock(&rp->mut);
return ret;
}
//static void* tsGetStaleAndRemove (struct replacementPolicy* impl) {
// stasis_replacement_policy_threadsafe_wrapper_t * rp = impl->impl;
// pthread_mutex_lock(&rp->mut);
// void *ret = rp->impl->getStaleAndRemove(rp->impl);
// pthread_mutex_unlock(&rp->mut);
// return ret;
//}
static void* tsGetStaleAndRemove (struct replacementPolicy* impl) {
stasis_replacement_policy_threadsafe_wrapper_t * rp = impl->impl;
pthread_mutex_lock(&rp->mut);
void *ret = rp->impl->getStaleAndRemove(rp->impl);
pthread_mutex_unlock(&rp->mut);
return ret;
}
static void tsInsert (struct replacementPolicy* impl, void * page) {
stasis_replacement_policy_threadsafe_wrapper_t * rp = impl->impl;
pthread_mutex_lock(&rp->mut);
@ -60,7 +60,7 @@ replacementPolicy* replacementPolicyThreadsafeWrapperInit(replacementPolicy* rp)
ret->deinit = tsDeinit;
ret->hit = tsHit;
ret->getStale = tsGetStale;
// ret->getStaleAndRemove = tsGetStaleAndRemove;
ret->getStaleAndRemove = tsGetStaleAndRemove;
ret->remove = tsRemove;
ret->insert = tsInsert;
ret->impl = rpw;

View file

@ -28,6 +28,7 @@ typedef struct replacementPolicy {
void (*hit) (struct replacementPolicy* impl, void * page);
void*(*getStale)(struct replacementPolicy* impl);
void*(*remove) (struct replacementPolicy* impl, void * page);
void*(*getStaleAndRemove)(struct replacementPolicy* impl);
void (*insert) (struct replacementPolicy* impl, void * page);
void * impl;
} replacementPolicy;