diff --git a/configure.in b/configure.in index 8c3159e..eb96073 100644 --- a/configure.in +++ b/configure.in @@ -209,7 +209,7 @@ AC_FUNC_MEMCMP AC_FUNC_REALLOC AC_FUNC_STAT AC_FUNC_VPRINTF -AC_CHECK_FUNCS([bzero fdatasync ftruncate getcwd gettimeofday inet_ntoa localtime_r memmove memset mkdir posix_memalign pow powl socket sqrt strchr strdup strerror strrchr strstr strtol strtoul tcase_set_timeout]) +AC_CHECK_FUNCS([bzero fdatasync floor ftruncate getcwd gettimeofday inet_ntoa localtime_r memmove memset mkdir posix_memalign pow powl socket sqrt strchr strdup strerror strrchr strstr strtol strtoul sync_file_range tcase_set_timeout]) #AC_CONFIG_LIBMYSQLD diff --git a/src/stasis/bufferManager.c b/src/stasis/bufferManager.c index 139612f..ff4eba0 100644 --- a/src/stasis/bufferManager.c +++ b/src/stasis/bufferManager.c @@ -120,6 +120,7 @@ static int bufManBufInit() { loadPageImpl = bufManLoadPage; writeBackPage = pageWrite; forcePages = forcePageFile; + forcePageRange = forceRangePageFile; bufDeinit = bufManBufDeinit; simulateBufferManagerCrash = bufManSimulateBufferManagerCrash; @@ -497,6 +498,7 @@ Page * (*loadPageImpl)(int xid, int pageid) = 0; void (*releasePageImpl)(Page * p) = 0; void (*writeBackPage)(Page * p) = 0; void (*forcePages)() = 0; +void (*forcePageRange)() = 0; void (*bufDeinit)() = 0; void (*simulateBufferManagerCrash)() = 0; diff --git a/src/stasis/bufferManager/bufferHash.c b/src/stasis/bufferManager/bufferHash.c index e2b3996..e3e002e 100644 --- a/src/stasis/bufferManager/bufferHash.c +++ b/src/stasis/bufferManager/bufferHash.c @@ -267,6 +267,9 @@ static void bhWriteBackPage(Page * p) { static void bhForcePages() { forcePageFile(); } +static void bhForcePageRange() { + forceRangePageFile(); +} static void bhBufDeinit() { running = 0; @@ -325,6 +328,7 @@ void bhBufInit() { releasePageImpl = bhReleasePage; writeBackPage = bhWriteBackPage; forcePages = bhForcePages; + forcePageRange = bhForcePageRange; bufDeinit = bhBufDeinit; simulateBufferManagerCrash = bhSimulateBufferManagerCrash; diff --git a/src/stasis/io/debug.c b/src/stasis/io/debug.c index d179a0b..6ec2f37 100644 --- a/src/stasis/io/debug.c +++ b/src/stasis/io/debug.c @@ -152,6 +152,13 @@ static int debug_force(stasis_handle_t *h) { printf("tid=%9ld retn force(%lx) = %d\n", pthread_self(), (unsigned long)hh, ret); fflush(stdout); return ret; } +static int debug_force_range(stasis_handle_t *h, lsn_t start, lsn_t stop) { + stasis_handle_t * hh = ((debug_impl*)h->impl)->h; + printf("tid=%9ld call force(%lx,%lld,%lld)\n", pthread_self(), (unsigned long)hh, start, stop); fflush(stdout); + int ret = hh->force_range(hh, start, stop); + printf("tid=%9ld retn force(%lx) = %d\n", pthread_self(), (unsigned long)hh, ret); fflush(stdout); + return ret; +} static int debug_truncate_start(stasis_handle_t * h, lsn_t new_start) { stasis_handle_t * hh = ((debug_impl*)h->impl)->h; printf("tid=%9ld call truncate_start(%lx, %lld)\n", pthread_self(), (unsigned long)hh, new_start); fflush(stdout); @@ -175,6 +182,7 @@ struct stasis_handle_t debug_func = { .read_buffer = debug_read_buffer, .release_read_buffer = debug_release_read_buffer, .force = debug_force, + .force_range = debug_force_range, .truncate_start = debug_truncate_start, .error = 0 }; diff --git a/src/stasis/io/file.c b/src/stasis/io/file.c index 8d5e315..e78cc60 100644 --- a/src/stasis/io/file.c +++ b/src/stasis/io/file.c @@ -1,4 +1,8 @@ #include +#ifdef HAVE_SYNC_FILE_RANGE +#define _GNU_SOURCE +#endif +#include #include #include #include @@ -11,7 +15,6 @@ #include #include #include -#include /** @file */ @@ -39,6 +42,7 @@ static int updateEOF(stasis_handle_t * h) { static int file_num_copies(stasis_handle_t * h) { return 0; } static int file_num_copies_buffer(stasis_handle_t * h) { return 0; } static int file_force(stasis_handle_t *h); +static int file_force_range(stasis_handle_t *h, lsn_t start, lsn_t stop); static int file_close(stasis_handle_t * h) { file_force(h); @@ -415,7 +419,17 @@ static int file_force(stasis_handle_t * h) { pthread_mutex_lock(&impl->mut); // must latch because of truncate... :( int fd = impl->fd; pthread_mutex_unlock(&impl->mut); - + { + static int warned = 0; + if(!warned) { + printf("Warning: There is a race condition between force() and " + " truncate() in file.c (This shouldn't matter in practice, " + "as the logger hasn't moved over to use file.c yet.\n"); + warned = 1; + } + } + // XXX there is a race here; the file handle could have been invalidated + // by truncate. #ifdef HAVE_FDATASYNC DEBUG("file_force() is calling fdatasync()\n"); fdatasync(fd); @@ -428,7 +442,52 @@ static int file_force(stasis_handle_t * h) { } return 0; } - +static int file_force_range(stasis_handle_t *h, lsn_t start, lsn_t stop) { + file_impl * impl = h->impl; + int ret = 0; + if(!impl->file_flags & O_SYNC) { + // not opened synchronously; we need to explicitly sync. + pthread_mutex_lock(&impl->mut); + int fd = impl->fd; + lsn_t off = impl->start_pos; + pthread_mutex_unlock(&impl->mut); + { + static int warned = 0; + if(!warned) { + printf("Warning: There is a race condition between force_range() and " + " truncate() in file.c (This shouldn't matter in practice, " + "as the logger hasn't moved over to use file.c yet.\n"); + warned = 1; + } + } + //#ifdef HAVE_F_SYNC_RANGE +#ifdef HAVE_SYNC_FILE_RANGE + printf("Calling sync_file_range\n"); + ret = sync_file_range(fd, start-off, (stop-start), + SYNC_FILE_RANGE_WAIT_BEFORE | + SYNC_FILE_RANGE_WRITE | + SYNC_FILE_RANGE_WAIT_AFTER); + if(ret) { + int error = errno; + assert(ret == -1); + // With the possible exceptions of ENOMEM and ENOSPACE, all of the sync + // errors are unrecoverable. + h->error = EBADF; + ret = error; + } +#else +#ifdef HAVE_FDATASYNC + printf("file_force_range() is calling fdatasync()\n"); + fdatasync(fd); +#else + printf("file_force_range() is calling fsync()\n"); + fsync(fd); +#endif + ret = 0; +#endif + } + return ret; +} static int file_truncate_start(stasis_handle_t * h, lsn_t new_start) { file_impl * impl = h->impl; pthread_mutex_lock(&impl->mut); @@ -508,6 +567,7 @@ struct stasis_handle_t file_func = { .read_buffer = file_read_buffer, .release_read_buffer = file_release_read_buffer, .force = file_force, + .force_range = file_force_range, .truncate_start = file_truncate_start, .error = 0 }; diff --git a/src/stasis/io/memory.c b/src/stasis/io/memory.c index 8c48ace..0f2b865 100644 --- a/src/stasis/io/memory.c +++ b/src/stasis/io/memory.c @@ -215,6 +215,9 @@ static int mem_read(stasis_handle_t * h, static int mem_force(stasis_handle_t *h) { return 0; } +static int mem_force_range(stasis_handle_t *h,lsn_t start, lsn_t stop) { + return 0; +} static int mem_truncate_start(stasis_handle_t * h, lsn_t new_start) { mem_impl* impl = (mem_impl*) h->impl; pthread_mutex_lock(&(impl->mut)); @@ -255,6 +258,7 @@ struct stasis_handle_t mem_func = { .read_buffer = mem_read_buffer, .release_read_buffer = mem_release_read_buffer, .force = mem_force, + .force_range = mem_force_range, .truncate_start = mem_truncate_start, .error = 0 }; diff --git a/src/stasis/io/non_blocking.c b/src/stasis/io/non_blocking.c index c98951c..7c10c8f 100644 --- a/src/stasis/io/non_blocking.c +++ b/src/stasis/io/non_blocking.c @@ -524,12 +524,16 @@ static int nbw_read(stasis_handle_t * h, } return ret; } -static int nbw_force(stasis_handle_t * h) { +static int nbw_force_range_impl(stasis_handle_t * h, lsn_t start, lsn_t stop) { nbw_impl * impl = h->impl; - pthread_mutex_lock(&impl->mut); - const tree_node * n = RB_ENTRY(min)(impl->fast_handles); + // pthread_mutex_lock(&impl->mut); + tree_node scratch; + scratch.start_pos = start; + scratch.end_pos = start+1; + const tree_node * n = RB_ENTRY(lookup)(RB_LUGTEQ,&scratch,impl->fast_handles); // min)(impl->fast_handles); int blocked = 0; while(n) { + if(n->start_pos >= stop) { break; } if(n->dirty) { // cast strips const ((tree_node*)n)->dirty = NEEDS_FORCE; @@ -553,15 +557,31 @@ static int nbw_force(stasis_handle_t * h) { if(impl->slow_force_once) { if(impl->all_slow_handle_count) { stasis_handle_t * h = impl->all_slow_handles[0]; - ret = h->force(h); + ret = h->force_range(h, start, stop); } } else { for(int i = 0; i < impl->all_slow_handle_count; i++) { stasis_handle_t * h = impl->all_slow_handles[i]; - int tmpret = h->force(h); + int tmpret = h->force_range(h, start, stop); if(tmpret) { ret = tmpret; } } } + // pthread_mutex_unlock(&impl->mut); + return ret; +} +static int nbw_force(stasis_handle_t * h) { + nbw_impl * impl = h->impl; + pthread_mutex_lock(&impl->mut); + int ret = nbw_force_range_impl(h, impl->start_pos, impl->end_pos); + pthread_mutex_unlock(&impl->mut); + return ret; +} +static int nbw_force_range(stasis_handle_t * h, + off_t start, + off_t stop) { + nbw_impl * impl = h->impl; + pthread_mutex_lock(&impl->mut); + int ret = nbw_force_range_impl(h, start, stop); pthread_mutex_unlock(&impl->mut); return ret; } @@ -596,6 +616,7 @@ struct stasis_handle_t nbw_func = { .read_buffer = nbw_read_buffer, .release_read_buffer = nbw_release_read_buffer, .force = nbw_force, + .force_range = nbw_force_range, .truncate_start = nbw_truncate_start, .error = 0 }; diff --git a/src/stasis/io/pfile.c b/src/stasis/io/pfile.c index 057b458..58c5ffe 100644 --- a/src/stasis/io/pfile.c +++ b/src/stasis/io/pfile.c @@ -1,9 +1,13 @@ #include #define _XOPEN_SOURCE 500 +#ifdef HAVE_SYNC_FILE_RANGE +#define _GNU_SOURCE +#endif +#include + #include #include #include -#include #include #include #include @@ -378,6 +382,34 @@ static int pfile_force(stasis_handle_t *h) { } return 0; } +static int pfile_force_range(stasis_handle_t *h, lsn_t start, lsn_t stop) { + pfile_impl * impl = h->impl; +#ifdef HAVE_SYNC_FILE_RANGE + printf("Calling sync_file_range\n"); + int ret = sync_file_range(impl->fd, start-impl->start_pos, (stop-start), + SYNC_FILE_RANGE_WAIT_BEFORE | + SYNC_FILE_RANGE_WRITE | + SYNC_FILE_RANGE_WAIT_AFTER); + if(ret) { + int error = errno; + assert(ret == -1); + // With the possible exceptions of ENOMEM and ENOSPACE, all of the sync + // errors are unrecoverable. + h->error = EBADF; + ret = error; + } +#else +#ifdef HAVE_FDATASYNC + printf("file_force_range() is calling fdatasync()\n"); + fdatasync(fd); +#else + printf("file_force_range() is calling fsync()\n"); + fsync(fd); +#endif + int ret = 0; +#endif + return ret; +} static int pfile_truncate_start(stasis_handle_t *h, lsn_t new_start) { static int truncate_warned = 0; if (!truncate_warned) { @@ -403,6 +435,7 @@ struct stasis_handle_t pfile_func = { .read_buffer = pfile_read_buffer, .release_read_buffer = pfile_release_read_buffer, .force = pfile_force, + .force_range = pfile_force_range, .truncate_start = pfile_truncate_start, .error = 0 }; diff --git a/src/stasis/operations/lsmTable.h b/src/stasis/operations/lsmTable.h index f34991c..ce5cfc2 100644 --- a/src/stasis/operations/lsmTable.h +++ b/src/stasis/operations/lsmTable.h @@ -6,6 +6,7 @@ #include #include "lsmIterators.h" +#include namespace rose { /** @@ -24,6 +25,7 @@ namespace rose { int worker_id; pageid_t(*pageAlloc)(int,void*); void *pageAllocState; + void *oldAllocState; pthread_mutex_t * block_ready_mut; pthread_cond_t * in_block_needed_cond; pthread_cond_t * out_block_needed_cond; @@ -91,6 +93,7 @@ namespace rose { // experiments, but 450 bytes overhead per tuple is insane! static const int RB_TREE_OVERHEAD = 400; // = 450; static const pageid_t MEM_SIZE = 1000 * 1000 * 1000; + // static const pageid_t MEM_SIZE = 100 * 1000; // How many pages should we try to fill with the first C1 merge? static const int R = 3; // XXX set this as low as possible (for dynamic setting. = sqrt(C2 size / C0 size)) static const pageid_t START_SIZE = MEM_SIZE * R /( PAGE_SIZE * 4); //10 * 1000; /*10 **/ //1000; // XXX 4 is fudge related to RB overhead. @@ -115,6 +118,8 @@ namespace rose { int xid = Tbegin(); // Initialize tree with an empty tree. // XXX hardcodes ITERA's type: + // We assume that the caller set pageAllocState for us; oldPageAllocState + // shouldn't be set (it should be NULLRID) typename ITERA::handle tree = new typename ITERA::treeIteratorHandle( TlsmCreate(xid, PAGELAYOUT::cmp_id(),a->pageAlloc, @@ -155,7 +160,17 @@ namespace rose { pthread_mutex_unlock(a->block_ready_mut); xid = Tbegin(); - + // XXX hardcodes allocator type. + if(((recordid*)a->oldAllocState)->size != -1) { + // free the tree that we merged against during the last round. + TlsmFree(xid,tree->r_,TlsmRegionDeallocRid,a->oldAllocState); + } + // we're merging against old alloc state this round. + *(recordid*)(a->oldAllocState) = *(recordid*)(a->pageAllocState); + // we're merging into pagealloc state. + *(recordid*)(a->pageAllocState) = Talloc(xid, sizeof(TlsmRegionAllocConf_t)); + Tset(xid, *(recordid*)(a->pageAllocState), + &LSM_REGION_ALLOC_STATIC_INITIALIZER); tree->r_ = TlsmCreate(xid, PAGELAYOUT::cmp_id(),a->pageAlloc, a->pageAllocState,PAGELAYOUT::FMT::TUP::sizeofBytes()); @@ -189,12 +204,8 @@ namespace rose { delete taEnd; delete tbEnd; - gettimeofday(&stop_tv,0); - // TlsmFree(wait_queue[0]) /// XXX Need to implement (de)allocation! - // TlsmFree(wait_queue[1]) - merge_count++; double wait_elapsed = tv_to_double(wait_tv) - tv_to_double(start_tv); @@ -252,6 +263,17 @@ namespace rose { // We don't want to ever look at the one we just handed upstream... // We could wait for an in tree to be ready, and then pass it directly // to compress data (to avoid all those merging comparisons...) + + // old alloc state contains the tree that we used as input for this merge... we can still free it + + // XXX storage leak; upstream is going to have to free this somehow... + *(recordid*)(a->pageAllocState) = NULLRID; + + // create a new allocator. + *(recordid*)(a->pageAllocState) = Talloc(xid, sizeof(TlsmRegionAllocConf_t)); + Tset(xid, *(recordid*)(a->pageAllocState), + &LSM_REGION_ALLOC_STATIC_INITIALIZER); + tree->r_ = TlsmCreate(xid, PAGELAYOUT::cmp_id(),a->pageAlloc, a->pageAllocState,PAGELAYOUT::FMT::TUP::sizeofBytes()); @@ -272,8 +294,10 @@ namespace rose { typedef struct { recordid bigTree; recordid bigTreeAllocState; // this is probably the head of an arraylist of regions used by the tree... + recordid oldBigTreeAllocState; // this is probably the head of an arraylist of regions used by the tree... recordid mediumTree; recordid mediumTreeAllocState; + recordid oldMediumTreeAllocState; epoch_t beginning; epoch_t end; } lsmTableHeader_t; @@ -287,11 +311,13 @@ namespace rose { recordid ret = Talloc(xid, sizeof(lsmTableHeader_t)); lsmTableHeader_t h; + h.oldBigTreeAllocState = NULLRID; h.bigTreeAllocState = Talloc(xid,sizeof(TlsmRegionAllocConf_t)); Tset(xid,h.bigTreeAllocState,&LSM_REGION_ALLOC_STATIC_INITIALIZER); h.bigTree = TlsmCreate(xid, PAGELAYOUT::cmp_id(), TlsmRegionAllocRid,&h.bigTreeAllocState, PAGELAYOUT::FMT::TUP::sizeofBytes()); + h.oldMediumTreeAllocState = NULLRID; h.mediumTreeAllocState = Talloc(xid,sizeof(TlsmRegionAllocConf_t)); Tset(xid,h.mediumTreeAllocState,&LSM_REGION_ALLOC_STATIC_INITIALIZER); h.mediumTree = TlsmCreate(xid, PAGELAYOUT::cmp_id(), @@ -401,6 +427,8 @@ namespace rose { recordid * ridp = (recordid*)malloc(sizeof(recordid)); *ridp = h.bigTreeAllocState; + recordid * oldridp = (recordid*)malloc(sizeof(recordid)); + *oldridp = NULLRID; ret->args1 = (merge_args*)malloc(sizeof(merge_args)); merge_args tmpargs1 = @@ -408,6 +436,7 @@ namespace rose { 1, TlsmRegionAllocRid, ridp, + oldridp, block_ready_mut, block1_needed_cond, block2_needed_cond, @@ -427,6 +456,8 @@ namespace rose { ridp = (recordid*)malloc(sizeof(recordid)); *ridp = h.mediumTreeAllocState; + oldridp = (recordid*)malloc(sizeof(recordid)); + *oldridp = NULLRID; ret->args2 = (merge_args*)malloc(sizeof(merge_args)); merge_args tmpargs2 = @@ -434,6 +465,7 @@ namespace rose { 2, TlsmRegionAllocRid, ridp, + oldridp, block_ready_mut, block0_needed_cond, block1_needed_cond, @@ -557,16 +589,17 @@ namespace rose { byte * arry = val.toByteArray(); typename PAGELAYOUT::FMT::TUP * r = 0; - r = getRecordHelper(xid, h->args2->my_tree->r_, val, scratch, arry); - if(r) { pthread_mutex_unlock(h->mut); return r; } - + if(h->args2->my_tree) { + r = getRecordHelper(xid, h->args2->my_tree->r_, val, scratch, arry); + if(r) { pthread_mutex_unlock(h->mut); return r; } + } DEBUG("Not in first my_tree {%lld}\n", h->args2->my_tree->r_.size); if(*h->args1->in_tree) { r = getRecordHelper(xid, (**h->args1->in_tree)->r_, val, scratch, arry); if(r) { pthread_mutex_unlock(h->mut); return r; } } else { - DEBUG("no tree"); + DEBUG("no second in_tree"); } DEBUG("Not in second in_tree\n"); diff --git a/src/stasis/operations/lsmTree.c b/src/stasis/operations/lsmTree.c index ca09089..80d83f6 100644 --- a/src/stasis/operations/lsmTree.c +++ b/src/stasis/operations/lsmTree.c @@ -11,19 +11,46 @@ static lsm_comparator_t comparators[MAX_LSM_COMPARATORS]; static lsm_page_initializer_t initializers[MAX_LSM_PAGE_INITIALIZERS]; TlsmRegionAllocConf_t LSM_REGION_ALLOC_STATIC_INITIALIZER = - { -1, -1, 1000 }; + { {0,0,-1}, 0, -1, -1, 1000 }; pageid_t TlsmRegionAlloc(int xid, void *conf) { - TlsmRegionAllocConf_t * a = (TlsmRegionAllocConf_t*)conf; + TlsmRegionAllocConf_t* a = (TlsmRegionAllocConf_t*)conf; if(a->nextPage == a->endOfRegion) { + if(a->regionList.size == -1) { + a->regionList = TarrayListAlloc(xid, 1, 4, sizeof(pageid_t)); + a->regionCount = 0; + } + TarrayListExtend(xid,a->regionList,1); + a->regionList.slot = a->regionCount; + DEBUG("region lst slot %lld\n",a->regionList.slot); + a->regionCount++; + DEBUG("region count %lld\n",a->regionCount); a->nextPage = TregionAlloc(xid, a->regionSize,0); + DEBUG("next page %lld\n",a->nextPage); a->endOfRegion = a->nextPage + a->regionSize; + Tset(xid,a->regionList,&a->nextPage); + DEBUG("next page %lld\n",a->nextPage); } + DEBUG("%lld ?= %lld\n", a->nextPage,a->endOfRegion); pageid_t ret = a->nextPage; + DEBUG("ret %lld\n",ret); (a->nextPage)++; return ret; } +void TlsmRegionDeallocRid(int xid, void *conf) { + recordid rid = *(recordid*)conf; + TlsmRegionAllocConf_t a; + Tread(xid,rid,&a); + // TlsmRegionAllocConf_t* a = (TlsmRegionAllocConf_t*)conf; + for(int i = 0; i < a.regionCount; i++) { + a.regionList.slot = i; + pageid_t pid; + Tread(xid,a.regionList,&pid); + TregionDealloc(xid,pid); + } +} + pageid_t TlsmRegionAllocRid(int xid, void * ridp) { recordid rid = *(recordid*)ridp; TlsmRegionAllocConf_t conf; @@ -575,6 +602,13 @@ recordid TlsmAppendPage(int xid, recordid tree, return ret; } +void TlsmFree(int xid, recordid tree, lsm_page_deallocator_t dealloc, + void *allocator_state) { + // Tdealloc(xid,tree); + dealloc(xid,allocator_state); + // XXX fishy shouldn't caller do this? + Tdealloc(xid, *(recordid*)allocator_state); +} static pageid_t lsmLookup(int xid, Page *node, int depth, const byte *key, size_t keySize, lsm_comparator_t cmp) { diff --git a/src/stasis/page/compression/for-impl.h b/src/stasis/page/compression/for-impl.h index 1b4fc6f..dc38dbe 100644 --- a/src/stasis/page/compression/for-impl.h +++ b/src/stasis/page/compression/for-impl.h @@ -124,7 +124,6 @@ For::recordFind(int xid, slot_index_t start, slot_index_t stop, } } } - assert(ret); //XXX return ret; } } // namespace rose diff --git a/src/stasis/pageFile.c b/src/stasis/pageFile.c index fdbd252..a3530f1 100644 --- a/src/stasis/pageFile.c +++ b/src/stasis/pageFile.c @@ -28,6 +28,7 @@ static int stable = -1; static pthread_mutex_t stable_mutex; static void pfForcePageFile(); static void pfClosePageFile(); +static void pfForceRangePageFile(lsn_t start, lsn_t stop) ; inline static pageid_t myLseekNoLock(int f, pageid_t offset, int whence); static int oldOffset = -1; @@ -134,6 +135,7 @@ void openPageFile() { pageRead = pfPageRead; pageWrite = pfPageWrite; forcePageFile = pfForcePageFile; + forceRangePageFile = pfForceRangePageFile; closePageFile = pfClosePageFile; DEBUG("Opening storefile.\n"); @@ -170,6 +172,23 @@ static void pfForcePageFile() { } } +static void pfForceRangePageFile(lsn_t start, lsn_t stop) { + if(pageFile_isDurable) { +#ifdef HAVE_SYNC_FILE_RANGE + int ret = sync_file_range(stable, start, stop, + SYNC_FILE_RANGE_WAIT_BEFORE | + SYNC_FILE_RANGE_WRITE | + SYNC_FILE_RANGE_WAIT_AFTER); + assert(!ret); +#else +#ifdef HAVE_FDATASYNC + fdatasync(fd); +#else + fsync(fd); +#endif +#endif + } +} static void pfClosePageFile() { assert(stable != -1); forcePageFile(); diff --git a/src/stasis/pageHandle.c b/src/stasis/pageHandle.c index a5208a0..b1653d9 100644 --- a/src/stasis/pageHandle.c +++ b/src/stasis/pageHandle.c @@ -6,9 +6,10 @@ #include #include #include -void (*pageWrite)(Page * dat); +void (*pageWrite)(Page * dat); void (*pageRead)(Page * ret); void (*forcePageFile)(); +void (*forceRangePageFile)(); void (*closePageFile)(); int printedForceWarning = 0; @@ -57,6 +58,10 @@ static void phForce() { int err = h->force(h); assert(!err); } +static void phForceRange(lsn_t start, lsn_t stop) { + int err = h->force_range(h,start,stop); + assert(!err); +} static void phClose() { int err = h->close(h); DEBUG("Closing pageHandle\n"); @@ -72,6 +77,7 @@ void pageHandleOpen(stasis_handle_t * handle) { pageWrite = phWrite; pageRead = phRead; forcePageFile = phForce; + forceRangePageFile = phForceRange; closePageFile = phClose; h = handle; } diff --git a/src/stasis/truncation.c b/src/stasis/truncation.c index 6eca6e3..8ff5796 100644 --- a/src/stasis/truncation.c +++ b/src/stasis/truncation.c @@ -106,7 +106,31 @@ static void dirtyPages_flush() { } free(staleDirtyPages); } +void dirtyPages_flushRange(pageid_t start, pageid_t stop) { + int * staleDirtyPages = malloc(sizeof(int) * (MAX_BUFFER_SIZE)); + int i; + Page * p = 0; + pthread_mutex_lock(&dirtyPages_mutex); + void *tmp; + i = 0; + for(tmp = pblHtFirst(dirtyPages); tmp; tmp = pblHtNext(dirtyPages)) { + int num = *((int*) pblHtCurrentKey(dirtyPages)); + if(num <= start && num < stop) { + staleDirtyPages[i] = num; + i++; + } + } + staleDirtyPages[i] = -1; + pthread_mutex_unlock(&dirtyPages_mutex); + for(i = 0; i < MAX_BUFFER_SIZE && staleDirtyPages[i] != -1; i++) { + p = loadPage(-1, staleDirtyPages[i]); + writeBackPage(p); + releasePage(p); + } + free(staleDirtyPages); + forcePageRange(start,stop); // XXX +} void dirtyPagesInit() { dirtyPages = pblHtCreate(); } diff --git a/stasis/bufferManager.h b/stasis/bufferManager.h index b8c558d..530ab0f 100644 --- a/stasis/bufferManager.h +++ b/stasis/bufferManager.h @@ -149,7 +149,12 @@ extern void (*writeBackPage)(Page * p); a no-op. */ extern void (*forcePages)(); +/** + Force written back pages that fall within a particular range to disk. + This does not force page that have not been written to with pageWrite(). +*/ +extern void (*forcePageRange)(pageid_t start, pageid_t stop); extern void (*simulateBufferManagerCrash)(); int bufInit(int type); diff --git a/stasis/io/handle.h b/stasis/io/handle.h index b6287d8..5f9e662 100644 --- a/stasis/io/handle.h +++ b/stasis/io/handle.h @@ -199,6 +199,7 @@ typedef struct stasis_handle_t { returned) may or may not be forced to disk. */ int (*force)(struct stasis_handle_t * h); + int (*force_range)(struct stasis_handle_t * h, lsn_t start, lsn_t stop); /** Truncate bytes from the beginning of the file. This is needed by the log manager. diff --git a/stasis/operations/lsmTree.h b/stasis/operations/lsmTree.h index 7ffcfe1..714ae10 100644 --- a/stasis/operations/lsmTree.h +++ b/stasis/operations/lsmTree.h @@ -25,6 +25,7 @@ typedef struct { typedef int(*lsm_comparator_t)(const void* a, const void* b); typedef void*(*lsm_page_initializer_t)(Page *, void *); typedef pageid_t(*lsm_page_allocator_t)(int, void *); +typedef void(*lsm_page_deallocator_t)(int, void *); void lsmTreeRegisterComparator(int id, lsm_comparator_t i); void lsmTreeRegisterPageInitializer(int id, lsm_page_initializer_t i); @@ -32,10 +33,15 @@ void lsmTreeRegisterPageInitializer(int id, lsm_page_initializer_t i); pageid_t TlsmRegionAlloc(int xid, void *conf); pageid_t TlsmRegionAllocRid(int xid, void *conf); typedef struct { + recordid regionList; + pageid_t regionCount; pageid_t nextPage; pageid_t endOfRegion; pageid_t regionSize; } TlsmRegionAllocConf_t; + +void TlsmRegionDeallocRid(int xid, void *conf); + extern TlsmRegionAllocConf_t LSM_REGION_ALLOC_STATIC_INITIALIZER; /** @@ -66,6 +72,8 @@ recordid TlsmAppendPage(int xid, recordid tree, const byte *key, lsm_page_allocator_t allocator, void *allocator_state, long pageid); +void TlsmFree(int xid, recordid tree, lsm_page_deallocator_t dealloc, + void *allocator_state); /** Lookup a leaf page. diff --git a/stasis/pageHandle.h b/stasis/pageHandle.h index 6b31785..ecb3227 100644 --- a/stasis/pageHandle.h +++ b/stasis/pageHandle.h @@ -47,6 +47,7 @@ extern void (*pageRead)(Page * ret); as well...) */ extern void (*forcePageFile)(); +extern void (*forceRangePageFile)(); /** Force the page file to disk, then close it. */