remove old comments

git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@1055 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
sears 2010-08-24 00:40:48 +00:00
parent 6fd7dd0523
commit 960ff041e2
5 changed files with 151 additions and 226 deletions

View file

@ -179,7 +179,9 @@ void logtable<TUPLE>::flushTable()
gettimeofday(&start_tv,0);
start = tv_to_double(start_tv);
merge_mgr->finished_merge(0);
#ifdef NO_SNOWSHOVEL
merge_mgr->finished_merge(0); // XXX will deadlock..
#endif
flushing = true;
bool blocked = false;

View file

@ -189,35 +189,10 @@ public:
batchedRevalidatingIterator( rbtree_t *s, int64_t* cur_size, int64_t target_size, bool * flushing, int batch_size, pthread_mutex_t * rb_mut ) : s_(s), cur_size_(cur_size), target_size_(target_size), flushing_(flushing), batch_size_(batch_size), num_batched_(batch_size), cur_off_(batch_size), mut_(rb_mut) {
next_ret_ = (TUPLE**)malloc(sizeof(next_ret_[0]) * batch_size_);
populate_next_ret();
/* if(mut_) pthread_mutex_lock(mut_);
if(s_->begin() == s_->end()) {
next_ret_ = NULL;
} else {
next_ret_ = (*s_->begin())->create_copy(); // the create_copy() calls have to happen before we release mut_...
}
if(mut_) pthread_mutex_unlock(mut_); */
}
batchedRevalidatingIterator( rbtree_t *s, int batch_size, pthread_mutex_t * rb_mut, TUPLE *&key ) : s_(s), cur_size_(0), target_size_(0), flushing_(0), batch_size_(batch_size), num_batched_(batch_size), cur_off_(batch_size), mut_(rb_mut) {
next_ret_ = (TUPLE**)malloc(sizeof(next_ret_[0]) * batch_size_);
populate_next_ret(key);
/* if(mut_) pthread_mutex_lock(mut_);
if(key) {
if(s_->find(key) != s_->end()) {
next_ret_ = (*(s_->find(key)))->create_copy();
} else if(s_->upper_bound(key) != s_->end()) {
next_ret_ = (*(s_->upper_bound(key)))->create_copy();
} else {
next_ret_ = NULL;
}
} else {
if(s_->begin() == s_->end()) {
next_ret_ = NULL;
} else {
next_ret_ = (*s_->begin())->create_copy(); // the create_copy() calls have to happen before we release mut_...
}
}
// DEBUG("changing mem next ret = %s key = %s\n", next_ret_ ? (const char*)next_ret_->key() : "NONE", key ? (const char*)key->key() : "NULL");
if(mut_) pthread_mutex_unlock(mut_); */
}
~batchedRevalidatingIterator() {
@ -225,20 +200,9 @@ public:
TUPLE::freetuple(next_ret_[i]);
}
free(next_ret_);
// if(next_ret_) TUPLE::freetuple(next_ret_);
}
TUPLE* next_callerFrees() {
/* if(mut_) pthread_mutex_lock(mut_);
TUPLE * ret = next_ret_;
if(next_ret_) {
if(s_->upper_bound(next_ret_) == s_->end()) {
next_ret_ = 0;
} else {
next_ret_ = (*s_->upper_bound(next_ret_))->create_copy();
}
}
if(mut_) pthread_mutex_unlock(mut_); */
if(cur_off_ == num_batched_) { return NULL; } // the last thing we did is call populate_next_ret_(), which only leaves us in this state at the end of the iterator.
TUPLE * ret = next_ret_[cur_off_];
cur_off_++;

View file

@ -23,8 +23,6 @@ mergeStats* mergeManager:: get_merge_stats(int mergeLevel) {
}
mergeManager::~mergeManager() {
pthread_mutex_destroy(&throttle_mut);
pthread_cond_destroy(&throttle_wokeup_cond);
delete c0;
delete c1;
delete c2;
@ -51,10 +49,8 @@ void mergeManager::set_c0_size(int64_t size) {
c0->target_size = size;
}
void mergeManager::update_progress(mergeStats * s, int delta) {
s->delta += delta;
void mergeManager::sleep_on_mini_delta(mergeStats *s, int delta) {
s->mini_delta += delta;
{
if(/*s->merge_level < 2*/ s->merge_level == 1 && s->mergeable_size && delta) {
int64_t effective_max_delta = (int64_t)(UPDATE_PROGRESS_PERIOD * s->bps);
@ -67,19 +63,31 @@ void mergeManager::update_progress(mergeStats * s, int delta) {
double elapsed_delta = now_double - ts_to_double(&s->last_mini_tick);
double slp = UPDATE_PROGRESS_PERIOD - elapsed_delta;
if(slp > 0.001) {
// struct timespec sleeptime;
// double_to_ts(&sleeptime, slp);
// nanosleep(&sleeptime, 0);
// printf("%d Sleep A %f\n", s->merge_level, slp);
struct timespec sleeptime;
double_to_ts(&sleeptime, slp);
nanosleep(&sleeptime, 0);
printf("%d Sleep A %f\n", s->merge_level, slp);
}
double_to_ts(&s->last_mini_tick, now_double);
s->mini_delta = 0;
}
}
}
void mergeManager::update_progress(mergeStats * s, int delta) {
s->delta += delta;
#if 0
#ifdef NO_SNOWSHOVEL
if(merge_level < 2 && delta) {
#else
if(merge_level == 1 && delta) {
#endif
sleep_on_mini_delta(s, delta);
}
#endif
if((!delta) || s->delta > UPDATE_PROGRESS_DELTA) {
if(delta) {
rwlc_writelock(ltable->header_mut);
if(delta) {
s->delta = 0;
if(!s->need_tick) { s->need_tick = 1; }
}
@ -96,33 +104,23 @@ void mergeManager::update_progress(mergeStats * s, int delta) {
} else if(s->merge_level == 1) { // C0-C1 merge (c0 is continuously growing...)
if(s->active) {
s->in_progress = ((double)(s->bytes_in_large+s->bytes_in_small)) / (double)(s->base_size+ltable->mean_c0_effective_size);
if(s->in_progress > 0.95) { s->in_progress = 0.95; }
assert(s->in_progress > -0.01 && s->in_progress < 1.02);
// if(s->in_progress > 0.95) { s->in_progress = 0.95; }
// assert(s->in_progress > -0.01 && s->in_progress < 1.02);
} else {
s->in_progress = 0;
}
}
if(s->merge_level != 2) {
if(s->mergeable_size) {
s->out_progress = ((double)s->current_size + (double)s->base_size) / (double)s->target_size;
} else {
s->out_progress = 0.0;
}
}
#ifdef NO_SNOWSHOVEL
s->current_size = s->base_size + s->bytes_out - s->bytes_in_large;
s->out_progress = ((double)s->current_size) / (double)s->target_size;
#else
if(delta) {
if(s->merge_level == 0) {
s->current_size = ltable->tree_bytes; // we need to track the number of bytes consumed by the merger; this data is not present in s, so fall back on ltable's aggregate.
} else {
s->current_size = s->base_size + s->bytes_out - s->bytes_in_large;
}
s->out_progress = ((double)s->current_size) / (double)s->target_size;
}
#endif
s->out_progress = ((double)s->current_size) / (double)s->target_size;
struct timeval now;
gettimeofday(&now, 0);
double elapsed_delta = tv_to_double(&now) - ts_to_double(&s->last_tick);
@ -140,11 +138,99 @@ void mergeManager::update_progress(mergeStats * s, int delta) {
s->bytes_in_small_delta = 0;
if(delta) rwlc_unlock(ltable->header_mut);
rwlc_unlock(ltable->header_mut);
}
}
void mergeManager::tick_based_on_merge_progress(mergeStats *s) {
/* model the effect of linux + stasis' write caches; at the end
of this merge, we need to force up to FORCE_INTERVAL bytes
after we think we're done writing the next component. */
int64_t overshoot_fudge = (int64_t)((s->out_progress) * ((double)FORCE_INTERVAL));
/* model the effect of amortizing this computation: we could
become this much more overshot if we don't act now. */
int64_t overshoot_fudge2 = UPDATE_PROGRESS_DELTA;
/* multiply by 2 for good measure. These are 'soft' walls, and
still let writes trickle through. Once we've exhausted the
fudge factors, we'll hit a hard wall, and stop writes
entirely, so it's better to start throttling too early than
too late. */
overshoot_fudge *= 2;
overshoot_fudge2 *= 4;
const double max_c0_sleep = 0.1;
const double min_c0_sleep = 0.01;
const double max_c1_sleep = 0.5;
const double min_c1_sleep = 0.1;
double max_sleep = s->merge_level == 0 ? max_c0_sleep : max_c1_sleep;
double min_sleep = s->merge_level == 0 ? min_c0_sleep : min_c1_sleep;
int spin = 0;
double total_sleep = 0.0;
while(1) {
int64_t overshoot = 0;
int64_t overshoot2 = 0;
int64_t raw_overshoot = 0;
double bps;
// This needs to be here (and not in update_progress), since the other guy's in_progress changes while we sleep.
mergeStats * s1;
if(s->merge_level == 0) {
// dead code
s1 = c1;
} else {
s1 = c2;
}
if(s->mergeable_size) { // only apply backpressure if the next merger is not waiting for us
rwlc_readlock(ltable->header_mut);
if(s1->active && s->mergeable_size) {
raw_overshoot = (int64_t)(((double)s->target_size) * (s->out_progress - s1->in_progress));
bps = s1->bps;
}
rwlc_unlock(ltable->header_mut);
}
//#define PP_THREAD_INFO
#ifdef PP_THREAD_INFO
printf("\nMerge thread %d %6f %6f Overshoot: raw=%lld, d=%lld eff=%lld Throttle min(1, %6f) spin %d, total_sleep %6.3f\n", s->merge_level, c0_out_progress, c0_c1_in_progress, raw_overshoot, overshoot_fudge, overshoot, -1.0, spin, total_sleep);
#endif
bool one_threshold = (overshoot > 0 || overshoot2 > 0) || (raw_overshoot > 0);
bool two_threshold = (overshoot > 0 || overshoot2 > 0) && (raw_overshoot > 0);
if(one_threshold && (two_threshold || total_sleep < 0.01)) {
// throttle
// it took "elapsed" seconds to process "tick_length_bytes" mb
double sleeptime = 2.0 * fmax((double)overshoot,(double)overshoot2) / bps;
if(sleeptime < min_sleep) { sleeptime = min_sleep; }
if(sleeptime > max_sleep) { sleeptime = max_sleep; }
spin ++;
total_sleep += sleeptime;
if((spin > 40) || (total_sleep > (max_sleep * 20.0))) {
printf("\nMerge thread %d c0->out=%f c1->in=%f c1->out=%f c2->in=%f\n", s->merge_level, c0->out_progress, c1->in_progress, c1->out_progress, c2->in_progress);
printf("\nMerge thread %d Overshoot: raw=%lld, d=%lld eff=%lld eff2=%lld Throttle min(1, %6f) spin %d, total_sleep %6.3f\n", s->merge_level, (long long)raw_overshoot, (long long)overshoot_fudge, (long long)overshoot, (long long)overshoot2, sleeptime, spin, total_sleep);
}
sleeping[s->merge_level] = true;
struct timespec ts;
double_to_ts(&ts, sleeptime);
nanosleep(&ts, 0);
// printf("%d Sleep B %f\n", s->merge_level, sleeptime);
sleeping[s->merge_level] = false;
update_progress(s1, 0);
} else if(overshoot > 0 || overshoot2 > 0) { // this is our second time around the loop, so remember we're in the hole
s->need_tick ++;
if(s->need_tick > 500) { printf("need tick %d\n", s->need_tick); }
} else { // all is well in the world. we can run worry-free for a while.
s->need_tick = 0;
}
break;
}
}
/**
* This function is invoked periodically by the merge threads. It updates mergeManager's statistics, and applies
* backpressure as necessary.
@ -166,156 +252,38 @@ void mergeManager::update_progress(mergeStats * s, int delta) {
*
* bytes_consumed_by_merger = sum(bytes_in_small_delta)
*/
void mergeManager::tick(mergeStats * s, bool block, bool force) {
#define PRINT_SKIP 10000
if(force || s->need_tick) {
if(block) {
if(s->merge_level == 0) {
// pthread_mutex_lock(&ltable->tick_mut);
rwlc_readlock(ltable->header_mut);
while(sleeping[s->merge_level]) {
abort();
rwlc_unlock(ltable->header_mut);
// pthread_cond_wait(&throttle_wokeup_cond, &ltable->tick_mut);
rwlc_writelock(ltable->header_mut);
}
} else {
rwlc_readlock(ltable->header_mut);
while(sleeping[s->merge_level]) {
abort(); // if we're asleep, didn't this thread make us sleep???
rwlc_cond_wait(&throttle_wokeup_cond, ltable->header_mut);
}
}
void mergeManager::tick(mergeStats * s) {
if(s->need_tick) {
#ifdef NO_SNOWSHOVEL
bool snowshovel = false;
#else
bool snowshovel = true;
#endif
if((!snowshovel) || s->merge_level == 1) { // apply backpressure based on merge progress.
int64_t overshoot = 0;
int64_t overshoot2 = 0;
int64_t raw_overshoot = 0;
/* model the effect of linux + stasis' write caches; at the end
of this merge, we need to force up to FORCE_INTERVAL bytes
after we think we're done writing the next component. */
double skew = 0.0;
int64_t overshoot_fudge = (int64_t)((s->out_progress-skew) * ((double)FORCE_INTERVAL)/(1.0-skew));
/* model the effect of amortizing this computation: we could
become this much more overshot if we don't act now. */
int64_t overshoot_fudge2 = UPDATE_PROGRESS_DELTA; //(int64_t)(((double)UPDATE_PROGRESS_PERIOD) * s->bps / 1000.0);
/* multiply by 2 for good measure. These are 'soft' walls, and
still let writes trickle through. Once we've exausted the
fudge factors, we'll hit a hard wall, and stop writes
entirely, so it's better to start thottling too early than
too late. */
overshoot_fudge *= 2;
overshoot_fudge2 *= 4;
int spin = 0;
double total_sleep = 0.0;
do{
overshoot = 0;
overshoot2 = 0;
raw_overshoot = 0;
double bps;
// This needs to be here (and not in update_progress), since the other guy's in_progress changes while we sleep.
if(s->merge_level == 0) {
abort();
if(!(c1->active && c0->mergeable_size)) { overshoot_fudge = 0; overshoot_fudge2 = 0; }
raw_overshoot = (int64_t)(((double)c0->target_size) * (c0->out_progress - c1->in_progress));
overshoot = raw_overshoot + overshoot_fudge;
overshoot2 = raw_overshoot + overshoot_fudge2;
bps = c1->bps;
} else if (s->merge_level == 1) {
if(!(c2->active && c1->mergeable_size)) { overshoot_fudge = 0; overshoot_fudge2 = 0; }
raw_overshoot = (int64_t)(((double)c1->target_size) * (c1->out_progress - c2->in_progress));
overshoot = raw_overshoot + overshoot_fudge;
overshoot2 = raw_overshoot + overshoot_fudge2;
bps = c2->bps;
if(!c1->mergeable_size) { overshoot = 0; overshoot2 = 0; }
}
//#define PP_THREAD_INFO
#ifdef PP_THREAD_INFO
printf("\nMerge thread %d %6f %6f Overshoot: raw=%lld, d=%lld eff=%lld Throttle min(1, %6f) spin %d, total_sleep %6.3f\n", s->merge_level, c0_out_progress, c0_c1_in_progress, raw_overshoot, overshoot_fudge, overshoot, -1.0, spin, total_sleep);
#endif
bool one_threshold = (overshoot > 0 || overshoot2 > 0) || (raw_overshoot > 0);
bool two_threshold = (overshoot > 0 || overshoot2 > 0) && (raw_overshoot > 0);
if(one_threshold && (two_threshold || total_sleep < 0.01)) {
// throttle
// it took "elapsed" seconds to process "tick_length_bytes" mb
double sleeptime = 2.0 * fmax((double)overshoot,(double)overshoot2) / bps;
double max_c0_sleep = 0.1;
double min_c0_sleep = 0.01;
double max_c1_sleep = 0.5;
double min_c1_sleep = 0.1;
double max_sleep = s->merge_level == 0 ? max_c0_sleep : max_c1_sleep;
double min_sleep = s->merge_level == 0 ? min_c0_sleep : min_c1_sleep;
if(sleeptime < min_sleep) { sleeptime = min_sleep; }
if(sleeptime > max_sleep) { sleeptime = max_sleep; }
spin ++;
total_sleep += sleeptime;
if((spin > 40) || (total_sleep > (max_sleep * 20.0))) {
printf("\nMerge thread %d c0->out=%f c1->in=%f c1->out=%f c2->in=%f\n", s->merge_level, c0->out_progress, c1->in_progress, c1->out_progress, c2->in_progress);
printf("\nMerge thread %d Overshoot: raw=%lld, d=%lld eff=%lld eff2=%lld Throttle min(1, %6f) spin %d, total_sleep %6.3f\n", s->merge_level, (long long)raw_overshoot, (long long)overshoot_fudge, (long long)overshoot, (long long)overshoot2, sleeptime, spin, total_sleep);
}
sleeping[s->merge_level] = true;
if(s->merge_level == 0) abort();
rwlc_unlock(ltable->header_mut);
struct timespec ts;
double_to_ts(&ts, sleeptime);
nanosleep(&ts, 0);
// printf("%d Sleep B %f\n", s->merge_level, sleeptime);
// rwlc_writelock(ltable->header_mut);
rwlc_readlock(ltable->header_mut);
sleeping[s->merge_level] = false;
pthread_cond_broadcast(&throttle_wokeup_cond);
if(s->merge_level == 0) { update_progress(c1, 0); }
if(s->merge_level == 1) { update_progress(c2, 0); }
} else {
if(overshoot > 0 || overshoot2 > 0) {
s->need_tick ++;
if(s->need_tick > 500) { printf("need tick %d\n", s->need_tick); }
} else {
s->need_tick = 0;
}
break;
}
} while(1);
tick_based_on_merge_progress(s);
} else if(s->merge_level == 0) {
while(/*s->current_size*/ltable->tree_bytes > ltable->max_c0_size) {
rwlc_unlock(ltable->header_mut);
// Simple backpressure algorithm based on how full C0 is.
// Is C0 bigger than is allowed?
while(ltable->tree_bytes > ltable->max_c0_size) { // can't use s->current_size, since this is the thread that maintains that number...
printf("\nMEMORY OVERRUN!!!! SLEEP!!!!\n");
struct timespec ts;
double_to_ts(&ts, 0.1);
nanosleep(&ts, 0);
rwlc_readlock(ltable->header_mut);
}
double delta = ((double)ltable->tree_bytes)/(0.9*(double)ltable->max_c0_size); // 0 <= delta <= 1.1111 // - (0.9 * (double)ltable->max_c0_size);
// Linear backpressure model
double delta = ((double)ltable->tree_bytes)/(0.9*(double)ltable->max_c0_size); // 0 <= delta <= 1.111...
delta -= 1.0;
if(delta > 0.00005) {
double slp = 0.001 + 10.0 * delta; //delta / (double)(ltable->max_c0_size);
double slp = 0.001 + 10.0 * delta; //0.0015 < slp < 1.112111..
DEBUG("\nsleeping %0.6f tree_megabytes %0.3f\n", slp, ((double)ltable->tree_bytes)/(1024.0*1024.0));
struct timespec sleeptime;
double_to_ts(&sleeptime, slp);
rwlc_unlock(ltable->header_mut);
DEBUG("%d Sleep C %f\n", s->merge_level, slp);
nanosleep(&sleeptime, 0);
rwlc_readlock(ltable->header_mut);
}
}
rwlc_unlock(ltable->header_mut);
}
}
}
@ -326,7 +294,7 @@ void mergeManager::read_tuple_from_small_component(int merge_level, datatuple *
(s->bytes_in_small_delta) += tup->byte_length();
(s->bytes_in_small) += tup->byte_length();
update_progress(s, tup->byte_length());
tick(s, true);
tick(s);
}
}
void mergeManager::read_tuple_from_large_component(int merge_level, datatuple * tup) {
@ -334,9 +302,7 @@ void mergeManager::read_tuple_from_large_component(int merge_level, datatuple *
mergeStats * s = get_merge_stats(merge_level);
s->num_tuples_in_large++;
s->bytes_in_large += tup->byte_length();
// tick(s, false); // would be no-op; we just reduced current_size.
update_progress(s, tup->byte_length());
tick(s,false);
}
}
@ -344,15 +310,10 @@ void mergeManager::wrote_tuple(int merge_level, datatuple * tup) {
mergeStats * s = get_merge_stats(merge_level);
(s->num_tuples_out)++;
(s->bytes_out) += tup->byte_length();
// XXX this just updates stat's current size, and (perhaps) does a pretty print. It should not need a mutex.
// update_progress(s, tup->byte_length());
// tick(s, false);
}
void mergeManager::finished_merge(int merge_level) {
update_progress(get_merge_stats(merge_level), 0);
tick(get_merge_stats(merge_level), false, true); // XXX what does this do???
get_merge_stats(merge_level)->active = false;
if(merge_level != 0) {
get_merge_stats(merge_level - 1)->mergeable_size = 0;
@ -392,8 +353,6 @@ mergeManager::mergeManager(logtable<datatuple> *ltable):
c0(new mergeStats(0, ltable ? ltable->max_c0_size : 10000000)),
c1(new mergeStats(1, (int64_t)(ltable ? ((double)(ltable->max_c0_size) * *ltable->R()) : 100000000.0) )),
c2(new mergeStats(2, 0)) {
pthread_mutex_init(&throttle_mut, 0);
pthread_cond_init(&throttle_wokeup_cond, 0);
struct timeval tv;
gettimeofday(&tv, 0);
sleeping[0] = false;

View file

@ -43,8 +43,10 @@ public:
void new_merge(int mergelevel);
void set_c0_size(int64_t size);
void sleep_on_mini_delta(mergeStats *s, int delta);
void update_progress(mergeStats *s, int delta);
void tick(mergeStats * s, bool block, bool force = false);
void tick(mergeStats * s);
void tick_based_on_merge_progress(mergeStats * s);
mergeStats* get_merge_stats(int mergeLevel);
void read_tuple_from_small_component(int merge_level, datatuple * tup);
void read_tuple_from_large_component(int merge_level, datatuple * tup);
@ -62,8 +64,6 @@ private:
mergeStats * c0;
mergeStats * c1;
mergeStats * c2;
pthread_mutex_t throttle_mut;
pthread_cond_t throttle_wokeup_cond;
bool sleeping[3];
bool still_running;
pthread_cond_t pp_cond;

View file

@ -227,8 +227,6 @@ void* memMergeThread(void*arg)
ltable->update_persistent_header(xid, 1);
Tcommit(xid);
ltable->merge_mgr->finished_merge(1);
//TODO: this is simplistic for now
//6: if c1' is too big, signal the other merger
@ -282,6 +280,8 @@ void* memMergeThread(void*arg)
// 13
rwlc_unlock(ltable->header_mut);
ltable->merge_mgr->finished_merge(1);
// stats->pretty_print(stdout);
//TODO: get the freeing outside of the lock
@ -390,10 +390,10 @@ void *diskMergeThread(void*arg)
ltable->update_persistent_header(xid, 2);
Tcommit(xid);
ltable->merge_mgr->finished_merge(2);
rwlc_unlock(ltable->header_mut);
// stats->pretty_print(stdout);
ltable->merge_mgr->finished_merge(2);
}
return 0;