diff --git a/diskTreeComponent.cpp b/diskTreeComponent.cpp index aa2f22e..a9d1f8c 100644 --- a/diskTreeComponent.cpp +++ b/diskTreeComponent.cpp @@ -136,7 +136,6 @@ recordid diskTreeComponent::internalNodes::create(int xid) { recordid ret = { root, 0, 0 }; Page *p = loadPage(xid, ret.page); - writelock(p->rwlatch,0); //initialize root node stasis_page_slotted_initialize_page(p); @@ -162,7 +161,6 @@ recordid diskTreeComponent::internalNodes::create(int xid) { stasis_page_lsn_write(xid, p, internal_node_alloc->get_lsn(xid)); - unlock(p->rwlatch); releasePage(p); root_rec = ret; @@ -195,15 +193,15 @@ recordid diskTreeComponent::internalNodes::appendPage(int xid, recordid tree = root_rec; Page *p = loadPage(xid, tree.page); - writelock(p->rwlatch, 0); tree.slot = DEPTH; tree.size = 0; + readlock(p->rwlatch,0); const indexnode_rec *nr = (const indexnode_rec*)stasis_record_read_begin(xid, p, tree); int64_t depth = *((int64_t*)nr); stasis_record_read_done(xid, p, tree, (const byte*)nr); - + unlock(p->rwlatch); if(lastLeaf == -1) { lastLeaf = findLastLeaf(xid, p, depth); } @@ -212,18 +210,19 @@ recordid diskTreeComponent::internalNodes::appendPage(int xid, if(lastLeaf != tree.page) { lastLeafPage= loadPage(xid, lastLeaf); - writelock(lastLeafPage->rwlatch, 0); } else { lastLeafPage = p; } + writelock(lastLeafPage->rwlatch, 0); + recordid ret = stasis_record_alloc_begin(xid, lastLeafPage, sizeof(indexnode_rec)+keySize); if(ret.size == INVALID_SLOT) { - if(lastLeafPage->id != p->id) { + unlock(lastLeafPage->rwlatch); + if(lastLeafPage->id != p->id) { // is the root the last leaf page? assert(lastLeaf != tree.page); - unlock(lastLeafPage->rwlatch); releasePage(lastLeafPage); // don't need that page anymore... lastLeafPage = 0; } @@ -239,30 +238,43 @@ recordid diskTreeComponent::internalNodes::appendPage(int xid, DEBUG("Need to split root; depth = %d\n", depth); pageid_t child = internal_node_alloc->alloc_extent(xid, 1); - Page *lc = loadPage(xid, child); - writelock(lc->rwlatch,0); - - initializeNodePage(xid, lc); - - //creates a copy of the root page records in the - //newly allocated child page slotid_t numslots = stasis_record_last(xid, p).slot+1; - recordid rid; - rid.page = p->id; - // XXX writelock lc here? no need, since it's not installed in the tree yet - for(rid.slot = FIRST_SLOT; rid.slot < numslots; rid.slot++) { - //read the record from the root page - rid.size = stasis_record_length_read(xid, p, rid); - const indexnode_rec *nr = (const indexnode_rec*)stasis_record_read_begin(xid, p, rid); - recordid cnext = stasis_record_alloc_begin(xid, lc,rid.size); + { + Page *lc = loadPage(xid, child); - assert(rid.slot == cnext.slot); - assert(cnext.size != INVALID_SLOT); + initializeNodePage(xid, lc); - stasis_record_alloc_done(xid, lc, cnext); - stasis_record_write(xid, lc, cnext, (byte*)nr); - stasis_record_read_done(xid, p, rid, (const byte*)nr); - } + //creates a copy of the root page records in the + //newly allocated child page + recordid rid; + rid.page = p->id; + // XXX writelock lc here? no need, since it's not installed in the tree yet + for(rid.slot = FIRST_SLOT; rid.slot < numslots; rid.slot++) { + //read the record from the root page + rid.size = stasis_record_length_read(xid, p, rid); + const indexnode_rec *nr = (const indexnode_rec*)stasis_record_read_begin(xid, p, rid); + recordid cnext = stasis_record_alloc_begin(xid, lc,rid.size); + + assert(rid.slot == cnext.slot); + assert(cnext.size != INVALID_SLOT); + + stasis_record_alloc_done(xid, lc, cnext); + stasis_record_write(xid, lc, cnext, (byte*)nr); + stasis_record_read_done(xid, p, rid, (const byte*)nr); + } + + if(!depth) { + lastLeaf = lc->id; + pageid_t tmpid = -1; + recordid rid = { lc->id, PREV_LEAF, root_rec_size }; + stasis_record_write(xid, lc, rid, (byte*)&tmpid); + rid.slot = NEXT_LEAF; + stasis_record_write(xid, lc, rid, (byte*)&tmpid); + } + + stasis_page_lsn_write(xid, lc, internal_node_alloc->get_lsn(xid)); + releasePage(lc); + } // lc is now out of scope. // deallocate old entries, and update pointer on parent node. // NOTE: stasis_record_free call goes to slottedFree in slotted.c @@ -288,19 +300,6 @@ recordid diskTreeComponent::internalNodes::appendPage(int xid, // don't overwrite key... nr->ptr = child; stasis_record_write_done(xid,p,pFirstSlot,(byte*)nr); - // XXX move this up before we insert LC into the root? Removes write lock on lc. - if(!depth) { - lastLeaf = lc->id; - pageid_t tmpid = -1; - recordid rid = { lc->id, PREV_LEAF, root_rec_size }; - stasis_record_write(xid, lc, rid, (byte*)&tmpid); - rid.slot = NEXT_LEAF; - stasis_record_write(xid, lc, rid, (byte*)&tmpid); - } - - stasis_page_lsn_write(xid, lc, internal_node_alloc->get_lsn(xid)); - unlock(lc->rwlatch); - releasePage(lc); //update the depth info at the root depth ++; @@ -324,21 +323,19 @@ recordid diskTreeComponent::internalNodes::appendPage(int xid, // write the new value to an existing page DEBUG("Writing %s\t%d to existing page# %lld\n", datatuple::key_to_str(key).c_str(), val_page, lastLeafPage->id); - stasis_record_alloc_done(xid, lastLeafPage, ret); writeNodeRecord(xid, lastLeafPage, ret, key, keySize, val_page); + unlock(lastLeafPage->rwlatch); if(lastLeafPage->id != p->id) { assert(lastLeaf != tree.page); - unlock(lastLeafPage->rwlatch); releasePage(lastLeafPage); } } stasis_page_lsn_write(xid, p, internal_node_alloc->get_lsn(xid)); - unlock(p->rwlatch); releasePage(p); return ret; @@ -387,12 +384,14 @@ recordid diskTreeComponent::internalNodes::appendInternalNode(int xid, Page *p, if(!depth) { // leaf node. + writelock(p->rwlatch, 0); ret = stasis_record_alloc_begin(xid, p, sizeof(indexnode_rec)+key_len); if(ret.size != INVALID_SLOT) { stasis_record_alloc_done(xid, p, ret); writeNodeRecord(xid,p,ret,key,key_len,val_page); stasis_page_lsn_write(xid, p, internal_node_alloc->get_lsn(xid)); } + unlock(p->rwlatch); } else { // recurse @@ -406,11 +405,8 @@ recordid diskTreeComponent::internalNodes::appendInternalNode(int xid, Page *p, nr = 0; { Page *child_page = loadPage(xid, child_id); - writelock(child_page->rwlatch,0); ret = appendInternalNode(xid, child_page, depth-1, key, key_len, val_page); - - unlock(child_page->rwlatch); releasePage(child_page); } @@ -423,9 +419,10 @@ recordid diskTreeComponent::internalNodes::appendInternalNode(int xid, Page *p, ret.size, readRecordLength(xid, p, slot)); if(ret.size != INVALID_SLOT) { + writelock(p->rwlatch, 0); // XXX we hold this longer than necessary. push latching into buildPathToLeaf(). stasis_record_alloc_done(xid, p, ret); ret = buildPathToLeaf(xid, ret, p, depth, key, key_len, val_page); - + unlock(p->rwlatch); DEBUG("split tree rooted at %lld, wrote value to {%d %d %lld}\n", p->id, ret.page, ret.slot, ret.size); } else { @@ -452,7 +449,6 @@ recordid diskTreeComponent::internalNodes::buildPathToLeaf(int xid, recordid roo DEBUG("new child = %lld internal? %lld\n", child, depth-1); Page *child_p = loadPage(xid, child); - writelock(child_p->rwlatch,0); initializeNodePage(xid, child_p); recordid ret; @@ -466,7 +462,6 @@ recordid diskTreeComponent::internalNodes::buildPathToLeaf(int xid, recordid roo ret = buildPathToLeaf(xid, child_rec, child_p, depth-1, key, key_len, val_page); - unlock(child_p->rwlatch); releasePage(child_p); } else { @@ -492,7 +487,6 @@ recordid diskTreeComponent::internalNodes::buildPathToLeaf(int xid, recordid roo ret = leaf_rec; stasis_page_lsn_write(xid, child_p, internal_node_alloc->get_lsn(xid)); - unlock(child_p->rwlatch); releasePage(child_p); if(lastLeaf != -1 && lastLeaf != root_rec.page) { // install forward link in previous page @@ -511,6 +505,8 @@ recordid diskTreeComponent::internalNodes::buildPathToLeaf(int xid, recordid roo // Crucially, this happens *after* the recursion. Therefore, we can query the // tree with impunity while the leaf is being built and don't have to worry // about dangling pointers to pages that are in the process of being allocated. + + // XXX set bool on recursive call, and only grab the write latch at the first level of recursion. writeNodeRecord(xid, root_p, root, key, key_len, child); return ret; @@ -526,15 +522,16 @@ pageid_t diskTreeComponent::internalNodes::findLastLeaf(int xid, Page *root, int DEBUG("Found last leaf = %lld\n", root->id); return root->id; } else { + readlock(root->rwlatch,0); recordid last_rid = stasis_record_last(xid, root); - const indexnode_rec *nr = (const indexnode_rec*) stasis_record_read_begin(xid, root, last_rid); - Page *p = loadPage(xid, nr->ptr); + const indexnode_rec * nr = (const indexnode_rec*)stasis_record_read_begin(xid, root, last_rid); + pageid_t ptr = nr->ptr; stasis_record_read_done(xid, root, last_rid, (const byte*)nr); + unlock(root->rwlatch); - readlock(p->rwlatch,0); + Page *p = loadPage(xid, ptr); pageid_t ret = findLastLeaf(xid,p,depth-1); - unlock(p->rwlatch); releasePage(p); return ret; @@ -551,12 +548,13 @@ pageid_t diskTreeComponent::internalNodes::findFirstLeaf(int xid, Page *root, in return root->id; } else { recordid rid = {root->id, FIRST_SLOT, 0}; + + readlock(root->rwlatch,0); const indexnode_rec *nr = (const indexnode_rec*)stasis_record_read_begin(xid, root, rid); - Page *p = loadPage(xid, nr->ptr); - stasis_record_read_done(xid, root, rid, (const byte*)nr); - readlock(p->rwlatch,0); + pageid_t ptr = nr->ptr; + unlock(root->rwlatch); + Page *p = loadPage(xid, ptr); pageid_t ret = findFirstLeaf(xid,p,depth-1); - unlock(p->rwlatch); releasePage(p); return ret; } @@ -566,16 +564,17 @@ pageid_t diskTreeComponent::internalNodes::findFirstLeaf(int xid, Page *root, in pageid_t diskTreeComponent::internalNodes::findPage(int xid, const byte *key, size_t keySize) { Page *p = loadPage(xid, root_rec.page); - readlock(p->rwlatch,0); recordid depth_rid = {p->id, DEPTH, 0}; - int64_t * depth = (int64_t*)stasis_record_read_begin(xid, p, depth_rid); + readlock(p->rwlatch,0); + const int64_t * depthp = (const int64_t*)stasis_record_read_begin(xid, p, depth_rid); + int64_t depth = *depthp; + stasis_record_read_done(xid, p, depth_rid, (const byte*)depthp); + unlock(p->rwlatch); - recordid rid = lookup(xid, p, *depth, key, keySize); - stasis_record_read_done(xid, p, depth_rid, (const byte*)depth); + recordid rid = lookup(xid, p, depth, key, keySize); pageid_t ret = lookupLeafPageFromRid(xid,rid); - unlock(p->rwlatch); releasePage(p); return ret; @@ -603,9 +602,11 @@ recordid diskTreeComponent::internalNodes::lookup(int xid, const byte *key, size_t keySize ) { //DEBUG("lookup: pid %lld\t depth %lld\n", node->id, depth); + readlock(node->rwlatch,0); slotid_t numslots = stasis_record_last(xid, node).slot + 1; if(numslots == FIRST_SLOT) { + unlock(node->rwlatch); return NULLRID; } assert(numslots > FIRST_SLOT); @@ -637,13 +638,14 @@ recordid diskTreeComponent::internalNodes::lookup(int xid, pageid_t child_id = nr->ptr; stasis_record_read_done(xid, node, rid, (const byte*)nr); + unlock(node->rwlatch); Page* child_page = loadPage(xid, child_id); - readlock(child_page->rwlatch,0); recordid ret = lookup(xid,child_page,depth-1,key,keySize); - unlock(child_page->rwlatch); + releasePage(child_page); return ret; } else { + unlock(node->rwlatch); recordid ret = {node->id, match, keySize}; return ret; } @@ -658,7 +660,7 @@ void diskTreeComponent::internalNodes::print_tree(int xid) { int64_t depth = depth_nr->ptr; stasis_record_read_done(xid,p,depth_rid,(const byte*)depth_nr); - print_tree(xid, root_rec.page, depth); + print_tree(xid, root_rec.page, depth); // XXX expensive latching! unlock(p->rwlatch); releasePage(p); @@ -729,23 +731,23 @@ diskTreeComponent::internalNodes::iterator::iterator(int xid, RegionAllocator* r ro_alloc_ = ro_alloc; if(root.page == 0 && root.slot == 0 && root.size == -1) abort(); p = ro_alloc_->load_page(xid,root.page); - readlock(p->rwlatch,0); DEBUG("ROOT_REC_SIZE %d\n", diskTreeComponent::internalNodes::root_rec_size); recordid rid = {p->id, diskTreeComponent::internalNodes::DEPTH, diskTreeComponent::internalNodes::root_rec_size}; + + readlock(p->rwlatch, 0); const indexnode_rec* nr = (const indexnode_rec*)stasis_record_read_begin(xid,p, rid); int64_t depth = nr->ptr; DEBUG("DEPTH = %lld\n", depth); stasis_record_read_done(xid,p,rid,(const byte*)nr); + unlock(p->rwlatch); pageid_t leafid = diskTreeComponent::internalNodes::findFirstLeaf(xid, p, depth); if(leafid != root.page) { - unlock(p->rwlatch); releasePage(p); p = ro_alloc_->load_page(xid,leafid); - readlock(p->rwlatch,0); assert(depth != 0); } else { assert(depth == 0); @@ -763,23 +765,26 @@ diskTreeComponent::internalNodes::iterator::iterator(int xid, RegionAllocator* r done = false; t = 0; justOnePage = (depth == 0); + readlock(p->rwlatch,0); } diskTreeComponent::internalNodes::iterator::iterator(int xid, RegionAllocator* ro_alloc, recordid root, const byte* key, len_t keylen) { if(root.page == NULLRID.page && root.slot == NULLRID.slot) abort(); ro_alloc_ = ro_alloc; p = ro_alloc_->load_page(xid,root.page); - readlock(p->rwlatch,0); + recordid rid = {p->id, diskTreeComponent::internalNodes::DEPTH, diskTreeComponent::internalNodes::root_rec_size}; - const indexnode_rec *nr = (const indexnode_rec*)stasis_record_read_begin(xid,p,rid); + + readlock(p->rwlatch,0); + const indexnode_rec* nr = (const indexnode_rec*)stasis_record_read_begin(xid,p,rid); int64_t depth = nr->ptr; stasis_record_read_done(xid,p,rid,(const byte*)nr); + unlock(p->rwlatch); recordid lsm_entry_rid = diskTreeComponent::internalNodes::lookup(xid,p,depth,key,keylen); if(lsm_entry_rid.page == NULLRID.page && lsm_entry_rid.slot == NULLRID.slot) { - unlock(p->rwlatch); releasePage(p); p = NULL; done = true; @@ -788,10 +793,8 @@ diskTreeComponent::internalNodes::iterator::iterator(int xid, RegionAllocator* r if(root.page != lsm_entry_rid.page) { - unlock(p->rwlatch); releasePage(p); p = ro_alloc->load_page(xid,lsm_entry_rid.page); - readlock(p->rwlatch,0); } done = false; @@ -804,6 +807,8 @@ diskTreeComponent::internalNodes::iterator::iterator(int xid, RegionAllocator* r DEBUG("diskTreeComponentIterator: index root %lld index page %lld data page %lld key %s\n", root.page, current.page, rec->ptr, key); DEBUG("entry = %s key = %s\n", (char*)(rec+1), (char*)key); + + readlock(p->rwlatch,0); } t = 0; // must be zero so free() doesn't croak. } diff --git a/logstore.cpp b/logstore.cpp index c544932..8894a2e 100644 --- a/logstore.cpp +++ b/logstore.cpp @@ -34,6 +34,7 @@ logtable::logtable(pageid_t internal_region_size, pageid_t datapage_regio // This bool is purely for external code. this->accepting_new_requests = true; this->still_running_ = true; + flushing = false; this->merge_mgr = new mergeManager(this); this->mergedata = 0; //tmerger = new tuplemerger(&append_merger); @@ -180,6 +181,7 @@ void logtable::flushTable() merge_mgr->finished_merge(0); + flushing = true; bool blocked = false; int expmcount = merge_count; @@ -234,7 +236,7 @@ void logtable::flushTable() } else { DEBUG("signaled c0-c1 merge\n"); } - + flushing = false; } template diff --git a/logstore.h b/logstore.h index 6481be9..adbbad3 100644 --- a/logstore.h +++ b/logstore.h @@ -79,6 +79,8 @@ public: void set_tree_c0(memTreeComponent::rbtree_ptr_t newtree){tree_c0 = newtree; bump_epoch(); } void set_max_c0_size(int64_t max_c0_size) { this->max_c0_size = max_c0_size; + this->mean_c0_effective_size = max_c0_size; + this->num_c0_mergers = 0; merge_mgr->set_c0_size(max_c0_size); merge_mgr->get_merge_stats(1); } @@ -111,6 +113,10 @@ public: pthread_mutex_t tick_mut; pthread_mutex_t rb_mut; int64_t max_c0_size; + // these track the effectiveness of snowshoveling + int64_t mean_c0_effective_size; + int64_t num_c0_mergers; + mergeManager * merge_mgr; bool accepting_new_requests; @@ -120,6 +126,7 @@ public: if(still_running_) { still_running_ = false; flushTable(); + flushing = true; } rwlc_unlock(header_mut); // XXX must need to do other things! (join the threads?) @@ -141,6 +148,7 @@ private: int tsize; //number of tuples public: int64_t tree_bytes; //number of bytes + bool flushing; //DATA PAGE SETTINGS pageid_t internal_region_size; // in number of pages diff --git a/memTreeComponent.h b/memTreeComponent.h index 9ec3dc6..30c250b 100644 --- a/memTreeComponent.h +++ b/memTreeComponent.h @@ -2,6 +2,7 @@ #define _MEMTREECOMPONENT_H_ #include #include +#include // XXX for double_to_ts. template class memTreeComponent { @@ -166,6 +167,15 @@ public: void populate_next_ret(TUPLE *key=NULL) { if(cur_off_ == num_batched_) { if(mut_) pthread_mutex_lock(mut_); + if(cur_size_) { + while(*cur_size_ < (0.7 * (double)target_size_) && ! *flushing_) { // TODO: how to pick this threshold? Too high, and the disk is idle. Too low, and we waste ram. + pthread_mutex_unlock(mut_); + struct timespec ts; + mergeManager::double_to_ts(&ts, 0.1); + nanosleep(&ts, 0); + pthread_mutex_lock(mut_); + } + } if(key) { populate_next_ret_impl(s_->upper_bound(key)); } else { @@ -176,7 +186,7 @@ public: } public: - batchedRevalidatingIterator( rbtree_t *s, int batch_size, pthread_mutex_t * rb_mut ) : s_(s), batch_size_(batch_size), num_batched_(batch_size), cur_off_(batch_size), mut_(rb_mut) { + batchedRevalidatingIterator( rbtree_t *s, int64_t* cur_size, int64_t target_size, bool * flushing, int batch_size, pthread_mutex_t * rb_mut ) : s_(s), cur_size_(cur_size), target_size_(target_size), flushing_(flushing), batch_size_(batch_size), num_batched_(batch_size), cur_off_(batch_size), mut_(rb_mut) { next_ret_ = (TUPLE**)malloc(sizeof(next_ret_[0]) * batch_size_); populate_next_ret(); /* if(mut_) pthread_mutex_lock(mut_); @@ -243,6 +253,9 @@ public: rbtree_t *s_; TUPLE ** next_ret_; + int64_t* cur_size_; // a pointer to the current size of the red-black tree, in bytes. + int64_t target_size_; // the low-water size for the tree. If cur_size_ is not null, and *cur_size_ < C * target_size_, we sleep. + bool* flushing_; // never block if *flushing is true. int batch_size_; int num_batched_; int cur_off_; diff --git a/mergeManager.cpp b/mergeManager.cpp index f122eec..6a0a423 100644 --- a/mergeManager.cpp +++ b/mergeManager.cpp @@ -51,10 +51,10 @@ void mergeManager::update_progress(mergeStats * s, int delta) { s->delta += delta; s->mini_delta += delta; { - if(s->merge_level < 2 && s->mergeable_size && delta) { + if(/*s->merge_level < 2*/ s->merge_level == 1 && s->mergeable_size && delta) { int64_t effective_max_delta = (int64_t)(UPDATE_PROGRESS_PERIOD * s->bps); - if(s->merge_level == 0) { s->base_size = ltable->tree_bytes; } +// if(s->merge_level == 0) { s->base_size = ltable->tree_bytes; } if(s->mini_delta > effective_max_delta) { struct timeval now; @@ -63,9 +63,10 @@ void mergeManager::update_progress(mergeStats * s, int delta) { double elapsed_delta = now_double - ts_to_double(&s->last_mini_tick); double slp = UPDATE_PROGRESS_PERIOD - elapsed_delta; if(slp > 0.001) { - struct timespec sleeptime; - double_to_ts(&sleeptime, slp); - nanosleep(&sleeptime, 0); +// struct timespec sleeptime; +// double_to_ts(&sleeptime, slp); +// nanosleep(&sleeptime, 0); +// printf("%d Sleep A %f\n", s->merge_level, slp); } double_to_ts(&s->last_mini_tick, now_double); s->mini_delta = 0; @@ -90,7 +91,7 @@ void mergeManager::update_progress(mergeStats * s, int delta) { } } else if(s->merge_level == 1) { // C0-C1 merge (c0 is continuously growing...) if(s->active) { - s->in_progress = ((double)(s->bytes_in_large+s->bytes_in_small)) / (double)(s->base_size+ltable->max_c0_size); + s->in_progress = ((double)(s->bytes_in_large+s->bytes_in_small)) / (double)(s->base_size+ltable->mean_c0_effective_size); if(s->in_progress > 0.95) { s->in_progress = 0.95; } assert(s->in_progress > -0.01 && s->in_progress < 1.02); } else { @@ -109,6 +110,7 @@ void mergeManager::update_progress(mergeStats * s, int delta) { #else if(s->merge_level == 0 && delta) { s->current_size = s->bytes_out - s->bytes_in_large; + s->out_progress = ((double)s->current_size) / (double)s->target_size; } else { s->current_size = s->base_size + s->bytes_out - s->bytes_in_large; } @@ -165,17 +167,19 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) { if(block) { if(s->merge_level == 0) { - pthread_mutex_lock(<able->tick_mut); - rwlc_writelock(ltable->header_mut); +// pthread_mutex_lock(<able->tick_mut); + rwlc_readlock(ltable->header_mut); while(sleeping[s->merge_level]) { + abort(); rwlc_unlock(ltable->header_mut); - pthread_cond_wait(&throttle_wokeup_cond, <able->tick_mut); +// pthread_cond_wait(&throttle_wokeup_cond, <able->tick_mut); rwlc_writelock(ltable->header_mut); } } else { - rwlc_writelock(ltable->header_mut); + rwlc_readlock(ltable->header_mut); while(sleeping[s->merge_level]) { + abort(); // if we're asleep, didn't this thread make us sleep??? rwlc_cond_wait(&throttle_wokeup_cond, ltable->header_mut); } } @@ -245,9 +249,6 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) { // throttle // it took "elapsed" seconds to process "tick_length_bytes" mb double sleeptime = 2.0 * fmax((double)overshoot,(double)overshoot2) / bps; - - struct timespec sleep_until; - double max_c0_sleep = 0.1; double min_c0_sleep = 0.01; double max_c1_sleep = 0.5; @@ -265,20 +266,18 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) { printf("\nMerge thread %d Overshoot: raw=%lld, d=%lld eff=%lld Throttle min(1, %6f) spin %d, total_sleep %6.3f\n", s->merge_level, (long long)raw_overshoot, (long long)overshoot_fudge, (long long)overshoot, sleeptime, spin, total_sleep); } - struct timeval now; - gettimeofday(&now, 0); - - double_to_ts(&sleep_until, sleeptime + tv_to_double(&now)); sleeping[s->merge_level] = true; if(s->merge_level == 0) abort(); rwlc_unlock(ltable->header_mut); struct timespec ts; double_to_ts(&ts, sleeptime); nanosleep(&ts, 0); - rwlc_writelock(ltable->header_mut); + printf("%d Sleep B %f\n", s->merge_level, sleeptime); + + // rwlc_writelock(ltable->header_mut); + rwlc_readlock(ltable->header_mut); sleeping[s->merge_level] = false; pthread_cond_broadcast(&throttle_wokeup_cond); - gettimeofday(&now, 0); if(s->merge_level == 0) { update_progress(c1, 0); } if(s->merge_level == 1) { update_progress(c2, 0); } } else { @@ -295,29 +294,35 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) { while(/*s->current_size*/ltable->tree_bytes > ltable->max_c0_size) { rwlc_unlock(ltable->header_mut); printf("\nMEMORY OVERRUN!!!! SLEEP!!!!\n"); - sleep(1); + struct timespec ts; + double_to_ts(&ts, 0.1); + nanosleep(&ts, 0); rwlc_readlock(ltable->header_mut); } - if(/*s->current_size*/ltable->tree_bytes > 0.9 * (double)ltable->max_c0_size) { - double slp = 0.01 + (double)(((double)ltable->tree_bytes)-0.9*(double)ltable->max_c0_size) / (double)(ltable->max_c0_size); + double delta = ((double)ltable->tree_bytes)/(0.9*(double)ltable->max_c0_size); // 0 <= delta <= 1.1111 // - (0.9 * (double)ltable->max_c0_size); + delta -= 1.0; + if(delta > 0.0005) { + double slp = 0.001 + delta; //delta / (double)(ltable->max_c0_size); DEBUG("\nsleeping %0.6f tree_megabytes %0.3f\n", slp, ((double)ltable->tree_bytes)/(1024.0*1024.0)); struct timespec sleeptime; double_to_ts(&sleeptime, slp); rwlc_unlock(ltable->header_mut); + DEBUG("%d Sleep C %f\n", s->merge_level, slp); + nanosleep(&sleeptime, 0); rwlc_readlock(ltable->header_mut); } } rwlc_unlock(ltable->header_mut); - if(s->merge_level == 0) pthread_mutex_unlock(<able->tick_mut); // XXX can this even be the case? + //if(s->merge_level == 0) pthread_mutex_unlock(<able->tick_mut); // XXX can this even be the case? } else { if(!force) { if(s->print_skipped == PRINT_SKIP) { - if(s->merge_level == 0) pthread_mutex_lock(<able->tick_mut); + //if(s->merge_level == 0) pthread_mutex_lock(<able->tick_mut); rwlc_writelock(ltable->header_mut); pretty_print(stdout); rwlc_unlock(ltable->header_mut); - if(s->merge_level == 0) pthread_mutex_unlock(<able->tick_mut); + //if(s->merge_level == 0) pthread_mutex_unlock(<able->tick_mut); s->print_skipped = 0; } else { s->print_skipped++; diff --git a/mergeStats.h b/mergeStats.h index cf88c6e..9b0c52a 100644 --- a/mergeStats.h +++ b/mergeStats.h @@ -130,7 +130,9 @@ class mergeStats { protected: pageid_t num_tuples_out; // How many tuples did we write? pageid_t num_datapages_out; // How many datapages? + public: pageid_t bytes_in_small; // How many bytes from the small input tree (for C0, we ignore tree overheads)? + protected: pageid_t bytes_in_small_delta; // How many bytes from the small input tree during this tick (for C0, we ignore tree overheads)? pageid_t num_tuples_in_small; // Tuples from the small input? pageid_t bytes_in_large; // Bytes from the large input? diff --git a/merger.cpp b/merger.cpp index 1249ca7..6c9c386 100644 --- a/merger.cpp +++ b/merger.cpp @@ -141,18 +141,10 @@ void* memMergeThread(void*arg) } #else + // the merge iterator will wait until c0 is big enough for us to proceed. if(!ltable->is_still_running()) { done = 1; } - while(ltable->tree_bytes < 0.5 * (double)ltable->max_c0_size && ! done) { - rwlc_unlock(ltable->header_mut); - sleep(1); // XXX fixme! - rwlc_writelock(ltable->header_mut); - - if(!ltable->is_still_running()) { - done = 1; - } - } #endif if(done==1) @@ -177,8 +169,8 @@ void* memMergeThread(void*arg) #else // memTreeComponent::revalidatingIterator *itrB = // new memTreeComponent::revalidatingIterator(ltable->get_tree_c0(), <able->rb_mut); - memTreeComponent::batchedRevalidatingIterator *itrB = - new memTreeComponent::batchedRevalidatingIterator(ltable->get_tree_c0(), 100, <able->rb_mut); +// memTreeComponent::batchedRevalidatingIterator *itrB = +// new memTreeComponent::batchedRevalidatingIterator(ltable->get_tree_c0(), <able->tree_bytes, ltable->max_c0_size, <able->flushing, 100, <able->rb_mut); #endif //create a new tree @@ -187,7 +179,11 @@ void* memMergeThread(void*arg) ltable->set_tree_c1_prime(c1_prime); rwlc_unlock(ltable->header_mut); - +#ifndef NO_SNOWSHOVEL + // needs to be past the rwlc_unlock... + memTreeComponent::batchedRevalidatingIterator *itrB = + new memTreeComponent::batchedRevalidatingIterator(ltable->get_tree_c0(), <able->tree_bytes, ltable->max_c0_size, <able->flushing, 100, <able->rb_mut); +#endif //: do the merge DEBUG("mmt:\tMerging:\n"); @@ -235,9 +231,21 @@ void* memMergeThread(void*arg) //TODO: this is simplistic for now //6: if c1' is too big, signal the other merger + + // update c0 effective size. + double frac = 1.0/(double)merge_count; + ltable->num_c0_mergers = merge_count; + ltable->mean_c0_effective_size = + (int64_t) ( + ((double)ltable->mean_c0_effective_size)*(1-frac) + + ((double)stats->bytes_in_small*frac)); + ltable->merge_mgr->get_merge_stats(0)->target_size = ltable->mean_c0_effective_size; double target_R = *ltable->R(); + + printf("Merge done. R = %f MemSize = %lld Mean = %lld, This = %lld, Count = %d factor %3.3fcur%3.3favg\n", target_R, (long long)ltable->max_c0_size, (long long int)ltable->mean_c0_effective_size, stats->bytes_in_small, merge_count, ((double)stats->bytes_in_small) / (double)ltable->max_c0_size, ((double)ltable->mean_c0_effective_size) / (double)ltable->max_c0_size); + assert(target_R >= MIN_R); - bool signal_c2 = (new_c1_size / ltable->max_c0_size > target_R); + bool signal_c2 = (new_c1_size / ltable->mean_c0_effective_size > target_R); DEBUG("\nc1 size %f R %f\n", new_c1_size, target_R); if( signal_c2 ) { @@ -368,7 +376,7 @@ void *diskMergeThread(void*arg) merge_count++; //update the current optimal R value - *(ltable->R()) = std::max(MIN_R, sqrt( ((double)stats->output_size()) / (ltable->max_c0_size) ) ); + *(ltable->R()) = std::max(MIN_R, sqrt( ((double)stats->output_size()) / ((double)ltable->mean_c0_effective_size) ) ); DEBUG("\nR = %f\n", *(ltable->R()));