more stats cleanups

git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@1493 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
sears 2010-12-14 01:49:23 +00:00
parent 43425128ba
commit 73e72b47b2
8 changed files with 52 additions and 43 deletions

View file

@ -903,10 +903,10 @@ void diskTreeComponent::iterator::init_iterators(datatuple * key1, datatuple * k
}
}
diskTreeComponent::iterator::iterator(diskTreeComponent::internalNodes *tree, double* cur_progress_delta, double target_progress_delta, bool * flushing) :
diskTreeComponent::iterator::iterator(diskTreeComponent::internalNodes *tree, mergeManager * mgr, double target_progress_delta, bool * flushing) :
ro_alloc_(new RegionAllocator()),
tree_(tree ? tree->get_root_rec() : NULLRID),
cur_progress_delta_(cur_progress_delta),
mgr_(mgr),
target_progress_delta_(target_progress_delta),
flushing_(flushing)
{
@ -917,7 +917,7 @@ diskTreeComponent::iterator::iterator(diskTreeComponent::internalNodes *tree, do
diskTreeComponent::iterator::iterator(diskTreeComponent::internalNodes *tree, datatuple* key) :
ro_alloc_(new RegionAllocator()),
tree_(tree ? tree->get_root_rec() : NULLRID),
cur_progress_delta_(NULL),
mgr_(NULL),
target_progress_delta_(0.0),
flushing_(NULL)
{
@ -1006,12 +1006,14 @@ datatuple * diskTreeComponent::iterator::next_callerFrees()
// else readTuple is null. We're done.
}
if(readTuple && cur_progress_delta_) {
// *cur_progress_delta is how far ahead we are, as a fraction of the total merge.
while(*cur_progress_delta_ > target_progress_delta_ && ((!flushing_) || (! *flushing_))) { // TODO: how to pick this threshold?
if(readTuple && mgr_) {
// c1_c2_progress_delta() is c1's out progress - c2's in progress. We want to stop processing c2 if we are too far ahead (ie; c2 >> c1; delta << 0).
while(mgr_->c1_c2_progress_delta() < -target_progress_delta_ && ((!flushing_) || (! *flushing_))) { // TODO: how to pick this threshold?
DEBUG("Input is too far behind. Delta is %f\n", mgr_->c1_c2_progress_delta());
struct timespec ts;
mergeManager::double_to_ts(&ts, 0.1);
mergeManager::double_to_ts(&ts, 0.01);
nanosleep(&ts, 0);
mgr_->update_progress(mgr_->get_merge_stats(1), 0);
}
}

View file

@ -64,8 +64,8 @@ class diskTreeComponent {
void writes_done();
iterator * open_iterator(double* cur_size = NULL, double target_size = 0, bool * flushing = NULL) {
return new iterator(ltree, cur_size, target_size, flushing);
iterator * open_iterator(mergeManager * mgr = NULL, double target_size = 0, bool * flushing = NULL) {
return new iterator(ltree, mgr, target_size, flushing);
}
iterator * open_iterator(datatuple * key) {
if(key != NULL) {
@ -201,7 +201,7 @@ class diskTreeComponent {
{
public:
explicit iterator(diskTreeComponent::internalNodes *tree, double* cur_size = NULL, double target_size = 0, bool * flushing = NULL);
explicit iterator(diskTreeComponent::internalNodes *tree, mergeManager * mgr = NULL, double target_size = 0, bool * flushing = NULL);
explicit iterator(diskTreeComponent::internalNodes *tree,datatuple *key);
@ -219,7 +219,7 @@ class diskTreeComponent {
RegionAllocator * ro_alloc_; // has a filehandle that we use to optimize sequential scans.
recordid tree_; //root of the tree
double * cur_progress_delta_;
mergeManager * mgr_;
double target_progress_delta_;
bool * flushing_;

View file

@ -23,7 +23,7 @@ template<class TUPLE>
logtable<TUPLE>::logtable(pageid_t max_c0_size, pageid_t internal_region_size, pageid_t datapage_region_size, pageid_t datapage_size)
{
this->max_c0_size = max_c0_size;
this->mean_c0_effective_size = max_c0_size;
this->mean_c0_run_length = max_c0_size;
this->num_c0_mergers = 0;
r_val = 3.0; // MIN_R
@ -42,7 +42,6 @@ logtable<TUPLE>::logtable(pageid_t max_c0_size, pageid_t internal_region_size, p
tmerger = new tuplemerger(&replace_merger);
header_mut = rwlc_initlock();
pthread_mutex_init(&tick_mut, 0);
pthread_mutex_init(&rb_mut, 0);
pthread_cond_init(&c0_needed, 0);
pthread_cond_init(&c0_ready, 0);
@ -75,7 +74,6 @@ logtable<TUPLE>::~logtable()
}
pthread_mutex_destroy(&rb_mut);
pthread_mutex_destroy(&tick_mut);
rwlc_deletelock(header_mut);
pthread_cond_destroy(&c0_needed);
pthread_cond_destroy(&c0_ready);

View file

@ -112,7 +112,7 @@ public:
pthread_mutex_t rb_mut;
int64_t max_c0_size;
// these track the effectiveness of snowshoveling
int64_t mean_c0_effective_size;
int64_t mean_c0_run_length;
int64_t num_c0_mergers;
mergeManager * merge_mgr;

View file

@ -42,7 +42,7 @@ void mergeManager::new_merge(int mergeLevel) {
// target_size was set during startup
} else if(s->merge_level == 1) {
assert(c0->target_size);
c1->target_size = (pageid_t)(*ltable->R() * (double)c0->target_size);
c1->target_size = (pageid_t)(*ltable->R() * (double)ltable->mean_c0_run_length);
assert(c1->target_size);
} else if(s->merge_level == 2) {
// target_size is infinity...
@ -62,8 +62,7 @@ void mergeManager::update_progress(mergeStats * s, int delta) {
s->delta = 0;
if(!s->need_tick) { s->need_tick = 1; }
}
if(s->merge_level == 2
) {
if(s->merge_level == 2) {
if(s->active) {
s->in_progress = ((double)(s->bytes_in_large + s->bytes_in_small)) / (double)(get_merge_stats(s->merge_level-1)->mergeable_size + s->base_size);
} else {
@ -71,7 +70,7 @@ void mergeManager::update_progress(mergeStats * s, int delta) {
}
} else if(s->merge_level == 1) { // C0-C1 merge (c0 is continuously growing...)
if(s->active) {
s->in_progress = ((double)(s->bytes_in_large+s->bytes_in_small)) / (double)(s->base_size+ltable->mean_c0_effective_size);
s->in_progress = ((double)(s->bytes_in_large+s->bytes_in_small)) / (double)(s->base_size+ltable->mean_c0_run_length);
} else {
s->in_progress = 0;
}
@ -83,7 +82,12 @@ void mergeManager::update_progress(mergeStats * s, int delta) {
s->current_size = s->base_size + s->bytes_out - s->bytes_in_large;
}
s->out_progress = ((double)s->current_size) / (double)s->target_size;
s->out_progress = ((double)s->current_size) / ((s->merge_level == 0 ) ? (double)ltable->mean_c0_run_length : (double)s->target_size);
if(c2->active && c1->mergeable_size) {
c1_c2_delta = c1->out_progress - c2->in_progress;
} else {
c1_c2_delta = -0.02; // We try to keep this number between -0.05 and -0.01.
}
#if EXTENDED_STATS
struct timeval now;
@ -136,16 +140,20 @@ void mergeManager::tick(mergeStats * s) {
// Only apply back pressure if next thread is not waiting on us.
rwlc_readlock(ltable->header_mut);
if(c1->mergeable_size && c2->active) {
double delta = c1->out_progress - c2->in_progress;
rwlc_unlock(ltable->header_mut);
if(delta > -0.01) {
if(c1_c2_delta > -0.01) {
DEBUG("Input is too far ahead. Delta is %f\n", c1_c2_delta);
double delta = c1_c2_delta;
rwlc_unlock(ltable->header_mut);
delta += 0.01; // delta > 0;
double slp = 0.001 + delta;
struct timespec sleeptime;
DEBUG("\ndisk sleeping %0.6f tree_megabytes %0.3f\n", slp, ((double)ltable->tree_bytes)/(1024.0*1024.0));
double_to_ts(&sleeptime,slp);
nanosleep(&sleeptime, 0);
update_progress(s, 0);
s->need_tick = 1;
} else {
rwlc_unlock(ltable->header_mut);
}
} else {
rwlc_unlock(ltable->header_mut);
@ -246,13 +254,14 @@ void * merge_manager_pretty_print_thread(void * arg) {
return m->pretty_print_thread();
}
double mergeManager::c1_c2_progress_delta() {
return c1_c2_delta;
}
void mergeManager::init_helper(void) {
struct timeval tv;
c1_c2_delta = -0.02; // XXX move this magic number somewhere. It's also in update_progress.
gettimeofday(&tv, 0);
sleeping[0] = false;
sleeping[1] = false;
sleeping[2] = false;
cur_c1_c2_progress_delta = c2->in_progress - c1->out_progress;
#if EXTENDED_STATS
double_to_ts(&c0->stats_last_tick, tv_to_double(&tv));

View file

@ -46,6 +46,8 @@ public:
void new_merge(int mergelevel);
void set_c0_size(int64_t size);
void update_progress(mergeStats *s, int delta);
double c1_c2_progress_delta();
void tick(mergeStats * s);
mergeStats* get_merge_stats(int mergeLevel);
void read_tuple_from_small_component(int merge_level, datatuple * tup);
@ -60,8 +62,8 @@ public:
void pretty_print(FILE * out);
void *pretty_print_thread();
double cur_c1_c2_progress_delta;
private:
double c1_c2_delta;
void init_helper(void);
struct marshalled_header {
recordid c0;
@ -69,14 +71,12 @@ private:
recordid c2;
};
logtable<datatuple>* ltable;
double throttle_seconds;
double last_throttle_seconds;
mergeStats * c0;
mergeStats * c1;
mergeStats * c2;
bool sleeping[3];
// The following fields are used to shut down the pretty print thread.
bool still_running;
// Needed so that the pretty print thread can be woken up during shutdown.
pthread_cond_t pp_cond;
pthread_t pp_thread;
};

View file

@ -143,7 +143,6 @@ class mergeStats {
pageid_t output_size() {
return bytes_out;
}
int merge_level; // 1 => C0->C1, 2 => C1->C2
protected:
double float_tv(struct timeval& tv) {
@ -151,14 +150,15 @@ class mergeStats {
}
friend class mergeManager;
protected: // XXX only accessed during initialization.
protected:
struct marshalled_header {
int merge_level;
pageid_t base_size;
pageid_t mergeable_size;
pageid_t target_size; // Needed?
};
public:
public: // XXX eliminate protected fields.
int merge_level; // 1 => C0->C1, 2 => C1->C2
pageid_t base_size; // size of table at beginning of merge. for c0, size of table at beginning of current c0-c1 merge round, plus data written since then. (this minus c1->bytes_in_small is the current size)
protected:
pageid_t mergeable_size; // protected by mutex.

View file

@ -95,7 +95,7 @@ void * merge_scheduler::memMergeThread() {
const int64_t min_bloom_target = ltable_->max_c0_size;
//create a new tree
diskTreeComponent * c1_prime = new diskTreeComponent(xid, ltable_->internal_region_size, ltable_->datapage_region_size, ltable_->datapage_size, stats, (stats->target_size < min_bloom_target ? min_bloom_target : stats->target_size) / 100);
diskTreeComponent * c1_prime = new diskTreeComponent(xid, ltable_->internal_region_size, ltable_->datapage_region_size, ltable_->datapage_size, stats, (stats->target_size < min_bloom_target ? min_bloom_target : stats->target_size) / 100);
ltable_->set_tree_c1_prime(c1_prime);
@ -149,17 +149,17 @@ void * merge_scheduler::memMergeThread() {
// update c0 effective size.
double frac = 1.0/(double)merge_count;
ltable_->num_c0_mergers = merge_count;
ltable_->mean_c0_effective_size =
ltable_->mean_c0_run_length=
(int64_t) (
((double)ltable_->mean_c0_effective_size)*(1-frac) +
((double)ltable_->mean_c0_run_length)*(1-frac) +
((double)stats->bytes_in_small*frac));
ltable_->merge_mgr->get_merge_stats(0)->target_size = ltable_->mean_c0_effective_size;
//ltable_->merge_mgr->get_merge_stats(0)->target_size = ltable_->mean_c0_run_length;
}
printf("Merge done. R = %f MemSize = %lld Mean = %lld, This = %lld, Count = %d factor %3.3fcur%3.3favg\n", *ltable_->R(), (long long)ltable_->max_c0_size, (long long int)ltable_->mean_c0_effective_size, stats->bytes_in_small, merge_count, ((double)stats->bytes_in_small) / (double)ltable_->max_c0_size, ((double)ltable_->mean_c0_effective_size) / (double)ltable_->max_c0_size);
printf("Merge done. R = %f MemSize = %lld Mean = %lld, This = %lld, Count = %d factor %3.3fcur%3.3favg\n", *ltable_->R(), (long long)ltable_->max_c0_size, (long long int)ltable_->mean_c0_run_length, stats->bytes_in_small, merge_count, ((double)stats->bytes_in_small) / (double)ltable_->max_c0_size, ((double)ltable_->mean_c0_run_length) / (double)ltable_->max_c0_size);
assert(*ltable_->R() >= MIN_R);
bool signal_c2 = (new_c1_size / ltable_->mean_c0_effective_size > *ltable_->R());
bool signal_c2 = (new_c1_size / ltable_->mean_c0_run_length > *ltable_->R());
DEBUG("\nc1 size %f R %f\n", new_c1_size, *ltable_->R());
if( signal_c2 )
{
@ -252,7 +252,7 @@ void * merge_scheduler::diskMergeThread()
// 4: do the merge.
//create the iterators
diskTreeComponent::iterator *itrA = ltable_->get_tree_c2()->open_iterator();
diskTreeComponent::iterator *itrB = ltable_->get_tree_c1_mergeable()->open_iterator(&ltable_->merge_mgr->cur_c1_c2_progress_delta, 0.05, &ltable_->shutting_down_);
diskTreeComponent::iterator *itrB = ltable_->get_tree_c1_mergeable()->open_iterator(ltable_->merge_mgr, 0.05, &ltable_->shutting_down_);
//create a new tree
diskTreeComponent * c2_prime = new diskTreeComponent(xid, ltable_->internal_region_size, ltable_->datapage_region_size, ltable_->datapage_size, stats, (uint64_t)(ltable_->max_c0_size * *ltable_->R() + stats->base_size)/ 1000);
@ -289,7 +289,7 @@ void * merge_scheduler::diskMergeThread()
merge_count++;
//update the current optimal R value
*(ltable_->R()) = std::max(MIN_R, sqrt( ((double)stats->output_size()) / ((double)ltable_->mean_c0_effective_size) ) );
*(ltable_->R()) = std::max(MIN_R, sqrt( ((double)stats->output_size()) / ((double)ltable_->mean_c0_run_length) ) );
DEBUG("\nR = %f\n", *(ltable_->R()));