change replacementPolicy to avoid a malloc / free during page pins

This commit is contained in:
Sears Russell 2011-04-20 20:25:17 +00:00
parent 43c0d7ac09
commit fbdc102d6c
11 changed files with 157 additions and 196 deletions

View file

@ -12,7 +12,6 @@
#include <stasis/bufferManager/bufferHash.h>
#include <stasis/bufferPool.h>
#include <stasis/doubleLinkedList.h>
#include <stasis/dirtyPageTable.h>
#include <stasis/transactional.h>
@ -48,24 +47,6 @@ typedef struct {
pageid_t prefetch_next_count;
} stasis_buffer_hash_t;
typedef struct LL_ENTRY(node_t) node_t;
static node_t * pageGetNode(void * page, void * ignore) {
Page * p = page;
return (node_t*)p->prev;
}
static void pageSetNode(void * page, node_t * n, void * ignore) {
Page * p = page;
p->prev = (Page *) n;
}
static inline struct Page_s ** pagePendingPtr(Page * p) {
return ((struct Page_s **)(&((p)->next)));
}
static inline intptr_t* pagePinCountPtr(void * page) {
Page * p = page;
return ((intptr_t*)(&((p)->queue)));
}
static inline int needFlush(stasis_buffer_manager_t * bm) {
stasis_buffer_hash_t *bh = bm->impl;
pageid_t count = stasis_dirty_page_table_dirty_count(bh->dpt);
@ -80,7 +61,7 @@ static inline int needFlush(stasis_buffer_manager_t * bm) {
inline static void checkPageState(Page * p) {
Page * check = LH_ENTRY(find)(cachedPages, &(p->id), sizeof(p->id));
if(check) {
int pending = *pagePendingPtr(p);
int pending = p->pending;
int pinned = *pagePinCountPtr(p);
if((!pinned) && (!pending)) {
assert(pageGetNode(p, 0));
@ -94,7 +75,7 @@ inline static void checkPageState(Page * p) {
assert(notfound);
} else {
assert(!pageGetNode(p,0));
assert(!*pagePendingPtr(p));
assert(!p->pending);
assert(!*pagePinCountPtr(p));
int found = 0;
for(pageid_t i = 0; i < freeCount; i++) {
@ -122,7 +103,7 @@ static int bhTryToWriteBackPage(stasis_buffer_manager_t *bm, pageid_t page) {
assert(p->id == page);
if(*pagePendingPtr(p) || *pagePinCountPtr(p)) {
if(p->pending || p->pinCount) {
pthread_mutex_unlock(&bh->mut);
return EBUSY;
}
@ -141,9 +122,9 @@ inline static Page * getFreePage(stasis_buffer_manager_t *bm) {
if(bh->pageCount < stasis_buffer_manager_size) {
ret = stasis_buffer_pool_malloc_page(bh->buffer_pool);
stasis_buffer_pool_free_page(bh->buffer_pool, ret,-1);
(*pagePendingPtr(ret)) = 0;
pageSetNode(ret,0,0);
(*pagePinCountPtr(ret)) = 1; // to match what happens after the next block calls lru->remove()
ret->pending = 0;
ret->next = ret->prev = NULL;
ret->pinCount = 1; // to match what happens after the next block calls lru->remove()
bh->pageCount++;
} else {
while((ret = bh->lru->getStale(bh->lru))) {
@ -152,8 +133,8 @@ inline static Page * getFreePage(stasis_buffer_manager_t *bm) {
printf("bufferHash.c: Cannot find free page for application request.\nbufferHash.c: This should not happen unless all pages have been pinned.\nbufferHash.c: Crashing.");
abort();
}
assert(!*pagePinCountPtr(ret));
assert(!*pagePendingPtr(ret));
assert(!ret->pinCount);
assert(!ret->pending);
if(ret->dirty) {
pthread_mutex_unlock(&bh->mut);
DEBUG("Blocking app thread");
@ -170,9 +151,9 @@ inline static Page * getFreePage(stasis_buffer_manager_t *bm) {
Page * check = LH_ENTRY(remove)(bh->cachedPages, &ret->id, sizeof(ret->id));
assert(check == ret);
}
assert(!*pagePendingPtr(ret));
assert(!pageGetNode(ret,0));
assert(1 == *pagePinCountPtr(ret)); // was zero before this call...
assert(!ret->pending);
assert(ret->next == NULL);
assert(1 == ret->pinCount); // was zero before this call...
assert(!ret->dirty);
return ret;
}
@ -211,7 +192,7 @@ static Page * bhGetCachedPage(stasis_buffer_manager_t* bm, int xid, const pageid
int locked = tryreadlock(ret->loadlatch,0);
assert(locked);
#endif
if(!*pagePendingPtr(ret)) {
if(!ret->pending) {
bh->lru->remove(bh->lru, ret);
checkPageState(ret);
@ -247,7 +228,7 @@ static Page * bhLoadPageImpl_helper(stasis_buffer_manager_t* bm, stasis_buffer_m
// (If ret == 0, then no...)
while(ret) {
checkPageState(ret);
if(*pagePendingPtr(ret)) {
if(ret->pending) {
pthread_cond_wait(&bh->readComplete, &bh->mut);
if(ret->id != pageid) {
ret = LH_ENTRY(find)(bh->cachedPages, &pageid, sizeof(pageid));
@ -295,7 +276,7 @@ static Page * bhLoadPageImpl_helper(stasis_buffer_manager_t* bm, stasis_buffer_m
} while(1);
// Add a pending entry to cachedPages to block like-minded threads and writeback
(*pagePendingPtr(ret)) = (void*)1;
ret->pending = 1;
check = LH_ENTRY(insert)(bh->cachedPages,&pageid,sizeof(pageid), ret);
assert(!check);
@ -319,11 +300,11 @@ static Page * bhLoadPageImpl_helper(stasis_buffer_manager_t* bm, stasis_buffer_m
assert(!ret->dirty);
stasis_uninitialized_page_loaded(xid, ret);
}
*pagePendingPtr(ret) = 0;
ret->pending = 0;
// Would remove from lru, but getFreePage() guarantees that it isn't
// there.
assert(!pageGetNode(ret, 0));
assert(ret->next == 0);
#ifdef LATCH_SANITY_CHECKING
int locked = tryreadlock(ret->loadlatch, 0);
@ -449,7 +430,7 @@ static void bhBufDeinit(stasis_buffer_manager_t * bm) {
while((next = LH_ENTRY(readlist)(&iter))) {
Page * p = next->value;
assertunlocked(p->rwlatch);
assert(0 == *pagePinCountPtr(p));
assert(0 == p->pinCount);
readlock(p->rwlatch,0);
assert(!stasis_dirty_page_table_is_dirty(bh->dpt, p));
unlock(p->rwlatch);
@ -479,6 +460,7 @@ static void bhSimulateBufferManagerCrash(stasis_buffer_manager_t *bm) {
pthread_cond_signal(&bh->needFree);
pthread_join(bh->worker, 0);
pthread_cond_broadcast(&bh->prefetch_waiting);
for(int i = 0; i < bh->prefetch_thread_count; i++) {
pthread_join(bh->prefetch_workers[i], 0);
}
@ -549,7 +531,7 @@ stasis_buffer_manager_t* stasis_buffer_manager_hash_open(stasis_page_handle_t *
bh->buffer_pool = stasis_buffer_pool_init();
bh->lru = lruFastInit(pageGetNode, pageSetNode, pagePinCountPtr, 0);
bh->lru = lruFastInit();
bh->cachedPages = LH_ENTRY(create)(stasis_buffer_manager_size);

View file

@ -5,7 +5,6 @@
* Author: sears
*/
#include <stasis/concurrentHash.h>
#include <stasis/doubleLinkedList.h>
#include <stasis/replacementPolicy.h>
#include <stasis/bufferPool.h>
#include <stasis/pageHandle.h>
@ -39,21 +38,6 @@ typedef struct {
pthread_cond_t needFree;
} stasis_buffer_concurrent_hash_t;
typedef struct LL_ENTRY(node_t) node_t;
static node_t * pageGetNode(void * page, void * ignore) {
Page * p = page;
return (node_t*)p->prev;
}
static void pageSetNode(void * page, node_t * n, void * ignore) {
Page * p = page;
p->prev = (Page *) n;
}
static inline intptr_t* pagePinCountPtr(void * page) {
Page * p = page;
return ((intptr_t*)(&((p)->queue)));
}
static inline int needFlush(stasis_buffer_manager_t * bm) {
stasis_buffer_concurrent_hash_t *bh = bm->impl;
pageid_t count = stasis_dirty_page_table_dirty_count(bh->dpt);
@ -442,7 +426,7 @@ stasis_buffer_manager_t* stasis_buffer_manager_concurrent_hash_open(stasis_page_
#ifdef CONCURRENT_LRU
replacementPolicy ** lrus = malloc(sizeof(lrus[0]) * 37);
for(int i = 0; i < 37; i++) {
lrus[i] = lruFastInit(pageGetNode, pageSetNode, pagePinCountPtr, 0);
lrus[i] = lruFastInit();
}
ch->lru = replacementPolicyConcurrentWrapperInit(lrus, 37);
free(lrus);
@ -454,8 +438,8 @@ stasis_buffer_manager_t* stasis_buffer_manager_concurrent_hash_open(stasis_page_
for(pageid_t i = 0; i < stasis_buffer_manager_size; i++) {
Page *p = stasis_buffer_pool_malloc_page(ch->buffer_pool);
stasis_buffer_pool_free_page(ch->buffer_pool, p,(-1*i)-2);
pageSetNode(p,0,0);
(*pagePinCountPtr(p)) = 1;
p->prev = p->next = NULL;
p->pinCount = 1;
ch->lru->insert(ch->lru, p); // decrements pin count ptr (setting it to zero)
hashtable_insert(ch->ht, p->id, p);
}

View file

@ -68,7 +68,7 @@ static void middleInsert(Page *ret) {
ret->next = repMiddle;
repMiddle->prev = ret;
ret->prev->next = ret;
ret->queue = 2;
ret->pinCount = 2;
repMiddle = ret;
assert(ret->next != ret && ret->prev != ret);
@ -121,12 +121,12 @@ void cacheInsertPage (Page * ret) {
*/
repMiddle = repHead;
for(pageid_t i = 0; i < stasis_buffer_manager_size / 3; i++ ) {
repMiddle->queue = 1;
repMiddle->pinCount = 1;
repMiddle = repMiddle->next;
}
for( iter = repMiddle; iter; iter = iter->next ) {
iter->queue = 2;
iter->pinCount = 2;
}
} else { /* Just insert it. */
headInsert(ret);
@ -152,12 +152,12 @@ void cacheHitOnPage(Page * ret) {
headInsert(ret);
assert(ret->next != ret && ret->prev != ret);
if( ret->queue == 2 ) {
if( ret->pinCount == 2 ) {
/* keep first queue same size */
repMiddle = repMiddle->prev;
repMiddle->queue = 2;
repMiddle->pinCount = 2;
ret->queue = 1;
ret->pinCount = 1;
}
}
}

View file

@ -30,7 +30,7 @@ static Page * paLoadPage(stasis_buffer_manager_t *bm, stasis_buffer_manager_hand
pa->pageMap[pageid]->dirty = 0;
pa->pageMap[pageid]->next = 0;
pa->pageMap[pageid]->prev = 0;
pa->pageMap[pageid]->queue = 0;
pa->pageMap[pageid]->pinCount = 0;
pa->pageMap[pageid]->inCache = 1;
pa->pageMap[pageid]->rwlatch = initlock();
pa->pageMap[pageid]->loadlatch = initlock();

View file

@ -46,18 +46,18 @@ static void cwDeinit (struct replacementPolicy* impl) {
free(rp);
free(impl);
}
static void cwHit (struct replacementPolicy* impl, void * page) {
static void cwHit (struct replacementPolicy* impl, Page* page) {
stasis_replacement_policy_concurrent_wrapper_t * rp = impl->impl;
unsigned int bucket = bucket_hash(rp, page);
pthread_mutex_lock(&rp->mut[bucket]);
rp->impl[bucket]->hit(rp->impl[bucket], page);
pthread_mutex_unlock(&rp->mut[bucket]);
}
static void* cwGetStaleHelper(struct replacementPolicy* impl, void*(*func)(struct replacementPolicy*)) {
static Page* cwGetStaleHelper(struct replacementPolicy* impl, Page*(*func)(struct replacementPolicy*)) {
stasis_replacement_policy_concurrent_wrapper_t * rp = impl->impl;
intptr_t bucket = (intptr_t)pthread_getspecific(rp->next_bucket);
intptr_t oldbucket = bucket;
void *ret = 0;
Page*ret = 0;
int spin_count = 0;
while(ret == 0 && spin_count < rp->num_buckets) {
int err;
@ -119,15 +119,15 @@ static void* cwGetStaleHelper(struct replacementPolicy* impl, void*(*func)(struc
}
return ret;
}
static void* cwGetStale(struct replacementPolicy* impl) {
static Page* cwGetStale(struct replacementPolicy* impl) {
stasis_replacement_policy_concurrent_wrapper_t * rp = impl->impl;
return cwGetStaleHelper(impl, rp->impl[0]->getStale);
}
static void* cwGetStaleAndRemove(struct replacementPolicy* impl) {
static Page* cwGetStaleAndRemove(struct replacementPolicy* impl) {
stasis_replacement_policy_concurrent_wrapper_t * rp = impl->impl;
return cwGetStaleHelper(impl, rp->impl[0]->getStaleAndRemove);
}
static void* cwRemove (struct replacementPolicy* impl, void * page) {
static Page* cwRemove (struct replacementPolicy* impl, Page* page) {
stasis_replacement_policy_concurrent_wrapper_t * rp = impl->impl;
unsigned int bucket = bucket_hash(rp, page);
pthread_mutex_lock(&rp->mut[bucket]);
@ -135,7 +135,7 @@ static void* cwRemove (struct replacementPolicy* impl, void * page) {
pthread_mutex_unlock(&rp->mut[bucket]);
return ret;
}
static void cwInsert (struct replacementPolicy* impl, void * page) {
static void cwInsert (struct replacementPolicy* impl, Page* page) {
stasis_replacement_policy_concurrent_wrapper_t * rp = impl->impl;
unsigned int bucket = bucket_hash(rp, page);
pthread_mutex_lock(&rp->mut[bucket]);

View file

@ -31,7 +31,7 @@ static int stasis_replacement_policy_lru_entry_cmp(const void * ap, const void *
: 1;
}
static void stasis_replacement_policy_lru_deinit(replacementPolicy* r) {
static void stasis_replacement_policy_lru_deinit(replacementPolicy *r) {
//XXX free other stuff
stasis_replacement_policy_lru_t * l = r->impl;
LH_ENTRY(destroy)(l->hash);
@ -43,7 +43,7 @@ static void stasis_replacement_policy_lru_deinit(replacementPolicy* r) {
/** @todo handle clock wraps properly! */
static void stasis_replacement_policy_lru_hit(replacementPolicy* r, void * p) {
static void stasis_replacement_policy_lru_hit(replacementPolicy *r, Page *p) {
stasis_replacement_policy_lru_t * l = r->impl;
stasis_replacement_policy_lru_entry * e = l->getNode(p, l->conf);
assert(e);
@ -54,12 +54,12 @@ static void stasis_replacement_policy_lru_hit(replacementPolicy* r, void * p) {
old = (stasis_replacement_policy_lru_entry *)RB_ENTRY(search)(e, l->lru);
assert(e == old);
}
static void * stasis_replacement_policy_lru_get_stale(replacementPolicy* r) {
static Page* stasis_replacement_policy_lru_get_stale(replacementPolicy* r) {
stasis_replacement_policy_lru_t * l = r->impl;
stasis_replacement_policy_lru_entry * e = (stasis_replacement_policy_lru_entry * ) rbmin(l->lru);
return e ? e->value : 0;
}
static void* stasis_replacement_policy_lru_remove(replacementPolicy* r, void * p) {
static Page* stasis_replacement_policy_lru_remove(replacementPolicy* r, Page *p) {
stasis_replacement_policy_lru_t * l = r->impl;
stasis_replacement_policy_lru_entry * e = l->getNode(p, l->conf);
assert(e);
@ -69,13 +69,13 @@ static void* stasis_replacement_policy_lru_remove(replacementPolicy* r, void * p
free(e);
return ret;
}
static void* stasis_replacement_policy_lru_get_stale_and_remove(replacementPolicy* r) {
static Page* stasis_replacement_policy_lru_get_stale_and_remove(replacementPolicy* r) {
void* ret = stasis_replacement_policy_lru_get_stale(r);
stasis_replacement_policy_lru_remove(r, ret);
return ret;
}
static void stasis_replacement_policy_lru_insert(replacementPolicy* r, void * p) {
static void stasis_replacement_policy_lru_insert(replacementPolicy* r, Page* p) {
stasis_replacement_policy_lru_t * l = r->impl;
stasis_replacement_policy_lru_entry * e = malloc(sizeof(stasis_replacement_policy_lru_entry));
e->value = p;
@ -91,7 +91,6 @@ replacementPolicy * stasis_replacement_policy_lru_init() {
stasis_replacement_policy_lru_t * l = malloc(sizeof(stasis_replacement_policy_lru_t));
l->now = 0;
l->hash = LH_ENTRY(create)(10);
// l->lru = RB_ENTRY(init)((int(*)(const void*,const void*,const void*))cmp, 0);
l->lru = RB_ENTRY(init)(stasis_replacement_policy_lru_entry_cmp, 0);
ret->init = stasis_replacement_policy_lru_init;
ret->deinit = stasis_replacement_policy_lru_deinit;

View file

@ -1,59 +1,89 @@
#include <stasis/common.h>
#include <stasis/flags.h>
#include <stasis/replacementPolicy.h>
#include <stasis/doubleLinkedList.h>
#include <assert.h>
#include <stasis/page.h>
typedef LL_ENTRY(value_t) value_t;
typedef struct LL_ENTRY(node_t) node_t;
typedef struct LL_ENTRY(list) list;
typedef Page List;
typedef struct lruFast {
struct LL_ENTRY(list) * lru;
node_t * (*getNode)(void * page, void * conf);
void (*setNode)(void * page, node_t * n, void * conf);
intptr_t*(*derefCount)(void* page);
void * conf;
List list;
} lruFast;
static inline void llInit( List *list ) {
bzero( list, sizeof( *list ) );
list->id = 0xdeadbeef;
list->next = list->prev = list;
}
static inline void llPush( List *list, Page *p ) {
p->next = list;
p->prev = list->prev;
p->next->prev = p;
p->prev->next = p;
}
static inline void llRemoveNoReinit( Page *p ) {
p->prev->next = p->next;
p->next->prev = p->prev;
}
static inline void llRemove( Page *p ) {
llRemoveNoReinit( p );
p->prev = NULL;
p->next = NULL;
}
static inline int llIsEmpty( List *list ) {
return list->next == list;
}
static inline Page* llHead( List *list ) {
return ( llIsEmpty( list ) ) ? NULL : list->next;
}
static inline Page* llShift( List *list ) {
if(llIsEmpty(list)) return NULL;
static void stasis_lru_fast_hit(struct replacementPolicy * r, void * p) {
lruFast * l = r->impl;
if(ENOENT == LL_ENTRY(remove)(l->lru, p)) {
Page * ret = list->next;
llRemove(ret);
return ret;
}
static void stasis_lru_fast_hit(struct replacementPolicy * r, Page *p) {
lruFast *l = r->impl;
if( p->prev == NULL ) {
// ignore attempts to hit pages not in lru
return;
}
LL_ENTRY(push)(l->lru, p);
llRemoveNoReinit(p);
llPush(&l->list, p);
}
static void* stasis_lru_fast_getStale(struct replacementPolicy * r) {
lruFast * l = r->impl;
return LL_ENTRY(head)(l->lru);
static Page* stasis_lru_fast_getStale(struct replacementPolicy *r) {
lruFast *l = r->impl;
return llHead(&l->list);
}
static void* stasis_lru_fast_remove(struct replacementPolicy* r, void * p) {
lruFast * l = r->impl;
void *ret = NULL;
static Page* stasis_lru_fast_remove(struct replacementPolicy* r, Page *p) {
Page *ret = NULL;
if(!*l->derefCount(p)) {
int err = LL_ENTRY(remove)(l->lru, p);
assert(!err || ((Page*)p)->dirty);
if(!p->pinCount) {
if(p->next) {
llRemove(p);
} else {
assert(p->dirty);
}
ret = p;
}
(*l->derefCount(p))++;
p->pinCount++;
return ret;
}
static void* stasis_lru_fast_getStaleAndRemove(struct replacementPolicy* r) {
static Page* stasis_lru_fast_getStaleAndRemove(struct replacementPolicy *r) {
lruFast * l = r->impl;
void * ret = LL_ENTRY(shift)(l->lru);
Page *ret = llShift(&l->list);
if(ret) {
assert(!(*l->derefCount(ret)));
(*l->derefCount(ret))++;
assert(!ret->pinCount);
ret->pinCount++;
}
return ret;
}
static void stasis_lru_fast_insert(struct replacementPolicy* r, void * p) {
static void stasis_lru_fast_insert(struct replacementPolicy *r, Page *p) {
lruFast * l = r->impl;
(*l->derefCount(p))--;
assert(*l->derefCount(p) >= 0);
p->pinCount--;
assert(p->pinCount >= 0);
if(stasis_buffer_manager_hint_writes_are_sequential &&
!stasis_buffer_manager_debug_stress_latching) {
// We are in sequential mode, and only want to evict pages from
@ -65,32 +95,21 @@ static void stasis_lru_fast_insert(struct replacementPolicy* r, void * p) {
// insert the dirty page. This will cause the buffer manager to perform
// all sorts of useless (and otherwise rare) latching operations.
if(!*l->derefCount(p) && !((Page*)p)->dirty) {
int err = LL_ENTRY(push)(l->lru, p);
assert(!err);
if(!p->pinCount && !p->dirty) {
llPush(&l->list, p);
}
} else {
if(!*l->derefCount(p)) {
int err = LL_ENTRY(push)(l->lru, p);
assert(!err);
if(!p->pinCount) {
llPush(&l->list, p);
}
}
}
static void stasis_lru_fast_deinit(struct replacementPolicy * r) {
lruFast * l = r->impl;
// the node_t's get freed by LL_ENTRY. It's the caller's
// responsibility to free the void *'s passed into us.
LL_ENTRY(destroy)(l->lru);
free(l);
free(r);
}
replacementPolicy * lruFastInit(
struct LL_ENTRY(node_t) * (*getNode)(void * page, void * conf),
void (*setNode)(void * page,
struct LL_ENTRY(node_t) * n,
void * conf),
intptr_t* (*derefCount)(void *page),
void * conf) {
replacementPolicy * lruFastInit() {
struct replacementPolicy * ret = malloc(sizeof(struct replacementPolicy));
ret->deinit = stasis_lru_fast_deinit;
ret->hit = stasis_lru_fast_hit;
@ -99,10 +118,7 @@ replacementPolicy * lruFastInit(
ret->getStaleAndRemove = stasis_lru_fast_getStaleAndRemove;
ret->insert = stasis_lru_fast_insert;
lruFast * l = malloc(sizeof(lruFast));
l->lru = LL_ENTRY(create)(getNode, setNode, conf);
l->getNode = getNode;
l->setNode = setNode;
l->derefCount = derefCount;
llInit(&l->list);
ret->impl = l;
return ret;
}

View file

@ -17,34 +17,34 @@ static void tsDeinit (struct replacementPolicy* impl) {
rp->impl->deinit(rp->impl);
free(impl);
}
static void tsHit (struct replacementPolicy* impl, void * page) {
static void tsHit (struct replacementPolicy* impl, Page* page) {
stasis_replacement_policy_threadsafe_wrapper_t * rp = impl->impl;
pthread_mutex_lock(&rp->mut);
rp->impl->hit(rp->impl, page);
pthread_mutex_unlock(&rp->mut);
}
static void* tsGetStale(struct replacementPolicy* impl) {
static Page* tsGetStale(struct replacementPolicy* impl) {
stasis_replacement_policy_threadsafe_wrapper_t * rp = impl->impl;
pthread_mutex_lock(&rp->mut);
void *ret = rp->impl->getStale(rp->impl);
pthread_mutex_unlock(&rp->mut);
return ret;
}
static void* tsRemove (struct replacementPolicy* impl, void * page) {
static Page* tsRemove (struct replacementPolicy* impl, Page* page) {
stasis_replacement_policy_threadsafe_wrapper_t * rp = impl->impl;
pthread_mutex_lock(&rp->mut);
void *ret = rp->impl->remove(rp->impl, page);
pthread_mutex_unlock(&rp->mut);
return ret;
}
static void* tsGetStaleAndRemove (struct replacementPolicy* impl) {
static Page* tsGetStaleAndRemove (struct replacementPolicy* impl) {
stasis_replacement_policy_threadsafe_wrapper_t * rp = impl->impl;
pthread_mutex_lock(&rp->mut);
void *ret = rp->impl->getStaleAndRemove(rp->impl);
pthread_mutex_unlock(&rp->mut);
return ret;
}
static void tsInsert (struct replacementPolicy* impl, void * page) {
static void tsInsert (struct replacementPolicy* impl, Page* page) {
stasis_replacement_policy_threadsafe_wrapper_t * rp = impl->impl;
pthread_mutex_lock(&rp->mut);
rp->impl->insert(rp->impl, page);

View file

@ -141,8 +141,10 @@ struct Page_s {
struct Page_s *next;
/** The previous item in the replacement policy's queue. */
struct Page_s *prev;
/** Which queue is the page in? */
int queue;
/** How many times has the page been pinned? */
int pinCount;
/** Is the page pending I/O? (Used by some buffer managers) */
int pending;
/** Is the page in the cache at all? */
int inCache;

View file

@ -20,26 +20,18 @@
*/
#include <stasis/doubleLinkedList.h>
typedef struct replacementPolicy {
struct replacementPolicy* (*init)();
void (*deinit) (struct replacementPolicy* impl);
void (*hit) (struct replacementPolicy* impl, void * page);
void*(*getStale)(struct replacementPolicy* impl);
void*(*remove) (struct replacementPolicy* impl, void * page);
void*(*getStaleAndRemove)(struct replacementPolicy* impl);
void (*insert) (struct replacementPolicy* impl, void * page);
void (*hit) (struct replacementPolicy* impl, Page* page);
Page* (*getStale)(struct replacementPolicy* impl);
Page* (*remove) (struct replacementPolicy* impl, Page* page);
Page* (*getStaleAndRemove)(struct replacementPolicy* impl);
void (*insert) (struct replacementPolicy* impl, Page* page);
void * impl;
} replacementPolicy;
replacementPolicy * stasis_replacement_policy_lru_init();
replacementPolicy * lruFastInit(
struct LL_ENTRY(node_t) * (*getNode)(void * page, void * conf),
void (*setNode)(void * page,
struct LL_ENTRY(node_t) * n,
void * conf),
intptr_t* (*derefCount)(void *page),
void * conf);
replacementPolicy * lruFastInit();
replacementPolicy* replacementPolicyThreadsafeWrapperInit(replacementPolicy* rp);
replacementPolicy* replacementPolicyConcurrentWrapperInit(replacementPolicy** rp, int count);

View file

@ -2,7 +2,6 @@
#include <stasis/transactional.h>
#include <stasis/replacementPolicy.h>
#include <stasis/doubleLinkedList.h>
#include <assert.h>
@ -16,33 +15,20 @@
#define THREAD_COUNT 10
typedef struct LL_ENTRY(node_t) node_t;
typedef struct tracker {
pageid_t val; // Must be pageid_t. Otherwise, there will be mismatches inside of concurrentWrapper's hash.
intptr_t key;
intptr_t pinCount;
Page page;
int inCache;
pthread_mutex_t mut;
} tracker;
#define memberOffset(objType, memberName) ((char*)&((objType*)0)->memberName - (char*)0)
#define memberToObj(objType, memberName, memberPtr) (objType*)((char*)memberPtr - memberOffset(objType,memberName))
#define pageToTracker(p) memberToObj(tracker,page,p);
static pthread_mutex_t cached_count_mutex = PTHREAD_MUTEX_INITIALIZER;
static int threaded = 0;
static int cachedCount = 0;
static node_t * getKey(void * page, void * ignore) {
tracker * p = page;
return (node_t*)p->key;
}
static void setKey(void * page, node_t * n, void * ignore) {
tracker * p = page;
p->key = (intptr_t) n;
}
static intptr_t * pinCount(void *page) {
tracker * p = page;
return &p->pinCount;
}
tracker * t;
void randomSetup() {
time_t seed = time(0);
@ -53,8 +39,8 @@ void randomSetup() {
t = calloc(OBJECT_COUNT, sizeof(tracker));
for(int i = 0; i < OBJECT_COUNT; i++) {
t[i].val = i;
t[i].pinCount = 1;
t[i].page.id = i;
t[i].page.pinCount = 1;
}
}
@ -75,14 +61,14 @@ void randomTest(replacementPolicy * lru, unsigned long count) {
// TOGGLE IN CACHE
pthread_mutex_lock(&t[i].mut);
if(!t[i].inCache) {
lru->insert(lru, &t[i]);
lru->insert(lru, &t[i].page);
t[i].inCache = 1;
pthread_mutex_lock(&cached_count_mutex);
cachedCount ++;
pthread_mutex_unlock(&cached_count_mutex);
} else {
void * v = lru->remove(lru, &t[i]);
assert(v == &t[i]);
tracker* tr = pageToTracker( lru->remove(lru, &t[i].page) );
assert(tr == &t[i]);
t[i].inCache = 0;
pthread_mutex_lock(&cached_count_mutex);
cachedCount --;
@ -91,20 +77,20 @@ void randomTest(replacementPolicy * lru, unsigned long count) {
pthread_mutex_unlock(&t[i].mut);
} else if(op < 30) {
// Get stale + remove
tracker * tr = lru->getStale(lru);
tracker * tr = pageToTracker( lru->getStale(lru) );
if( tr ) {
pthread_mutex_lock(&t[tr->val].mut);
pthread_mutex_lock(&t[tr->page.id].mut);
if(tr->inCache) {
assert(tr == &t[tr->val]);
tr = lru->remove(lru, tr);
assert(tr == &t[tr->val]);
assert(tr == &t[tr->page.id]);
tr = pageToTracker( lru->remove(lru, &tr->page) );
assert(tr == &t[tr->page.id]);
tr->inCache = 0;
pthread_mutex_lock(&cached_count_mutex);
if(!threaded) assert(cachedCount != 0);
cachedCount --;
pthread_mutex_unlock(&cached_count_mutex);
}
pthread_mutex_unlock(&t[tr->val].mut);
pthread_mutex_unlock(&t[tr->page.id].mut);
} else {
if(!threaded) {
assert(cachedCount == 0);
@ -112,20 +98,20 @@ void randomTest(replacementPolicy * lru, unsigned long count) {
}
} else if(op < 50) {
// Get stale
tracker * tr = lru->getStale(lru);
tracker * tr = pageToTracker( lru->getStale(lru) );
if(tr) {
pthread_mutex_lock(&t[tr->val].mut);
pthread_mutex_lock(&t[tr->page.id].mut);
if(!threaded) assert(tr->inCache);
assert(tr == &t[tr->val]);
assert(tr == &t[tr->page.id]);
if(!threaded) assert(cachedCount != 0);
pthread_mutex_unlock(&t[tr->val].mut);
pthread_mutex_unlock(&t[tr->page.id].mut);
} else {
if(!threaded) assert(cachedCount == 0);
}
} else {
// Hit
pthread_mutex_lock(&t[i].mut);
if(t[i].inCache) lru->hit(lru, &t[i]);
if(t[i].inCache) lru->hit(lru, &t[i].page);
pthread_mutex_unlock(&t[i].mut);
}
}
@ -133,7 +119,7 @@ void randomTest(replacementPolicy * lru, unsigned long count) {
void fillThenEmptyTest(replacementPolicy *lru) {
for(int i = 0; i < OBJECT_COUNT; i++) {
lru->insert(lru, &t[i]);
lru->insert(lru, &t[i].page);
}
int j = 0;
while(lru->getStaleAndRemove(lru)) {
@ -143,8 +129,8 @@ void fillThenEmptyTest(replacementPolicy *lru) {
assert(0 == lru->getStale(lru));
assert(j == OBJECT_COUNT);
for(int i = 0; i < OBJECT_COUNT; i++) {
lru->insert(lru, &t[i]);
lru->remove(lru, &t[i]);
lru->insert(lru, &t[i].page);
lru->remove(lru, &t[i].page);
}
j = 0;
while(lru->getStaleAndRemove(lru)) {
@ -154,7 +140,7 @@ void fillThenEmptyTest(replacementPolicy *lru) {
}
START_TEST(replacementPolicyLRURandomTest) {
replacementPolicy * lru = lruFastInit(getKey, setKey, pinCount, 0);
replacementPolicy * lru = lruFastInit();
threaded = 0;
randomSetup();
randomTest(lru, LONG_COUNT);
@ -162,7 +148,7 @@ START_TEST(replacementPolicyLRURandomTest) {
randomTeardown();
} END_TEST
START_TEST(replacementPolicyLRUFastRandomTest) {
replacementPolicy * lru = lruFastInit(getKey, setKey, pinCount, 0);
replacementPolicy * lru = lruFastInit();
threaded = 0;
randomSetup();
randomTest(lru, LONG_COUNT);
@ -170,7 +156,7 @@ START_TEST(replacementPolicyLRUFastRandomTest) {
randomTeardown();
} END_TEST
START_TEST(replacementPolicyThreadsafeRandomTest) {
replacementPolicy * lru = lruFastInit(getKey, setKey, pinCount, 0);
replacementPolicy * lru = lruFastInit();
replacementPolicy * tsLru = replacementPolicyThreadsafeWrapperInit(lru);
threaded = 0;
randomSetup();
@ -183,7 +169,7 @@ START_TEST(replacementPolicyConcurrentRandomTest) {
int LRU_COUNT = OBJECT_COUNT / 51;
replacementPolicy * lru[LRU_COUNT];
for(int i = 0; i < LRU_COUNT; i++) {
lru[i] = lruFastInit(getKey, setKey, pinCount, 0);
lru[i] = lruFastInit();
}
threaded = 0;
replacementPolicy * cwLru = replacementPolicyConcurrentWrapperInit(lru, LRU_COUNT);
@ -201,7 +187,7 @@ void * randomTestWorker(void * arg) {
}
START_TEST(replacementPolicyThreadsafeThreadTest) {
replacementPolicy * lru = lruFastInit(getKey, setKey, pinCount, 0);
replacementPolicy * lru = lruFastInit();
replacementPolicy * tsLru = replacementPolicyThreadsafeWrapperInit(lru);
threaded = 1;
worker_lru = tsLru;
@ -221,7 +207,7 @@ START_TEST(replacementPolicyConcurrentThreadTest) {
int LRU_COUNT = OBJECT_COUNT / 51;
replacementPolicy * lru[LRU_COUNT];
for(int i = 0; i < LRU_COUNT; i++) {
lru[i] = lruFastInit(getKey, setKey, pinCount, 0);
lru[i] = lruFastInit();
}
replacementPolicy * cwLru = replacementPolicyConcurrentWrapperInit(lru, THREAD_COUNT);
threaded = 1;
@ -241,14 +227,14 @@ START_TEST(replacementPolicyConcurrentThreadTest) {
} END_TEST
START_TEST(replacementPolicyEmptyFastLRUTest) {
randomSetup();
replacementPolicy *rp = lruFastInit(getKey, setKey, pinCount, 0);
replacementPolicy *rp = lruFastInit();
fillThenEmptyTest(rp);
rp->deinit(rp);
randomTeardown();
} END_TEST
START_TEST(replacementPolicyEmptyThreadsafeTest) {
randomSetup();
replacementPolicy *rpA = lruFastInit(getKey, setKey, pinCount, 0);
replacementPolicy *rpA = lruFastInit();
replacementPolicy *rp = replacementPolicyThreadsafeWrapperInit(rpA);
fillThenEmptyTest(rp);
rp->deinit(rp);
@ -258,7 +244,7 @@ START_TEST(replacementPolicyEmptyConcurrentTest) {
randomSetup();
replacementPolicy *rpA[THREAD_COUNT];
for(int i = 0; i < THREAD_COUNT; i++) {
rpA[i] = lruFastInit(getKey, setKey, pinCount, 0);
rpA[i] = lruFastInit();
}
replacementPolicy *rp
= replacementPolicyConcurrentWrapperInit(rpA, THREAD_COUNT);