diff --git a/mergeManager.cpp b/mergeManager.cpp index e82fd57..0500657 100644 --- a/mergeManager.cpp +++ b/mergeManager.cpp @@ -32,6 +32,7 @@ mergeManager::~mergeManager() { still_running = false; pthread_cond_signal(&pp_cond); pthread_join(pp_thread, 0); + pthread_join(update_progress_pthread, 0); pthread_cond_destroy(&pp_cond); delete c0; delete c1; @@ -81,7 +82,9 @@ void mergeManager::update_progress(mergeStats * s, int delta) { } } else if(s->merge_level == 1) { // C0-C1 merge (c0 is continuously growing...) if(s->active) { - s->in_progress = ((double)(s->bytes_in_large+s->bytes_in_small)) / (double)(s->base_size+fmax(ltable->mean_c0_run_length,(double)s->bytes_in_small)); + // s->in_progress = ((double)(s->bytes_in_large+s->bytes_in_small)) / (double)(s->base_size+fmax(ltable->mean_c0_run_length,(double)s->bytes_in_small)); + s->in_progress = ((double)(s->bytes_in_large+s->bytes_in_small)) / (double)(s->base_size+fmax(c0->target_size,(double)s->bytes_in_small)); + //s->in_progress = ((double)(s->bytes_in_large+s->bytes_in_small)) / (((double)s->base_size)+(double)s->bytes_in_small); } else { s->in_progress = 0; } @@ -352,7 +355,7 @@ void mergeManager::update_progress(mergeStats * s, int delta) { * bytes_consumed_by_merger = sum(stats_bytes_in_small_delta) */ void mergeManager::tick(mergeStats * s) { - if(s->merge_level == 1) { // apply backpressure based on merge progress. + if(s && s->merge_level == 1) { // apply backpressure based on merge progress. if(s->need_tick) { s->need_tick = 0; // Only apply back pressure if next thread is not waiting on us. @@ -377,24 +380,30 @@ void mergeManager::tick(mergeStats * s) { rwlc_unlock(ltable->header_mut); } } - } else if(s->merge_level == 0) { + } else if((!s) || s->merge_level == 0) { // Simple backpressure algorithm based on how full C0 is. pageid_t cur_c0_sz; - // Is C0 bigger than is allowed? - while((cur_c0_sz = s->get_current_size()) > ltable->max_c0_size) { // can't use s->current_size, since this is the thread that maintains that number... - printf("\nMEMORY OVERRUN!!!! SLEEP!!!!\n"); - struct timespec ts; - double_to_ts(&ts, 0.1); - nanosleep(&ts, 0); + if(s) { + // Is C0 bigger than is allowed? + while((cur_c0_sz = s->get_current_size()) > ltable->max_c0_size) { // can't use s->current_size, since this is the thread that maintains that number... + printf("\nMEMORY OVERRUN!!!! SLEEP!!!!\n"); + struct timespec ts; + double_to_ts(&ts, 0.1); + nanosleep(&ts, 0); + } + // Linear backpressure model + s->out_progress = ((double)cur_c0_sz)/((double)ltable->max_c0_size); + } else { + cur_c0_sz = c0->get_current_size(); } - // Linear backpressure model - s->out_progress = ((double)cur_c0_sz)/((double)ltable->max_c0_size); - double delta = ((double)cur_c0_sz)/(0.9*(double)ltable->max_c0_size); // 0 <= delta <= 1.111... + double delta = ((double)cur_c0_sz)/(0.95*(double)ltable->max_c0_size); // 0 <= delta <= 1.111... delta -= 1.0; if(delta > 0.00005) { double slp = 0.001 + 5.0 * delta; //0.0015 < slp < 1.112111.. - + if(!s) { + // printf("sleeping!\n"); + } DEBUG("\nmem sleeping %0.6f tree_megabytes %0.3f\n", slp, ((double)ltable->tree_bytes)/(1024.0*1024.0)); struct timespec sleeptime; double_to_ts(&sleeptime, slp); @@ -412,7 +421,9 @@ void mergeManager::read_tuple_from_small_component(int merge_level, datatuple * (s->stats_bytes_in_small_delta) += tup->byte_length(); #endif (s->bytes_in_small) += tup->byte_length(); - update_progress(s, tup->byte_length()); + if(merge_level != 0) { + update_progress(s, tup->byte_length()); + } tick(s); } } @@ -421,7 +432,9 @@ void mergeManager::read_tuple_from_large_component(int merge_level, int tuple_co mergeStats * s = get_merge_stats(merge_level); s->num_tuples_in_large += tuple_count; s->bytes_in_large += byte_len; - update_progress(s, byte_len); + if(merge_level != 0) { + update_progress(s, byte_len); + } } } @@ -455,7 +468,21 @@ void mergeManager::finished_merge(int merge_level) { #endif update_progress(get_merge_stats(merge_level), 0); } +void * mergeManager::update_progress_thread() { + pthread_mutex_t dummy_mut; + pthread_mutex_init(&dummy_mut, 0); + while(still_running) { + struct timeval tv; + gettimeofday(&tv, 0); + struct timespec ts; + double_to_ts(&ts, tv_to_double(&tv)+0.1); + pthread_cond_timedwait(&update_progress_cond, &dummy_mut, &ts); + // printf("Calling update progress\n"); + update_progress(c0,0); + } + return 0; +} void * mergeManager::pretty_print_thread() { pthread_mutex_t dummy_mut; pthread_mutex_init(&dummy_mut, 0); @@ -479,6 +506,10 @@ void * merge_manager_pretty_print_thread(void * arg) { mergeManager * m = (mergeManager*)arg; return m->pretty_print_thread(); } +void * merge_manager_update_progress_thread(void * arg) { + mergeManager * m = (mergeManager*)arg; + return m->update_progress_thread(); +} double mergeManager::c1_c2_progress_delta() { return c1_c2_delta; @@ -497,6 +528,7 @@ void mergeManager::init_helper(void) { still_running = true; pthread_cond_init(&pp_cond, 0); pthread_create(&pp_thread, 0, merge_manager_pretty_print_thread, (void*)this); + pthread_create(&update_progress_pthread, 0, merge_manager_update_progress_thread, (void*)this); } mergeManager::mergeManager(logtable *ltable): diff --git a/mergeManager.h b/mergeManager.h index 264ac9f..e43b0e0 100644 --- a/mergeManager.h +++ b/mergeManager.h @@ -60,6 +60,7 @@ public: void wrote_tuple(int merge_level, datatuple * tup); void pretty_print(FILE * out); void *pretty_print_thread(); + void *update_progress_thread(); private: /** @@ -102,5 +103,7 @@ private: bool still_running; pthread_cond_t pp_cond; pthread_t pp_thread; + pthread_cond_t update_progress_cond; + pthread_t update_progress_pthread; }; #endif /* MERGEMANAGER_H_ */