remove mergeStats->current_size field, which is derivable from other fields in mergeStats

git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@1508 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
sears 2010-12-15 00:15:59 +00:00
parent cd27f50baf
commit 052508fb76
5 changed files with 28 additions and 34 deletions

View file

@ -188,8 +188,6 @@ void logtable<TUPLE>::flushTable()
merge_count ++;
merge_mgr->get_merge_stats(0)->starting_merge();
merge_mgr->get_merge_stats(0)->current_size = 0;
if(blocked && stop - start > 1.0) {
if(first)
{
@ -492,9 +490,6 @@ datatuple * logtable<TUPLE>::insertTupleHelper(datatuple *tuple)
tree_c0->insert(new_t); //insert the new tuple
//update the tree size (+ new_t size - pre_t size)
merge_mgr->get_merge_stats(0)->current_size += ((int64_t)new_t->byte_length() - (int64_t)pre_t->byte_length());
}
else //no tuple with same key exists in mem-tree
{
@ -503,11 +498,7 @@ datatuple * logtable<TUPLE>::insertTupleHelper(datatuple *tuple)
//insert tuple into the rbtree
tree_c0->insert(t);
merge_mgr->get_merge_stats(0)->current_size += t->byte_length();// + RB_TREE_OVERHEAD;
}
merge_mgr->wrote_tuple(0, t); // needs to be here; doesn't grab a mutex.
return pre_t;
}

View file

@ -3,7 +3,7 @@
#include <set>
#include <assert.h>
#include <mergeManager.h> // XXX for double_to_ts.
#include <mergeStats.h>
template<class TUPLE>
class memTreeComponent {
public:
@ -167,8 +167,8 @@ public:
void populate_next_ret(TUPLE *key=NULL) {
if(cur_off_ == num_batched_) {
if(mut_) pthread_mutex_lock(mut_);
if(cur_size_) {
while(*cur_size_ < (0.8 * (double)target_size_) && ! *flushing_) { // TODO: how to pick this threshold? Too high, and the disk is idle. Too low, and we waste ram.
if(mgr_) {
while(mgr_->get_merge_stats(0)->get_current_size() < (0.8 * (double)target_size_) && ! *flushing_) { // TODO: how to pick this threshold? Too high, and the disk is idle. Too low, and we waste ram.
pthread_mutex_unlock(mut_);
struct timespec ts;
mergeManager::double_to_ts(&ts, 0.1);
@ -186,11 +186,11 @@ public:
}
public:
batchedRevalidatingIterator( rbtree_t *s, pageid_t* cur_size, int64_t target_size, bool * flushing, int batch_size, pthread_mutex_t * rb_mut ) : s_(s), cur_size_(cur_size), target_size_(target_size), flushing_(flushing), batch_size_(batch_size), num_batched_(batch_size), cur_off_(batch_size), mut_(rb_mut) {
batchedRevalidatingIterator( rbtree_t *s, mergeManager * mgr, int64_t target_size, bool * flushing, int batch_size, pthread_mutex_t * rb_mut ) : s_(s), mgr_(mgr), target_size_(target_size), flushing_(flushing), batch_size_(batch_size), num_batched_(batch_size), cur_off_(batch_size), mut_(rb_mut) {
next_ret_ = (TUPLE**)malloc(sizeof(next_ret_[0]) * batch_size_);
populate_next_ret();
}
batchedRevalidatingIterator( rbtree_t *s, int batch_size, pthread_mutex_t * rb_mut, TUPLE *&key ) : s_(s), cur_size_(0), target_size_(0), flushing_(0), batch_size_(batch_size), num_batched_(batch_size), cur_off_(batch_size), mut_(rb_mut) {
batchedRevalidatingIterator( rbtree_t *s, int batch_size, pthread_mutex_t * rb_mut, TUPLE *&key ) : s_(s), mgr_(NULL), target_size_(0), flushing_(0), batch_size_(batch_size), num_batched_(batch_size), cur_off_(batch_size), mut_(rb_mut) {
next_ret_ = (TUPLE**)malloc(sizeof(next_ret_[0]) * batch_size_);
populate_next_ret(key);
}
@ -217,7 +217,7 @@ public:
rbtree_t *s_;
TUPLE ** next_ret_;
pageid_t* cur_size_; // a pointer to the current size of the red-black tree, in bytes.
mergeManager * mgr_;
int64_t target_size_; // the low-water size for the tree. If cur_size_ is not null, and *cur_size_ < C * target_size_, we sleep.
bool* flushing_; // never block if *flushing is true.
int batch_size_;

View file

@ -44,10 +44,11 @@ void mergeManager::new_merge(int mergeLevel) {
assert(c0->target_size);
c1->target_size = (pageid_t)(*ltable->R() * (double)ltable->mean_c0_run_length);
assert(c1->target_size);
s->new_merge2();
} else if(s->merge_level == 2) {
// target_size is infinity...
s->new_merge2();
} else { abort(); }
s->new_merge2();
}
void mergeManager::set_c0_size(int64_t size) {
assert(size);
@ -76,13 +77,7 @@ void mergeManager::update_progress(mergeStats * s, int delta) {
}
}
if(s->merge_level == 0) {
// ltable manages c0's current size directly.
} else {
s->current_size = s->base_size + s->bytes_out - s->bytes_in_large;
}
s->out_progress = ((double)s->current_size) / ((s->merge_level == 0 ) ? (double)ltable->mean_c0_run_length : (double)s->target_size);
s->out_progress = ((double)s->get_current_size()) / ((s->merge_level == 0 ) ? (double)ltable->mean_c0_run_length : (double)s->target_size);
if(c2->active && c1->mergeable_size) {
c1_c2_delta = c1->out_progress - c2->in_progress;
} else {
@ -162,16 +157,17 @@ void mergeManager::tick(mergeStats * s) {
} else if(s->merge_level == 0) {
// Simple backpressure algorithm based on how full C0 is.
pageid_t cur_c0_sz;
// Is C0 bigger than is allowed?
while(c0->current_size > ltable->max_c0_size) { // can't use s->current_size, since this is the thread that maintains that number...
while((cur_c0_sz = s->get_current_size()) > ltable->max_c0_size) { // can't use s->current_size, since this is the thread that maintains that number...
printf("\nMEMORY OVERRUN!!!! SLEEP!!!!\n");
struct timespec ts;
double_to_ts(&ts, 0.1);
nanosleep(&ts, 0);
}
// Linear backpressure model
s->out_progress = ((double)c0->current_size)/((double)ltable->max_c0_size);
double delta = ((double)c0->current_size)/(0.9*(double)ltable->max_c0_size); // 0 <= delta <= 1.111...
s->out_progress = ((double)cur_c0_sz)/((double)ltable->max_c0_size);
double delta = ((double)cur_c0_sz)/(0.9*(double)ltable->max_c0_size); // 0 <= delta <= 1.111...
delta -= 1.0;
if(delta > 0.00005) {
double slp = 0.001 + 5.0 * delta; //0.0015 < slp < 1.112111..
@ -325,7 +321,7 @@ void mergeManager::pretty_print(FILE * out) {
}
pageid_t mb = 1024 * 1024;
fprintf(out,"[merge progress MB/s window (lifetime)]: app [%s %6lldMB tot %6lldMB cur ~ %3.0f%%/%3.0f%% %6.1fsec %4.1f (%4.1f)] %s %s [%s %3.0f%% ~ %3.0f%% %4.1f (%4.1f)] %s %s [%s %3.0f%% %4.1f (%4.1f)] %s ",
c0->active ? "RUN" : "---", (long long)(c0->stats_lifetime_consumed / mb), (long long)(c0->current_size / mb), 100.0 * c0->out_progress, 100.0 * ((double)c0->current_size)/(double)ltable->max_c0_size, c0->stats_lifetime_elapsed, c0->stats_bps/((double)mb), c0->stats_lifetime_consumed/(((double)mb)*c0->stats_lifetime_elapsed),
c0->active ? "RUN" : "---", (long long)(c0->stats_lifetime_consumed / mb), (long long)(c0->get_current_size() / mb), 100.0 * c0->out_progress, 100.0 * ((double)c0->get_current_size())/(double)ltable->max_c0_size, c0->stats_lifetime_elapsed, c0->stats_bps/((double)mb), c0->stats_lifetime_consumed/(((double)mb)*c0->stats_lifetime_elapsed),
have_c0 ? "C0" : "..",
have_c0m ? "C0'" : "...",
c1->active ? "RUN" : "---", 100.0 * c1->in_progress, 100.0 * c1->out_progress, c1->stats_bps/((double)mb), c1->stats_lifetime_consumed/(((double)mb)*c1->stats_lifetime_elapsed),

View file

@ -32,7 +32,6 @@ class mergeStats {
base_size(0),
mergeable_size(0),
target_size(target_size),
current_size(0),
bytes_out(0),
bytes_in_small(0),
bytes_in_large(0),
@ -70,7 +69,6 @@ class mergeStats {
base_size = h.base_size;
mergeable_size = h.mergeable_size;
target_size = h.target_size;
current_size = 0;
bytes_out = base_size;
bytes_in_small = 0;
bytes_in_large = 0;
@ -100,7 +98,6 @@ class mergeStats {
just_handed_off = false;
}
base_size = bytes_out;
current_size = base_size;
bytes_out = 0;
bytes_in_small = 0;
bytes_in_large = 0;
@ -125,10 +122,18 @@ class mergeStats {
mergeManager::double_to_ts(&stats_last_tick, mergeManager::tv_to_double(&last));
#endif
}
pageid_t get_current_size() {
if(merge_level == 0) {
return base_size + bytes_in_small - bytes_in_large - bytes_out;
} else {
// s->bytes_out has strange semantics. It's how many bytes our input has written into this tree.
return base_size + bytes_out - bytes_in_large;
}
}
void handed_off_tree() {
if(merge_level == 2) {
} else {
mergeable_size = current_size;
mergeable_size = get_current_size();
just_handed_off = true;
}
}
@ -164,7 +169,6 @@ class mergeStats {
pageid_t mergeable_size; // protected by mutex.
public:
pageid_t target_size;
pageid_t current_size;
protected:
pageid_t bytes_out; // How many bytes worth of tuples did we write?
public:

View file

@ -103,7 +103,7 @@ void * merge_scheduler::memMergeThread() {
// needs to be past the rwlc_unlock...
memTreeComponent<datatuple>::batchedRevalidatingIterator *itrB =
new memTreeComponent<datatuple>::batchedRevalidatingIterator(ltable_->get_tree_c0(), &ltable_->merge_mgr->get_merge_stats(0)->current_size, ltable_->max_c0_size, &ltable_->flushing, 100, &ltable_->rb_mut);
new memTreeComponent<datatuple>::batchedRevalidatingIterator(ltable_->get_tree_c0(), ltable_->merge_mgr, ltable_->max_c0_size, &ltable_->flushing, 100, &ltable_->rb_mut);
//: do the merge
DEBUG("mmt:\tMerging:\n");
@ -339,7 +339,7 @@ static int garbage_collect(logtable<datatuple> * ltable_, datatuple ** garbage,
} // close rbitr before touching the tree.
if(t2tmp) {
ltable_->get_tree_c0()->erase(garbage[i]);
ltable_->merge_mgr->get_merge_stats(0)->current_size -= garbage[i]->byte_length();
//ltable_->merge_mgr->get_merge_stats(0)->current_size -= garbage[i]->byte_length();
datatuple::freetuple(t2tmp);
}
datatuple::freetuple(garbage[i]);
@ -423,6 +423,9 @@ void merge_iterators(int xid,
// cannot free any tuples here; they may still be read through a lookup
}
if(stats->merge_level == 1) {
// We consume tuples from c0 as we read them, so update its stats here.
ltable->merge_mgr->wrote_tuple(0, t2);
next_garbage = garbage_collect(ltable, garbage, garbage_len, next_garbage);
garbage[next_garbage] = t2;
next_garbage++;