concurrent lru implementation for new buffer manager

This commit is contained in:
Sears Russell 2009-11-09 20:53:05 +00:00
parent 8bd0616f97
commit b4a0f2436b
6 changed files with 114 additions and 6 deletions

View file

@ -43,6 +43,6 @@ ADD_LIBRARY(stasis crc32.c redblack.c lhtable.c concurrentHash.c rw.c doubleLink
io/non_blocking.c io/debug.c io/handle.c
bufferManager/pageArray.c
bufferManager/bufferHash.c replacementPolicy/lru.c
replacementPolicy/lruFast.c replacementPolicy/threadsafeWrapper.c)
replacementPolicy/lruFast.c replacementPolicy/threadsafeWrapper.c replacementPolicy/concurrentWrapper.c)
INSTALL(TARGETS stasis LIBRARY DESTINATION lib)

View file

@ -34,5 +34,5 @@ libstasis_la_SOURCES=crc32.c redblack.c lhtable.c rw.c doubleLinkedList.c common
bufferManager/legacy/pageFile.c \
bufferManager/legacy/pageCache.c \
bufferManager/legacy/legacyBufferManager.c \
replacementPolicy/lru.c replacementPolicy/lruFast.c replacementPolicy/threadsafeWrapper.c
replacementPolicy/lru.c replacementPolicy/lruFast.c replacementPolicy/threadsafeWrapper.c replacementPolicy/concurrentWrapper.c
AM_CFLAGS=${GLOBAL_CFLAGS}

View file

@ -261,10 +261,13 @@ stasis_buffer_manager_t* stasis_buffer_manager_concurrent_hash_open(stasis_page_
#endif
ch->buffer_pool = stasis_buffer_pool_init();
ch->lru = lruFastInit(pageGetNode, pageSetNode, 0);
ch->lru = replacementPolicyThreadsafeWrapperInit(ch->lru);
replacementPolicy ** lrus = malloc(sizeof(lrus[0]) * 37);
for(int i = 0; i < 37; i++) {
lrus[i] = lruFastInit(pageGetNode, pageSetNode, 0);
}
ch->lru = replacementPolicyConcurrentWrapperInit(lrus, 37);
ch->ht = hashtable_init(MAX_BUFFER_SIZE * 4);
free(lrus);
for(int i = 0; i < MAX_BUFFER_SIZE; i++) {
Page *p = stasis_buffer_pool_malloc_page(ch->buffer_pool);

View file

@ -0,0 +1,103 @@
/*
* threadsafeWrapper.c
*
* Created on: Oct 20, 2009
* Author: sears
*/
#include <stasis/common.h>
#include <stasis/page.h>
#include <stasis/replacementPolicy.h>
typedef struct {
replacementPolicy** impl;
pthread_mutex_t* mut;
int num_buckets;
pthread_key_t next_bucket;
} stasis_replacement_policy_concurrent_wrapper_t;
static inline unsigned int bucket_hash(stasis_replacement_policy_concurrent_wrapper_t * rpw, void * page) {
return ((unsigned long long)((Page*)page)->id) % rpw->num_buckets;
}
static void cwDeinit (struct replacementPolicy* impl) {
stasis_replacement_policy_concurrent_wrapper_t * rp = impl->impl;
for(int i = 0; i < rp->num_buckets; i++) {
rp->impl[i]->deinit(rp->impl[i]);
pthread_mutex_destroy(&rp->mut[i]);
}
pthread_key_delete(rp->next_bucket);
free(rp->impl);
free(rp->mut);
free(rp);
free(impl);
}
static void cwHit (struct replacementPolicy* impl, void * page) {
stasis_replacement_policy_concurrent_wrapper_t * rp = impl->impl;
unsigned int bucket = bucket_hash(rp, page);
pthread_mutex_lock(&rp->mut[bucket]);
rp->impl[bucket]->hit(rp->impl[bucket], page);
pthread_mutex_unlock(&rp->mut[bucket]);
}
static void* cwGetStale(struct replacementPolicy* impl) {
stasis_replacement_policy_concurrent_wrapper_t * rp = impl->impl;
intptr_t bucket = (intptr_t)pthread_getspecific(rp->next_bucket);
intptr_t oldbucket = bucket;
void *ret = 0;
while(ret == 0) {
while(pthread_mutex_trylock(&rp->mut[bucket])) {
bucket = (bucket + 1) % rp->num_buckets;
}
ret = rp->impl[bucket]->getStale(rp->impl[bucket]);
pthread_mutex_unlock(&rp->mut[bucket]);
bucket = (bucket + 1) % rp->num_buckets;
}
if(bucket != oldbucket) {
pthread_setspecific(rp->next_bucket, (void*) bucket);
}
return ret;
}
static void* cwRemove (struct replacementPolicy* impl, void * page) {
stasis_replacement_policy_concurrent_wrapper_t * rp = impl->impl;
unsigned int bucket = bucket_hash(rp, page);
pthread_mutex_lock(&rp->mut[bucket]);
void *ret = rp->impl[bucket]->remove(rp->impl[bucket], page);
pthread_mutex_unlock(&rp->mut[bucket]);
return ret;
}
//static void* tsGetStaleAndRemove (struct replacementPolicy* impl) {
// stasis_replacement_policy_threadsafe_wrapper_t * rp = impl->impl;
// pthread_mutex_lock(&rp->mut);
// void *ret = rp->impl->getStaleAndRemove(rp->impl);
// pthread_mutex_unlock(&rp->mut);
// return ret;
//}
static void cwInsert (struct replacementPolicy* impl, void * page) {
stasis_replacement_policy_concurrent_wrapper_t * rp = impl->impl;
unsigned int bucket = bucket_hash(rp, page);
pthread_mutex_lock(&rp->mut[bucket]);
rp->impl[bucket]->insert(rp->impl[bucket], page);
pthread_mutex_unlock(&rp->mut[bucket]);
}
replacementPolicy* replacementPolicyConcurrentWrapperInit(replacementPolicy** rp, int count) {
replacementPolicy *ret = malloc(sizeof(*ret));
stasis_replacement_policy_concurrent_wrapper_t * rpw = malloc(sizeof(*rpw));
rpw->mut = malloc(sizeof(rpw->mut[0]) * count);
rpw->impl = malloc(sizeof(rpw->impl[0]) * count);
for(int i = 0; i < count; i++) {
pthread_mutex_init(&rpw->mut[i],0);
rpw->impl[i] = rp[i];
}
rpw->num_buckets = count;
pthread_key_create(&rpw->next_bucket,0);
pthread_setspecific(rpw->next_bucket, (void*)0);
ret->init = NULL;
ret->deinit = cwDeinit;
ret->hit = cwHit;
ret->getStale = cwGetStale;
// ret->getStaleAndRemove = tsGetStaleAndRemove;
ret->remove = cwRemove;
ret->insert = cwInsert;
ret->impl = rpw;
return ret;
}

View file

@ -20,7 +20,7 @@ static void hit(struct replacementPolicy * r, void * p) {
lruFast * l = r->impl;
// node_t * n = LH_ENTRY(find)(l->hash, &id, sizeof(int));
node_t * n = l->getNode(p, l->conf);
assert(n);
if(!n) { return; } // ignore attempts to hit pages not in lru
LL_ENTRY(removeNoFree)(l->lru, n);
LL_ENTRY(pushNode)(l->lru, n);
}

View file

@ -39,3 +39,5 @@ replacementPolicy * lruFastInit(
struct LL_ENTRY(node_t) * n,
void * conf),
void * conf);
replacementPolicy* replacementPolicyThreadsafeWrapperInit(replacementPolicy* rp);
replacementPolicy* replacementPolicyConcurrentWrapperInit(replacementPolicy** rp, int count);