scattered, and poorly tested fixes. finer grained latching for tree nodes; throttle c0-c1 merge when c0 is small; fix delta computations for c0 throttling; disable short pauses for c1-c2 merge. this gets the bulk load throughput up to 8500, but workloada is still slow

git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@1045 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
sears 2010-08-21 03:09:18 +00:00
parent 0d471bb912
commit 07b35f4e20
7 changed files with 159 additions and 116 deletions

View file

@ -136,7 +136,6 @@ recordid diskTreeComponent::internalNodes::create(int xid) {
recordid ret = { root, 0, 0 };
Page *p = loadPage(xid, ret.page);
writelock(p->rwlatch,0);
//initialize root node
stasis_page_slotted_initialize_page(p);
@ -162,7 +161,6 @@ recordid diskTreeComponent::internalNodes::create(int xid) {
stasis_page_lsn_write(xid, p, internal_node_alloc->get_lsn(xid));
unlock(p->rwlatch);
releasePage(p);
root_rec = ret;
@ -195,15 +193,15 @@ recordid diskTreeComponent::internalNodes::appendPage(int xid,
recordid tree = root_rec;
Page *p = loadPage(xid, tree.page);
writelock(p->rwlatch, 0);
tree.slot = DEPTH;
tree.size = 0;
readlock(p->rwlatch,0);
const indexnode_rec *nr = (const indexnode_rec*)stasis_record_read_begin(xid, p, tree);
int64_t depth = *((int64_t*)nr);
stasis_record_read_done(xid, p, tree, (const byte*)nr);
unlock(p->rwlatch);
if(lastLeaf == -1) {
lastLeaf = findLastLeaf(xid, p, depth);
}
@ -212,18 +210,19 @@ recordid diskTreeComponent::internalNodes::appendPage(int xid,
if(lastLeaf != tree.page) {
lastLeafPage= loadPage(xid, lastLeaf);
writelock(lastLeafPage->rwlatch, 0);
} else {
lastLeafPage = p;
}
writelock(lastLeafPage->rwlatch, 0);
recordid ret = stasis_record_alloc_begin(xid, lastLeafPage,
sizeof(indexnode_rec)+keySize);
if(ret.size == INVALID_SLOT) {
if(lastLeafPage->id != p->id) {
assert(lastLeaf != tree.page);
unlock(lastLeafPage->rwlatch);
if(lastLeafPage->id != p->id) { // is the root the last leaf page?
assert(lastLeaf != tree.page);
releasePage(lastLeafPage); // don't need that page anymore...
lastLeafPage = 0;
}
@ -239,14 +238,14 @@ recordid diskTreeComponent::internalNodes::appendPage(int xid,
DEBUG("Need to split root; depth = %d\n", depth);
pageid_t child = internal_node_alloc->alloc_extent(xid, 1);
slotid_t numslots = stasis_record_last(xid, p).slot+1;
{
Page *lc = loadPage(xid, child);
writelock(lc->rwlatch,0);
initializeNodePage(xid, lc);
//creates a copy of the root page records in the
//newly allocated child page
slotid_t numslots = stasis_record_last(xid, p).slot+1;
recordid rid;
rid.page = p->id;
// XXX writelock lc here? no need, since it's not installed in the tree yet
@ -264,6 +263,19 @@ recordid diskTreeComponent::internalNodes::appendPage(int xid,
stasis_record_read_done(xid, p, rid, (const byte*)nr);
}
if(!depth) {
lastLeaf = lc->id;
pageid_t tmpid = -1;
recordid rid = { lc->id, PREV_LEAF, root_rec_size };
stasis_record_write(xid, lc, rid, (byte*)&tmpid);
rid.slot = NEXT_LEAF;
stasis_record_write(xid, lc, rid, (byte*)&tmpid);
}
stasis_page_lsn_write(xid, lc, internal_node_alloc->get_lsn(xid));
releasePage(lc);
} // lc is now out of scope.
// deallocate old entries, and update pointer on parent node.
// NOTE: stasis_record_free call goes to slottedFree in slotted.c
// this function only reduces the numslots when you call it
@ -288,19 +300,6 @@ recordid diskTreeComponent::internalNodes::appendPage(int xid,
// don't overwrite key...
nr->ptr = child;
stasis_record_write_done(xid,p,pFirstSlot,(byte*)nr);
// XXX move this up before we insert LC into the root? Removes write lock on lc.
if(!depth) {
lastLeaf = lc->id;
pageid_t tmpid = -1;
recordid rid = { lc->id, PREV_LEAF, root_rec_size };
stasis_record_write(xid, lc, rid, (byte*)&tmpid);
rid.slot = NEXT_LEAF;
stasis_record_write(xid, lc, rid, (byte*)&tmpid);
}
stasis_page_lsn_write(xid, lc, internal_node_alloc->get_lsn(xid));
unlock(lc->rwlatch);
releasePage(lc);
//update the depth info at the root
depth ++;
@ -324,21 +323,19 @@ recordid diskTreeComponent::internalNodes::appendPage(int xid,
// write the new value to an existing page
DEBUG("Writing %s\t%d to existing page# %lld\n", datatuple::key_to_str(key).c_str(),
val_page, lastLeafPage->id);
stasis_record_alloc_done(xid, lastLeafPage, ret);
writeNodeRecord(xid, lastLeafPage, ret, key, keySize, val_page);
unlock(lastLeafPage->rwlatch);
if(lastLeafPage->id != p->id) {
assert(lastLeaf != tree.page);
unlock(lastLeafPage->rwlatch);
releasePage(lastLeafPage);
}
}
stasis_page_lsn_write(xid, p, internal_node_alloc->get_lsn(xid));
unlock(p->rwlatch);
releasePage(p);
return ret;
@ -387,12 +384,14 @@ recordid diskTreeComponent::internalNodes::appendInternalNode(int xid, Page *p,
if(!depth) {
// leaf node.
writelock(p->rwlatch, 0);
ret = stasis_record_alloc_begin(xid, p, sizeof(indexnode_rec)+key_len);
if(ret.size != INVALID_SLOT) {
stasis_record_alloc_done(xid, p, ret);
writeNodeRecord(xid,p,ret,key,key_len,val_page);
stasis_page_lsn_write(xid, p, internal_node_alloc->get_lsn(xid));
}
unlock(p->rwlatch);
} else {
// recurse
@ -406,11 +405,8 @@ recordid diskTreeComponent::internalNodes::appendInternalNode(int xid, Page *p,
nr = 0;
{
Page *child_page = loadPage(xid, child_id);
writelock(child_page->rwlatch,0);
ret = appendInternalNode(xid, child_page, depth-1, key, key_len,
val_page);
unlock(child_page->rwlatch);
releasePage(child_page);
}
@ -423,9 +419,10 @@ recordid diskTreeComponent::internalNodes::appendInternalNode(int xid, Page *p,
ret.size,
readRecordLength(xid, p, slot));
if(ret.size != INVALID_SLOT) {
writelock(p->rwlatch, 0); // XXX we hold this longer than necessary. push latching into buildPathToLeaf().
stasis_record_alloc_done(xid, p, ret);
ret = buildPathToLeaf(xid, ret, p, depth, key, key_len, val_page);
unlock(p->rwlatch);
DEBUG("split tree rooted at %lld, wrote value to {%d %d %lld}\n",
p->id, ret.page, ret.slot, ret.size);
} else {
@ -452,7 +449,6 @@ recordid diskTreeComponent::internalNodes::buildPathToLeaf(int xid, recordid roo
DEBUG("new child = %lld internal? %lld\n", child, depth-1);
Page *child_p = loadPage(xid, child);
writelock(child_p->rwlatch,0);
initializeNodePage(xid, child_p);
recordid ret;
@ -466,7 +462,6 @@ recordid diskTreeComponent::internalNodes::buildPathToLeaf(int xid, recordid roo
ret = buildPathToLeaf(xid, child_rec, child_p, depth-1, key, key_len,
val_page);
unlock(child_p->rwlatch);
releasePage(child_p);
} else {
@ -492,7 +487,6 @@ recordid diskTreeComponent::internalNodes::buildPathToLeaf(int xid, recordid roo
ret = leaf_rec;
stasis_page_lsn_write(xid, child_p, internal_node_alloc->get_lsn(xid));
unlock(child_p->rwlatch);
releasePage(child_p);
if(lastLeaf != -1 && lastLeaf != root_rec.page) {
// install forward link in previous page
@ -511,6 +505,8 @@ recordid diskTreeComponent::internalNodes::buildPathToLeaf(int xid, recordid roo
// Crucially, this happens *after* the recursion. Therefore, we can query the
// tree with impunity while the leaf is being built and don't have to worry
// about dangling pointers to pages that are in the process of being allocated.
// XXX set bool on recursive call, and only grab the write latch at the first level of recursion.
writeNodeRecord(xid, root_p, root, key, key_len, child);
return ret;
@ -526,15 +522,16 @@ pageid_t diskTreeComponent::internalNodes::findLastLeaf(int xid, Page *root, int
DEBUG("Found last leaf = %lld\n", root->id);
return root->id;
} else {
readlock(root->rwlatch,0);
recordid last_rid = stasis_record_last(xid, root);
const indexnode_rec * nr = (const indexnode_rec*)stasis_record_read_begin(xid, root, last_rid);
Page *p = loadPage(xid, nr->ptr);
pageid_t ptr = nr->ptr;
stasis_record_read_done(xid, root, last_rid, (const byte*)nr);
unlock(root->rwlatch);
readlock(p->rwlatch,0);
Page *p = loadPage(xid, ptr);
pageid_t ret = findLastLeaf(xid,p,depth-1);
unlock(p->rwlatch);
releasePage(p);
return ret;
@ -551,12 +548,13 @@ pageid_t diskTreeComponent::internalNodes::findFirstLeaf(int xid, Page *root, in
return root->id;
} else {
recordid rid = {root->id, FIRST_SLOT, 0};
readlock(root->rwlatch,0);
const indexnode_rec *nr = (const indexnode_rec*)stasis_record_read_begin(xid, root, rid);
Page *p = loadPage(xid, nr->ptr);
stasis_record_read_done(xid, root, rid, (const byte*)nr);
readlock(p->rwlatch,0);
pageid_t ptr = nr->ptr;
unlock(root->rwlatch);
Page *p = loadPage(xid, ptr);
pageid_t ret = findFirstLeaf(xid,p,depth-1);
unlock(p->rwlatch);
releasePage(p);
return ret;
}
@ -566,16 +564,17 @@ pageid_t diskTreeComponent::internalNodes::findFirstLeaf(int xid, Page *root, in
pageid_t diskTreeComponent::internalNodes::findPage(int xid, const byte *key, size_t keySize) {
Page *p = loadPage(xid, root_rec.page);
readlock(p->rwlatch,0);
recordid depth_rid = {p->id, DEPTH, 0};
int64_t * depth = (int64_t*)stasis_record_read_begin(xid, p, depth_rid);
readlock(p->rwlatch,0);
const int64_t * depthp = (const int64_t*)stasis_record_read_begin(xid, p, depth_rid);
int64_t depth = *depthp;
stasis_record_read_done(xid, p, depth_rid, (const byte*)depthp);
unlock(p->rwlatch);
recordid rid = lookup(xid, p, *depth, key, keySize);
stasis_record_read_done(xid, p, depth_rid, (const byte*)depth);
recordid rid = lookup(xid, p, depth, key, keySize);
pageid_t ret = lookupLeafPageFromRid(xid,rid);
unlock(p->rwlatch);
releasePage(p);
return ret;
@ -603,9 +602,11 @@ recordid diskTreeComponent::internalNodes::lookup(int xid,
const byte *key, size_t keySize ) {
//DEBUG("lookup: pid %lld\t depth %lld\n", node->id, depth);
readlock(node->rwlatch,0);
slotid_t numslots = stasis_record_last(xid, node).slot + 1;
if(numslots == FIRST_SLOT) {
unlock(node->rwlatch);
return NULLRID;
}
assert(numslots > FIRST_SLOT);
@ -637,13 +638,14 @@ recordid diskTreeComponent::internalNodes::lookup(int xid,
pageid_t child_id = nr->ptr;
stasis_record_read_done(xid, node, rid, (const byte*)nr);
unlock(node->rwlatch);
Page* child_page = loadPage(xid, child_id);
readlock(child_page->rwlatch,0);
recordid ret = lookup(xid,child_page,depth-1,key,keySize);
unlock(child_page->rwlatch);
releasePage(child_page);
return ret;
} else {
unlock(node->rwlatch);
recordid ret = {node->id, match, keySize};
return ret;
}
@ -658,7 +660,7 @@ void diskTreeComponent::internalNodes::print_tree(int xid) {
int64_t depth = depth_nr->ptr;
stasis_record_read_done(xid,p,depth_rid,(const byte*)depth_nr);
print_tree(xid, root_rec.page, depth);
print_tree(xid, root_rec.page, depth); // XXX expensive latching!
unlock(p->rwlatch);
releasePage(p);
@ -729,23 +731,23 @@ diskTreeComponent::internalNodes::iterator::iterator(int xid, RegionAllocator* r
ro_alloc_ = ro_alloc;
if(root.page == 0 && root.slot == 0 && root.size == -1) abort();
p = ro_alloc_->load_page(xid,root.page);
readlock(p->rwlatch,0);
DEBUG("ROOT_REC_SIZE %d\n", diskTreeComponent::internalNodes::root_rec_size);
recordid rid = {p->id, diskTreeComponent::internalNodes::DEPTH, diskTreeComponent::internalNodes::root_rec_size};
readlock(p->rwlatch, 0);
const indexnode_rec* nr = (const indexnode_rec*)stasis_record_read_begin(xid,p, rid);
int64_t depth = nr->ptr;
DEBUG("DEPTH = %lld\n", depth);
stasis_record_read_done(xid,p,rid,(const byte*)nr);
unlock(p->rwlatch);
pageid_t leafid = diskTreeComponent::internalNodes::findFirstLeaf(xid, p, depth);
if(leafid != root.page) {
unlock(p->rwlatch);
releasePage(p);
p = ro_alloc_->load_page(xid,leafid);
readlock(p->rwlatch,0);
assert(depth != 0);
} else {
assert(depth == 0);
@ -763,23 +765,26 @@ diskTreeComponent::internalNodes::iterator::iterator(int xid, RegionAllocator* r
done = false;
t = 0;
justOnePage = (depth == 0);
readlock(p->rwlatch,0);
}
diskTreeComponent::internalNodes::iterator::iterator(int xid, RegionAllocator* ro_alloc, recordid root, const byte* key, len_t keylen) {
if(root.page == NULLRID.page && root.slot == NULLRID.slot) abort();
ro_alloc_ = ro_alloc;
p = ro_alloc_->load_page(xid,root.page);
readlock(p->rwlatch,0);
recordid rid = {p->id, diskTreeComponent::internalNodes::DEPTH, diskTreeComponent::internalNodes::root_rec_size};
readlock(p->rwlatch,0);
const indexnode_rec* nr = (const indexnode_rec*)stasis_record_read_begin(xid,p,rid);
int64_t depth = nr->ptr;
stasis_record_read_done(xid,p,rid,(const byte*)nr);
unlock(p->rwlatch);
recordid lsm_entry_rid = diskTreeComponent::internalNodes::lookup(xid,p,depth,key,keylen);
if(lsm_entry_rid.page == NULLRID.page && lsm_entry_rid.slot == NULLRID.slot) {
unlock(p->rwlatch);
releasePage(p);
p = NULL;
done = true;
@ -788,10 +793,8 @@ diskTreeComponent::internalNodes::iterator::iterator(int xid, RegionAllocator* r
if(root.page != lsm_entry_rid.page)
{
unlock(p->rwlatch);
releasePage(p);
p = ro_alloc->load_page(xid,lsm_entry_rid.page);
readlock(p->rwlatch,0);
}
done = false;
@ -804,6 +807,8 @@ diskTreeComponent::internalNodes::iterator::iterator(int xid, RegionAllocator* r
DEBUG("diskTreeComponentIterator: index root %lld index page %lld data page %lld key %s\n", root.page, current.page, rec->ptr, key);
DEBUG("entry = %s key = %s\n", (char*)(rec+1), (char*)key);
readlock(p->rwlatch,0);
}
t = 0; // must be zero so free() doesn't croak.
}

View file

@ -34,6 +34,7 @@ logtable<TUPLE>::logtable(pageid_t internal_region_size, pageid_t datapage_regio
// This bool is purely for external code.
this->accepting_new_requests = true;
this->still_running_ = true;
flushing = false;
this->merge_mgr = new mergeManager(this);
this->mergedata = 0;
//tmerger = new tuplemerger(&append_merger);
@ -180,6 +181,7 @@ void logtable<TUPLE>::flushTable()
merge_mgr->finished_merge(0);
flushing = true;
bool blocked = false;
int expmcount = merge_count;
@ -234,7 +236,7 @@ void logtable<TUPLE>::flushTable()
} else {
DEBUG("signaled c0-c1 merge\n");
}
flushing = false;
}
template<class TUPLE>

View file

@ -79,6 +79,8 @@ public:
void set_tree_c0(memTreeComponent<datatuple>::rbtree_ptr_t newtree){tree_c0 = newtree; bump_epoch(); }
void set_max_c0_size(int64_t max_c0_size) {
this->max_c0_size = max_c0_size;
this->mean_c0_effective_size = max_c0_size;
this->num_c0_mergers = 0;
merge_mgr->set_c0_size(max_c0_size);
merge_mgr->get_merge_stats(1);
}
@ -111,6 +113,10 @@ public:
pthread_mutex_t tick_mut;
pthread_mutex_t rb_mut;
int64_t max_c0_size;
// these track the effectiveness of snowshoveling
int64_t mean_c0_effective_size;
int64_t num_c0_mergers;
mergeManager * merge_mgr;
bool accepting_new_requests;
@ -120,6 +126,7 @@ public:
if(still_running_) {
still_running_ = false;
flushTable();
flushing = true;
}
rwlc_unlock(header_mut);
// XXX must need to do other things! (join the threads?)
@ -141,6 +148,7 @@ private:
int tsize; //number of tuples
public:
int64_t tree_bytes; //number of bytes
bool flushing;
//DATA PAGE SETTINGS
pageid_t internal_region_size; // in number of pages

View file

@ -2,6 +2,7 @@
#define _MEMTREECOMPONENT_H_
#include <set>
#include <assert.h>
#include <mergeManager.h> // XXX for double_to_ts.
template<class TUPLE>
class memTreeComponent {
@ -166,6 +167,15 @@ public:
void populate_next_ret(TUPLE *key=NULL) {
if(cur_off_ == num_batched_) {
if(mut_) pthread_mutex_lock(mut_);
if(cur_size_) {
while(*cur_size_ < (0.7 * (double)target_size_) && ! *flushing_) { // TODO: how to pick this threshold? Too high, and the disk is idle. Too low, and we waste ram.
pthread_mutex_unlock(mut_);
struct timespec ts;
mergeManager::double_to_ts(&ts, 0.1);
nanosleep(&ts, 0);
pthread_mutex_lock(mut_);
}
}
if(key) {
populate_next_ret_impl(s_->upper_bound(key));
} else {
@ -176,7 +186,7 @@ public:
}
public:
batchedRevalidatingIterator( rbtree_t *s, int batch_size, pthread_mutex_t * rb_mut ) : s_(s), batch_size_(batch_size), num_batched_(batch_size), cur_off_(batch_size), mut_(rb_mut) {
batchedRevalidatingIterator( rbtree_t *s, int64_t* cur_size, int64_t target_size, bool * flushing, int batch_size, pthread_mutex_t * rb_mut ) : s_(s), cur_size_(cur_size), target_size_(target_size), flushing_(flushing), batch_size_(batch_size), num_batched_(batch_size), cur_off_(batch_size), mut_(rb_mut) {
next_ret_ = (TUPLE**)malloc(sizeof(next_ret_[0]) * batch_size_);
populate_next_ret();
/* if(mut_) pthread_mutex_lock(mut_);
@ -243,6 +253,9 @@ public:
rbtree_t *s_;
TUPLE ** next_ret_;
int64_t* cur_size_; // a pointer to the current size of the red-black tree, in bytes.
int64_t target_size_; // the low-water size for the tree. If cur_size_ is not null, and *cur_size_ < C * target_size_, we sleep.
bool* flushing_; // never block if *flushing is true.
int batch_size_;
int num_batched_;
int cur_off_;

View file

@ -51,10 +51,10 @@ void mergeManager::update_progress(mergeStats * s, int delta) {
s->delta += delta;
s->mini_delta += delta;
{
if(s->merge_level < 2 && s->mergeable_size && delta) {
if(/*s->merge_level < 2*/ s->merge_level == 1 && s->mergeable_size && delta) {
int64_t effective_max_delta = (int64_t)(UPDATE_PROGRESS_PERIOD * s->bps);
if(s->merge_level == 0) { s->base_size = ltable->tree_bytes; }
// if(s->merge_level == 0) { s->base_size = ltable->tree_bytes; }
if(s->mini_delta > effective_max_delta) {
struct timeval now;
@ -63,9 +63,10 @@ void mergeManager::update_progress(mergeStats * s, int delta) {
double elapsed_delta = now_double - ts_to_double(&s->last_mini_tick);
double slp = UPDATE_PROGRESS_PERIOD - elapsed_delta;
if(slp > 0.001) {
struct timespec sleeptime;
double_to_ts(&sleeptime, slp);
nanosleep(&sleeptime, 0);
// struct timespec sleeptime;
// double_to_ts(&sleeptime, slp);
// nanosleep(&sleeptime, 0);
// printf("%d Sleep A %f\n", s->merge_level, slp);
}
double_to_ts(&s->last_mini_tick, now_double);
s->mini_delta = 0;
@ -90,7 +91,7 @@ void mergeManager::update_progress(mergeStats * s, int delta) {
}
} else if(s->merge_level == 1) { // C0-C1 merge (c0 is continuously growing...)
if(s->active) {
s->in_progress = ((double)(s->bytes_in_large+s->bytes_in_small)) / (double)(s->base_size+ltable->max_c0_size);
s->in_progress = ((double)(s->bytes_in_large+s->bytes_in_small)) / (double)(s->base_size+ltable->mean_c0_effective_size);
if(s->in_progress > 0.95) { s->in_progress = 0.95; }
assert(s->in_progress > -0.01 && s->in_progress < 1.02);
} else {
@ -109,6 +110,7 @@ void mergeManager::update_progress(mergeStats * s, int delta) {
#else
if(s->merge_level == 0 && delta) {
s->current_size = s->bytes_out - s->bytes_in_large;
s->out_progress = ((double)s->current_size) / (double)s->target_size;
} else {
s->current_size = s->base_size + s->bytes_out - s->bytes_in_large;
}
@ -165,17 +167,19 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) {
if(block) {
if(s->merge_level == 0) {
pthread_mutex_lock(&ltable->tick_mut);
rwlc_writelock(ltable->header_mut);
// pthread_mutex_lock(&ltable->tick_mut);
rwlc_readlock(ltable->header_mut);
while(sleeping[s->merge_level]) {
abort();
rwlc_unlock(ltable->header_mut);
pthread_cond_wait(&throttle_wokeup_cond, &ltable->tick_mut);
// pthread_cond_wait(&throttle_wokeup_cond, &ltable->tick_mut);
rwlc_writelock(ltable->header_mut);
}
} else {
rwlc_writelock(ltable->header_mut);
rwlc_readlock(ltable->header_mut);
while(sleeping[s->merge_level]) {
abort(); // if we're asleep, didn't this thread make us sleep???
rwlc_cond_wait(&throttle_wokeup_cond, ltable->header_mut);
}
}
@ -245,9 +249,6 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) {
// throttle
// it took "elapsed" seconds to process "tick_length_bytes" mb
double sleeptime = 2.0 * fmax((double)overshoot,(double)overshoot2) / bps;
struct timespec sleep_until;
double max_c0_sleep = 0.1;
double min_c0_sleep = 0.01;
double max_c1_sleep = 0.5;
@ -265,20 +266,18 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) {
printf("\nMerge thread %d Overshoot: raw=%lld, d=%lld eff=%lld Throttle min(1, %6f) spin %d, total_sleep %6.3f\n", s->merge_level, (long long)raw_overshoot, (long long)overshoot_fudge, (long long)overshoot, sleeptime, spin, total_sleep);
}
struct timeval now;
gettimeofday(&now, 0);
double_to_ts(&sleep_until, sleeptime + tv_to_double(&now));
sleeping[s->merge_level] = true;
if(s->merge_level == 0) abort();
rwlc_unlock(ltable->header_mut);
struct timespec ts;
double_to_ts(&ts, sleeptime);
nanosleep(&ts, 0);
rwlc_writelock(ltable->header_mut);
printf("%d Sleep B %f\n", s->merge_level, sleeptime);
// rwlc_writelock(ltable->header_mut);
rwlc_readlock(ltable->header_mut);
sleeping[s->merge_level] = false;
pthread_cond_broadcast(&throttle_wokeup_cond);
gettimeofday(&now, 0);
if(s->merge_level == 0) { update_progress(c1, 0); }
if(s->merge_level == 1) { update_progress(c2, 0); }
} else {
@ -295,29 +294,35 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) {
while(/*s->current_size*/ltable->tree_bytes > ltable->max_c0_size) {
rwlc_unlock(ltable->header_mut);
printf("\nMEMORY OVERRUN!!!! SLEEP!!!!\n");
sleep(1);
struct timespec ts;
double_to_ts(&ts, 0.1);
nanosleep(&ts, 0);
rwlc_readlock(ltable->header_mut);
}
if(/*s->current_size*/ltable->tree_bytes > 0.9 * (double)ltable->max_c0_size) {
double slp = 0.01 + (double)(((double)ltable->tree_bytes)-0.9*(double)ltable->max_c0_size) / (double)(ltable->max_c0_size);
double delta = ((double)ltable->tree_bytes)/(0.9*(double)ltable->max_c0_size); // 0 <= delta <= 1.1111 // - (0.9 * (double)ltable->max_c0_size);
delta -= 1.0;
if(delta > 0.0005) {
double slp = 0.001 + delta; //delta / (double)(ltable->max_c0_size);
DEBUG("\nsleeping %0.6f tree_megabytes %0.3f\n", slp, ((double)ltable->tree_bytes)/(1024.0*1024.0));
struct timespec sleeptime;
double_to_ts(&sleeptime, slp);
rwlc_unlock(ltable->header_mut);
DEBUG("%d Sleep C %f\n", s->merge_level, slp);
nanosleep(&sleeptime, 0);
rwlc_readlock(ltable->header_mut);
}
}
rwlc_unlock(ltable->header_mut);
if(s->merge_level == 0) pthread_mutex_unlock(&ltable->tick_mut); // XXX can this even be the case?
//if(s->merge_level == 0) pthread_mutex_unlock(&ltable->tick_mut); // XXX can this even be the case?
} else {
if(!force) {
if(s->print_skipped == PRINT_SKIP) {
if(s->merge_level == 0) pthread_mutex_lock(&ltable->tick_mut);
//if(s->merge_level == 0) pthread_mutex_lock(&ltable->tick_mut);
rwlc_writelock(ltable->header_mut);
pretty_print(stdout);
rwlc_unlock(ltable->header_mut);
if(s->merge_level == 0) pthread_mutex_unlock(&ltable->tick_mut);
//if(s->merge_level == 0) pthread_mutex_unlock(&ltable->tick_mut);
s->print_skipped = 0;
} else {
s->print_skipped++;

View file

@ -130,7 +130,9 @@ class mergeStats {
protected:
pageid_t num_tuples_out; // How many tuples did we write?
pageid_t num_datapages_out; // How many datapages?
public:
pageid_t bytes_in_small; // How many bytes from the small input tree (for C0, we ignore tree overheads)?
protected:
pageid_t bytes_in_small_delta; // How many bytes from the small input tree during this tick (for C0, we ignore tree overheads)?
pageid_t num_tuples_in_small; // Tuples from the small input?
pageid_t bytes_in_large; // Bytes from the large input?

View file

@ -141,18 +141,10 @@ void* memMergeThread(void*arg)
}
#else
// the merge iterator will wait until c0 is big enough for us to proceed.
if(!ltable->is_still_running()) {
done = 1;
}
while(ltable->tree_bytes < 0.5 * (double)ltable->max_c0_size && ! done) {
rwlc_unlock(ltable->header_mut);
sleep(1); // XXX fixme!
rwlc_writelock(ltable->header_mut);
if(!ltable->is_still_running()) {
done = 1;
}
}
#endif
if(done==1)
@ -177,8 +169,8 @@ void* memMergeThread(void*arg)
#else
// memTreeComponent<datatuple>::revalidatingIterator *itrB =
// new memTreeComponent<datatuple>::revalidatingIterator(ltable->get_tree_c0(), &ltable->rb_mut);
memTreeComponent<datatuple>::batchedRevalidatingIterator *itrB =
new memTreeComponent<datatuple>::batchedRevalidatingIterator(ltable->get_tree_c0(), 100, &ltable->rb_mut);
// memTreeComponent<datatuple>::batchedRevalidatingIterator *itrB =
// new memTreeComponent<datatuple>::batchedRevalidatingIterator(ltable->get_tree_c0(), &ltable->tree_bytes, ltable->max_c0_size, &ltable->flushing, 100, &ltable->rb_mut);
#endif
//create a new tree
@ -187,7 +179,11 @@ void* memMergeThread(void*arg)
ltable->set_tree_c1_prime(c1_prime);
rwlc_unlock(ltable->header_mut);
#ifndef NO_SNOWSHOVEL
// needs to be past the rwlc_unlock...
memTreeComponent<datatuple>::batchedRevalidatingIterator *itrB =
new memTreeComponent<datatuple>::batchedRevalidatingIterator(ltable->get_tree_c0(), &ltable->tree_bytes, ltable->max_c0_size, &ltable->flushing, 100, &ltable->rb_mut);
#endif
//: do the merge
DEBUG("mmt:\tMerging:\n");
@ -235,9 +231,21 @@ void* memMergeThread(void*arg)
//TODO: this is simplistic for now
//6: if c1' is too big, signal the other merger
// update c0 effective size.
double frac = 1.0/(double)merge_count;
ltable->num_c0_mergers = merge_count;
ltable->mean_c0_effective_size =
(int64_t) (
((double)ltable->mean_c0_effective_size)*(1-frac) +
((double)stats->bytes_in_small*frac));
ltable->merge_mgr->get_merge_stats(0)->target_size = ltable->mean_c0_effective_size;
double target_R = *ltable->R();
printf("Merge done. R = %f MemSize = %lld Mean = %lld, This = %lld, Count = %d factor %3.3fcur%3.3favg\n", target_R, (long long)ltable->max_c0_size, (long long int)ltable->mean_c0_effective_size, stats->bytes_in_small, merge_count, ((double)stats->bytes_in_small) / (double)ltable->max_c0_size, ((double)ltable->mean_c0_effective_size) / (double)ltable->max_c0_size);
assert(target_R >= MIN_R);
bool signal_c2 = (new_c1_size / ltable->max_c0_size > target_R);
bool signal_c2 = (new_c1_size / ltable->mean_c0_effective_size > target_R);
DEBUG("\nc1 size %f R %f\n", new_c1_size, target_R);
if( signal_c2 )
{
@ -368,7 +376,7 @@ void *diskMergeThread(void*arg)
merge_count++;
//update the current optimal R value
*(ltable->R()) = std::max(MIN_R, sqrt( ((double)stats->output_size()) / (ltable->max_c0_size) ) );
*(ltable->R()) = std::max(MIN_R, sqrt( ((double)stats->output_size()) / ((double)ltable->mean_c0_effective_size) ) );
DEBUG("\nR = %f\n", *(ltable->R()));