fix bug in lru backoff, add flag for buffer manager stress testing, add support for buffer manager handles to concurrentBufferManager, fix support in bufferHash
This commit is contained in:
parent
ae04cee76c
commit
00cf4eb4ce
6 changed files with 63 additions and 21 deletions
|
@ -308,7 +308,9 @@ static Page * bhLoadPageImpl_helper(stasis_buffer_manager_t* bm, stasis_buffer_m
|
|||
// try to read this page from disk.
|
||||
pthread_mutex_unlock(&bh->mut);
|
||||
|
||||
bh->page_handle->read(bh->page_handle, ret, type);
|
||||
stasis_page_handle_t * h = handle ? (stasis_page_handle_t*)handle : bh->page_handle;
|
||||
|
||||
h->read(h, ret, type);
|
||||
|
||||
pthread_mutex_lock(&bh->mut);
|
||||
|
||||
|
|
|
@ -200,7 +200,30 @@ static inline stasis_buffer_concurrent_hash_tls_t * populateTLS(stasis_buffer_ma
|
|||
}
|
||||
int count = 0;
|
||||
while(tls->p == NULL) {
|
||||
Page * tmp = ch->lru->getStaleAndRemove(ch->lru);
|
||||
Page * tmp;
|
||||
int spin_count = 0;
|
||||
while(!(tmp = ch->lru->getStaleAndRemove(ch->lru))) {
|
||||
spin_count++;
|
||||
if(needFlush(bm)) {
|
||||
// exponential backoff -- don't test exponential backoff flag
|
||||
// here. LRU should only return null if we're in big trouble,
|
||||
// or if the flag is set to true.
|
||||
|
||||
// wake writeback thread
|
||||
pthread_cond_signal(&ch->needFree);
|
||||
|
||||
// sleep
|
||||
struct timespec ts = { 0, (1024 * 1024) << (spin_count > 3 ? 3 : spin_count) };
|
||||
nanosleep(&ts, 0);
|
||||
if(spin_count > 9) {
|
||||
static int warned = 0;
|
||||
if(!warned) {
|
||||
fprintf(stderr, "Warning: lots of spinning attempting to get page from LRU\n");
|
||||
warned = 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
hashtable_bucket_handle_t h;
|
||||
tls->p = hashtable_remove_begin(ch->ht, tmp->id, &h);
|
||||
if(tls->p) {
|
||||
|
@ -265,12 +288,14 @@ static inline stasis_buffer_concurrent_hash_tls_t * populateTLS(stasis_buffer_ma
|
|||
|
||||
static void chReleasePage(stasis_buffer_manager_t * bm, Page * p);
|
||||
|
||||
static Page * chLoadPageImpl_helper(stasis_buffer_manager_t* bm, int xid, const pageid_t pageid, int uninitialized, pagetype_t type) {
|
||||
static Page * chLoadPageImpl_helper(stasis_buffer_manager_t* bm, int xid, stasis_page_handle_t *ph, const pageid_t pageid, int uninitialized, pagetype_t type) {
|
||||
stasis_buffer_concurrent_hash_t *ch = bm->impl;
|
||||
stasis_buffer_concurrent_hash_tls_t *tls = populateTLS(bm);
|
||||
hashtable_bucket_handle_t h;
|
||||
Page * p = 0;
|
||||
|
||||
ph = ph ? ph : ch->page_handle;
|
||||
|
||||
do {
|
||||
if(p) { // the last time around pinned the wrong page!
|
||||
chReleasePage(bm, p);
|
||||
|
@ -304,7 +329,7 @@ static Page * chLoadPageImpl_helper(stasis_buffer_manager_t* bm, int xid, const
|
|||
type = UNINITIALIZED_PAGE;
|
||||
stasis_page_loaded(p, UNINITIALIZED_PAGE);
|
||||
} else {
|
||||
ch->page_handle->read(ch->page_handle, p, type);
|
||||
ph->read(ph, p, type);
|
||||
}
|
||||
unlock(p->loadlatch);
|
||||
|
||||
|
@ -322,10 +347,10 @@ static Page * chLoadPageImpl_helper(stasis_buffer_manager_t* bm, int xid, const
|
|||
return p;
|
||||
}
|
||||
static Page * chLoadPageImpl(stasis_buffer_manager_t *bm, stasis_buffer_manager_handle_t *h, int xid, const pageid_t pageid, pagetype_t type) {
|
||||
return chLoadPageImpl_helper(bm, xid, pageid, 0, type);
|
||||
return chLoadPageImpl_helper(bm, xid, (stasis_page_handle_t*)h, pageid, 0, type);
|
||||
}
|
||||
static Page * chLoadUninitPageImpl(stasis_buffer_manager_t *bm, int xid, const pageid_t pageid) {
|
||||
return chLoadPageImpl_helper(bm, xid,pageid,1,UNKNOWN_TYPE_PAGE); // 1 means dont care about preimage of page.
|
||||
return chLoadPageImpl_helper(bm, xid, 0, pageid,1,UNKNOWN_TYPE_PAGE); // 1 means dont care about preimage of page.
|
||||
}
|
||||
static void chReleasePage(stasis_buffer_manager_t * bm, Page * p) {
|
||||
stasis_buffer_concurrent_hash_t * ch = bm->impl;
|
||||
|
@ -369,11 +394,12 @@ static void chBufDeinit(stasis_buffer_manager_t * bm) {
|
|||
chBufDeinitHelper(bm, 0);
|
||||
}
|
||||
static stasis_buffer_manager_handle_t * chOpenHandle(stasis_buffer_manager_t *bm, int is_sequential) {
|
||||
// no-op
|
||||
return (void*)1;
|
||||
stasis_buffer_concurrent_hash_t * bh = bm->impl;
|
||||
return (stasis_buffer_manager_handle_t*)bh->page_handle->dup(bh->page_handle, is_sequential);
|
||||
}
|
||||
static int chCloseHandle(stasis_buffer_manager_t *bm, stasis_buffer_manager_handle_t* h) {
|
||||
return 0; // no error
|
||||
((stasis_page_handle_t*)h)->close((stasis_page_handle_t*)h);
|
||||
return 0;
|
||||
}
|
||||
|
||||
stasis_buffer_manager_t* stasis_buffer_manager_concurrent_hash_open(stasis_page_handle_t * h, stasis_log_t * log, stasis_dirty_page_table_t * dpt) {
|
||||
|
|
|
@ -33,6 +33,11 @@ int stasis_buffer_manager_hint_writes_are_sequential = STASIS_BUFFER_MANAGER_HIN
|
|||
int stasis_buffer_manager_hint_writes_are_sequential = 0;
|
||||
#endif
|
||||
|
||||
#ifdef STASIS_BUFFER_MANAGER_DEBUG_STRESS_LATCHING
|
||||
int stasis_buffer_manager_debug_stress_latching = STASIS_BUFFER_MANAGER_DEBUG_STRESS_LATCHING;
|
||||
#else
|
||||
int stasis_buffer_manager_debug_stress_latching = 0;
|
||||
#endif
|
||||
|
||||
#ifdef BUFFER_MANAGER_O_DIRECT
|
||||
int bufferManagerO_DIRECT = BUFFER_MANAGER_O_DIRECT;
|
||||
|
|
|
@ -80,19 +80,15 @@ static void* cwGetStaleHelper(struct replacementPolicy* impl, void*(*func)(struc
|
|||
pthread_mutex_unlock(&rp->mut[bucket]);
|
||||
bucket = hash_mod(rp, bucket + 1);
|
||||
spin_count++;
|
||||
if(stasis_replacement_policy_concurrent_wrapper_exponential_backoff &&
|
||||
if((!ret) &&
|
||||
stasis_replacement_policy_concurrent_wrapper_exponential_backoff &&
|
||||
spin_count > 1) {
|
||||
// exponential backoff
|
||||
// 3 -> we wait no more than 8msec
|
||||
struct timespec ts = { 0, 1024 * 1024 << (spin_count > 3 ? 3 : spin_count) };
|
||||
nanosleep(&ts,0);
|
||||
if(spin_count > 9) {
|
||||
static int warned = 0;
|
||||
if(!warned) {
|
||||
fprintf(stderr, "Warning: lots of spinning in concurrent wrapper\n");
|
||||
warned = 1;
|
||||
}
|
||||
|
||||
if(bucket != oldbucket) {
|
||||
pthread_setspecific(rp->next_bucket, (void*) bucket);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
if(ret == 0) { // should be extremely rare.
|
||||
|
|
|
@ -54,12 +54,17 @@ static void stasis_lru_fast_insert(struct replacementPolicy* r, void * p) {
|
|||
lruFast * l = r->impl;
|
||||
(*l->derefCount(p))--;
|
||||
assert(*l->derefCount(p) >= 0);
|
||||
if(stasis_buffer_manager_hint_writes_are_sequential) {
|
||||
if(stasis_buffer_manager_hint_writes_are_sequential &&
|
||||
!stasis_buffer_manager_debug_stress_latching) {
|
||||
// We are in sequential mode, and only want to evict pages from
|
||||
// the writeback thread. Therefore, it would be a waste of time
|
||||
// to put this dirty page in the LRU. (Also, we know that, when
|
||||
// the page is evicted, it will be taken out of LRU, and put back in.
|
||||
|
||||
// If we're *trying* to stress the buffer manager latches, etc, then
|
||||
// insert the dirty page. This will cause the buffer manager to perform
|
||||
// all sorts of useless (and otherwise rare) latching operations.
|
||||
|
||||
if(!*l->derefCount(p) && !((Page*)p)->dirty) {
|
||||
int err = LL_ENTRY(push)(l->lru, p);
|
||||
assert(!err);
|
||||
|
|
|
@ -28,6 +28,14 @@ extern pageid_t stasis_buffer_manager_size;
|
|||
so that we can get good concurrency on our writes.
|
||||
*/
|
||||
extern int stasis_buffer_manager_hint_writes_are_sequential;
|
||||
/**
|
||||
If this is true, then disable some optimizations associated with sequential
|
||||
write mode. This will needlessly burn CPU by inserting dirty pages into
|
||||
the LRU. In sequential write mode, these dirty pages will cause populateTLS
|
||||
to loop excessively, excercising all sorts of extremely rare thread
|
||||
synchronization schedules.
|
||||
*/
|
||||
extern int stasis_buffer_manager_debug_stress_latching;
|
||||
/**
|
||||
This determines which type of file handle the buffer manager will use.
|
||||
|
||||
|
|
Loading…
Reference in a new issue