diff --git a/src/stasis/bufferManager/bufferHash.c b/src/stasis/bufferManager/bufferHash.c index 8a3f4eb..02922c8 100644 --- a/src/stasis/bufferManager/bufferHash.c +++ b/src/stasis/bufferManager/bufferHash.c @@ -308,7 +308,9 @@ static Page * bhLoadPageImpl_helper(stasis_buffer_manager_t* bm, stasis_buffer_m // try to read this page from disk. pthread_mutex_unlock(&bh->mut); - bh->page_handle->read(bh->page_handle, ret, type); + stasis_page_handle_t * h = handle ? (stasis_page_handle_t*)handle : bh->page_handle; + + h->read(h, ret, type); pthread_mutex_lock(&bh->mut); diff --git a/src/stasis/bufferManager/concurrentBufferManager.c b/src/stasis/bufferManager/concurrentBufferManager.c index 50e02cd..291c562 100644 --- a/src/stasis/bufferManager/concurrentBufferManager.c +++ b/src/stasis/bufferManager/concurrentBufferManager.c @@ -200,7 +200,30 @@ static inline stasis_buffer_concurrent_hash_tls_t * populateTLS(stasis_buffer_ma } int count = 0; while(tls->p == NULL) { - Page * tmp = ch->lru->getStaleAndRemove(ch->lru); + Page * tmp; + int spin_count = 0; + while(!(tmp = ch->lru->getStaleAndRemove(ch->lru))) { + spin_count++; + if(needFlush(bm)) { + // exponential backoff -- don't test exponential backoff flag + // here. LRU should only return null if we're in big trouble, + // or if the flag is set to true. + + // wake writeback thread + pthread_cond_signal(&ch->needFree); + + // sleep + struct timespec ts = { 0, (1024 * 1024) << (spin_count > 3 ? 3 : spin_count) }; + nanosleep(&ts, 0); + if(spin_count > 9) { + static int warned = 0; + if(!warned) { + fprintf(stderr, "Warning: lots of spinning attempting to get page from LRU\n"); + warned = 1; + } + } + } + } hashtable_bucket_handle_t h; tls->p = hashtable_remove_begin(ch->ht, tmp->id, &h); if(tls->p) { @@ -265,12 +288,14 @@ static inline stasis_buffer_concurrent_hash_tls_t * populateTLS(stasis_buffer_ma static void chReleasePage(stasis_buffer_manager_t * bm, Page * p); -static Page * chLoadPageImpl_helper(stasis_buffer_manager_t* bm, int xid, const pageid_t pageid, int uninitialized, pagetype_t type) { +static Page * chLoadPageImpl_helper(stasis_buffer_manager_t* bm, int xid, stasis_page_handle_t *ph, const pageid_t pageid, int uninitialized, pagetype_t type) { stasis_buffer_concurrent_hash_t *ch = bm->impl; stasis_buffer_concurrent_hash_tls_t *tls = populateTLS(bm); hashtable_bucket_handle_t h; Page * p = 0; + ph = ph ? ph : ch->page_handle; + do { if(p) { // the last time around pinned the wrong page! chReleasePage(bm, p); @@ -304,7 +329,7 @@ static Page * chLoadPageImpl_helper(stasis_buffer_manager_t* bm, int xid, const type = UNINITIALIZED_PAGE; stasis_page_loaded(p, UNINITIALIZED_PAGE); } else { - ch->page_handle->read(ch->page_handle, p, type); + ph->read(ph, p, type); } unlock(p->loadlatch); @@ -322,10 +347,10 @@ static Page * chLoadPageImpl_helper(stasis_buffer_manager_t* bm, int xid, const return p; } static Page * chLoadPageImpl(stasis_buffer_manager_t *bm, stasis_buffer_manager_handle_t *h, int xid, const pageid_t pageid, pagetype_t type) { - return chLoadPageImpl_helper(bm, xid, pageid, 0, type); + return chLoadPageImpl_helper(bm, xid, (stasis_page_handle_t*)h, pageid, 0, type); } static Page * chLoadUninitPageImpl(stasis_buffer_manager_t *bm, int xid, const pageid_t pageid) { - return chLoadPageImpl_helper(bm, xid,pageid,1,UNKNOWN_TYPE_PAGE); // 1 means dont care about preimage of page. + return chLoadPageImpl_helper(bm, xid, 0, pageid,1,UNKNOWN_TYPE_PAGE); // 1 means dont care about preimage of page. } static void chReleasePage(stasis_buffer_manager_t * bm, Page * p) { stasis_buffer_concurrent_hash_t * ch = bm->impl; @@ -369,11 +394,12 @@ static void chBufDeinit(stasis_buffer_manager_t * bm) { chBufDeinitHelper(bm, 0); } static stasis_buffer_manager_handle_t * chOpenHandle(stasis_buffer_manager_t *bm, int is_sequential) { - // no-op - return (void*)1; + stasis_buffer_concurrent_hash_t * bh = bm->impl; + return (stasis_buffer_manager_handle_t*)bh->page_handle->dup(bh->page_handle, is_sequential); } static int chCloseHandle(stasis_buffer_manager_t *bm, stasis_buffer_manager_handle_t* h) { - return 0; // no error + ((stasis_page_handle_t*)h)->close((stasis_page_handle_t*)h); + return 0; } stasis_buffer_manager_t* stasis_buffer_manager_concurrent_hash_open(stasis_page_handle_t * h, stasis_log_t * log, stasis_dirty_page_table_t * dpt) { diff --git a/src/stasis/flags.c b/src/stasis/flags.c index 285200c..14fcc4a 100644 --- a/src/stasis/flags.c +++ b/src/stasis/flags.c @@ -33,6 +33,11 @@ int stasis_buffer_manager_hint_writes_are_sequential = STASIS_BUFFER_MANAGER_HIN int stasis_buffer_manager_hint_writes_are_sequential = 0; #endif +#ifdef STASIS_BUFFER_MANAGER_DEBUG_STRESS_LATCHING +int stasis_buffer_manager_debug_stress_latching = STASIS_BUFFER_MANAGER_DEBUG_STRESS_LATCHING; +#else +int stasis_buffer_manager_debug_stress_latching = 0; +#endif #ifdef BUFFER_MANAGER_O_DIRECT int bufferManagerO_DIRECT = BUFFER_MANAGER_O_DIRECT; diff --git a/src/stasis/replacementPolicy/concurrentWrapper.c b/src/stasis/replacementPolicy/concurrentWrapper.c index 851c586..85596b7 100644 --- a/src/stasis/replacementPolicy/concurrentWrapper.c +++ b/src/stasis/replacementPolicy/concurrentWrapper.c @@ -80,19 +80,15 @@ static void* cwGetStaleHelper(struct replacementPolicy* impl, void*(*func)(struc pthread_mutex_unlock(&rp->mut[bucket]); bucket = hash_mod(rp, bucket + 1); spin_count++; - if(stasis_replacement_policy_concurrent_wrapper_exponential_backoff && + if((!ret) && + stasis_replacement_policy_concurrent_wrapper_exponential_backoff && spin_count > 1) { - // exponential backoff - // 3 -> we wait no more than 8msec - struct timespec ts = { 0, 1024 * 1024 << (spin_count > 3 ? 3 : spin_count) }; - nanosleep(&ts,0); - if(spin_count > 9) { - static int warned = 0; - if(!warned) { - fprintf(stderr, "Warning: lots of spinning in concurrent wrapper\n"); - warned = 1; - } + + if(bucket != oldbucket) { + pthread_setspecific(rp->next_bucket, (void*) bucket); } + + return 0; } } if(ret == 0) { // should be extremely rare. diff --git a/src/stasis/replacementPolicy/lruFast.c b/src/stasis/replacementPolicy/lruFast.c index ab2c318..c779a33 100644 --- a/src/stasis/replacementPolicy/lruFast.c +++ b/src/stasis/replacementPolicy/lruFast.c @@ -54,12 +54,17 @@ static void stasis_lru_fast_insert(struct replacementPolicy* r, void * p) { lruFast * l = r->impl; (*l->derefCount(p))--; assert(*l->derefCount(p) >= 0); - if(stasis_buffer_manager_hint_writes_are_sequential) { + if(stasis_buffer_manager_hint_writes_are_sequential && + !stasis_buffer_manager_debug_stress_latching) { // We are in sequential mode, and only want to evict pages from // the writeback thread. Therefore, it would be a waste of time // to put this dirty page in the LRU. (Also, we know that, when // the page is evicted, it will be taken out of LRU, and put back in. + // If we're *trying* to stress the buffer manager latches, etc, then + // insert the dirty page. This will cause the buffer manager to perform + // all sorts of useless (and otherwise rare) latching operations. + if(!*l->derefCount(p) && !((Page*)p)->dirty) { int err = LL_ENTRY(push)(l->lru, p); assert(!err); diff --git a/stasis/flags.h b/stasis/flags.h index b93b3f8..33b423c 100644 --- a/stasis/flags.h +++ b/stasis/flags.h @@ -28,6 +28,14 @@ extern pageid_t stasis_buffer_manager_size; so that we can get good concurrency on our writes. */ extern int stasis_buffer_manager_hint_writes_are_sequential; +/** + If this is true, then disable some optimizations associated with sequential + write mode. This will needlessly burn CPU by inserting dirty pages into + the LRU. In sequential write mode, these dirty pages will cause populateTLS + to loop excessively, excercising all sorts of extremely rare thread + synchronization schedules. + */ +extern int stasis_buffer_manager_debug_stress_latching; /** This determines which type of file handle the buffer manager will use.