move update progress out of application threads, and into background thread

git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@3288 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
sears 2011-11-04 20:59:22 +00:00
parent 8843807ab5
commit 297e715514
2 changed files with 50 additions and 15 deletions

View file

@ -32,6 +32,7 @@ mergeManager::~mergeManager() {
still_running = false;
pthread_cond_signal(&pp_cond);
pthread_join(pp_thread, 0);
pthread_join(update_progress_pthread, 0);
pthread_cond_destroy(&pp_cond);
delete c0;
delete c1;
@ -81,7 +82,9 @@ void mergeManager::update_progress(mergeStats * s, int delta) {
}
} else if(s->merge_level == 1) { // C0-C1 merge (c0 is continuously growing...)
if(s->active) {
s->in_progress = ((double)(s->bytes_in_large+s->bytes_in_small)) / (double)(s->base_size+fmax(ltable->mean_c0_run_length,(double)s->bytes_in_small));
// s->in_progress = ((double)(s->bytes_in_large+s->bytes_in_small)) / (double)(s->base_size+fmax(ltable->mean_c0_run_length,(double)s->bytes_in_small));
s->in_progress = ((double)(s->bytes_in_large+s->bytes_in_small)) / (double)(s->base_size+fmax(c0->target_size,(double)s->bytes_in_small));
//s->in_progress = ((double)(s->bytes_in_large+s->bytes_in_small)) / (((double)s->base_size)+(double)s->bytes_in_small);
} else {
s->in_progress = 0;
}
@ -352,7 +355,7 @@ void mergeManager::update_progress(mergeStats * s, int delta) {
* bytes_consumed_by_merger = sum(stats_bytes_in_small_delta)
*/
void mergeManager::tick(mergeStats * s) {
if(s->merge_level == 1) { // apply backpressure based on merge progress.
if(s && s->merge_level == 1) { // apply backpressure based on merge progress.
if(s->need_tick) {
s->need_tick = 0;
// Only apply back pressure if next thread is not waiting on us.
@ -377,10 +380,11 @@ void mergeManager::tick(mergeStats * s) {
rwlc_unlock(ltable->header_mut);
}
}
} else if(s->merge_level == 0) {
} else if((!s) || s->merge_level == 0) {
// Simple backpressure algorithm based on how full C0 is.
pageid_t cur_c0_sz;
if(s) {
// Is C0 bigger than is allowed?
while((cur_c0_sz = s->get_current_size()) > ltable->max_c0_size) { // can't use s->current_size, since this is the thread that maintains that number...
printf("\nMEMORY OVERRUN!!!! SLEEP!!!!\n");
@ -390,11 +394,16 @@ void mergeManager::tick(mergeStats * s) {
}
// Linear backpressure model
s->out_progress = ((double)cur_c0_sz)/((double)ltable->max_c0_size);
double delta = ((double)cur_c0_sz)/(0.9*(double)ltable->max_c0_size); // 0 <= delta <= 1.111...
} else {
cur_c0_sz = c0->get_current_size();
}
double delta = ((double)cur_c0_sz)/(0.95*(double)ltable->max_c0_size); // 0 <= delta <= 1.111...
delta -= 1.0;
if(delta > 0.00005) {
double slp = 0.001 + 5.0 * delta; //0.0015 < slp < 1.112111..
if(!s) {
// printf("sleeping!\n");
}
DEBUG("\nmem sleeping %0.6f tree_megabytes %0.3f\n", slp, ((double)ltable->tree_bytes)/(1024.0*1024.0));
struct timespec sleeptime;
double_to_ts(&sleeptime, slp);
@ -412,7 +421,9 @@ void mergeManager::read_tuple_from_small_component(int merge_level, datatuple *
(s->stats_bytes_in_small_delta) += tup->byte_length();
#endif
(s->bytes_in_small) += tup->byte_length();
if(merge_level != 0) {
update_progress(s, tup->byte_length());
}
tick(s);
}
}
@ -421,9 +432,11 @@ void mergeManager::read_tuple_from_large_component(int merge_level, int tuple_co
mergeStats * s = get_merge_stats(merge_level);
s->num_tuples_in_large += tuple_count;
s->bytes_in_large += byte_len;
if(merge_level != 0) {
update_progress(s, byte_len);
}
}
}
void mergeManager::wrote_tuple(int merge_level, datatuple * tup) {
mergeStats * s = get_merge_stats(merge_level);
@ -455,7 +468,21 @@ void mergeManager::finished_merge(int merge_level) {
#endif
update_progress(get_merge_stats(merge_level), 0);
}
void * mergeManager::update_progress_thread() {
pthread_mutex_t dummy_mut;
pthread_mutex_init(&dummy_mut, 0);
while(still_running) {
struct timeval tv;
gettimeofday(&tv, 0);
struct timespec ts;
double_to_ts(&ts, tv_to_double(&tv)+0.1);
pthread_cond_timedwait(&update_progress_cond, &dummy_mut, &ts);
// printf("Calling update progress\n");
update_progress(c0,0);
}
return 0;
}
void * mergeManager::pretty_print_thread() {
pthread_mutex_t dummy_mut;
pthread_mutex_init(&dummy_mut, 0);
@ -479,6 +506,10 @@ void * merge_manager_pretty_print_thread(void * arg) {
mergeManager * m = (mergeManager*)arg;
return m->pretty_print_thread();
}
void * merge_manager_update_progress_thread(void * arg) {
mergeManager * m = (mergeManager*)arg;
return m->update_progress_thread();
}
double mergeManager::c1_c2_progress_delta() {
return c1_c2_delta;
@ -497,6 +528,7 @@ void mergeManager::init_helper(void) {
still_running = true;
pthread_cond_init(&pp_cond, 0);
pthread_create(&pp_thread, 0, merge_manager_pretty_print_thread, (void*)this);
pthread_create(&update_progress_pthread, 0, merge_manager_update_progress_thread, (void*)this);
}
mergeManager::mergeManager(logtable *ltable):

View file

@ -60,6 +60,7 @@ public:
void wrote_tuple(int merge_level, datatuple * tup);
void pretty_print(FILE * out);
void *pretty_print_thread();
void *update_progress_thread();
private:
/**
@ -102,5 +103,7 @@ private:
bool still_running;
pthread_cond_t pp_cond;
pthread_t pp_thread;
pthread_cond_t update_progress_cond;
pthread_t update_progress_pthread;
};
#endif /* MERGEMANAGER_H_ */