admission control *almost* works; c1 sometimes blocks for a long time when a new c2 is created

git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@809 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
sears 2010-05-26 00:58:17 +00:00
parent c50c14d259
commit aa7f7189d6
7 changed files with 216 additions and 118 deletions

View file

@ -50,7 +50,7 @@ logtable<TUPLE>::logtable(pageid_t internal_region_size, pageid_t datapage_regio
this->datapage_region_size = datapage_region_size; this->datapage_region_size = datapage_region_size;
this->datapage_size = datapage_size; this->datapage_size = datapage_size;
c0_stats = merge_mgr->newMergeStats(0); c0_stats = merge_mgr->get_merge_stats(0);
c0_stats->new_merge(); c0_stats->new_merge();
c0_stats->starting_merge(); c0_stats->starting_merge();
} }
@ -143,13 +143,11 @@ void logtable<TUPLE>::flushTable()
gettimeofday(&start_tv,0); gettimeofday(&start_tv,0);
start = tv_to_double(start_tv); start = tv_to_double(start_tv);
c0_stats->handed_off_tree();
c0_stats->finished_merge();
c0_stats->new_merge();
pthread_mutex_lock(&header_mut); pthread_mutex_lock(&header_mut);
int expmcount = merge_count; int expmcount = merge_count;
c0_stats->finished_merge();
//this is for waiting the previous merger of the mem-tree //this is for waiting the previous merger of the mem-tree
//hopefullly this wont happen //hopefullly this wont happen
@ -165,12 +163,16 @@ void logtable<TUPLE>::flushTable()
} }
} }
c0_stats->handed_off_tree();
c0_stats->new_merge();
gettimeofday(&stop_tv,0); gettimeofday(&stop_tv,0);
stop = tv_to_double(stop_tv); stop = tv_to_double(stop_tv);
set_tree_c0_mergeable(get_tree_c0()); set_tree_c0_mergeable(get_tree_c0());
pthread_cond_signal(&c0_ready); pthread_cond_signal(&c0_ready);
DEBUG("Signaled c0-c1 merge thread\n");
merge_count ++; merge_count ++;
set_tree_c0(new memTreeComponent<datatuple>::rbtree_t); set_tree_c0(new memTreeComponent<datatuple>::rbtree_t);
@ -181,7 +183,7 @@ void logtable<TUPLE>::flushTable()
pthread_mutex_unlock(&header_mut); pthread_mutex_unlock(&header_mut);
if(blocked) { if(blocked && stop - start > 0.1) {
if(first) if(first)
{ {
printf("\nBlocked writes for %f sec\n", stop-start); printf("\nBlocked writes for %f sec\n", stop-start);
@ -243,7 +245,7 @@ datatuple * logtable<TUPLE>::findTuple(int xid, const datatuple::key_t key, size
} }
} }
//TODO: Arange to only hold read latches while hitting disk. //TODO: Arrange to only hold read latches while hitting disk.
//step 3: check c1 //step 3: check c1
if(!done) if(!done)
@ -416,10 +418,11 @@ template<class TUPLE>
void logtable<TUPLE>::insertTuple(datatuple *tuple) void logtable<TUPLE>::insertTuple(datatuple *tuple)
{ {
//lock the red-black tree //lock the red-black tree
c0_stats->read_tuple_from_small_component(tuple);
pthread_mutex_lock(&header_mut); pthread_mutex_lock(&header_mut);
c0_stats->read_tuple_from_small_component(tuple);
//find the previous tuple with same key in the memtree if exists //find the previous tuple with same key in the memtree if exists
memTreeComponent<datatuple>::rbtree_t::iterator rbitr = tree_c0->find(tuple); memTreeComponent<datatuple>::rbtree_t::iterator rbitr = tree_c0->find(tuple);
datatuple * t = 0;
if(rbitr != tree_c0->end()) if(rbitr != tree_c0->end())
{ {
datatuple *pre_t = *rbitr; datatuple *pre_t = *rbitr;
@ -427,7 +430,7 @@ void logtable<TUPLE>::insertTuple(datatuple *tuple)
c0_stats->read_tuple_from_large_component(pre_t); c0_stats->read_tuple_from_large_component(pre_t);
datatuple *new_t = tmerger->merge(pre_t, tuple); datatuple *new_t = tmerger->merge(pre_t, tuple);
c0_stats->merged_tuples(new_t, tuple, pre_t); c0_stats->merged_tuples(new_t, tuple, pre_t);
c0_stats->wrote_tuple(new_t); t = new_t;
tree_c0->erase(pre_t); //remove the previous tuple tree_c0->erase(pre_t); //remove the previous tuple
tree_c0->insert(new_t); //insert the new tuple tree_c0->insert(new_t); //insert the new tuple
@ -440,16 +443,18 @@ void logtable<TUPLE>::insertTuple(datatuple *tuple)
else //no tuple with same key exists in mem-tree else //no tuple with same key exists in mem-tree
{ {
datatuple *t = tuple->create_copy(); t = tuple->create_copy();
//insert tuple into the rbtree //insert tuple into the rbtree
tree_c0->insert(t); tree_c0->insert(t);
c0_stats->wrote_tuple(t);
tsize++; tsize++;
tree_bytes += t->byte_length();// + RB_TREE_OVERHEAD; tree_bytes += t->byte_length();// + RB_TREE_OVERHEAD;
} }
c0_stats->wrote_tuple(t);
//flushing logic //flushing logic
if(tree_bytes >= max_c0_size ) if(tree_bytes >= max_c0_size )
{ {

View file

@ -78,7 +78,7 @@ public:
void set_max_c0_size(int64_t max_c0_size) { void set_max_c0_size(int64_t max_c0_size) {
this->max_c0_size = max_c0_size; this->max_c0_size = max_c0_size;
merge_mgr->set_c0_size(max_c0_size); merge_mgr->set_c0_size(max_c0_size);
merge_mgr->get_merge_stats(1);
} }
void set_tree_c0_mergeable(memTreeComponent<datatuple>::rbtree_ptr_t newtree){tree_c0_mergeable = newtree; bump_epoch(); } void set_tree_c0_mergeable(memTreeComponent<datatuple>::rbtree_ptr_t newtree){tree_c0_mergeable = newtree; bump_epoch(); }
void update_persistent_header(int xid); void update_persistent_header(int xid);

View file

@ -9,7 +9,7 @@
#include "mergeStats.h" #include "mergeStats.h"
#include "logstore.h" #include "logstore.h"
#include "math.h" #include "math.h"
mergeStats* mergeManager:: newMergeStats(int mergeLevel) { mergeStats* mergeManager:: get_merge_stats(int mergeLevel) {
if (mergeLevel == 0) { if (mergeLevel == 0) {
return c0; return c0;
} else if (mergeLevel == 1) { } else if (mergeLevel == 1) {
@ -35,16 +35,19 @@ void mergeManager::new_merge(mergeStats * s) {
pthread_mutex_lock(&mut); pthread_mutex_lock(&mut);
if(s->merge_count) { if(s->merge_count) {
if(s->merge_level == 0) { if(s->merge_level == 0) {
// queueSize was set during startup // target_size was set during startup
} else if(s->merge_level == 1) { } else if(s->merge_level == 1) {
c1_queueSize = (pageid_t)(*ltable->R() * (double)c0_queueSize); //c1_queueSize > s->bytes_out ? c1_queueSize : s->bytes_out; assert(c0->target_size);
c1->target_size = (pageid_t)(*ltable->R() * (double)c0->target_size); //c1_queueSize > s->bytes_out ? c1_queueSize : s->bytes_out;
assert(c1->target_size);
} else if(s->merge_level == 2) { } else if(s->merge_level == 2) {
// target_size is infinity...
} else { abort(); } } else { abort(); }
} }
pthread_mutex_unlock(&mut); pthread_mutex_unlock(&mut);
} }
void mergeManager::set_c0_size(int64_t size) { void mergeManager::set_c0_size(int64_t size) {
c0_queueSize = size; c0->target_size = size;
} }
/** /**
@ -68,78 +71,100 @@ void mergeManager::set_c0_size(int64_t size) {
* *
* bytes_consumed_by_merger = sum(bytes_in_small_delta) * bytes_consumed_by_merger = sum(bytes_in_small_delta)
*/ */
void mergeManager::tick(mergeStats * s, bool done) { void mergeManager::tick(mergeStats * s, bool block) {
pageid_t tick_length_bytes = 1024*1024; pageid_t tick_length_bytes = 10*1024;
if(done || (s->bytes_in_small_delta > tick_length_bytes)) {
pthread_mutex_lock(&mut);
struct timeval now;
gettimeofday(&now, 0);
double elapsed_delta = tv_to_double(&now) - ts_to_double(&s->last_tick);
double bps = (double)s->bytes_in_small_delta / (double)elapsed_delta;
pageid_t current_size = s->bytes_out - s->bytes_in_large; if(true || s->bytes_in_small_delta > tick_length_bytes) {
int64_t overshoot; s->current_size = s->base_size + s->bytes_out - s->bytes_in_large;
int64_t overshoot_fudge = (int64_t)((double)c0_queueSize * 0.1);
do{ if(block) {
overshoot = 0; // pthread_mutex_lock(&mut);
if(s->merge_level == 0) { struct timeval now;
if(done) { gettimeofday(&now, 0);
// c0->bytes_in_small = 0; double elapsed_delta = tv_to_double(&now) - ts_to_double(&s->last_tick);
} else if(c0->mergeable_size) { double bps = 0; // = (double)s->bytes_in_small_delta / (double)elapsed_delta;
overshoot = (overshoot_fudge + c0->mergeable_size - c1->bytes_in_small) // amount left to process
- (c0_queueSize - current_size); // - room for more insertions s->lifetime_elapsed += elapsed_delta;
s->lifetime_consumed += s->bytes_in_small_delta;
double decay = 0.9999; // XXX set this in some principled way. Surely, it should depend on tick_length (once that's working...)
s->window_elapsed = (decay * s->window_elapsed) + elapsed_delta;
s->window_consumed = (decay * s->window_consumed) + s->bytes_in_small_delta;
double_to_ts(&s->last_tick, tv_to_double(&now));
s->bytes_in_small_delta = 0;
int64_t overshoot = 0;
int64_t overshoot_fudge = 1024*1024; // XXX set based on avg / max tuple size?
int spin = 0;
double total_sleep = 0.0;
do{
double c0_c1_progress = ((double)(c1->bytes_in_large + c1->bytes_in_small)) / (double)(c0->mergeable_size + c1->base_size);
double c1_c2_progress = ((double)(c2->bytes_in_large + c2->bytes_in_small)) / (double)(c1->mergeable_size + c2->base_size);
double c0_c1_bps = c1->window_consumed / c1->window_elapsed;
double c1_c2_bps = c2->window_consumed / c2->window_elapsed;
if(s->merge_level == 0) {
pageid_t c0_c1_bytes_remaining = (pageid_t)((1.0-c0_c1_progress) * (double)c0->mergeable_size);
pageid_t c0_bytes_left = c0->target_size - c0->current_size;
overshoot = overshoot_fudge + c0_c1_bytes_remaining - c0_bytes_left;
bps = c0_c1_bps;
if(!c0->mergeable_size) { overshoot = -1; }
if(c0->mergeable_size && ! c1->active) { overshoot = c0->current_size + overshoot_fudge; }
} else if (s->merge_level == 1) {
pageid_t c1_c2_bytes_remaining = (pageid_t)((1.0-c1_c2_progress) * (double)c1->mergeable_size);
pageid_t c1_bytes_left = c1->target_size - c1->current_size;
overshoot = overshoot_fudge + c1_c2_bytes_remaining - c1_bytes_left;
if(!c1->mergeable_size) { overshoot = -1; }
if(c1->mergeable_size && ! c2->active) { overshoot = c1->current_size + overshoot_fudge; }
bps = c1_c2_bps;
} }
} else if (s->merge_level == 1) {
if(done) {
c0->mergeable_size = 0;
c1->bytes_in_small = 0;
} else if(/*c1_queueSize && */c1->mergeable_size) {
overshoot = (c1->mergeable_size - c2->bytes_in_small)
- (c1_queueSize - current_size);
}
} else if (s->merge_level == 2) {
if(done) {
c1->mergeable_size = 0;
c2->bytes_in_small = 0;
}
// Never throttle this merge.
}
static int num_skipped = 0;
if(num_skipped == 10) {
printf("#%d mbps %6.1f overshoot %9lld current_size = %9lld ",s->merge_level, bps / (1024.0*1024.0), overshoot, current_size);
pretty_print(stdout);
num_skipped = 0;
}
num_skipped ++;
if(overshoot > 0) {
// throttle
// it took "elapsed" seconds to process "tick_length_bytes" mb
double sleeptime = (double)overshoot / bps; // 2 is a fudge factor
struct timespec sleep_until; //#define PP_THREAD_INFO
if(sleeptime > 1) { sleeptime = 1; } #ifdef PP_THREAD_INFO
double_to_ts(&sleep_until, sleeptime + tv_to_double(&now)); printf("#%d mbps %6.1f overshoot %9lld current_size = %9lld ",s->merge_level, bps / (1024.0*1024.0), overshoot, s->current_size);
// printf("\nMerge thread %d Overshoot: %lld Throttle %6f\n", s->merge_level, overshoot, sleeptime); #endif
// pthread_mutex_lock(&dummy_throttle_mut); pretty_print(stdout);
pthread_cond_timedwait(&dummy_throttle_cond, &mut, &sleep_until); if(overshoot > 0) {
// pthread_mutex_unlock(&dummy_throttle_mut); // throttle
gettimeofday(&now, 0); // it took "elapsed" seconds to process "tick_length_bytes" mb
} double sleeptime = 2.0 * (double)overshoot / bps;
} while(overshoot > 0);
memcpy(&s->last_tick, &now, sizeof(now));
s->bytes_in_small_delta = 0;
// pretty_print(stdout); struct timespec sleep_until;
pthread_mutex_unlock(&mut);
double max_c0_sleep = 0.1;
double max_c1_sleep = 0.1;
double max_sleep = s->merge_level == 0 ? max_c0_sleep : max_c1_sleep;
if(sleeptime < 0.01) { sleeptime = 0.01; }
if(sleeptime > max_sleep) { sleeptime = max_sleep; }
spin ++;
total_sleep += sleeptime;
if((spin > 20) || (total_sleep > (max_sleep * 10))) {
if(bps > 1) {
printf("\nMerge thread %d Overshoot: %lld Throttle min(1, %6f) spin %d, total_sleep %6.3f\n", s->merge_level, overshoot, sleeptime, spin, total_sleep);
}
}
double_to_ts(&sleep_until, sleeptime + tv_to_double(&now));
pthread_cond_timedwait(&dummy_throttle_cond, &ltable->header_mut, &sleep_until);
gettimeofday(&now, 0);
}
} while(overshoot > overshoot_fudge);
} else {
pretty_print(stdout);
}
// pthread_mutex_unlock(&mut);
} }
} }
mergeManager::mergeManager(logtable<datatuple> *ltable): mergeManager::mergeManager(logtable<datatuple> *ltable):
ltable(ltable), ltable(ltable),
c0_queueSize(0),
c1_queueSize(0),
// c2_queueSize(0),
c0(new mergeStats(this, 0)), c0(new mergeStats(this, 0)),
c1(new mergeStats(this, 1)), c1(new mergeStats(this, 1)),
c2(new mergeStats(this, 2)) { c2(new mergeStats(this, 2)) {
@ -163,26 +188,44 @@ void mergeManager::pretty_print(FILE * out) {
bool have_c1m = false; bool have_c1m = false;
bool have_c2 = false; bool have_c2 = false;
if(lt) { if(lt) {
pthread_mutex_lock(&lt->header_mut); // pthread_mutex_lock(&lt->header_mut);
have_c0 = NULL != lt->get_tree_c0(); have_c0 = NULL != lt->get_tree_c0();
have_c0m = NULL != lt->get_tree_c0_mergeable(); have_c0m = NULL != lt->get_tree_c0_mergeable();
have_c1 = NULL != lt->get_tree_c1(); have_c1 = NULL != lt->get_tree_c1();
have_c1m = NULL != lt->get_tree_c1_mergeable() ; have_c1m = NULL != lt->get_tree_c1_mergeable() ;
have_c2 = NULL != lt->get_tree_c2(); have_c2 = NULL != lt->get_tree_c2();
pthread_mutex_unlock(&lt->header_mut); // pthread_mutex_unlock(&lt->header_mut);
} }
fprintf(out,"[%s] %s %s [%s] %s %s [%s] %s ",
c0->active ? "RUN" : "---", double c0_c1_progress = 100.0 * (c1->bytes_in_large + c1->bytes_in_small) / (c0->mergeable_size + c1->base_size); // c1->bytes_in_small / c0->mergeable_size;
double c1_c2_progress = 100.0 * (c2->bytes_in_large + c2->bytes_in_small) / (c1->mergeable_size + c2->base_size); //c2->bytes_in_small / c1->mergeable_size;
assert((!c1->active) || (c0_c1_progress >= -1 && c0_c1_progress < 102));
assert((!c2->active) || (c1_c2_progress >= -1 && c1_c2_progress < 102));
fprintf(out,"[merge progress MB/s window (lifetime)]: app [%s %6lldMB %6.1fsec %4.1f (%4.1f)] %s %s [%s %3.0f%% %4.1f (%4.1f)] %s %s [%s %3.0f%% %4.1f (%4.1f)] %s ",
c0->active ? "RUN" : "---", (uint64_t)(c0->lifetime_consumed / mb), c0->lifetime_elapsed, c0->window_consumed/(((double)mb)*c0->window_elapsed), c0->lifetime_consumed/(((double)mb)*c0->lifetime_elapsed),
have_c0 ? "C0" : "..", have_c0 ? "C0" : "..",
have_c0m ? "C0'" : "...", have_c0m ? "C0'" : "...",
c1->active ? "RUN" : "---", c1->active ? "RUN" : "---", c0_c1_progress, c1->window_consumed/(((double)mb)*c1->window_elapsed), c1->lifetime_consumed/(((double)mb)*c1->lifetime_elapsed),
have_c1 ? "C1" : "..", have_c1 ? "C1" : "..",
have_c1m ? "C1'" : "...", have_c1m ? "C1'" : "...",
c2->active ? "RUN" : "---", c2->active ? "RUN" : "---", c1_c2_progress, c2->window_consumed/(((double)mb)*c2->window_elapsed), c2->lifetime_consumed/(((double)mb)*c2->lifetime_elapsed),
have_c2 ? "C2" : ".."); have_c2 ? "C2" : "..");
fprintf(out, "[size in small/large, out, mergeable] C0 %4lld %4lld %4lld %4lld %4lld ", c0_queueSize/mb, c0->bytes_in_small/mb, c0->bytes_in_large/mb, c0->bytes_out/mb, c0->mergeable_size/mb); #define PP_SIZES
fprintf(out, "C1 %4lld %4lld %4lld %4lld %4lld ", c1_queueSize/mb, c1->bytes_in_small/mb, c1->bytes_in_large/mb, c1->bytes_out/mb, c1->mergeable_size/mb); #ifdef PP_SIZES
fprintf(out, "C2 .... %4lld %4lld %4lld %4lld ", c2->bytes_in_small/mb, c2->bytes_in_large/mb, c2->bytes_out/mb, c2->mergeable_size/mb); fprintf(out, "[size in small/large, out, mergeable] C0 %4lld %4lld %4lld %4lld %4lld %4lld ",
c0->target_size/mb, c0->current_size/mb, c0->bytes_in_small/mb,
c0->bytes_in_large/mb, c0->bytes_out/mb, c0->mergeable_size/mb);
fprintf(out, "C1 %4lld %4lld %4lld %4lld %4lld %4lld ",
c1->target_size/mb, c1->current_size/mb, c1->bytes_in_small/mb,
c1->bytes_in_large/mb, c1->bytes_out/mb, c1->mergeable_size/mb);
fprintf(out, "C2 ---- %4lld %4lld %4lld %4lld %4lld ",
/*----*/ c2->current_size/mb, c2->bytes_in_small/mb,
c2->bytes_in_large/mb, c2->bytes_out/mb, c2->mergeable_size/mb);
#endif
// fprintf(out, "Throttle: %6.1f%% (cur) %6.1f%% (overall) ", 100.0*(last_throttle_seconds/(last_elapsed_seconds)), 100.0*(throttle_seconds/(elapsed_seconds))); // fprintf(out, "Throttle: %6.1f%% (cur) %6.1f%% (overall) ", 100.0*(last_throttle_seconds/(last_elapsed_seconds)), 100.0*(throttle_seconds/(elapsed_seconds)));
// fprintf(out, "C0 size %4lld resident %4lld ", // fprintf(out, "C0 size %4lld resident %4lld ",
// 2*c0_queueSize/mb, // 2*c0_queueSize/mb,

View file

@ -21,7 +21,7 @@ class logtable;
class mergeStats; class mergeStats;
class mergeManager { class mergeManager {
private: public:
double tv_to_double(struct timeval * tv) { double tv_to_double(struct timeval * tv) {
return (double)tv->tv_sec + ((double)tv->tv_usec)/1000000.0; return (double)tv->tv_sec + ((double)tv->tv_usec)/1000000.0;
} }
@ -35,26 +35,23 @@ private:
uint64_t long_tv(struct timeval& tv) { uint64_t long_tv(struct timeval& tv) {
return (1000000ULL * (uint64_t)tv.tv_sec) + ((uint64_t)tv.tv_usec); return (1000000ULL * (uint64_t)tv.tv_sec) + ((uint64_t)tv.tv_usec);
} }
public:
mergeManager(logtable<datatuple> *ltable); mergeManager(logtable<datatuple> *ltable);
~mergeManager(); ~mergeManager();
void new_merge(mergeStats * s); void new_merge(mergeStats * s);
void set_c0_size(int64_t size); void set_c0_size(int64_t size);
void tick(mergeStats * s, bool done = false); void tick(mergeStats * s, bool block);
void pretty_print(FILE * out); void pretty_print(FILE * out);
mergeStats* newMergeStats(int mergeLevel); mergeStats* get_merge_stats(int mergeLevel);
private: private:
pthread_mutex_t mut; pthread_mutex_t mut;
logtable<datatuple>* ltable; logtable<datatuple>* ltable;
pageid_t c0_queueSize;
pageid_t c1_queueSize; // How many bytes must c0-c1 consume before trying to swap over to an empty c1? ( = current target c1 size)
double throttle_seconds; double throttle_seconds;
double elapsed_seconds; // double elapsed_seconds;
double last_throttle_seconds; double last_throttle_seconds;
double last_elapsed_seconds; // double last_elapsed_seconds;
mergeStats * c0; mergeStats * c0;
mergeStats * c1; mergeStats * c1;
mergeStats * c2; mergeStats * c2;

View file

@ -24,6 +24,9 @@ class mergeStats {
merge_mgr(merge_mgr), merge_mgr(merge_mgr),
merge_level(merge_level), merge_level(merge_level),
merge_count(0), merge_count(0),
base_size(0),
target_size(0),
current_size(0),
mergeable_size(0), mergeable_size(0),
bytes_out_with_overhead(0), bytes_out_with_overhead(0),
bytes_out(0), bytes_out(0),
@ -34,21 +37,34 @@ class mergeStats {
num_tuples_in_small(0), num_tuples_in_small(0),
bytes_in_large(0), bytes_in_large(0),
num_tuples_in_large(0), num_tuples_in_large(0),
just_handed_off(false),
lifetime_elapsed(0),
lifetime_consumed(0),
window_elapsed(0.001),
window_consumed(0),
active(false) { active(false) {
gettimeofday(&sleep,0); gettimeofday(&sleep,0);
gettimeofday(&last,0); gettimeofday(&last,0);
merge_mgr->double_to_ts(&last_tick, merge_mgr->tv_to_double(&last));
} }
void new_merge() { void new_merge() {
merge_mgr->new_merge(this); merge_mgr->new_merge(this);
if(just_handed_off) {
bytes_out = 0;
just_handed_off = false;
}
base_size = bytes_out;
current_size = base_size;
merge_count++; merge_count++;
// bytes_out_with_overhead = 0; bytes_out_with_overhead = 0;
// bytes_out = 0; bytes_out = 0;
num_tuples_out = 0; num_tuples_out = 0;
num_datapages_out = 0; num_datapages_out = 0;
bytes_in_small = 0; bytes_in_small = 0;
bytes_in_small_delta = 0; bytes_in_small_delta = 0;
num_tuples_in_small = 0; num_tuples_in_small = 0;
// bytes_in_large = 0; bytes_in_large = 0;
num_tuples_in_large = 0; num_tuples_in_large = 0;
gettimeofday(&sleep,0); gettimeofday(&sleep,0);
} }
@ -56,15 +72,23 @@ class mergeStats {
active = true; active = true;
gettimeofday(&start, 0); gettimeofday(&start, 0);
gettimeofday(&last, 0); gettimeofday(&last, 0);
merge_mgr->double_to_ts(&last_tick, merge_mgr->tv_to_double(&last));
} }
void handed_off_tree() { void handed_off_tree() {
mergeable_size = bytes_out - bytes_in_large; if(merge_level == 2) {
bytes_out = 0; } else {
bytes_in_large = 0; mergeable_size = current_size;
just_handed_off = true;
}
} }
void finished_merge() { void finished_merge() {
active = false; active = false;
merge_mgr->tick(this, true); if(merge_level == 1) {
merge_mgr->get_merge_stats(0)->mergeable_size = 0;
} else if(merge_level == 2) {
merge_mgr->get_merge_stats(1)->mergeable_size = 0;
}
gettimeofday(&done, 0); gettimeofday(&done, 0);
} }
void read_tuple_from_large_component(datatuple * tup) { void read_tuple_from_large_component(datatuple * tup) {
@ -78,7 +102,7 @@ class mergeStats {
num_tuples_in_small++; num_tuples_in_small++;
bytes_in_small_delta += tup->byte_length(); bytes_in_small_delta += tup->byte_length();
bytes_in_small += tup->byte_length(); bytes_in_small += tup->byte_length();
merge_mgr->tick(this); merge_mgr->tick(this, true);
} }
} }
void merged_tuples(datatuple * merged, datatuple * small, datatuple * large) { void merged_tuples(datatuple * merged, datatuple * small, datatuple * large) {
@ -86,6 +110,7 @@ class mergeStats {
void wrote_tuple(datatuple * tup) { void wrote_tuple(datatuple * tup) {
num_tuples_out++; num_tuples_out++;
bytes_out += tup->byte_length(); bytes_out += tup->byte_length();
merge_mgr->tick(this, false);
} }
void wrote_datapage(DataPage<datatuple> *dp) { void wrote_datapage(DataPage<datatuple> *dp) {
num_datapages_out++; num_datapages_out++;
@ -110,10 +135,13 @@ class mergeStats {
protected: protected:
struct timespec last_tick; struct timespec last_tick;
pageid_t base_size;
pageid_t target_size;
pageid_t current_size;
pageid_t mergeable_size; pageid_t mergeable_size;
pageid_t bytes_out_with_overhead;// How many bytes did we write (including internal tree nodes)? pageid_t bytes_out_with_overhead;// How many bytes did we write (including internal tree nodes)?
pageid_t bytes_out; // How many bytes worth of tuples did we write? pageid_t bytes_out; // How many bytes worth of tuples did we write?
pageid_t num_tuples_out; // How many tuples did we write? pageid_t num_tuples_out; // How many tuples did we write?
pageid_t num_datapages_out; // How many datapages? pageid_t num_datapages_out; // How many datapages?
pageid_t bytes_in_small; // How many bytes from the small input tree (for C0, we ignore tree overheads)? pageid_t bytes_in_small; // How many bytes from the small input tree (for C0, we ignore tree overheads)?
@ -121,6 +149,14 @@ protected:
pageid_t num_tuples_in_small; // Tuples from the small input? pageid_t num_tuples_in_small; // Tuples from the small input?
pageid_t bytes_in_large; // Bytes from the large input? pageid_t bytes_in_large; // Bytes from the large input?
pageid_t num_tuples_in_large; // Tuples from large input? pageid_t num_tuples_in_large; // Tuples from large input?
bool just_handed_off;
double lifetime_elapsed;
double lifetime_consumed;
double window_elapsed;
double window_consumed;
bool active; bool active;
public: public:

View file

@ -141,12 +141,12 @@ void* memMergeThread(void*arg)
assert(ltable->get_tree_c1()); assert(ltable->get_tree_c1());
int merge_count =0; int merge_count =0;
mergeStats * stats = a->ltable->merge_mgr->newMergeStats(1); mergeStats * stats = a->ltable->merge_mgr->get_merge_stats(1);
while(true) // 1 while(true) // 1
{ {
stats->new_merge();
pthread_mutex_lock(&ltable->header_mut); pthread_mutex_lock(&ltable->header_mut);
stats->new_merge();
int done = 0; int done = 0;
// 2: wait for c0_mergable // 2: wait for c0_mergable
while(!ltable->get_tree_c0_mergeable()) while(!ltable->get_tree_c0_mergeable())
@ -225,12 +225,13 @@ void* memMergeThread(void*arg)
// 11: c0_mergeable = NULL // 11: c0_mergeable = NULL
ltable->set_tree_c0_mergeable(NULL); ltable->set_tree_c0_mergeable(NULL);
double new_c1_size = stats->output_size(); double new_c1_size = stats->output_size();
stats->handed_off_tree();
pthread_cond_signal(&ltable->c0_needed); pthread_cond_signal(&ltable->c0_needed);
ltable->update_persistent_header(xid); ltable->update_persistent_header(xid);
Tcommit(xid); Tcommit(xid);
stats->finished_merge();
//TODO: this is simplistic for now //TODO: this is simplistic for now
//6: if c1' is too big, signal the other merger //6: if c1' is too big, signal the other merger
double target_R = *ltable->R(); double target_R = *ltable->R();
@ -254,6 +255,7 @@ void* memMergeThread(void*arg)
// 7: and perhaps c1_mergeable // 7: and perhaps c1_mergeable
ltable->set_tree_c1_mergeable(c1_prime); // c1_prime == c1. ltable->set_tree_c1_mergeable(c1_prime); // c1_prime == c1.
stats->handed_off_tree();
// 8: c1 = new empty. // 8: c1 = new empty.
ltable->set_tree_c1(new diskTreeComponent(xid, ltable->internal_region_size, ltable->datapage_region_size, ltable->datapage_size, stats)); ltable->set_tree_c1(new diskTreeComponent(xid, ltable->internal_region_size, ltable->datapage_region_size, ltable->datapage_size, stats));
@ -269,8 +271,6 @@ void* memMergeThread(void*arg)
// 13 // 13
pthread_mutex_unlock(&ltable->header_mut); pthread_mutex_unlock(&ltable->header_mut);
stats->finished_merge();
// stats->pretty_print(stdout); // stats->pretty_print(stdout);
//TODO: get the freeing outside of the lock //TODO: get the freeing outside of the lock
@ -292,14 +292,14 @@ void *diskMergeThread(void*arg)
int merge_count =0; int merge_count =0;
mergeStats * stats = a->ltable->merge_mgr->newMergeStats(2); mergeStats * stats = a->ltable->merge_mgr->get_merge_stats(2);
while(true) while(true)
{ {
// 2: wait for input // 2: wait for input
stats->new_merge();
pthread_mutex_lock(&ltable->header_mut); pthread_mutex_lock(&ltable->header_mut);
stats->new_merge();
int done = 0; int done = 0;
// get a new input for merge // get a new input for merge
while(!ltable->get_tree_c1_mergeable()) while(!ltable->get_tree_c1_mergeable())
@ -381,9 +381,9 @@ void *diskMergeThread(void*arg)
ltable->update_persistent_header(xid); ltable->update_persistent_header(xid);
Tcommit(xid); Tcommit(xid);
pthread_mutex_unlock(&ltable->header_mut);
stats->finished_merge(); stats->finished_merge();
pthread_mutex_unlock(&ltable->header_mut);
// stats->pretty_print(stdout); // stats->pretty_print(stdout);
} }
@ -400,24 +400,31 @@ void merge_iterators(int xid,
bool dropDeletes // should be true iff this is biggest component bool dropDeletes // should be true iff this is biggest component
) )
{ {
stasis_log_t * log = (stasis_log_t*)stasis_log();
datatuple *t1 = itrA->next_callerFrees(); datatuple *t1 = itrA->next_callerFrees();
pthread_mutex_lock(&ltable->header_mut); // XXX slow
stats->read_tuple_from_large_component(t1); stats->read_tuple_from_large_component(t1);
pthread_mutex_unlock(&ltable->header_mut); // XXX slow
datatuple *t2 = 0; datatuple *t2 = 0;
int i = 0; int i = 0;
while( (t2=itrB->next_callerFrees()) != 0) while( (t2=itrB->next_callerFrees()) != 0)
{ {
pthread_mutex_lock(&ltable->header_mut); // XXX slow
stats->read_tuple_from_small_component(t2); stats->read_tuple_from_small_component(t2);
pthread_mutex_unlock(&ltable->header_mut); // XXX slow
DEBUG("tuple\t%lld: keylen %d datalen %d\n", DEBUG("tuple\t%lld: keylen %d datalen %d\n",
ntuples, *(t2->keylen),*(t2->datalen) ); ntuples, *(t2->keylen),*(t2->datalen) );
while(t1 != 0 && datatuple::compare(t1->key(), t1->keylen(), t2->key(), t2->keylen()) < 0) // t1 is less than t2 while(t1 != 0 && datatuple::compare(t1->key(), t1->keylen(), t2->key(), t2->keylen()) < 0) // t1 is less than t2
{ {
pthread_mutex_lock(&ltable->header_mut); // XXX slow
//insert t1 //insert t1
scratch_tree->insertTuple(xid, t1); scratch_tree->insertTuple(xid, t1);
i+=t1->byte_length(); if(i > 1000000) { if(forceMe) forceMe->force(xid); i = 0; } i+=t1->byte_length(); if(i > 1000000) { if(forceMe) forceMe->force(xid); log->force_tail(log, LOG_FORCE_WAL); i = 0; }
stats->wrote_tuple(t1); stats->wrote_tuple(t1);
datatuple::freetuple(t1); datatuple::freetuple(t1);
//advance itrA //advance itrA
@ -425,46 +432,56 @@ void merge_iterators(int xid,
if(t1) { if(t1) {
stats->read_tuple_from_large_component(t1); stats->read_tuple_from_large_component(t1);
} }
pthread_mutex_unlock(&ltable->header_mut); // XXX slow
} }
if(t1 != 0 && datatuple::compare(t1->key(), t1->keylen(), t2->key(), t2->keylen()) == 0) if(t1 != 0 && datatuple::compare(t1->key(), t1->keylen(), t2->key(), t2->keylen()) == 0)
{ {
datatuple *mtuple = ltable->gettuplemerger()->merge(t1,t2); datatuple *mtuple = ltable->gettuplemerger()->merge(t1,t2);
pthread_mutex_lock(&ltable->header_mut); // XXX slow
stats->merged_tuples(mtuple, t2, t1); // this looks backwards, but is right. stats->merged_tuples(mtuple, t2, t1); // this looks backwards, but is right.
//insert merged tuple, drop deletes //insert merged tuple, drop deletes
if(dropDeletes && !mtuple->isDelete()) { if(dropDeletes && !mtuple->isDelete()) {
scratch_tree->insertTuple(xid, mtuple); scratch_tree->insertTuple(xid, mtuple);
i+=mtuple->byte_length(); if(i > 1000000) { if(forceMe) forceMe->force(xid); log->force_tail(log, LOG_FORCE_WAL); i = 0; }
} }
datatuple::freetuple(t1); datatuple::freetuple(t1);
stats->wrote_tuple(mtuple);
t1 = itrA->next_callerFrees(); //advance itrA t1 = itrA->next_callerFrees(); //advance itrA
if(t1) { if(t1) {
stats->read_tuple_from_large_component(t1); stats->read_tuple_from_large_component(t1);
} }
datatuple::freetuple(mtuple); datatuple::freetuple(mtuple);
pthread_mutex_unlock(&ltable->header_mut); // XXX slow
} }
else else
{ {
//insert t2 //insert t2
scratch_tree->insertTuple(xid, t2); scratch_tree->insertTuple(xid, t2);
i+=t2->byte_length(); if(i > 1000000) { if(forceMe) forceMe->force(xid); log->force_tail(log, LOG_FORCE_WAL); i = 0; }
pthread_mutex_lock(&ltable->header_mut); // XXX slow
stats->wrote_tuple(t2);
pthread_mutex_unlock(&ltable->header_mut); // XXX slow
// cannot free any tuples here; they may still be read through a lookup // cannot free any tuples here; they may still be read through a lookup
} }
i+= t2->byte_length(); if(i > 1000000) { if(forceMe) forceMe->force(xid); i = 0; }
stats->wrote_tuple(t2);
datatuple::freetuple(t2); datatuple::freetuple(t2);
} }
while(t1 != 0) {// t1 is less than t2 while(t1 != 0) {// t1 is less than t2
pthread_mutex_lock(&ltable->header_mut); // XXX slow
scratch_tree->insertTuple(xid, t1); scratch_tree->insertTuple(xid, t1);
i += t1->byte_length(); if(i > 1000000) { if(forceMe) forceMe->force(xid); i = 0; }
stats->wrote_tuple(t1); stats->wrote_tuple(t1);
i += t1->byte_length(); if(i > 1000000) { if(forceMe) forceMe->force(xid); log->force_tail(log, LOG_FORCE_WAL); i = 0; }
datatuple::freetuple(t1); datatuple::freetuple(t1);
//advance itrA //advance itrA
t1 = itrA->next_callerFrees(); t1 = itrA->next_callerFrees();
stats->read_tuple_from_large_component(t1); stats->read_tuple_from_large_component(t1);
pthread_mutex_unlock(&ltable->header_mut); // XXX slow
} }
DEBUG("dpages: %d\tnpages: %d\tntuples: %d\n", dpages, npages, ntuples); DEBUG("dpages: %d\tnpages: %d\tntuples: %d\n", dpages, npages, ntuples);

View file

@ -35,7 +35,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
xid = Tbegin(); xid = Tbegin();
mergeManager merge_mgr(0); mergeManager merge_mgr(0);
mergeStats * stats = merge_mgr.newMergeStats(1); mergeStats * stats = merge_mgr.get_merge_stats(1);
diskTreeComponent *ltable_c1 = new diskTreeComponent(xid, 1000, 10000, 5, stats); diskTreeComponent *ltable_c1 = new diskTreeComponent(xid, 1000, 10000, 5, stats);