add tryToWriteBackPage. concurrentBufferManager is now compatible with FORCE mode transactions.
This commit is contained in:
parent
ee6d0768d3
commit
ffa4f18283
5 changed files with 40 additions and 6 deletions
|
@ -526,6 +526,7 @@ stasis_buffer_manager_t* stasis_buffer_manager_hash_open(stasis_page_handle_t *
|
|||
bm->getCachedPageImpl = bhGetCachedPage;
|
||||
bm->releasePageImpl = bhReleasePage;
|
||||
bm->writeBackPage = bhWriteBackPage;
|
||||
bm->tryToWriteBackPage = bhWriteBackPage;
|
||||
bm->forcePages = bhForcePages;
|
||||
bm->forcePageRange = bhForcePageRange;
|
||||
bm->stasis_buffer_manager_close = bhBufDeinit;
|
||||
|
|
|
@ -61,15 +61,25 @@ static inline int needFlush(stasis_buffer_manager_t * bm) {
|
|||
return count > needed;
|
||||
}
|
||||
|
||||
static int chWriteBackPage(stasis_buffer_manager_t* bm, pageid_t pageid) {
|
||||
static int chWriteBackPage_helper(stasis_buffer_manager_t* bm, pageid_t pageid, int is_hint) {
|
||||
stasis_buffer_concurrent_hash_t *ch = bm->impl;
|
||||
hashtable_bucket_handle_t h;
|
||||
Page * p = hashtable_lookup_lock(ch->ht, pageid, &h);
|
||||
int ret = 0;
|
||||
if(!p) {
|
||||
ret = ENOENT;
|
||||
} else if(!trywritelock(p->loadlatch,0)) {
|
||||
ret = EBUSY;
|
||||
} else {
|
||||
if(is_hint) {
|
||||
if(!trywritelock(p->loadlatch,0)) {
|
||||
ret = EBUSY;
|
||||
}
|
||||
} else {
|
||||
// Uggh. With the current design, it's possible that the trywritelock will block on the writeback thread.
|
||||
// That leaves us with few options, so we expose two sets of semantics up to the caller.
|
||||
|
||||
// Since this isn't a hint, the page is not pinned. Therefore, the following will only deadlock if the caller is buggy.
|
||||
writelock(p->loadlatch,0);
|
||||
}
|
||||
}
|
||||
hashtable_unlock(&h);
|
||||
if(ret) { return ret; }
|
||||
|
@ -78,6 +88,12 @@ static int chWriteBackPage(stasis_buffer_manager_t* bm, pageid_t pageid) {
|
|||
unlock(p->loadlatch);
|
||||
return 0;
|
||||
}
|
||||
static int chWriteBackPage(stasis_buffer_manager_t* bm, pageid_t pageid) {
|
||||
return chWriteBackPage_helper(bm,pageid,0); // not hint; for correctness. Block (deadlock?) on contention.
|
||||
}
|
||||
static int chTryToWriteBackPage(stasis_buffer_manager_t* bm, pageid_t pageid) {
|
||||
return chWriteBackPage_helper(bm,pageid,1); // just a hint. Return EBUSY on contention.
|
||||
}
|
||||
static void * writeBackWorker(void * bmp) {
|
||||
stasis_buffer_manager_t* bm = bmp;
|
||||
stasis_buffer_concurrent_hash_t * ch = bm->impl;
|
||||
|
@ -295,6 +311,7 @@ stasis_buffer_manager_t* stasis_buffer_manager_concurrent_hash_open(stasis_page_
|
|||
bm->getCachedPageImpl = chGetCachedPage;
|
||||
bm->releasePageImpl = chReleasePage;
|
||||
bm->writeBackPage = chWriteBackPage;
|
||||
bm->tryToWriteBackPage = chTryToWriteBackPage;
|
||||
bm->forcePages = chForcePages;
|
||||
bm->forcePageRange = chForcePageRange;
|
||||
bm->stasis_buffer_manager_close = chBufDeinit;
|
||||
|
|
|
@ -92,6 +92,7 @@ stasis_buffer_manager_t * stasis_buffer_manager_mem_array_open () {
|
|||
bm->prefetchPages = NULL;
|
||||
bm->getCachedPageImpl = paGetCachedPage;
|
||||
bm->writeBackPage = paWriteBackPage;
|
||||
bm->tryToWriteBackPage = paWriteBackPage;
|
||||
bm->forcePages = paForcePages;
|
||||
bm->forcePageRange = paForcePageRange;
|
||||
bm->stasis_buffer_manager_close = paBufDeinit;
|
||||
|
|
|
@ -131,7 +131,7 @@ int stasis_dirty_page_table_flush(stasis_dirty_page_table_t * dirtyPages) {
|
|||
if(off == stride) {
|
||||
pthread_mutex_unlock(&dirtyPages->mutex);
|
||||
for(pageid_t i = 0; i < off; i++) {
|
||||
dirtyPages->bufferManager->writeBackPage(dirtyPages->bufferManager, vals[i]);
|
||||
dirtyPages->bufferManager->tryToWriteBackPage(dirtyPages->bufferManager, vals[i]);
|
||||
}
|
||||
off = 0;
|
||||
strides++;
|
||||
|
@ -140,7 +140,7 @@ int stasis_dirty_page_table_flush(stasis_dirty_page_table_t * dirtyPages) {
|
|||
}
|
||||
pthread_mutex_unlock(&dirtyPages->mutex);
|
||||
for(int i = 0; i < off; i++) {
|
||||
dirtyPages->bufferManager->writeBackPage(dirtyPages->bufferManager, vals[i]);
|
||||
dirtyPages->bufferManager->tryToWriteBackPage(dirtyPages->bufferManager, vals[i]);
|
||||
}
|
||||
pthread_mutex_lock(&dirtyPages->mutex);
|
||||
dirtyPages->flushing = 0;
|
||||
|
@ -202,8 +202,12 @@ void stasis_dirty_page_table_flush_range(stasis_dirty_page_table_t * dirtyPages,
|
|||
pthread_mutex_unlock(&dirtyPages->mutex);
|
||||
|
||||
for(pageid_t i = 0; i < n; i++) {
|
||||
if(stop) {
|
||||
int err = dirtyPages->bufferManager->writeBackPage(dirtyPages->bufferManager, staleDirtyPages[i]);
|
||||
if(stop && (err == EBUSY)) { abort(); /*api violation!*/ }
|
||||
if(err == EBUSY) { abort(); /*api violation!*/ }
|
||||
} else {
|
||||
dirtyPages->bufferManager->tryToWriteBackPage(dirtyPages->bufferManager, staleDirtyPages[i]);
|
||||
}
|
||||
}
|
||||
free(staleDirtyPages);
|
||||
}
|
||||
|
|
|
@ -134,9 +134,20 @@ struct stasis_buffer_manager_t {
|
|||
storage. For compatibility, such buffer managers should ignore
|
||||
this call.)
|
||||
|
||||
This function may block instead of returning EBUSY. Therefore, if the page
|
||||
may be pinned by some other (or this thread), call tryToWriteBackPage instead.
|
||||
|
||||
@return 0 on success, ENOENT if the page is not in cache, and EBUSY if the page is pinned.
|
||||
*/
|
||||
int (*writeBackPage)(stasis_buffer_manager_t*, pageid_t p);
|
||||
/**
|
||||
* This function is like writeBackPage, except that it will never block due
|
||||
* because the page is pinned. However, it may sometimes fail to write the page,
|
||||
* and instead return EBUSY, even if the page is not pinned. Therefore, this
|
||||
* method is appropriate for performance hints and log truncation, but not
|
||||
* FORCE mode transactions.
|
||||
*/
|
||||
int (*tryToWriteBackPage)(stasis_buffer_manager_t*, pageid_t p);
|
||||
/**
|
||||
Force any written back pages to disk.
|
||||
|
||||
|
|
Loading…
Reference in a new issue