diff --git a/logstore.cpp b/logstore.cpp index 4e55f02..f41cc98 100644 --- a/logstore.cpp +++ b/logstore.cpp @@ -50,7 +50,7 @@ logtable::logtable(pageid_t internal_region_size, pageid_t datapage_regio this->datapage_region_size = datapage_region_size; this->datapage_size = datapage_size; - c0_stats = merge_mgr->newMergeStats(0); + c0_stats = merge_mgr->get_merge_stats(0); c0_stats->new_merge(); c0_stats->starting_merge(); } @@ -143,13 +143,11 @@ void logtable::flushTable() gettimeofday(&start_tv,0); start = tv_to_double(start_tv); - c0_stats->handed_off_tree(); - c0_stats->finished_merge(); - c0_stats->new_merge(); pthread_mutex_lock(&header_mut); int expmcount = merge_count; + c0_stats->finished_merge(); //this is for waiting the previous merger of the mem-tree //hopefullly this wont happen @@ -165,12 +163,16 @@ void logtable::flushTable() } } + c0_stats->handed_off_tree(); + c0_stats->new_merge(); + gettimeofday(&stop_tv,0); stop = tv_to_double(stop_tv); set_tree_c0_mergeable(get_tree_c0()); pthread_cond_signal(&c0_ready); + DEBUG("Signaled c0-c1 merge thread\n"); merge_count ++; set_tree_c0(new memTreeComponent::rbtree_t); @@ -181,7 +183,7 @@ void logtable::flushTable() pthread_mutex_unlock(&header_mut); - if(blocked) { + if(blocked && stop - start > 0.1) { if(first) { printf("\nBlocked writes for %f sec\n", stop-start); @@ -243,7 +245,7 @@ datatuple * logtable::findTuple(int xid, const datatuple::key_t key, size } } - //TODO: Arange to only hold read latches while hitting disk. + //TODO: Arrange to only hold read latches while hitting disk. //step 3: check c1 if(!done) @@ -416,10 +418,11 @@ template void logtable::insertTuple(datatuple *tuple) { //lock the red-black tree - c0_stats->read_tuple_from_small_component(tuple); pthread_mutex_lock(&header_mut); + c0_stats->read_tuple_from_small_component(tuple); //find the previous tuple with same key in the memtree if exists memTreeComponent::rbtree_t::iterator rbitr = tree_c0->find(tuple); + datatuple * t = 0; if(rbitr != tree_c0->end()) { datatuple *pre_t = *rbitr; @@ -427,7 +430,7 @@ void logtable::insertTuple(datatuple *tuple) c0_stats->read_tuple_from_large_component(pre_t); datatuple *new_t = tmerger->merge(pre_t, tuple); c0_stats->merged_tuples(new_t, tuple, pre_t); - c0_stats->wrote_tuple(new_t); + t = new_t; tree_c0->erase(pre_t); //remove the previous tuple tree_c0->insert(new_t); //insert the new tuple @@ -440,16 +443,18 @@ void logtable::insertTuple(datatuple *tuple) else //no tuple with same key exists in mem-tree { - datatuple *t = tuple->create_copy(); + t = tuple->create_copy(); //insert tuple into the rbtree tree_c0->insert(t); - c0_stats->wrote_tuple(t); + tsize++; tree_bytes += t->byte_length();// + RB_TREE_OVERHEAD; } + c0_stats->wrote_tuple(t); + //flushing logic if(tree_bytes >= max_c0_size ) { diff --git a/logstore.h b/logstore.h index c690af3..954756f 100644 --- a/logstore.h +++ b/logstore.h @@ -78,7 +78,7 @@ public: void set_max_c0_size(int64_t max_c0_size) { this->max_c0_size = max_c0_size; merge_mgr->set_c0_size(max_c0_size); - + merge_mgr->get_merge_stats(1); } void set_tree_c0_mergeable(memTreeComponent::rbtree_ptr_t newtree){tree_c0_mergeable = newtree; bump_epoch(); } void update_persistent_header(int xid); diff --git a/mergeManager.cpp b/mergeManager.cpp index 7a2e3d5..5933fcb 100644 --- a/mergeManager.cpp +++ b/mergeManager.cpp @@ -9,7 +9,7 @@ #include "mergeStats.h" #include "logstore.h" #include "math.h" -mergeStats* mergeManager:: newMergeStats(int mergeLevel) { +mergeStats* mergeManager:: get_merge_stats(int mergeLevel) { if (mergeLevel == 0) { return c0; } else if (mergeLevel == 1) { @@ -35,16 +35,19 @@ void mergeManager::new_merge(mergeStats * s) { pthread_mutex_lock(&mut); if(s->merge_count) { if(s->merge_level == 0) { - // queueSize was set during startup + // target_size was set during startup } else if(s->merge_level == 1) { - c1_queueSize = (pageid_t)(*ltable->R() * (double)c0_queueSize); //c1_queueSize > s->bytes_out ? c1_queueSize : s->bytes_out; + assert(c0->target_size); + c1->target_size = (pageid_t)(*ltable->R() * (double)c0->target_size); //c1_queueSize > s->bytes_out ? c1_queueSize : s->bytes_out; + assert(c1->target_size); } else if(s->merge_level == 2) { + // target_size is infinity... } else { abort(); } } pthread_mutex_unlock(&mut); } void mergeManager::set_c0_size(int64_t size) { - c0_queueSize = size; + c0->target_size = size; } /** @@ -68,78 +71,100 @@ void mergeManager::set_c0_size(int64_t size) { * * bytes_consumed_by_merger = sum(bytes_in_small_delta) */ -void mergeManager::tick(mergeStats * s, bool done) { - pageid_t tick_length_bytes = 1024*1024; - if(done || (s->bytes_in_small_delta > tick_length_bytes)) { - pthread_mutex_lock(&mut); - struct timeval now; - gettimeofday(&now, 0); - double elapsed_delta = tv_to_double(&now) - ts_to_double(&s->last_tick); - double bps = (double)s->bytes_in_small_delta / (double)elapsed_delta; +void mergeManager::tick(mergeStats * s, bool block) { + pageid_t tick_length_bytes = 10*1024; - pageid_t current_size = s->bytes_out - s->bytes_in_large; + if(true || s->bytes_in_small_delta > tick_length_bytes) { - int64_t overshoot; - int64_t overshoot_fudge = (int64_t)((double)c0_queueSize * 0.1); - do{ - overshoot = 0; - if(s->merge_level == 0) { - if(done) { - // c0->bytes_in_small = 0; - } else if(c0->mergeable_size) { - overshoot = (overshoot_fudge + c0->mergeable_size - c1->bytes_in_small) // amount left to process - - (c0_queueSize - current_size); // - room for more insertions + s->current_size = s->base_size + s->bytes_out - s->bytes_in_large; + + if(block) { +// pthread_mutex_lock(&mut); + struct timeval now; + gettimeofday(&now, 0); + double elapsed_delta = tv_to_double(&now) - ts_to_double(&s->last_tick); + double bps = 0; // = (double)s->bytes_in_small_delta / (double)elapsed_delta; + + s->lifetime_elapsed += elapsed_delta; + s->lifetime_consumed += s->bytes_in_small_delta; + double decay = 0.9999; // XXX set this in some principled way. Surely, it should depend on tick_length (once that's working...) + s->window_elapsed = (decay * s->window_elapsed) + elapsed_delta; + s->window_consumed = (decay * s->window_consumed) + s->bytes_in_small_delta; + + double_to_ts(&s->last_tick, tv_to_double(&now)); + + s->bytes_in_small_delta = 0; + + int64_t overshoot = 0; + int64_t overshoot_fudge = 1024*1024; // XXX set based on avg / max tuple size? + int spin = 0; + double total_sleep = 0.0; + do{ + + double c0_c1_progress = ((double)(c1->bytes_in_large + c1->bytes_in_small)) / (double)(c0->mergeable_size + c1->base_size); + double c1_c2_progress = ((double)(c2->bytes_in_large + c2->bytes_in_small)) / (double)(c1->mergeable_size + c2->base_size); + + double c0_c1_bps = c1->window_consumed / c1->window_elapsed; + double c1_c2_bps = c2->window_consumed / c2->window_elapsed; + + if(s->merge_level == 0) { + pageid_t c0_c1_bytes_remaining = (pageid_t)((1.0-c0_c1_progress) * (double)c0->mergeable_size); + pageid_t c0_bytes_left = c0->target_size - c0->current_size; + overshoot = overshoot_fudge + c0_c1_bytes_remaining - c0_bytes_left; + bps = c0_c1_bps; + if(!c0->mergeable_size) { overshoot = -1; } + if(c0->mergeable_size && ! c1->active) { overshoot = c0->current_size + overshoot_fudge; } + } else if (s->merge_level == 1) { + pageid_t c1_c2_bytes_remaining = (pageid_t)((1.0-c1_c2_progress) * (double)c1->mergeable_size); + pageid_t c1_bytes_left = c1->target_size - c1->current_size; + overshoot = overshoot_fudge + c1_c2_bytes_remaining - c1_bytes_left; + if(!c1->mergeable_size) { overshoot = -1; } + if(c1->mergeable_size && ! c2->active) { overshoot = c1->current_size + overshoot_fudge; } + bps = c1_c2_bps; } - } else if (s->merge_level == 1) { - if(done) { - c0->mergeable_size = 0; - c1->bytes_in_small = 0; - } else if(/*c1_queueSize && */c1->mergeable_size) { - overshoot = (c1->mergeable_size - c2->bytes_in_small) - - (c1_queueSize - current_size); - } - } else if (s->merge_level == 2) { - if(done) { - c1->mergeable_size = 0; - c2->bytes_in_small = 0; - } - // Never throttle this merge. - } - static int num_skipped = 0; - if(num_skipped == 10) { - printf("#%d mbps %6.1f overshoot %9lld current_size = %9lld ",s->merge_level, bps / (1024.0*1024.0), overshoot, current_size); - pretty_print(stdout); - num_skipped = 0; - } - num_skipped ++; - if(overshoot > 0) { - // throttle - // it took "elapsed" seconds to process "tick_length_bytes" mb - double sleeptime = (double)overshoot / bps; // 2 is a fudge factor - struct timespec sleep_until; - if(sleeptime > 1) { sleeptime = 1; } - double_to_ts(&sleep_until, sleeptime + tv_to_double(&now)); -// printf("\nMerge thread %d Overshoot: %lld Throttle %6f\n", s->merge_level, overshoot, sleeptime); -// pthread_mutex_lock(&dummy_throttle_mut); - pthread_cond_timedwait(&dummy_throttle_cond, &mut, &sleep_until); -// pthread_mutex_unlock(&dummy_throttle_mut); - gettimeofday(&now, 0); - } - } while(overshoot > 0); - memcpy(&s->last_tick, &now, sizeof(now)); + //#define PP_THREAD_INFO + #ifdef PP_THREAD_INFO + printf("#%d mbps %6.1f overshoot %9lld current_size = %9lld ",s->merge_level, bps / (1024.0*1024.0), overshoot, s->current_size); + #endif + pretty_print(stdout); + if(overshoot > 0) { + // throttle + // it took "elapsed" seconds to process "tick_length_bytes" mb + double sleeptime = 2.0 * (double)overshoot / bps; - s->bytes_in_small_delta = 0; -// pretty_print(stdout); - pthread_mutex_unlock(&mut); + + struct timespec sleep_until; + + double max_c0_sleep = 0.1; + double max_c1_sleep = 0.1; + double max_sleep = s->merge_level == 0 ? max_c0_sleep : max_c1_sleep; + if(sleeptime < 0.01) { sleeptime = 0.01; } + if(sleeptime > max_sleep) { sleeptime = max_sleep; } + + spin ++; + total_sleep += sleeptime; + + if((spin > 20) || (total_sleep > (max_sleep * 10))) { + if(bps > 1) { + printf("\nMerge thread %d Overshoot: %lld Throttle min(1, %6f) spin %d, total_sleep %6.3f\n", s->merge_level, overshoot, sleeptime, spin, total_sleep); + } + } + + double_to_ts(&sleep_until, sleeptime + tv_to_double(&now)); + pthread_cond_timedwait(&dummy_throttle_cond, <able->header_mut, &sleep_until); + gettimeofday(&now, 0); + } + } while(overshoot > overshoot_fudge); + } else { + pretty_print(stdout); + } +// pthread_mutex_unlock(&mut); } } mergeManager::mergeManager(logtable *ltable): ltable(ltable), - c0_queueSize(0), - c1_queueSize(0), -// c2_queueSize(0), c0(new mergeStats(this, 0)), c1(new mergeStats(this, 1)), c2(new mergeStats(this, 2)) { @@ -163,26 +188,44 @@ void mergeManager::pretty_print(FILE * out) { bool have_c1m = false; bool have_c2 = false; if(lt) { - pthread_mutex_lock(<->header_mut); +// pthread_mutex_lock(<->header_mut); have_c0 = NULL != lt->get_tree_c0(); have_c0m = NULL != lt->get_tree_c0_mergeable(); have_c1 = NULL != lt->get_tree_c1(); have_c1m = NULL != lt->get_tree_c1_mergeable() ; have_c2 = NULL != lt->get_tree_c2(); - pthread_mutex_unlock(<->header_mut); +// pthread_mutex_unlock(<->header_mut); } - fprintf(out,"[%s] %s %s [%s] %s %s [%s] %s ", - c0->active ? "RUN" : "---", + + double c0_c1_progress = 100.0 * (c1->bytes_in_large + c1->bytes_in_small) / (c0->mergeable_size + c1->base_size); // c1->bytes_in_small / c0->mergeable_size; + double c1_c2_progress = 100.0 * (c2->bytes_in_large + c2->bytes_in_small) / (c1->mergeable_size + c2->base_size); //c2->bytes_in_small / c1->mergeable_size; + + assert((!c1->active) || (c0_c1_progress >= -1 && c0_c1_progress < 102)); + assert((!c2->active) || (c1_c2_progress >= -1 && c1_c2_progress < 102)); + + fprintf(out,"[merge progress MB/s window (lifetime)]: app [%s %6lldMB %6.1fsec %4.1f (%4.1f)] %s %s [%s %3.0f%% %4.1f (%4.1f)] %s %s [%s %3.0f%% %4.1f (%4.1f)] %s ", + c0->active ? "RUN" : "---", (uint64_t)(c0->lifetime_consumed / mb), c0->lifetime_elapsed, c0->window_consumed/(((double)mb)*c0->window_elapsed), c0->lifetime_consumed/(((double)mb)*c0->lifetime_elapsed), have_c0 ? "C0" : "..", have_c0m ? "C0'" : "...", - c1->active ? "RUN" : "---", + c1->active ? "RUN" : "---", c0_c1_progress, c1->window_consumed/(((double)mb)*c1->window_elapsed), c1->lifetime_consumed/(((double)mb)*c1->lifetime_elapsed), have_c1 ? "C1" : "..", have_c1m ? "C1'" : "...", - c2->active ? "RUN" : "---", + c2->active ? "RUN" : "---", c1_c2_progress, c2->window_consumed/(((double)mb)*c2->window_elapsed), c2->lifetime_consumed/(((double)mb)*c2->lifetime_elapsed), have_c2 ? "C2" : ".."); - fprintf(out, "[size in small/large, out, mergeable] C0 %4lld %4lld %4lld %4lld %4lld ", c0_queueSize/mb, c0->bytes_in_small/mb, c0->bytes_in_large/mb, c0->bytes_out/mb, c0->mergeable_size/mb); - fprintf(out, "C1 %4lld %4lld %4lld %4lld %4lld ", c1_queueSize/mb, c1->bytes_in_small/mb, c1->bytes_in_large/mb, c1->bytes_out/mb, c1->mergeable_size/mb); - fprintf(out, "C2 .... %4lld %4lld %4lld %4lld ", c2->bytes_in_small/mb, c2->bytes_in_large/mb, c2->bytes_out/mb, c2->mergeable_size/mb); +#define PP_SIZES + #ifdef PP_SIZES + fprintf(out, "[size in small/large, out, mergeable] C0 %4lld %4lld %4lld %4lld %4lld %4lld ", + c0->target_size/mb, c0->current_size/mb, c0->bytes_in_small/mb, + c0->bytes_in_large/mb, c0->bytes_out/mb, c0->mergeable_size/mb); + + fprintf(out, "C1 %4lld %4lld %4lld %4lld %4lld %4lld ", + c1->target_size/mb, c1->current_size/mb, c1->bytes_in_small/mb, + c1->bytes_in_large/mb, c1->bytes_out/mb, c1->mergeable_size/mb); + + fprintf(out, "C2 ---- %4lld %4lld %4lld %4lld %4lld ", + /*----*/ c2->current_size/mb, c2->bytes_in_small/mb, + c2->bytes_in_large/mb, c2->bytes_out/mb, c2->mergeable_size/mb); +#endif // fprintf(out, "Throttle: %6.1f%% (cur) %6.1f%% (overall) ", 100.0*(last_throttle_seconds/(last_elapsed_seconds)), 100.0*(throttle_seconds/(elapsed_seconds))); // fprintf(out, "C0 size %4lld resident %4lld ", // 2*c0_queueSize/mb, diff --git a/mergeManager.h b/mergeManager.h index 5445c39..1664379 100644 --- a/mergeManager.h +++ b/mergeManager.h @@ -21,7 +21,7 @@ class logtable; class mergeStats; class mergeManager { -private: +public: double tv_to_double(struct timeval * tv) { return (double)tv->tv_sec + ((double)tv->tv_usec)/1000000.0; } @@ -35,26 +35,23 @@ private: uint64_t long_tv(struct timeval& tv) { return (1000000ULL * (uint64_t)tv.tv_sec) + ((uint64_t)tv.tv_usec); } -public: mergeManager(logtable *ltable); ~mergeManager(); void new_merge(mergeStats * s); void set_c0_size(int64_t size); - void tick(mergeStats * s, bool done = false); + void tick(mergeStats * s, bool block); void pretty_print(FILE * out); - mergeStats* newMergeStats(int mergeLevel); + mergeStats* get_merge_stats(int mergeLevel); private: pthread_mutex_t mut; logtable* ltable; - pageid_t c0_queueSize; - pageid_t c1_queueSize; // How many bytes must c0-c1 consume before trying to swap over to an empty c1? ( = current target c1 size) double throttle_seconds; - double elapsed_seconds; +// double elapsed_seconds; double last_throttle_seconds; - double last_elapsed_seconds; +// double last_elapsed_seconds; mergeStats * c0; mergeStats * c1; mergeStats * c2; diff --git a/mergeStats.h b/mergeStats.h index 104fa04..fe60f98 100644 --- a/mergeStats.h +++ b/mergeStats.h @@ -24,6 +24,9 @@ class mergeStats { merge_mgr(merge_mgr), merge_level(merge_level), merge_count(0), + base_size(0), + target_size(0), + current_size(0), mergeable_size(0), bytes_out_with_overhead(0), bytes_out(0), @@ -34,21 +37,34 @@ class mergeStats { num_tuples_in_small(0), bytes_in_large(0), num_tuples_in_large(0), + just_handed_off(false), + lifetime_elapsed(0), + lifetime_consumed(0), + window_elapsed(0.001), + window_consumed(0), active(false) { gettimeofday(&sleep,0); gettimeofday(&last,0); + merge_mgr->double_to_ts(&last_tick, merge_mgr->tv_to_double(&last)); } void new_merge() { merge_mgr->new_merge(this); + + if(just_handed_off) { + bytes_out = 0; + just_handed_off = false; + } + base_size = bytes_out; + current_size = base_size; merge_count++; - // bytes_out_with_overhead = 0; - // bytes_out = 0; + bytes_out_with_overhead = 0; + bytes_out = 0; num_tuples_out = 0; num_datapages_out = 0; bytes_in_small = 0; bytes_in_small_delta = 0; num_tuples_in_small = 0; - // bytes_in_large = 0; + bytes_in_large = 0; num_tuples_in_large = 0; gettimeofday(&sleep,0); } @@ -56,15 +72,23 @@ class mergeStats { active = true; gettimeofday(&start, 0); gettimeofday(&last, 0); + merge_mgr->double_to_ts(&last_tick, merge_mgr->tv_to_double(&last)); + } void handed_off_tree() { - mergeable_size = bytes_out - bytes_in_large; - bytes_out = 0; - bytes_in_large = 0; + if(merge_level == 2) { + } else { + mergeable_size = current_size; + just_handed_off = true; + } } void finished_merge() { active = false; - merge_mgr->tick(this, true); + if(merge_level == 1) { + merge_mgr->get_merge_stats(0)->mergeable_size = 0; + } else if(merge_level == 2) { + merge_mgr->get_merge_stats(1)->mergeable_size = 0; + } gettimeofday(&done, 0); } void read_tuple_from_large_component(datatuple * tup) { @@ -78,7 +102,7 @@ class mergeStats { num_tuples_in_small++; bytes_in_small_delta += tup->byte_length(); bytes_in_small += tup->byte_length(); - merge_mgr->tick(this); + merge_mgr->tick(this, true); } } void merged_tuples(datatuple * merged, datatuple * small, datatuple * large) { @@ -86,6 +110,7 @@ class mergeStats { void wrote_tuple(datatuple * tup) { num_tuples_out++; bytes_out += tup->byte_length(); + merge_mgr->tick(this, false); } void wrote_datapage(DataPage *dp) { num_datapages_out++; @@ -110,10 +135,13 @@ class mergeStats { protected: struct timespec last_tick; + pageid_t base_size; + pageid_t target_size; + pageid_t current_size; pageid_t mergeable_size; pageid_t bytes_out_with_overhead;// How many bytes did we write (including internal tree nodes)? - pageid_t bytes_out; // How many bytes worth of tuples did we write? + pageid_t bytes_out; // How many bytes worth of tuples did we write? pageid_t num_tuples_out; // How many tuples did we write? pageid_t num_datapages_out; // How many datapages? pageid_t bytes_in_small; // How many bytes from the small input tree (for C0, we ignore tree overheads)? @@ -121,6 +149,14 @@ protected: pageid_t num_tuples_in_small; // Tuples from the small input? pageid_t bytes_in_large; // Bytes from the large input? pageid_t num_tuples_in_large; // Tuples from large input? + + bool just_handed_off; + + double lifetime_elapsed; + double lifetime_consumed; + double window_elapsed; + double window_consumed; + bool active; public: diff --git a/merger.cpp b/merger.cpp index b764013..9b5d02a 100644 --- a/merger.cpp +++ b/merger.cpp @@ -141,12 +141,12 @@ void* memMergeThread(void*arg) assert(ltable->get_tree_c1()); int merge_count =0; - mergeStats * stats = a->ltable->merge_mgr->newMergeStats(1); + mergeStats * stats = a->ltable->merge_mgr->get_merge_stats(1); while(true) // 1 { - stats->new_merge(); pthread_mutex_lock(<able->header_mut); + stats->new_merge(); int done = 0; // 2: wait for c0_mergable while(!ltable->get_tree_c0_mergeable()) @@ -225,12 +225,13 @@ void* memMergeThread(void*arg) // 11: c0_mergeable = NULL ltable->set_tree_c0_mergeable(NULL); double new_c1_size = stats->output_size(); - stats->handed_off_tree(); pthread_cond_signal(<able->c0_needed); ltable->update_persistent_header(xid); Tcommit(xid); + stats->finished_merge(); + //TODO: this is simplistic for now //6: if c1' is too big, signal the other merger double target_R = *ltable->R(); @@ -254,6 +255,7 @@ void* memMergeThread(void*arg) // 7: and perhaps c1_mergeable ltable->set_tree_c1_mergeable(c1_prime); // c1_prime == c1. + stats->handed_off_tree(); // 8: c1 = new empty. ltable->set_tree_c1(new diskTreeComponent(xid, ltable->internal_region_size, ltable->datapage_region_size, ltable->datapage_size, stats)); @@ -269,8 +271,6 @@ void* memMergeThread(void*arg) // 13 pthread_mutex_unlock(<able->header_mut); - - stats->finished_merge(); // stats->pretty_print(stdout); //TODO: get the freeing outside of the lock @@ -292,14 +292,14 @@ void *diskMergeThread(void*arg) int merge_count =0; - mergeStats * stats = a->ltable->merge_mgr->newMergeStats(2); + mergeStats * stats = a->ltable->merge_mgr->get_merge_stats(2); while(true) { // 2: wait for input - stats->new_merge(); pthread_mutex_lock(<able->header_mut); + stats->new_merge(); int done = 0; // get a new input for merge while(!ltable->get_tree_c1_mergeable()) @@ -381,9 +381,9 @@ void *diskMergeThread(void*arg) ltable->update_persistent_header(xid); Tcommit(xid); - pthread_mutex_unlock(<able->header_mut); - stats->finished_merge(); + + pthread_mutex_unlock(<able->header_mut); // stats->pretty_print(stdout); } @@ -400,24 +400,31 @@ void merge_iterators(int xid, bool dropDeletes // should be true iff this is biggest component ) { + stasis_log_t * log = (stasis_log_t*)stasis_log(); + datatuple *t1 = itrA->next_callerFrees(); + pthread_mutex_lock(<able->header_mut); // XXX slow stats->read_tuple_from_large_component(t1); + pthread_mutex_unlock(<able->header_mut); // XXX slow datatuple *t2 = 0; int i = 0; while( (t2=itrB->next_callerFrees()) != 0) { + pthread_mutex_lock(<able->header_mut); // XXX slow stats->read_tuple_from_small_component(t2); + pthread_mutex_unlock(<able->header_mut); // XXX slow DEBUG("tuple\t%lld: keylen %d datalen %d\n", ntuples, *(t2->keylen),*(t2->datalen) ); while(t1 != 0 && datatuple::compare(t1->key(), t1->keylen(), t2->key(), t2->keylen()) < 0) // t1 is less than t2 { + pthread_mutex_lock(<able->header_mut); // XXX slow //insert t1 scratch_tree->insertTuple(xid, t1); - i+=t1->byte_length(); if(i > 1000000) { if(forceMe) forceMe->force(xid); i = 0; } + i+=t1->byte_length(); if(i > 1000000) { if(forceMe) forceMe->force(xid); log->force_tail(log, LOG_FORCE_WAL); i = 0; } stats->wrote_tuple(t1); datatuple::freetuple(t1); //advance itrA @@ -425,46 +432,56 @@ void merge_iterators(int xid, if(t1) { stats->read_tuple_from_large_component(t1); } + pthread_mutex_unlock(<able->header_mut); // XXX slow } if(t1 != 0 && datatuple::compare(t1->key(), t1->keylen(), t2->key(), t2->keylen()) == 0) { datatuple *mtuple = ltable->gettuplemerger()->merge(t1,t2); + pthread_mutex_lock(<able->header_mut); // XXX slow stats->merged_tuples(mtuple, t2, t1); // this looks backwards, but is right. //insert merged tuple, drop deletes if(dropDeletes && !mtuple->isDelete()) { scratch_tree->insertTuple(xid, mtuple); + i+=mtuple->byte_length(); if(i > 1000000) { if(forceMe) forceMe->force(xid); log->force_tail(log, LOG_FORCE_WAL); i = 0; } } datatuple::freetuple(t1); + stats->wrote_tuple(mtuple); t1 = itrA->next_callerFrees(); //advance itrA if(t1) { stats->read_tuple_from_large_component(t1); } datatuple::freetuple(mtuple); + pthread_mutex_unlock(<able->header_mut); // XXX slow } else { //insert t2 scratch_tree->insertTuple(xid, t2); + i+=t2->byte_length(); if(i > 1000000) { if(forceMe) forceMe->force(xid); log->force_tail(log, LOG_FORCE_WAL); i = 0; } + pthread_mutex_lock(<able->header_mut); // XXX slow + + stats->wrote_tuple(t2); + pthread_mutex_unlock(<able->header_mut); // XXX slow + // cannot free any tuples here; they may still be read through a lookup } - i+= t2->byte_length(); if(i > 1000000) { if(forceMe) forceMe->force(xid); i = 0; } - stats->wrote_tuple(t2); - datatuple::freetuple(t2); } while(t1 != 0) {// t1 is less than t2 + pthread_mutex_lock(<able->header_mut); // XXX slow scratch_tree->insertTuple(xid, t1); - i += t1->byte_length(); if(i > 1000000) { if(forceMe) forceMe->force(xid); i = 0; } stats->wrote_tuple(t1); + i += t1->byte_length(); if(i > 1000000) { if(forceMe) forceMe->force(xid); log->force_tail(log, LOG_FORCE_WAL); i = 0; } datatuple::freetuple(t1); //advance itrA t1 = itrA->next_callerFrees(); stats->read_tuple_from_large_component(t1); + pthread_mutex_unlock(<able->header_mut); // XXX slow } DEBUG("dpages: %d\tnpages: %d\tntuples: %d\n", dpages, npages, ntuples); diff --git a/test/check_logtable.cpp b/test/check_logtable.cpp index 50f8588..2278ca8 100644 --- a/test/check_logtable.cpp +++ b/test/check_logtable.cpp @@ -35,7 +35,7 @@ void insertProbeIter(size_t NUM_ENTRIES) xid = Tbegin(); mergeManager merge_mgr(0); - mergeStats * stats = merge_mgr.newMergeStats(1); + mergeStats * stats = merge_mgr.get_merge_stats(1); diskTreeComponent *ltable_c1 = new diskTreeComponent(xid, 1000, 10000, 5, stats);