From b4a0f2436be18fd222dcec49dfa0011eb9e96e1c Mon Sep 17 00:00:00 2001 From: Sears Russell Date: Mon, 9 Nov 2009 20:53:05 +0000 Subject: [PATCH] concurrent lru implementation for new buffer manager --- src/stasis/CMakeLists.txt | 2 +- src/stasis/Makefile.am | 2 +- .../bufferManager/concurrentBufferManager.c | 9 +- .../replacementPolicy/concurrentWrapper.c | 103 ++++++++++++++++++ src/stasis/replacementPolicy/lruFast.c | 2 +- stasis/replacementPolicy.h | 2 + 6 files changed, 114 insertions(+), 6 deletions(-) create mode 100644 src/stasis/replacementPolicy/concurrentWrapper.c diff --git a/src/stasis/CMakeLists.txt b/src/stasis/CMakeLists.txt index 89888ac..ffb7cc6 100644 --- a/src/stasis/CMakeLists.txt +++ b/src/stasis/CMakeLists.txt @@ -43,6 +43,6 @@ ADD_LIBRARY(stasis crc32.c redblack.c lhtable.c concurrentHash.c rw.c doubleLink io/non_blocking.c io/debug.c io/handle.c bufferManager/pageArray.c bufferManager/bufferHash.c replacementPolicy/lru.c - replacementPolicy/lruFast.c replacementPolicy/threadsafeWrapper.c) + replacementPolicy/lruFast.c replacementPolicy/threadsafeWrapper.c replacementPolicy/concurrentWrapper.c) INSTALL(TARGETS stasis LIBRARY DESTINATION lib) diff --git a/src/stasis/Makefile.am b/src/stasis/Makefile.am index 6293709..d90a788 100644 --- a/src/stasis/Makefile.am +++ b/src/stasis/Makefile.am @@ -34,5 +34,5 @@ libstasis_la_SOURCES=crc32.c redblack.c lhtable.c rw.c doubleLinkedList.c common bufferManager/legacy/pageFile.c \ bufferManager/legacy/pageCache.c \ bufferManager/legacy/legacyBufferManager.c \ - replacementPolicy/lru.c replacementPolicy/lruFast.c replacementPolicy/threadsafeWrapper.c + replacementPolicy/lru.c replacementPolicy/lruFast.c replacementPolicy/threadsafeWrapper.c replacementPolicy/concurrentWrapper.c AM_CFLAGS=${GLOBAL_CFLAGS} diff --git a/src/stasis/bufferManager/concurrentBufferManager.c b/src/stasis/bufferManager/concurrentBufferManager.c index 4346a53..70a2233 100644 --- a/src/stasis/bufferManager/concurrentBufferManager.c +++ b/src/stasis/bufferManager/concurrentBufferManager.c @@ -261,10 +261,13 @@ stasis_buffer_manager_t* stasis_buffer_manager_concurrent_hash_open(stasis_page_ #endif ch->buffer_pool = stasis_buffer_pool_init(); - - ch->lru = lruFastInit(pageGetNode, pageSetNode, 0); - ch->lru = replacementPolicyThreadsafeWrapperInit(ch->lru); + replacementPolicy ** lrus = malloc(sizeof(lrus[0]) * 37); + for(int i = 0; i < 37; i++) { + lrus[i] = lruFastInit(pageGetNode, pageSetNode, 0); + } + ch->lru = replacementPolicyConcurrentWrapperInit(lrus, 37); ch->ht = hashtable_init(MAX_BUFFER_SIZE * 4); + free(lrus); for(int i = 0; i < MAX_BUFFER_SIZE; i++) { Page *p = stasis_buffer_pool_malloc_page(ch->buffer_pool); diff --git a/src/stasis/replacementPolicy/concurrentWrapper.c b/src/stasis/replacementPolicy/concurrentWrapper.c new file mode 100644 index 0000000..0380b01 --- /dev/null +++ b/src/stasis/replacementPolicy/concurrentWrapper.c @@ -0,0 +1,103 @@ +/* + * threadsafeWrapper.c + * + * Created on: Oct 20, 2009 + * Author: sears + */ +#include +#include +#include + +typedef struct { + replacementPolicy** impl; + pthread_mutex_t* mut; + int num_buckets; + pthread_key_t next_bucket; +} stasis_replacement_policy_concurrent_wrapper_t; + +static inline unsigned int bucket_hash(stasis_replacement_policy_concurrent_wrapper_t * rpw, void * page) { + return ((unsigned long long)((Page*)page)->id) % rpw->num_buckets; +} + +static void cwDeinit (struct replacementPolicy* impl) { + stasis_replacement_policy_concurrent_wrapper_t * rp = impl->impl; + for(int i = 0; i < rp->num_buckets; i++) { + rp->impl[i]->deinit(rp->impl[i]); + pthread_mutex_destroy(&rp->mut[i]); + } + pthread_key_delete(rp->next_bucket); + free(rp->impl); + free(rp->mut); + free(rp); + free(impl); +} +static void cwHit (struct replacementPolicy* impl, void * page) { + stasis_replacement_policy_concurrent_wrapper_t * rp = impl->impl; + unsigned int bucket = bucket_hash(rp, page); + pthread_mutex_lock(&rp->mut[bucket]); + rp->impl[bucket]->hit(rp->impl[bucket], page); + pthread_mutex_unlock(&rp->mut[bucket]); +} +static void* cwGetStale(struct replacementPolicy* impl) { + stasis_replacement_policy_concurrent_wrapper_t * rp = impl->impl; + intptr_t bucket = (intptr_t)pthread_getspecific(rp->next_bucket); + intptr_t oldbucket = bucket; + void *ret = 0; + while(ret == 0) { + while(pthread_mutex_trylock(&rp->mut[bucket])) { + bucket = (bucket + 1) % rp->num_buckets; + } + ret = rp->impl[bucket]->getStale(rp->impl[bucket]); + pthread_mutex_unlock(&rp->mut[bucket]); + bucket = (bucket + 1) % rp->num_buckets; + } + if(bucket != oldbucket) { + pthread_setspecific(rp->next_bucket, (void*) bucket); + } + return ret; +} +static void* cwRemove (struct replacementPolicy* impl, void * page) { + stasis_replacement_policy_concurrent_wrapper_t * rp = impl->impl; + unsigned int bucket = bucket_hash(rp, page); + pthread_mutex_lock(&rp->mut[bucket]); + void *ret = rp->impl[bucket]->remove(rp->impl[bucket], page); + pthread_mutex_unlock(&rp->mut[bucket]); + return ret; +} +//static void* tsGetStaleAndRemove (struct replacementPolicy* impl) { +// stasis_replacement_policy_threadsafe_wrapper_t * rp = impl->impl; +// pthread_mutex_lock(&rp->mut); +// void *ret = rp->impl->getStaleAndRemove(rp->impl); +// pthread_mutex_unlock(&rp->mut); +// return ret; +//} +static void cwInsert (struct replacementPolicy* impl, void * page) { + stasis_replacement_policy_concurrent_wrapper_t * rp = impl->impl; + unsigned int bucket = bucket_hash(rp, page); + pthread_mutex_lock(&rp->mut[bucket]); + rp->impl[bucket]->insert(rp->impl[bucket], page); + pthread_mutex_unlock(&rp->mut[bucket]); +} + +replacementPolicy* replacementPolicyConcurrentWrapperInit(replacementPolicy** rp, int count) { + replacementPolicy *ret = malloc(sizeof(*ret)); + stasis_replacement_policy_concurrent_wrapper_t * rpw = malloc(sizeof(*rpw)); + rpw->mut = malloc(sizeof(rpw->mut[0]) * count); + rpw->impl = malloc(sizeof(rpw->impl[0]) * count); + for(int i = 0; i < count; i++) { + pthread_mutex_init(&rpw->mut[i],0); + rpw->impl[i] = rp[i]; + } + rpw->num_buckets = count; + pthread_key_create(&rpw->next_bucket,0); + pthread_setspecific(rpw->next_bucket, (void*)0); + ret->init = NULL; + ret->deinit = cwDeinit; + ret->hit = cwHit; + ret->getStale = cwGetStale; +// ret->getStaleAndRemove = tsGetStaleAndRemove; + ret->remove = cwRemove; + ret->insert = cwInsert; + ret->impl = rpw; + return ret; +} diff --git a/src/stasis/replacementPolicy/lruFast.c b/src/stasis/replacementPolicy/lruFast.c index 3891fd1..948467a 100644 --- a/src/stasis/replacementPolicy/lruFast.c +++ b/src/stasis/replacementPolicy/lruFast.c @@ -20,7 +20,7 @@ static void hit(struct replacementPolicy * r, void * p) { lruFast * l = r->impl; // node_t * n = LH_ENTRY(find)(l->hash, &id, sizeof(int)); node_t * n = l->getNode(p, l->conf); - assert(n); + if(!n) { return; } // ignore attempts to hit pages not in lru LL_ENTRY(removeNoFree)(l->lru, n); LL_ENTRY(pushNode)(l->lru, n); } diff --git a/stasis/replacementPolicy.h b/stasis/replacementPolicy.h index 7c8dcbf..a5e2b9f 100644 --- a/stasis/replacementPolicy.h +++ b/stasis/replacementPolicy.h @@ -39,3 +39,5 @@ replacementPolicy * lruFastInit( struct LL_ENTRY(node_t) * n, void * conf), void * conf); +replacementPolicy* replacementPolicyThreadsafeWrapperInit(replacementPolicy* rp); +replacementPolicy* replacementPolicyConcurrentWrapperInit(replacementPolicy** rp, int count);