From fbdc102d6ca42851f14705636c664bbecb74634b Mon Sep 17 00:00:00 2001 From: Sears Russell Date: Wed, 20 Apr 2011 20:25:17 +0000 Subject: [PATCH] change replacementPolicy to avoid a malloc / free during page pins --- src/stasis/bufferManager/bufferHash.c | 56 +++------ .../bufferManager/concurrentBufferManager.c | 22 +--- src/stasis/bufferManager/legacy/pageCache.c | 12 +- src/stasis/bufferManager/pageArray.c | 2 +- .../replacementPolicy/concurrentWrapper.c | 14 +-- src/stasis/replacementPolicy/lru.c | 13 +- src/stasis/replacementPolicy/lruFast.c | 116 ++++++++++-------- .../replacementPolicy/threadsafeWrapper.c | 10 +- stasis/page.h | 6 +- stasis/replacementPolicy.h | 20 +-- test/stasis/check_replacementPolicy.c | 82 +++++-------- 11 files changed, 157 insertions(+), 196 deletions(-) diff --git a/src/stasis/bufferManager/bufferHash.c b/src/stasis/bufferManager/bufferHash.c index a8f1bfa..711aa10 100644 --- a/src/stasis/bufferManager/bufferHash.c +++ b/src/stasis/bufferManager/bufferHash.c @@ -12,7 +12,6 @@ #include #include -#include #include #include @@ -48,24 +47,6 @@ typedef struct { pageid_t prefetch_next_count; } stasis_buffer_hash_t; -typedef struct LL_ENTRY(node_t) node_t; - -static node_t * pageGetNode(void * page, void * ignore) { - Page * p = page; - return (node_t*)p->prev; -} -static void pageSetNode(void * page, node_t * n, void * ignore) { - Page * p = page; - p->prev = (Page *) n; -} - -static inline struct Page_s ** pagePendingPtr(Page * p) { - return ((struct Page_s **)(&((p)->next))); -} -static inline intptr_t* pagePinCountPtr(void * page) { - Page * p = page; - return ((intptr_t*)(&((p)->queue))); -} static inline int needFlush(stasis_buffer_manager_t * bm) { stasis_buffer_hash_t *bh = bm->impl; pageid_t count = stasis_dirty_page_table_dirty_count(bh->dpt); @@ -80,7 +61,7 @@ static inline int needFlush(stasis_buffer_manager_t * bm) { inline static void checkPageState(Page * p) { Page * check = LH_ENTRY(find)(cachedPages, &(p->id), sizeof(p->id)); if(check) { - int pending = *pagePendingPtr(p); + int pending = p->pending; int pinned = *pagePinCountPtr(p); if((!pinned) && (!pending)) { assert(pageGetNode(p, 0)); @@ -94,7 +75,7 @@ inline static void checkPageState(Page * p) { assert(notfound); } else { assert(!pageGetNode(p,0)); - assert(!*pagePendingPtr(p)); + assert(!p->pending); assert(!*pagePinCountPtr(p)); int found = 0; for(pageid_t i = 0; i < freeCount; i++) { @@ -122,7 +103,7 @@ static int bhTryToWriteBackPage(stasis_buffer_manager_t *bm, pageid_t page) { assert(p->id == page); - if(*pagePendingPtr(p) || *pagePinCountPtr(p)) { + if(p->pending || p->pinCount) { pthread_mutex_unlock(&bh->mut); return EBUSY; } @@ -141,9 +122,9 @@ inline static Page * getFreePage(stasis_buffer_manager_t *bm) { if(bh->pageCount < stasis_buffer_manager_size) { ret = stasis_buffer_pool_malloc_page(bh->buffer_pool); stasis_buffer_pool_free_page(bh->buffer_pool, ret,-1); - (*pagePendingPtr(ret)) = 0; - pageSetNode(ret,0,0); - (*pagePinCountPtr(ret)) = 1; // to match what happens after the next block calls lru->remove() + ret->pending = 0; + ret->next = ret->prev = NULL; + ret->pinCount = 1; // to match what happens after the next block calls lru->remove() bh->pageCount++; } else { while((ret = bh->lru->getStale(bh->lru))) { @@ -152,8 +133,8 @@ inline static Page * getFreePage(stasis_buffer_manager_t *bm) { printf("bufferHash.c: Cannot find free page for application request.\nbufferHash.c: This should not happen unless all pages have been pinned.\nbufferHash.c: Crashing."); abort(); } - assert(!*pagePinCountPtr(ret)); - assert(!*pagePendingPtr(ret)); + assert(!ret->pinCount); + assert(!ret->pending); if(ret->dirty) { pthread_mutex_unlock(&bh->mut); DEBUG("Blocking app thread"); @@ -170,9 +151,9 @@ inline static Page * getFreePage(stasis_buffer_manager_t *bm) { Page * check = LH_ENTRY(remove)(bh->cachedPages, &ret->id, sizeof(ret->id)); assert(check == ret); } - assert(!*pagePendingPtr(ret)); - assert(!pageGetNode(ret,0)); - assert(1 == *pagePinCountPtr(ret)); // was zero before this call... + assert(!ret->pending); + assert(ret->next == NULL); + assert(1 == ret->pinCount); // was zero before this call... assert(!ret->dirty); return ret; } @@ -211,7 +192,7 @@ static Page * bhGetCachedPage(stasis_buffer_manager_t* bm, int xid, const pageid int locked = tryreadlock(ret->loadlatch,0); assert(locked); #endif - if(!*pagePendingPtr(ret)) { + if(!ret->pending) { bh->lru->remove(bh->lru, ret); checkPageState(ret); @@ -247,7 +228,7 @@ static Page * bhLoadPageImpl_helper(stasis_buffer_manager_t* bm, stasis_buffer_m // (If ret == 0, then no...) while(ret) { checkPageState(ret); - if(*pagePendingPtr(ret)) { + if(ret->pending) { pthread_cond_wait(&bh->readComplete, &bh->mut); if(ret->id != pageid) { ret = LH_ENTRY(find)(bh->cachedPages, &pageid, sizeof(pageid)); @@ -295,7 +276,7 @@ static Page * bhLoadPageImpl_helper(stasis_buffer_manager_t* bm, stasis_buffer_m } while(1); // Add a pending entry to cachedPages to block like-minded threads and writeback - (*pagePendingPtr(ret)) = (void*)1; + ret->pending = 1; check = LH_ENTRY(insert)(bh->cachedPages,&pageid,sizeof(pageid), ret); assert(!check); @@ -319,11 +300,11 @@ static Page * bhLoadPageImpl_helper(stasis_buffer_manager_t* bm, stasis_buffer_m assert(!ret->dirty); stasis_uninitialized_page_loaded(xid, ret); } - *pagePendingPtr(ret) = 0; + ret->pending = 0; // Would remove from lru, but getFreePage() guarantees that it isn't // there. - assert(!pageGetNode(ret, 0)); + assert(ret->next == 0); #ifdef LATCH_SANITY_CHECKING int locked = tryreadlock(ret->loadlatch, 0); @@ -449,7 +430,7 @@ static void bhBufDeinit(stasis_buffer_manager_t * bm) { while((next = LH_ENTRY(readlist)(&iter))) { Page * p = next->value; assertunlocked(p->rwlatch); - assert(0 == *pagePinCountPtr(p)); + assert(0 == p->pinCount); readlock(p->rwlatch,0); assert(!stasis_dirty_page_table_is_dirty(bh->dpt, p)); unlock(p->rwlatch); @@ -479,6 +460,7 @@ static void bhSimulateBufferManagerCrash(stasis_buffer_manager_t *bm) { pthread_cond_signal(&bh->needFree); pthread_join(bh->worker, 0); + pthread_cond_broadcast(&bh->prefetch_waiting); for(int i = 0; i < bh->prefetch_thread_count; i++) { pthread_join(bh->prefetch_workers[i], 0); } @@ -549,7 +531,7 @@ stasis_buffer_manager_t* stasis_buffer_manager_hash_open(stasis_page_handle_t * bh->buffer_pool = stasis_buffer_pool_init(); - bh->lru = lruFastInit(pageGetNode, pageSetNode, pagePinCountPtr, 0); + bh->lru = lruFastInit(); bh->cachedPages = LH_ENTRY(create)(stasis_buffer_manager_size); diff --git a/src/stasis/bufferManager/concurrentBufferManager.c b/src/stasis/bufferManager/concurrentBufferManager.c index b478a26..6cfdc39 100644 --- a/src/stasis/bufferManager/concurrentBufferManager.c +++ b/src/stasis/bufferManager/concurrentBufferManager.c @@ -5,7 +5,6 @@ * Author: sears */ #include -#include #include #include #include @@ -39,21 +38,6 @@ typedef struct { pthread_cond_t needFree; } stasis_buffer_concurrent_hash_t; -typedef struct LL_ENTRY(node_t) node_t; - -static node_t * pageGetNode(void * page, void * ignore) { - Page * p = page; - return (node_t*)p->prev; -} -static void pageSetNode(void * page, node_t * n, void * ignore) { - Page * p = page; - p->prev = (Page *) n; -} -static inline intptr_t* pagePinCountPtr(void * page) { - Page * p = page; - return ((intptr_t*)(&((p)->queue))); -} - static inline int needFlush(stasis_buffer_manager_t * bm) { stasis_buffer_concurrent_hash_t *bh = bm->impl; pageid_t count = stasis_dirty_page_table_dirty_count(bh->dpt); @@ -442,7 +426,7 @@ stasis_buffer_manager_t* stasis_buffer_manager_concurrent_hash_open(stasis_page_ #ifdef CONCURRENT_LRU replacementPolicy ** lrus = malloc(sizeof(lrus[0]) * 37); for(int i = 0; i < 37; i++) { - lrus[i] = lruFastInit(pageGetNode, pageSetNode, pagePinCountPtr, 0); + lrus[i] = lruFastInit(); } ch->lru = replacementPolicyConcurrentWrapperInit(lrus, 37); free(lrus); @@ -454,8 +438,8 @@ stasis_buffer_manager_t* stasis_buffer_manager_concurrent_hash_open(stasis_page_ for(pageid_t i = 0; i < stasis_buffer_manager_size; i++) { Page *p = stasis_buffer_pool_malloc_page(ch->buffer_pool); stasis_buffer_pool_free_page(ch->buffer_pool, p,(-1*i)-2); - pageSetNode(p,0,0); - (*pagePinCountPtr(p)) = 1; + p->prev = p->next = NULL; + p->pinCount = 1; ch->lru->insert(ch->lru, p); // decrements pin count ptr (setting it to zero) hashtable_insert(ch->ht, p->id, p); } diff --git a/src/stasis/bufferManager/legacy/pageCache.c b/src/stasis/bufferManager/legacy/pageCache.c index 3b3133d..3dcac90 100644 --- a/src/stasis/bufferManager/legacy/pageCache.c +++ b/src/stasis/bufferManager/legacy/pageCache.c @@ -68,7 +68,7 @@ static void middleInsert(Page *ret) { ret->next = repMiddle; repMiddle->prev = ret; ret->prev->next = ret; - ret->queue = 2; + ret->pinCount = 2; repMiddle = ret; assert(ret->next != ret && ret->prev != ret); @@ -121,12 +121,12 @@ void cacheInsertPage (Page * ret) { */ repMiddle = repHead; for(pageid_t i = 0; i < stasis_buffer_manager_size / 3; i++ ) { - repMiddle->queue = 1; + repMiddle->pinCount = 1; repMiddle = repMiddle->next; } for( iter = repMiddle; iter; iter = iter->next ) { - iter->queue = 2; + iter->pinCount = 2; } } else { /* Just insert it. */ headInsert(ret); @@ -152,12 +152,12 @@ void cacheHitOnPage(Page * ret) { headInsert(ret); assert(ret->next != ret && ret->prev != ret); - if( ret->queue == 2 ) { + if( ret->pinCount == 2 ) { /* keep first queue same size */ repMiddle = repMiddle->prev; - repMiddle->queue = 2; + repMiddle->pinCount = 2; - ret->queue = 1; + ret->pinCount = 1; } } } diff --git a/src/stasis/bufferManager/pageArray.c b/src/stasis/bufferManager/pageArray.c index 5cc9ad2..161379f 100644 --- a/src/stasis/bufferManager/pageArray.c +++ b/src/stasis/bufferManager/pageArray.c @@ -30,7 +30,7 @@ static Page * paLoadPage(stasis_buffer_manager_t *bm, stasis_buffer_manager_hand pa->pageMap[pageid]->dirty = 0; pa->pageMap[pageid]->next = 0; pa->pageMap[pageid]->prev = 0; - pa->pageMap[pageid]->queue = 0; + pa->pageMap[pageid]->pinCount = 0; pa->pageMap[pageid]->inCache = 1; pa->pageMap[pageid]->rwlatch = initlock(); pa->pageMap[pageid]->loadlatch = initlock(); diff --git a/src/stasis/replacementPolicy/concurrentWrapper.c b/src/stasis/replacementPolicy/concurrentWrapper.c index 85596b7..ff1ee66 100644 --- a/src/stasis/replacementPolicy/concurrentWrapper.c +++ b/src/stasis/replacementPolicy/concurrentWrapper.c @@ -46,18 +46,18 @@ static void cwDeinit (struct replacementPolicy* impl) { free(rp); free(impl); } -static void cwHit (struct replacementPolicy* impl, void * page) { +static void cwHit (struct replacementPolicy* impl, Page* page) { stasis_replacement_policy_concurrent_wrapper_t * rp = impl->impl; unsigned int bucket = bucket_hash(rp, page); pthread_mutex_lock(&rp->mut[bucket]); rp->impl[bucket]->hit(rp->impl[bucket], page); pthread_mutex_unlock(&rp->mut[bucket]); } -static void* cwGetStaleHelper(struct replacementPolicy* impl, void*(*func)(struct replacementPolicy*)) { +static Page* cwGetStaleHelper(struct replacementPolicy* impl, Page*(*func)(struct replacementPolicy*)) { stasis_replacement_policy_concurrent_wrapper_t * rp = impl->impl; intptr_t bucket = (intptr_t)pthread_getspecific(rp->next_bucket); intptr_t oldbucket = bucket; - void *ret = 0; + Page*ret = 0; int spin_count = 0; while(ret == 0 && spin_count < rp->num_buckets) { int err; @@ -119,15 +119,15 @@ static void* cwGetStaleHelper(struct replacementPolicy* impl, void*(*func)(struc } return ret; } -static void* cwGetStale(struct replacementPolicy* impl) { +static Page* cwGetStale(struct replacementPolicy* impl) { stasis_replacement_policy_concurrent_wrapper_t * rp = impl->impl; return cwGetStaleHelper(impl, rp->impl[0]->getStale); } -static void* cwGetStaleAndRemove(struct replacementPolicy* impl) { +static Page* cwGetStaleAndRemove(struct replacementPolicy* impl) { stasis_replacement_policy_concurrent_wrapper_t * rp = impl->impl; return cwGetStaleHelper(impl, rp->impl[0]->getStaleAndRemove); } -static void* cwRemove (struct replacementPolicy* impl, void * page) { +static Page* cwRemove (struct replacementPolicy* impl, Page* page) { stasis_replacement_policy_concurrent_wrapper_t * rp = impl->impl; unsigned int bucket = bucket_hash(rp, page); pthread_mutex_lock(&rp->mut[bucket]); @@ -135,7 +135,7 @@ static void* cwRemove (struct replacementPolicy* impl, void * page) { pthread_mutex_unlock(&rp->mut[bucket]); return ret; } -static void cwInsert (struct replacementPolicy* impl, void * page) { +static void cwInsert (struct replacementPolicy* impl, Page* page) { stasis_replacement_policy_concurrent_wrapper_t * rp = impl->impl; unsigned int bucket = bucket_hash(rp, page); pthread_mutex_lock(&rp->mut[bucket]); diff --git a/src/stasis/replacementPolicy/lru.c b/src/stasis/replacementPolicy/lru.c index 808874b..65de6b9 100644 --- a/src/stasis/replacementPolicy/lru.c +++ b/src/stasis/replacementPolicy/lru.c @@ -31,7 +31,7 @@ static int stasis_replacement_policy_lru_entry_cmp(const void * ap, const void * : 1; } -static void stasis_replacement_policy_lru_deinit(replacementPolicy* r) { +static void stasis_replacement_policy_lru_deinit(replacementPolicy *r) { //XXX free other stuff stasis_replacement_policy_lru_t * l = r->impl; LH_ENTRY(destroy)(l->hash); @@ -43,7 +43,7 @@ static void stasis_replacement_policy_lru_deinit(replacementPolicy* r) { /** @todo handle clock wraps properly! */ -static void stasis_replacement_policy_lru_hit(replacementPolicy* r, void * p) { +static void stasis_replacement_policy_lru_hit(replacementPolicy *r, Page *p) { stasis_replacement_policy_lru_t * l = r->impl; stasis_replacement_policy_lru_entry * e = l->getNode(p, l->conf); assert(e); @@ -54,12 +54,12 @@ static void stasis_replacement_policy_lru_hit(replacementPolicy* r, void * p) { old = (stasis_replacement_policy_lru_entry *)RB_ENTRY(search)(e, l->lru); assert(e == old); } -static void * stasis_replacement_policy_lru_get_stale(replacementPolicy* r) { +static Page* stasis_replacement_policy_lru_get_stale(replacementPolicy* r) { stasis_replacement_policy_lru_t * l = r->impl; stasis_replacement_policy_lru_entry * e = (stasis_replacement_policy_lru_entry * ) rbmin(l->lru); return e ? e->value : 0; } -static void* stasis_replacement_policy_lru_remove(replacementPolicy* r, void * p) { +static Page* stasis_replacement_policy_lru_remove(replacementPolicy* r, Page *p) { stasis_replacement_policy_lru_t * l = r->impl; stasis_replacement_policy_lru_entry * e = l->getNode(p, l->conf); assert(e); @@ -69,13 +69,13 @@ static void* stasis_replacement_policy_lru_remove(replacementPolicy* r, void * p free(e); return ret; } -static void* stasis_replacement_policy_lru_get_stale_and_remove(replacementPolicy* r) { +static Page* stasis_replacement_policy_lru_get_stale_and_remove(replacementPolicy* r) { void* ret = stasis_replacement_policy_lru_get_stale(r); stasis_replacement_policy_lru_remove(r, ret); return ret; } -static void stasis_replacement_policy_lru_insert(replacementPolicy* r, void * p) { +static void stasis_replacement_policy_lru_insert(replacementPolicy* r, Page* p) { stasis_replacement_policy_lru_t * l = r->impl; stasis_replacement_policy_lru_entry * e = malloc(sizeof(stasis_replacement_policy_lru_entry)); e->value = p; @@ -91,7 +91,6 @@ replacementPolicy * stasis_replacement_policy_lru_init() { stasis_replacement_policy_lru_t * l = malloc(sizeof(stasis_replacement_policy_lru_t)); l->now = 0; l->hash = LH_ENTRY(create)(10); - // l->lru = RB_ENTRY(init)((int(*)(const void*,const void*,const void*))cmp, 0); l->lru = RB_ENTRY(init)(stasis_replacement_policy_lru_entry_cmp, 0); ret->init = stasis_replacement_policy_lru_init; ret->deinit = stasis_replacement_policy_lru_deinit; diff --git a/src/stasis/replacementPolicy/lruFast.c b/src/stasis/replacementPolicy/lruFast.c index c779a33..6690932 100644 --- a/src/stasis/replacementPolicy/lruFast.c +++ b/src/stasis/replacementPolicy/lruFast.c @@ -1,59 +1,89 @@ #include #include #include -#include #include #include -typedef LL_ENTRY(value_t) value_t; -typedef struct LL_ENTRY(node_t) node_t; -typedef struct LL_ENTRY(list) list; +typedef Page List; typedef struct lruFast { - struct LL_ENTRY(list) * lru; - node_t * (*getNode)(void * page, void * conf); - void (*setNode)(void * page, node_t * n, void * conf); - intptr_t*(*derefCount)(void* page); - void * conf; + List list; } lruFast; +static inline void llInit( List *list ) { + bzero( list, sizeof( *list ) ); + list->id = 0xdeadbeef; + list->next = list->prev = list; +} +static inline void llPush( List *list, Page *p ) { + p->next = list; + p->prev = list->prev; + p->next->prev = p; + p->prev->next = p; +} +static inline void llRemoveNoReinit( Page *p ) { + p->prev->next = p->next; + p->next->prev = p->prev; +} +static inline void llRemove( Page *p ) { + llRemoveNoReinit( p ); + p->prev = NULL; + p->next = NULL; +} +static inline int llIsEmpty( List *list ) { + return list->next == list; +} +static inline Page* llHead( List *list ) { + return ( llIsEmpty( list ) ) ? NULL : list->next; +} +static inline Page* llShift( List *list ) { + if(llIsEmpty(list)) return NULL; -static void stasis_lru_fast_hit(struct replacementPolicy * r, void * p) { - lruFast * l = r->impl; - if(ENOENT == LL_ENTRY(remove)(l->lru, p)) { + Page * ret = list->next; + llRemove(ret); + + return ret; +} +static void stasis_lru_fast_hit(struct replacementPolicy * r, Page *p) { + lruFast *l = r->impl; + if( p->prev == NULL ) { // ignore attempts to hit pages not in lru return; } - LL_ENTRY(push)(l->lru, p); + llRemoveNoReinit(p); + llPush(&l->list, p); } -static void* stasis_lru_fast_getStale(struct replacementPolicy * r) { - lruFast * l = r->impl; - return LL_ENTRY(head)(l->lru); +static Page* stasis_lru_fast_getStale(struct replacementPolicy *r) { + lruFast *l = r->impl; + return llHead(&l->list); } -static void* stasis_lru_fast_remove(struct replacementPolicy* r, void * p) { - lruFast * l = r->impl; - void *ret = NULL; +static Page* stasis_lru_fast_remove(struct replacementPolicy* r, Page *p) { + Page *ret = NULL; - if(!*l->derefCount(p)) { - int err = LL_ENTRY(remove)(l->lru, p); - assert(!err || ((Page*)p)->dirty); + if(!p->pinCount) { + if(p->next) { + llRemove(p); + } else { + assert(p->dirty); + } ret = p; } - (*l->derefCount(p))++; + p->pinCount++; + return ret; } -static void* stasis_lru_fast_getStaleAndRemove(struct replacementPolicy* r) { +static Page* stasis_lru_fast_getStaleAndRemove(struct replacementPolicy *r) { lruFast * l = r->impl; - void * ret = LL_ENTRY(shift)(l->lru); + Page *ret = llShift(&l->list); if(ret) { - assert(!(*l->derefCount(ret))); - (*l->derefCount(ret))++; + assert(!ret->pinCount); + ret->pinCount++; } return ret; } -static void stasis_lru_fast_insert(struct replacementPolicy* r, void * p) { +static void stasis_lru_fast_insert(struct replacementPolicy *r, Page *p) { lruFast * l = r->impl; - (*l->derefCount(p))--; - assert(*l->derefCount(p) >= 0); + p->pinCount--; + assert(p->pinCount >= 0); if(stasis_buffer_manager_hint_writes_are_sequential && !stasis_buffer_manager_debug_stress_latching) { // We are in sequential mode, and only want to evict pages from @@ -65,32 +95,21 @@ static void stasis_lru_fast_insert(struct replacementPolicy* r, void * p) { // insert the dirty page. This will cause the buffer manager to perform // all sorts of useless (and otherwise rare) latching operations. - if(!*l->derefCount(p) && !((Page*)p)->dirty) { - int err = LL_ENTRY(push)(l->lru, p); - assert(!err); + if(!p->pinCount && !p->dirty) { + llPush(&l->list, p); } } else { - if(!*l->derefCount(p)) { - int err = LL_ENTRY(push)(l->lru, p); - assert(!err); + if(!p->pinCount) { + llPush(&l->list, p); } } } static void stasis_lru_fast_deinit(struct replacementPolicy * r) { lruFast * l = r->impl; - // the node_t's get freed by LL_ENTRY. It's the caller's - // responsibility to free the void *'s passed into us. - LL_ENTRY(destroy)(l->lru); free(l); free(r); } -replacementPolicy * lruFastInit( - struct LL_ENTRY(node_t) * (*getNode)(void * page, void * conf), - void (*setNode)(void * page, - struct LL_ENTRY(node_t) * n, - void * conf), - intptr_t* (*derefCount)(void *page), - void * conf) { +replacementPolicy * lruFastInit() { struct replacementPolicy * ret = malloc(sizeof(struct replacementPolicy)); ret->deinit = stasis_lru_fast_deinit; ret->hit = stasis_lru_fast_hit; @@ -99,10 +118,7 @@ replacementPolicy * lruFastInit( ret->getStaleAndRemove = stasis_lru_fast_getStaleAndRemove; ret->insert = stasis_lru_fast_insert; lruFast * l = malloc(sizeof(lruFast)); - l->lru = LL_ENTRY(create)(getNode, setNode, conf); - l->getNode = getNode; - l->setNode = setNode; - l->derefCount = derefCount; + llInit(&l->list); ret->impl = l; return ret; } diff --git a/src/stasis/replacementPolicy/threadsafeWrapper.c b/src/stasis/replacementPolicy/threadsafeWrapper.c index dc4e85e..1a9825d 100644 --- a/src/stasis/replacementPolicy/threadsafeWrapper.c +++ b/src/stasis/replacementPolicy/threadsafeWrapper.c @@ -17,34 +17,34 @@ static void tsDeinit (struct replacementPolicy* impl) { rp->impl->deinit(rp->impl); free(impl); } -static void tsHit (struct replacementPolicy* impl, void * page) { +static void tsHit (struct replacementPolicy* impl, Page* page) { stasis_replacement_policy_threadsafe_wrapper_t * rp = impl->impl; pthread_mutex_lock(&rp->mut); rp->impl->hit(rp->impl, page); pthread_mutex_unlock(&rp->mut); } -static void* tsGetStale(struct replacementPolicy* impl) { +static Page* tsGetStale(struct replacementPolicy* impl) { stasis_replacement_policy_threadsafe_wrapper_t * rp = impl->impl; pthread_mutex_lock(&rp->mut); void *ret = rp->impl->getStale(rp->impl); pthread_mutex_unlock(&rp->mut); return ret; } -static void* tsRemove (struct replacementPolicy* impl, void * page) { +static Page* tsRemove (struct replacementPolicy* impl, Page* page) { stasis_replacement_policy_threadsafe_wrapper_t * rp = impl->impl; pthread_mutex_lock(&rp->mut); void *ret = rp->impl->remove(rp->impl, page); pthread_mutex_unlock(&rp->mut); return ret; } -static void* tsGetStaleAndRemove (struct replacementPolicy* impl) { +static Page* tsGetStaleAndRemove (struct replacementPolicy* impl) { stasis_replacement_policy_threadsafe_wrapper_t * rp = impl->impl; pthread_mutex_lock(&rp->mut); void *ret = rp->impl->getStaleAndRemove(rp->impl); pthread_mutex_unlock(&rp->mut); return ret; } -static void tsInsert (struct replacementPolicy* impl, void * page) { +static void tsInsert (struct replacementPolicy* impl, Page* page) { stasis_replacement_policy_threadsafe_wrapper_t * rp = impl->impl; pthread_mutex_lock(&rp->mut); rp->impl->insert(rp->impl, page); diff --git a/stasis/page.h b/stasis/page.h index 0a8db5b..d1f85c5 100644 --- a/stasis/page.h +++ b/stasis/page.h @@ -141,8 +141,10 @@ struct Page_s { struct Page_s *next; /** The previous item in the replacement policy's queue. */ struct Page_s *prev; - /** Which queue is the page in? */ - int queue; + /** How many times has the page been pinned? */ + int pinCount; + /** Is the page pending I/O? (Used by some buffer managers) */ + int pending; /** Is the page in the cache at all? */ int inCache; diff --git a/stasis/replacementPolicy.h b/stasis/replacementPolicy.h index 8816a6e..5498943 100644 --- a/stasis/replacementPolicy.h +++ b/stasis/replacementPolicy.h @@ -20,26 +20,18 @@ */ -#include - typedef struct replacementPolicy { struct replacementPolicy* (*init)(); void (*deinit) (struct replacementPolicy* impl); - void (*hit) (struct replacementPolicy* impl, void * page); - void*(*getStale)(struct replacementPolicy* impl); - void*(*remove) (struct replacementPolicy* impl, void * page); - void*(*getStaleAndRemove)(struct replacementPolicy* impl); - void (*insert) (struct replacementPolicy* impl, void * page); + void (*hit) (struct replacementPolicy* impl, Page* page); + Page* (*getStale)(struct replacementPolicy* impl); + Page* (*remove) (struct replacementPolicy* impl, Page* page); + Page* (*getStaleAndRemove)(struct replacementPolicy* impl); + void (*insert) (struct replacementPolicy* impl, Page* page); void * impl; } replacementPolicy; replacementPolicy * stasis_replacement_policy_lru_init(); -replacementPolicy * lruFastInit( - struct LL_ENTRY(node_t) * (*getNode)(void * page, void * conf), - void (*setNode)(void * page, - struct LL_ENTRY(node_t) * n, - void * conf), - intptr_t* (*derefCount)(void *page), - void * conf); +replacementPolicy * lruFastInit(); replacementPolicy* replacementPolicyThreadsafeWrapperInit(replacementPolicy* rp); replacementPolicy* replacementPolicyConcurrentWrapperInit(replacementPolicy** rp, int count); diff --git a/test/stasis/check_replacementPolicy.c b/test/stasis/check_replacementPolicy.c index aa1666f..7e3774a 100644 --- a/test/stasis/check_replacementPolicy.c +++ b/test/stasis/check_replacementPolicy.c @@ -2,7 +2,6 @@ #include #include -#include #include @@ -16,33 +15,20 @@ #define THREAD_COUNT 10 -typedef struct LL_ENTRY(node_t) node_t; - typedef struct tracker { - pageid_t val; // Must be pageid_t. Otherwise, there will be mismatches inside of concurrentWrapper's hash. - intptr_t key; - intptr_t pinCount; + Page page; int inCache; pthread_mutex_t mut; } tracker; + +#define memberOffset(objType, memberName) ((char*)&((objType*)0)->memberName - (char*)0) +#define memberToObj(objType, memberName, memberPtr) (objType*)((char*)memberPtr - memberOffset(objType,memberName)) +#define pageToTracker(p) memberToObj(tracker,page,p); + static pthread_mutex_t cached_count_mutex = PTHREAD_MUTEX_INITIALIZER; static int threaded = 0; static int cachedCount = 0; -static node_t * getKey(void * page, void * ignore) { - tracker * p = page; - return (node_t*)p->key; -} -static void setKey(void * page, node_t * n, void * ignore) { - tracker * p = page; - p->key = (intptr_t) n; -} -static intptr_t * pinCount(void *page) { - tracker * p = page; - return &p->pinCount; -} - - tracker * t; void randomSetup() { time_t seed = time(0); @@ -53,8 +39,8 @@ void randomSetup() { t = calloc(OBJECT_COUNT, sizeof(tracker)); for(int i = 0; i < OBJECT_COUNT; i++) { - t[i].val = i; - t[i].pinCount = 1; + t[i].page.id = i; + t[i].page.pinCount = 1; } } @@ -75,14 +61,14 @@ void randomTest(replacementPolicy * lru, unsigned long count) { // TOGGLE IN CACHE pthread_mutex_lock(&t[i].mut); if(!t[i].inCache) { - lru->insert(lru, &t[i]); + lru->insert(lru, &t[i].page); t[i].inCache = 1; pthread_mutex_lock(&cached_count_mutex); cachedCount ++; pthread_mutex_unlock(&cached_count_mutex); } else { - void * v = lru->remove(lru, &t[i]); - assert(v == &t[i]); + tracker* tr = pageToTracker( lru->remove(lru, &t[i].page) ); + assert(tr == &t[i]); t[i].inCache = 0; pthread_mutex_lock(&cached_count_mutex); cachedCount --; @@ -91,20 +77,20 @@ void randomTest(replacementPolicy * lru, unsigned long count) { pthread_mutex_unlock(&t[i].mut); } else if(op < 30) { // Get stale + remove - tracker * tr = lru->getStale(lru); + tracker * tr = pageToTracker( lru->getStale(lru) ); if( tr ) { - pthread_mutex_lock(&t[tr->val].mut); + pthread_mutex_lock(&t[tr->page.id].mut); if(tr->inCache) { - assert(tr == &t[tr->val]); - tr = lru->remove(lru, tr); - assert(tr == &t[tr->val]); + assert(tr == &t[tr->page.id]); + tr = pageToTracker( lru->remove(lru, &tr->page) ); + assert(tr == &t[tr->page.id]); tr->inCache = 0; pthread_mutex_lock(&cached_count_mutex); if(!threaded) assert(cachedCount != 0); cachedCount --; pthread_mutex_unlock(&cached_count_mutex); } - pthread_mutex_unlock(&t[tr->val].mut); + pthread_mutex_unlock(&t[tr->page.id].mut); } else { if(!threaded) { assert(cachedCount == 0); @@ -112,20 +98,20 @@ void randomTest(replacementPolicy * lru, unsigned long count) { } } else if(op < 50) { // Get stale - tracker * tr = lru->getStale(lru); + tracker * tr = pageToTracker( lru->getStale(lru) ); if(tr) { - pthread_mutex_lock(&t[tr->val].mut); + pthread_mutex_lock(&t[tr->page.id].mut); if(!threaded) assert(tr->inCache); - assert(tr == &t[tr->val]); + assert(tr == &t[tr->page.id]); if(!threaded) assert(cachedCount != 0); - pthread_mutex_unlock(&t[tr->val].mut); + pthread_mutex_unlock(&t[tr->page.id].mut); } else { if(!threaded) assert(cachedCount == 0); } } else { // Hit pthread_mutex_lock(&t[i].mut); - if(t[i].inCache) lru->hit(lru, &t[i]); + if(t[i].inCache) lru->hit(lru, &t[i].page); pthread_mutex_unlock(&t[i].mut); } } @@ -133,7 +119,7 @@ void randomTest(replacementPolicy * lru, unsigned long count) { void fillThenEmptyTest(replacementPolicy *lru) { for(int i = 0; i < OBJECT_COUNT; i++) { - lru->insert(lru, &t[i]); + lru->insert(lru, &t[i].page); } int j = 0; while(lru->getStaleAndRemove(lru)) { @@ -143,8 +129,8 @@ void fillThenEmptyTest(replacementPolicy *lru) { assert(0 == lru->getStale(lru)); assert(j == OBJECT_COUNT); for(int i = 0; i < OBJECT_COUNT; i++) { - lru->insert(lru, &t[i]); - lru->remove(lru, &t[i]); + lru->insert(lru, &t[i].page); + lru->remove(lru, &t[i].page); } j = 0; while(lru->getStaleAndRemove(lru)) { @@ -154,7 +140,7 @@ void fillThenEmptyTest(replacementPolicy *lru) { } START_TEST(replacementPolicyLRURandomTest) { - replacementPolicy * lru = lruFastInit(getKey, setKey, pinCount, 0); + replacementPolicy * lru = lruFastInit(); threaded = 0; randomSetup(); randomTest(lru, LONG_COUNT); @@ -162,7 +148,7 @@ START_TEST(replacementPolicyLRURandomTest) { randomTeardown(); } END_TEST START_TEST(replacementPolicyLRUFastRandomTest) { - replacementPolicy * lru = lruFastInit(getKey, setKey, pinCount, 0); + replacementPolicy * lru = lruFastInit(); threaded = 0; randomSetup(); randomTest(lru, LONG_COUNT); @@ -170,7 +156,7 @@ START_TEST(replacementPolicyLRUFastRandomTest) { randomTeardown(); } END_TEST START_TEST(replacementPolicyThreadsafeRandomTest) { - replacementPolicy * lru = lruFastInit(getKey, setKey, pinCount, 0); + replacementPolicy * lru = lruFastInit(); replacementPolicy * tsLru = replacementPolicyThreadsafeWrapperInit(lru); threaded = 0; randomSetup(); @@ -183,7 +169,7 @@ START_TEST(replacementPolicyConcurrentRandomTest) { int LRU_COUNT = OBJECT_COUNT / 51; replacementPolicy * lru[LRU_COUNT]; for(int i = 0; i < LRU_COUNT; i++) { - lru[i] = lruFastInit(getKey, setKey, pinCount, 0); + lru[i] = lruFastInit(); } threaded = 0; replacementPolicy * cwLru = replacementPolicyConcurrentWrapperInit(lru, LRU_COUNT); @@ -201,7 +187,7 @@ void * randomTestWorker(void * arg) { } START_TEST(replacementPolicyThreadsafeThreadTest) { - replacementPolicy * lru = lruFastInit(getKey, setKey, pinCount, 0); + replacementPolicy * lru = lruFastInit(); replacementPolicy * tsLru = replacementPolicyThreadsafeWrapperInit(lru); threaded = 1; worker_lru = tsLru; @@ -221,7 +207,7 @@ START_TEST(replacementPolicyConcurrentThreadTest) { int LRU_COUNT = OBJECT_COUNT / 51; replacementPolicy * lru[LRU_COUNT]; for(int i = 0; i < LRU_COUNT; i++) { - lru[i] = lruFastInit(getKey, setKey, pinCount, 0); + lru[i] = lruFastInit(); } replacementPolicy * cwLru = replacementPolicyConcurrentWrapperInit(lru, THREAD_COUNT); threaded = 1; @@ -241,14 +227,14 @@ START_TEST(replacementPolicyConcurrentThreadTest) { } END_TEST START_TEST(replacementPolicyEmptyFastLRUTest) { randomSetup(); - replacementPolicy *rp = lruFastInit(getKey, setKey, pinCount, 0); + replacementPolicy *rp = lruFastInit(); fillThenEmptyTest(rp); rp->deinit(rp); randomTeardown(); } END_TEST START_TEST(replacementPolicyEmptyThreadsafeTest) { randomSetup(); - replacementPolicy *rpA = lruFastInit(getKey, setKey, pinCount, 0); + replacementPolicy *rpA = lruFastInit(); replacementPolicy *rp = replacementPolicyThreadsafeWrapperInit(rpA); fillThenEmptyTest(rp); rp->deinit(rp); @@ -258,7 +244,7 @@ START_TEST(replacementPolicyEmptyConcurrentTest) { randomSetup(); replacementPolicy *rpA[THREAD_COUNT]; for(int i = 0; i < THREAD_COUNT; i++) { - rpA[i] = lruFastInit(getKey, setKey, pinCount, 0); + rpA[i] = lruFastInit(); } replacementPolicy *rp = replacementPolicyConcurrentWrapperInit(rpA, THREAD_COUNT);