Implement CLOCK replacement policy, and set it to be the default. Add flags to configure the default replacement policy.

This commit is contained in:
Sears Russell 2011-08-23 18:25:26 +00:00
parent c79f22a277
commit 9083bca1c3
11 changed files with 274 additions and 63 deletions

View file

@ -57,8 +57,12 @@ ADD_LIBRARY(stasis util/crc32.c
io/rangeTracker.c io/memory.c io/file.c io/pfile.c io/raid1.c
io/non_blocking.c io/debug.c io/handle.c
bufferManager/pageArray.c
bufferManager/bufferHash.c replacementPolicy/lru.c
replacementPolicy/lruFast.c replacementPolicy/threadsafeWrapper.c replacementPolicy/concurrentWrapper.c
bufferManager/bufferHash.c
replacementPolicy/lru.c
replacementPolicy/lruFast.c
replacementPolicy/threadsafeWrapper.c
replacementPolicy/concurrentWrapper.c
replacementPolicy/clock.c
)
ADD_LIBRARY(stasis_experimental
experimental/consumer.c

View file

@ -129,10 +129,6 @@ inline static Page * getFreePage(stasis_buffer_manager_t *bm) {
} else {
while((ret = bh->lru->getStale(bh->lru))) {
// Make sure we have an exclusive lock on victim.
if(!ret) {
printf("bufferHash.c: Cannot find free page for application request.\nbufferHash.c: This should not happen unless all pages have been pinned.\nbufferHash.c: Crashing.");
abort();
}
assert(!ret->pinCount);
assert(!ret->pending);
if(ret->dirty) {
@ -530,8 +526,12 @@ stasis_buffer_manager_t* stasis_buffer_manager_hash_open(stasis_page_handle_t *
bh->flushing = 0;
bh->buffer_pool = stasis_buffer_pool_init();
bh->lru = lruFastInit();
if(stasis_replacement_policy == STASIS_REPLACEMENT_POLICY_CONCURRENT_LRU ||
stasis_replacement_policy == STASIS_REPLACEMENT_POLICY_THREADSAFE_LRU) {
bh->lru = lruFastInit();
} else if(stasis_replacement_policy == STASIS_REPLACEMENT_POLICY_CLOCK) {
bh->lru = replacementPolicyClockInit(stasis_buffer_pool_get_underlying_array(bh->buffer_pool), stasis_buffer_manager_size);
}
bh->cachedPages = LH_ENTRY(create)(stasis_buffer_manager_size);

View file

@ -10,11 +10,12 @@
#include <stasis/pageHandle.h>
#include <stasis/flags.h>
#ifndef NO_CONCURRENT_LRU
#ifndef CONCURRENT_LRU
#define CONCURRENT_LRU
#endif // CONCURRENT_LRU
#endif // NO_CONCURRENT_LRU
#define CLOCK
//#ifndef NO_CONCURRENT_LRU
//#ifndef CONCURRENT_LRU
//#define CONCURRENT_LRU
//#endif // CONCURRENT_LRU
//#endif // NO_CONCURRENT_LRU
//#define STRESS_TEST_WRITEBACK 1 // if defined, writeback as much as possible, as fast as possible.
@ -430,16 +431,20 @@ stasis_buffer_manager_t* stasis_buffer_manager_concurrent_hash_open(stasis_page_
printf("Using expensive bufferHash sanity checking.\n");
#endif
ch->buffer_pool = stasis_buffer_pool_init();
#ifdef CONCURRENT_LRU
replacementPolicy ** lrus = malloc(sizeof(lrus[0]) * 37);
for(int i = 0; i < 37; i++) {
lrus[i] = lruFastInit();
if(stasis_replacement_policy == STASIS_REPLACEMENT_POLICY_CONCURRENT_LRU) {
replacementPolicy ** lrus = malloc(sizeof(lrus[0]) * 37);
for(int i = 0; i < 37; i++) {
lrus[i] = lruFastInit();
}
ch->lru = replacementPolicyConcurrentWrapperInit(lrus, 37);
free(lrus);
} else if(stasis_replacement_policy == STASIS_REPLACEMENT_POLICY_THREADSAFE_LRU) {
ch->lru = replacementPolicyThreadsafeWrapperInit(lruFastInit());
} else if(stasis_replacement_policy == STASIS_REPLACEMENT_POLICY_CLOCK) {
ch->lru = replacementPolicyClockInit(stasis_buffer_pool_get_underlying_array(ch->buffer_pool), stasis_buffer_manager_size);
}
ch->lru = replacementPolicyConcurrentWrapperInit(lrus, 37);
free(lrus);
#else
ch->lru = replacementPolicyThreadsafeWrapperInit(lruFastInit(pageGetNode, pageSetNode, pagePinCountPtr, 0));
#endif
ch->ht = hashtable_init(stasis_buffer_manager_size * 4);
for(pageid_t i = 0; i < stasis_buffer_manager_size; i++) {

View file

@ -143,3 +143,7 @@ void stasis_buffer_pool_free_page(stasis_buffer_pool_t * ret, Page *p, pageid_t
// p->dirty = 0;
writeunlock(p->rwlatch);
}
Page * stasis_buffer_pool_get_underlying_array(stasis_buffer_pool_t *ret) {
return ret->pool;
}

View file

@ -81,6 +81,12 @@ int stasis_buffer_manager_debug_stress_latching = STASIS_BUFFER_MANAGER_DEBUG_ST
int stasis_buffer_manager_debug_stress_latching = 0;
#endif
#ifdef STASIS_REPLACEMENT_POLICY
int stasis_replacement_policy = STASIS_REPLACEMENT_POLICY;
#else
int stasis_replacement_policy = STASIS_REPLACEMENT_POLICY_CLOCK;
#endif
#ifdef STASIS_REPLACEMENT_POLICY_CONCURRENT_WRAPPER_EXPONENTIAL_BACKOFF
int stasis_replacement_policy_concurrent_wrapper_exponential_backoff = STASIS_REPLACEMENT_POLICY_CONCURRENT_WRAPPER_EXPONENTIAL_BACKOFF;
#else

View file

@ -0,0 +1,133 @@
/**
* clock.c
*
* @file Implementation of the CLOCK buffer replacement algorithm.
*
* This replacement policy keeps a pointer "ptr" to the array of pages
* that back the buffer manager, which it treats as a circular buffer.
* It sweeps ptr over the array of pages, marking "hot" pages as "cold",
* and returning "cold" pages for eviction. It increments ptr one slot
* each time getStaleAndRemove is called. When pages are "hit", it marks
* them "hot", regardless of their prior state.
*
* This module also maintains p->pinCount in the obvious way so that it does
* not evict pages that have been pinned by the calling thread.
*
* This module does not use mutexes, and instead uses atomic instructions
* such as test and set.
*
* States (Stored in p->queue):
*
* Page pinning: [-1, 0, 1] -> remove -> insert -> 1
* Get stale and remove: [1] -> [0]. [0] -> [-1] -> insert -> [1]. [-1] -> [-1]
*
* Created on: Aug 22, 2011
* Author: sears
*/
#include <config.h>
#include <stasis/common.h>
#include <stasis/util/latches.h>
#include <stasis/replacementPolicy.h>
#include <stasis/page.h>
typedef struct {
uint64_t ptr;
Page * pages;
uint64_t page_count;
} stasis_replacement_policy_clock_t;
static void clockDeinit (struct replacementPolicy* impl) {
stasis_replacement_policy_clock_t * clock = impl->impl;
free(clock);
free(impl);
}
static void clockHit (struct replacementPolicy* impl, Page* page) {
page->next = (void*)1;
}
static Page* clockGetStale(struct replacementPolicy* impl) {
stasis_replacement_policy_clock_t * clock = impl->impl;
// NOTE: This just exists for legacy purposes, and is fundamantally not
// threadsafe. So, this doesn't mess with the __sync* stuff, or worry
// about order of operations like getStaleAndRemove does.
for(uint64_t spin = 0; spin < 2*clock->page_count; spin++) {
uint64_t ptr = clock->ptr % clock->page_count;
clock->ptr++;
if(clock->pages[ptr].next == (void*)0 &&
clock->pages[ptr].pinCount == 0) {
// Don't set state to -1, since that would cause trouble if the caller
// doesn't call remove. The -1 state is only there to protect us
// against concurrent interleavings of getStale() and remove(), which
// this (deprecated part of the) API does not support.
//clock->pages[ptr].next = (void*)-1;
return &clock->pages[ptr];
} else if(clock->pages[ptr].next == (void*)1){
clock->pages[ptr].next = (void*)0;
}
}
return NULL;
}
static Page* clockRemove (struct replacementPolicy* impl, Page* page) {
int ret = __sync_fetch_and_add(&page->pinCount,1);
if(ret == 0) {
return page;
} else {
return NULL;
}
}
static Page* clockGetStaleAndRemove (struct replacementPolicy* impl) {
stasis_replacement_policy_clock_t * clock = impl->impl;
for(uint64_t spin = 0; spin < 2*clock->page_count; spin++) {
uint64_t ptr = __sync_fetch_and_add(&clock->ptr,1) % clock->page_count;
if(__sync_bool_compare_and_swap(&clock->pages[ptr].next, 0, -1)) {
// evict this page, but not if it is pinned (this protects the caller
// from evicting pages that it has pinned, not pages that were pinned
// in race by other threads.)
if(clock->pages[ptr].pinCount == 0) {
clock->pages[ptr].pinCount++;
return &clock->pages[ptr];
} else {
// Reset the queue flag to 0, unless someone has changed it to 0
// or 1 in race.
__sync_bool_compare_and_swap(&clock->pages[ptr].next,-1,0);
}
} else if(__sync_bool_compare_and_swap(&clock->pages[ptr].next, 1, 0)) {
// page was hot. now it's cold. nothing else to see here; move along.
}
/*
else if(__sync_bool_compare_and_swap(&clock->pages[ptr]->queue, -1, -1)) { // extraneous no-op
// page was being evicted in race with us. unlikely, but move on.
} else { // ditto.
// page state changed concurrently with us. who knows what happened, so move on.
}
*/
}
return NULL;
}
static void clockInsert (struct replacementPolicy* impl, Page* page) {
__sync_fetch_and_sub(&page->pinCount,1); // don't care about ordering of this line and next. pinCount is just a "message to ourselves"
page->next = (void*)1;
__sync_synchronize();
}
replacementPolicy* replacementPolicyClockInit(Page * pageArray, int page_count) {
replacementPolicy *ret = malloc(sizeof(*ret));
stasis_replacement_policy_clock_t * clock = malloc(sizeof(*clock));
clock->pages = pageArray;
clock->page_count = page_count;
clock->ptr = 0;
ret->init = NULL;
ret->deinit = clockDeinit;
ret->hit = clockHit;
ret->getStale = clockGetStale;
ret->getStaleAndRemove = clockGetStaleAndRemove;
ret->remove = clockRemove;
ret->insert = clockInsert;
ret->impl = clock;
return ret;
}

View file

@ -65,4 +65,6 @@ Page* stasis_buffer_pool_malloc_page(stasis_buffer_pool_t* pool);
@see stasis_buffer_pool_malloc_page()
*/
void stasis_buffer_pool_free_page(stasis_buffer_pool_t* pool, Page * p, pageid_t id);
Page * stasis_buffer_pool_get_underlying_array(stasis_buffer_pool_t *ret);
#endif // STASIS_BUFFER_POOL_H

View file

@ -81,6 +81,10 @@ terms specified in this license.
#define PERSISTENT 1
#define DURABLE 2
#define STASIS_REPLACEMENT_POLICY_THREADSAFE_LRU 1
#define STASIS_REPLACEMENT_POLICY_CONCURRENT_LRU 2
#define STASIS_REPLACEMENT_POLICY_CLOCK 3
#define MAX_TRANSACTIONS 1000
/** Operation types */

View file

@ -84,6 +84,13 @@ extern stasis_handle_t* (*stasis_non_blocking_handle_file_factory)(const char* f
defining STASIS_BUFFER_MANAGER_IO_HANDLE_FLAGS.
*/
extern int stasis_buffer_manager_io_handle_flags;
/**
The default replacement policy.
Valid values are STASIS_REPLACEMENT_POLICY_THREADSAFE_LRU,
STASIS_REPLACEMENT_POLICY_CONCURRENT_LRU and STASIS_REPLACEMENT_POLICY_CLOCK
*/
extern int stasis_replacement_policy;
/**
If true, then concurrent LRU will use exponential backoff when it
has trouble finding a page to evict. If false, it will perform a

View file

@ -10,23 +10,34 @@
For now, Stasis uses plain-old LRU. DB-MIN would be an interesting
extension.
If you would like to implement your own caching policy, implement
the functions below. They are relatively straightforward. Note
that replacementPolicy implementations do not perform any file I/O
of their own.
The implementation of this module does not need to be threadsafe.
*/
typedef struct replacementPolicy {
/** Factory method */
struct replacementPolicy* (*init)();
/** Destructor */
void (*deinit) (struct replacementPolicy* impl);
/** The page has been touched. Reflect this fact (currently not called) */
void (*hit) (struct replacementPolicy* impl, Page* page);
/** Find a page that is "stale". Do not remove it.
*
* @deprecated This function is essentially impossible to use correctly in a concurrent setting, and is not necessarily threadsafe.
* */
Page* (*getStale)(struct replacementPolicy* impl);
/** Remove a page from consideration. This method needs to increment the
* "pinCount" field of Page (or perform other bookkeeping), since the page
* may be removed multiple times before it is re-inserted. Pages that
* have been removed should not be returned as "stale". */
Page* (*remove) (struct replacementPolicy* impl, Page* page);
/** Atomically getStale(), and remove() the page it returns.
*
* @return the page that was stale, and that has now been removed.
*/
Page* (*getStaleAndRemove)(struct replacementPolicy* impl);
/** Insert a page into the replacement policy, and decrement the pinCount.
* The page has just been "hit", and is now a candidate for getStale() to
* consider (unless it has a non-zero pincount).
*/
void (*insert) (struct replacementPolicy* impl, Page* page);
void * impl;
} replacementPolicy;
@ -35,3 +46,4 @@ replacementPolicy * stasis_replacement_policy_lru_init();
replacementPolicy * lruFastInit();
replacementPolicy* replacementPolicyThreadsafeWrapperInit(replacementPolicy* rp);
replacementPolicy* replacementPolicyConcurrentWrapperInit(replacementPolicy** rp, int count);
replacementPolicy* replacementPolicyClockInit(Page * pageArray, int page_count);

View file

@ -7,7 +7,7 @@
#define LOG_NAME "check_replacementPolicy.log"
#define OBJECT_COUNT 1000
#define OBJECT_COUNT 100
#define OP_COUNT 10000000
#define LONG_COUNT 100000000UL
@ -16,20 +16,16 @@
#define THREAD_COUNT 10
typedef struct tracker {
Page page;
int inCache;
pthread_mutex_t mut;
} tracker;
#define memberOffset(objType, memberName) ((char*)&((objType*)0)->memberName - (char*)0)
#define memberToObj(objType, memberName, memberPtr) (objType*)((char*)memberPtr - memberOffset(objType,memberName))
#define pageToTracker(p) memberToObj(tracker,page,p);
static pthread_mutex_t cached_count_mutex = PTHREAD_MUTEX_INITIALIZER;
static int threaded = 0;
static int cachedCount = 0;
tracker * t;
Page* pages;
void randomSetup() {
time_t seed = time(0);
printf("\nSeed = %ld\n", seed);
@ -38,14 +34,17 @@ void randomSetup() {
cachedCount = 0;
t = calloc(OBJECT_COUNT, sizeof(tracker));
pages = calloc(OBJECT_COUNT, sizeof(Page));
for(int i = 0; i < OBJECT_COUNT; i++) {
t[i].page.id = i;
t[i].page.pinCount = 1;
pages[i].id = i;
pages[i].pinCount = 1;
pages[i].next = 0;
}
}
void randomTeardown() {
free(t);
free(pages);
}
void randomTest(replacementPolicy * lru, unsigned long count) {
@ -61,14 +60,14 @@ void randomTest(replacementPolicy * lru, unsigned long count) {
// TOGGLE IN CACHE
pthread_mutex_lock(&t[i].mut);
if(!t[i].inCache) {
lru->insert(lru, &t[i].page);
lru->insert(lru, &pages[i]);
t[i].inCache = 1;
pthread_mutex_lock(&cached_count_mutex);
cachedCount ++;
pthread_mutex_unlock(&cached_count_mutex);
} else {
tracker* tr = pageToTracker( lru->remove(lru, &t[i].page) );
assert(tr == &t[i]);
Page *p = lru->remove(lru, &pages[i]);
assert(p == &pages[i]);
t[i].inCache = 0;
pthread_mutex_lock(&cached_count_mutex);
cachedCount --;
@ -77,20 +76,20 @@ void randomTest(replacementPolicy * lru, unsigned long count) {
pthread_mutex_unlock(&t[i].mut);
} else if(op < 30) {
// Get stale + remove
tracker * tr = pageToTracker( lru->getStale(lru) );
if( tr ) {
pthread_mutex_lock(&t[tr->page.id].mut);
if(tr->inCache) {
assert(tr == &t[tr->page.id]);
tr = pageToTracker( lru->remove(lru, &tr->page) );
assert(tr == &t[tr->page.id]);
tr->inCache = 0;
Page *p = lru->getStale(lru);
if( p ) {
pthread_mutex_lock(&t[p->id].mut);
if(t[p->id].inCache) {
assert(p == &pages[p->id]);
p = lru->remove(lru, p);
assert(p == &pages[p->id]);
t[p->id].inCache = 0;
pthread_mutex_lock(&cached_count_mutex);
if(!threaded) assert(cachedCount != 0);
cachedCount --;
pthread_mutex_unlock(&cached_count_mutex);
}
pthread_mutex_unlock(&t[tr->page.id].mut);
pthread_mutex_unlock(&t[p->id].mut);
} else {
if(!threaded) {
assert(cachedCount == 0);
@ -98,20 +97,20 @@ void randomTest(replacementPolicy * lru, unsigned long count) {
}
} else if(op < 50) {
// Get stale
tracker * tr = pageToTracker( lru->getStale(lru) );
if(tr) {
pthread_mutex_lock(&t[tr->page.id].mut);
if(!threaded) assert(tr->inCache);
assert(tr == &t[tr->page.id]);
Page * p = lru->getStale(lru);
if(p) {
pthread_mutex_lock(&t[p->id].mut);
if(!threaded) assert(t[p->id].inCache);
assert(p == &pages[p->id]);
if(!threaded) assert(cachedCount != 0);
pthread_mutex_unlock(&t[tr->page.id].mut);
pthread_mutex_unlock(&t[p->id].mut);
} else {
if(!threaded) assert(cachedCount == 0);
}
} else {
// Hit
pthread_mutex_lock(&t[i].mut);
if(t[i].inCache) lru->hit(lru, &t[i].page);
if(t[i].inCache) lru->hit(lru, &pages[i]);
pthread_mutex_unlock(&t[i].mut);
}
}
@ -119,7 +118,7 @@ void randomTest(replacementPolicy * lru, unsigned long count) {
void fillThenEmptyTest(replacementPolicy *lru) {
for(int i = 0; i < OBJECT_COUNT; i++) {
lru->insert(lru, &t[i].page);
lru->insert(lru, &pages[i]);
}
int j = 0;
while(lru->getStaleAndRemove(lru)) {
@ -129,8 +128,8 @@ void fillThenEmptyTest(replacementPolicy *lru) {
assert(0 == lru->getStale(lru));
assert(j == OBJECT_COUNT);
for(int i = 0; i < OBJECT_COUNT; i++) {
lru->insert(lru, &t[i].page);
lru->remove(lru, &t[i].page);
lru->insert(lru, &pages[i]);
lru->remove(lru, &pages[i]);
}
j = 0;
while(lru->getStaleAndRemove(lru)) {
@ -178,7 +177,14 @@ START_TEST(replacementPolicyConcurrentRandomTest) {
cwLru->deinit(cwLru);
randomTeardown();
} END_TEST
START_TEST(replacementPolicyClockRandomTest) {
threaded = 0;
randomSetup();
replacementPolicy * lru = replacementPolicyClockInit(pages, OBJECT_COUNT);
randomTest(lru, LONG_COUNT);
lru->deinit(lru);
randomTeardown();
} END_TEST
replacementPolicy * worker_lru;
unsigned long worker_count;
void * randomTestWorker(void * arg) {
@ -209,7 +215,7 @@ START_TEST(replacementPolicyConcurrentThreadTest) {
for(int i = 0; i < LRU_COUNT; i++) {
lru[i] = lruFastInit();
}
replacementPolicy * cwLru = replacementPolicyConcurrentWrapperInit(lru, THREAD_COUNT);
replacementPolicy * cwLru = replacementPolicyConcurrentWrapperInit(lru, LRU_COUNT);
threaded = 1;
worker_lru = cwLru;
worker_count = LONG_COUNT / THREAD_COUNT;
@ -225,6 +231,25 @@ START_TEST(replacementPolicyConcurrentThreadTest) {
cwLru->deinit(cwLru);
randomTeardown();
} END_TEST
START_TEST(replacementPolicyClockThreadTest) {
replacementPolicy * clockLru = replacementPolicyClockInit(pages, OBJECT_COUNT);
threaded = 1;
worker_lru = clockLru;
worker_count = LONG_COUNT / THREAD_COUNT;
pthread_t threads[THREAD_COUNT];
randomSetup();
for(int i = 0; i < THREAD_COUNT; i++) {
pthread_create(&threads[i], 0, randomTestWorker, 0);
}
for(int i = 0; i < THREAD_COUNT; i++) {
pthread_join(threads[i], 0);
}
clockLru->deinit(clockLru);
randomTeardown();
} END_TEST
START_TEST(replacementPolicyEmptyFastLRUTest) {
randomSetup();
replacementPolicy *rp = lruFastInit();
@ -252,7 +277,13 @@ START_TEST(replacementPolicyEmptyConcurrentTest) {
rp->deinit(rp);
randomTeardown();
} END_TEST
START_TEST(replacementPolicyEmptyClockTest) {
randomSetup();
replacementPolicy *rp = replacementPolicyClockInit(pages, OBJECT_COUNT);
fillThenEmptyTest(rp);
rp->deinit(rp);
randomTeardown();
} END_TEST
Suite * check_suite(void) {
Suite *s = suite_create("replacementPolicy");
/* Begin a new test */
@ -262,12 +293,15 @@ Suite * check_suite(void) {
tcase_add_test(tc, replacementPolicyEmptyFastLRUTest);
tcase_add_test(tc, replacementPolicyEmptyThreadsafeTest);
tcase_add_test(tc, replacementPolicyEmptyConcurrentTest);
tcase_add_test(tc, replacementPolicyEmptyClockTest);
tcase_add_test(tc, replacementPolicyLRURandomTest);
tcase_add_test(tc, replacementPolicyLRUFastRandomTest);
tcase_add_test(tc, replacementPolicyThreadsafeRandomTest);
tcase_add_test(tc, replacementPolicyConcurrentRandomTest);
tcase_add_test(tc, replacementPolicyClockRandomTest);
tcase_add_test(tc, replacementPolicyThreadsafeThreadTest);
tcase_add_test(tc, replacementPolicyConcurrentThreadTest);
tcase_add_test(tc, replacementPolicyClockThreadTest);
/* --------------------------------------------- */