added throttling policy and better stats. the throttling makes a merge policy bug painfully obvious (committing because i plan to do some refactoring...)
git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@802 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
parent
7b2560d906
commit
1adb980555
5 changed files with 200 additions and 30 deletions
13
logstore.cpp
13
logstore.cpp
|
@ -43,6 +43,10 @@ logtable<TUPLE>::logtable(pageid_t internal_region_size, pageid_t datapage_regio
|
|||
this->internal_region_size = internal_region_size;
|
||||
this->datapage_region_size = datapage_region_size;
|
||||
this->datapage_size = datapage_size;
|
||||
|
||||
c0_stats = merge_mgr->newMergeStats(0);
|
||||
c0_stats->new_merge();
|
||||
c0_stats->starting_merge();
|
||||
}
|
||||
|
||||
template<class TUPLE>
|
||||
|
@ -133,6 +137,8 @@ void logtable<TUPLE>::flushTable()
|
|||
gettimeofday(&start_tv,0);
|
||||
start = tv_to_double(start_tv);
|
||||
|
||||
c0_stats->finished_merge();
|
||||
c0_stats->new_merge();
|
||||
|
||||
writelock(header_lock,0);
|
||||
pthread_mutex_lock(mergedata->rbtree_mut);
|
||||
|
@ -173,7 +179,7 @@ void logtable<TUPLE>::flushTable()
|
|||
|
||||
set_tree_c0_mergeable(get_tree_c0());
|
||||
|
||||
pthread_cond_signal(mergedata->input_ready_cond);
|
||||
pthread_cond_broadcast(mergedata->input_ready_cond);
|
||||
|
||||
merge_count ++;
|
||||
set_tree_c0(new memTreeComponent<datatuple>::rbtree_t);
|
||||
|
@ -183,6 +189,7 @@ void logtable<TUPLE>::flushTable()
|
|||
|
||||
pthread_mutex_unlock(mergedata->rbtree_mut);
|
||||
unlock(header_lock);
|
||||
c0_stats->starting_merge();
|
||||
if(first)
|
||||
{
|
||||
printf("Blocked writes for %f sec\n", stop-start);
|
||||
|
@ -419,6 +426,7 @@ void logtable<TUPLE>::insertTuple(datatuple *tuple)
|
|||
//lock the red-black tree
|
||||
readlock(header_lock,0);
|
||||
pthread_mutex_lock(mergedata->rbtree_mut);
|
||||
c0_stats->read_tuple_from_small_component(tuple);
|
||||
//find the previous tuple with same key in the memtree if exists
|
||||
memTreeComponent<datatuple>::rbtree_t::iterator rbitr = tree_c0->find(tuple);
|
||||
if(rbitr != tree_c0->end())
|
||||
|
@ -426,6 +434,7 @@ void logtable<TUPLE>::insertTuple(datatuple *tuple)
|
|||
datatuple *pre_t = *rbitr;
|
||||
//do the merging
|
||||
datatuple *new_t = tmerger->merge(pre_t, tuple);
|
||||
c0_stats->merged_tuples(new_t, tuple, pre_t);
|
||||
tree_c0->erase(pre_t); //remove the previous tuple
|
||||
|
||||
tree_c0->insert(new_t); //insert the new tuple
|
||||
|
@ -443,7 +452,7 @@ void logtable<TUPLE>::insertTuple(datatuple *tuple)
|
|||
//insert tuple into the rbtree
|
||||
tree_c0->insert(t);
|
||||
tsize++;
|
||||
tree_bytes += t->byte_length() + RB_TREE_OVERHEAD;
|
||||
tree_bytes += t->byte_length();// + RB_TREE_OVERHEAD;
|
||||
|
||||
}
|
||||
|
||||
|
|
11
logstore.h
11
logstore.h
|
@ -12,6 +12,8 @@
|
|||
|
||||
#include "tuplemerger.h"
|
||||
|
||||
#include "mergeStats.h"
|
||||
|
||||
struct logtable_mergedata;
|
||||
|
||||
template<class TUPLE>
|
||||
|
@ -66,8 +68,12 @@ public:
|
|||
inline memTreeComponent<datatuple>::rbtree_ptr_t get_tree_c0(){return tree_c0;}
|
||||
inline memTreeComponent<datatuple>::rbtree_ptr_t get_tree_c0_mergeable(){return tree_c0_mergeable;}
|
||||
void set_tree_c0(memTreeComponent<datatuple>::rbtree_ptr_t newtree){tree_c0 = newtree; bump_epoch(); }
|
||||
void set_tree_c0_mergeable(memTreeComponent<datatuple>::rbtree_ptr_t newtree){tree_c0_mergeable = newtree; bump_epoch(); }
|
||||
void set_max_c0_size(int64_t max_c0_size) {
|
||||
this->max_c0_size = max_c0_size;
|
||||
merge_mgr->set_c0_size(max_c0_size);
|
||||
|
||||
}
|
||||
void set_tree_c0_mergeable(memTreeComponent<datatuple>::rbtree_ptr_t newtree){tree_c0_mergeable = newtree; bump_epoch(); }
|
||||
void update_persistent_header(int xid);
|
||||
|
||||
void setMergeData(logtable_mergedata * mdata);
|
||||
|
@ -88,9 +94,7 @@ public:
|
|||
|
||||
logtable_mergedata * mergedata;
|
||||
rwl * header_lock;
|
||||
|
||||
int64_t max_c0_size;
|
||||
|
||||
mergeManager * merge_mgr;
|
||||
|
||||
inline bool is_still_running() { return still_running_; }
|
||||
|
@ -121,6 +125,7 @@ private:
|
|||
|
||||
std::vector<iterator *> its;
|
||||
|
||||
mergeManager::mergeStats * c0_stats;
|
||||
bool still_running_;
|
||||
public:
|
||||
|
||||
|
|
193
mergeStats.h
193
mergeStats.h
|
@ -11,48 +11,148 @@
|
|||
#include <stasis/common.h>
|
||||
|
||||
class mergeManager {
|
||||
pageid_t c1_queueSize; // How many bytes must c0-c1 consume before trying to swap over to an empty c1? ( = current target c1 size)
|
||||
pageid_t c2_queueSize; // How many bytes must c1-c2 consume before there is room for a new empty c1? ( = previous c1 size)
|
||||
pageid_t c1_totalConsumed; // What is long-term effective throughput of merger #1? (Excluding blocked times)
|
||||
pageid_t c1_totalWorktime;
|
||||
pageid_t c2_totalConsumed; // What is long term effective throughput of merger #2? (Excluding blocked times)
|
||||
pageid_t c2_totalWorktime;
|
||||
private:
|
||||
double tv_to_double(struct timeval * tv) {
|
||||
return (double)tv->tv_sec + ((double)tv->tv_usec)/1000000.0;
|
||||
}
|
||||
double ts_to_double(struct timespec * ts) {
|
||||
return (double)ts->tv_sec + ((double)ts->tv_nsec)/1000000000.0;
|
||||
}
|
||||
void double_to_ts(struct timespec *ts, double time) {
|
||||
ts->tv_sec = (time_t)(time);
|
||||
ts->tv_nsec = (long)((time - (double)ts->tv_sec) * 1000000000.0);
|
||||
}
|
||||
public:
|
||||
mergeManager() :
|
||||
c0_queueSize(0),
|
||||
c1_queueSize(0),
|
||||
c2_queueSize(0),
|
||||
c0_totalConsumed(0),
|
||||
c0_totalCollapsed(0),
|
||||
c0_totalWorktime(0),
|
||||
c1_totalConsumed(0),
|
||||
c1_totalCollapsed(0),
|
||||
c1_totalWorktime(0),
|
||||
c2_totalConsumed(0),
|
||||
c2_totalWorktime(0) { }
|
||||
c2_totalCollapsed(0),
|
||||
c2_totalWorktime(0),
|
||||
c0(new mergeStats(this, 0)),
|
||||
c1(new mergeStats(this, 1)),
|
||||
c2(new mergeStats(this, 2)) {
|
||||
pthread_mutex_init(&mut, 0);
|
||||
pthread_mutex_init(&throttle_mut, 0);
|
||||
pthread_mutex_init(&dummy_throttle_mut, 0);
|
||||
pthread_cond_init(&dummy_throttle_cond, 0);
|
||||
struct timeval tv;
|
||||
gettimeofday(&tv, 0);
|
||||
double_to_ts(&last_throttle, tv_to_double(&tv));
|
||||
|
||||
}
|
||||
~mergeManager() {
|
||||
pthread_mutex_destroy(&mut);
|
||||
}
|
||||
class mergeStats;
|
||||
|
||||
void new_merge(mergeStats * s) {
|
||||
pthread_mutex_lock(&mut);
|
||||
if(s->merge_count) {
|
||||
if(s->merge_level == 1) {
|
||||
if(s->merge_level == 0) {
|
||||
// queueSize was set during startup
|
||||
} else if(s->merge_level == 1) {
|
||||
c1_queueSize = c1_queueSize > s->bytes_out ? c1_queueSize : s->bytes_out;
|
||||
c1_totalConsumed += s->bytes_in_small;
|
||||
c1_totalWorktime += (long_tv(s->done) - long_tv(s->start));
|
||||
} else if(s->merge_level == 2) {
|
||||
c2_queueSize = s->bytes_in_small;
|
||||
c2_totalConsumed += s->bytes_in_small;
|
||||
c2_totalWorktime += (long_tv(s->done) - long_tv(s->start));
|
||||
} else { abort(); }
|
||||
pretty_print(stdout);
|
||||
}
|
||||
pthread_mutex_unlock(&mut);
|
||||
}
|
||||
void set_c0_size(int64_t size) {
|
||||
c0_queueSize = size;
|
||||
}
|
||||
void tick(mergeStats * s, bool done = false) {
|
||||
if(s->merge_level == 0) {
|
||||
pthread_mutex_lock(&throttle_mut);
|
||||
// throttle?
|
||||
if(s->bytes_in_small_delta > c0_queueSize / 100) {
|
||||
struct timeval now;
|
||||
gettimeofday(&now, 0);
|
||||
double elapsed_delta = tv_to_double(&now) - ts_to_double(&last_throttle);
|
||||
pageid_t bytes_written_delta = (s->bytes_in_small_delta - s->bytes_collapsed_delta);
|
||||
double min_throughput = 100 * 1024; // don't throttle below 100 kilobytes / sec
|
||||
double c0_badness = (double)((c0_totalConsumed + bytes_written_delta - c1_totalConsumed) - c0_queueSize)/ (double)c0_queueSize;
|
||||
if(c0_badness > 0) {
|
||||
double target_throughput = min_throughput / (c0_badness * c0_badness);
|
||||
//double raw_throughput = ((double)bytes_written_delta)/elapsed_delta;
|
||||
double target_elapsed = ((double)bytes_written_delta)/target_throughput;
|
||||
printf("Worked %6.1f (target %6.1f)\n", elapsed_delta, target_elapsed);
|
||||
if(target_elapsed > elapsed_delta) {
|
||||
struct timespec sleep_until;
|
||||
double_to_ts(&sleep_until, ts_to_double(&last_throttle) + target_elapsed);
|
||||
fprintf(stdout, "Throttling for %6.1f seconds\n", target_elapsed - (double)elapsed_delta);
|
||||
pthread_mutex_lock(&dummy_throttle_mut);
|
||||
pthread_cond_timedwait(&dummy_throttle_cond, &dummy_throttle_mut, &sleep_until);
|
||||
pthread_mutex_unlock(&dummy_throttle_mut);
|
||||
memcpy(&last_throttle, &sleep_until, sizeof(sleep_until));
|
||||
} else {
|
||||
double_to_ts(&last_throttle, tv_to_double(&now));
|
||||
}
|
||||
} else {
|
||||
printf("badness is negative\n");
|
||||
double_to_ts(&last_throttle, tv_to_double(&now));
|
||||
}
|
||||
}
|
||||
}
|
||||
if(done || s->bytes_in_small_delta > c0_queueSize / 100) {
|
||||
struct timeval now;
|
||||
gettimeofday(&now,0);
|
||||
unsigned long long elapsed = long_tv(now) - long_tv(s->last);
|
||||
|
||||
pthread_mutex_lock(&mut);
|
||||
if(s->merge_level == 0) {
|
||||
c0_totalConsumed += s->bytes_in_small_delta;
|
||||
c0_totalWorktime += elapsed;
|
||||
c0_totalCollapsed += s->bytes_collapsed_delta;
|
||||
} else if(s->merge_level == 1) {
|
||||
c1_totalConsumed += s->bytes_in_small_delta;
|
||||
c1_totalWorktime += elapsed;
|
||||
c1_totalCollapsed += s->bytes_collapsed_delta;
|
||||
} else if(s->merge_level == 2) {
|
||||
c2_totalConsumed += s->bytes_in_small_delta;
|
||||
c2_totalWorktime += elapsed;
|
||||
c2_totalCollapsed += s->bytes_collapsed_delta;
|
||||
} else { abort(); }
|
||||
pthread_mutex_unlock(&mut);
|
||||
|
||||
s->bytes_in_small_delta = 0;
|
||||
s->bytes_collapsed_delta = 0;
|
||||
|
||||
memcpy(&s->last, &now, sizeof(now));
|
||||
pretty_print(stdout);
|
||||
}
|
||||
if(s->merge_level == 0) {
|
||||
pthread_mutex_unlock(&throttle_mut);
|
||||
}
|
||||
}
|
||||
uint64_t long_tv(struct timeval& tv) {
|
||||
return (1000000ULL * (uint64_t)tv.tv_sec) + ((uint64_t)tv.tv_usec);
|
||||
}
|
||||
void pretty_print(FILE * out) {
|
||||
fprintf(out,
|
||||
"C1 queue size %lld C2 queue size %lld C1 MB/s (eff; active) %6.1f C2 MB/s %6.1f\n",
|
||||
c1_queueSize, c2_queueSize,
|
||||
pageid_t mb = 1024 * 1024;
|
||||
fprintf(out,"%s %s %s ", c0->active ? "RUN" : "---", c1->active ? "RUN" : "---", c2->active ? "RUN" : "---");
|
||||
fprintf(out, "C0 size %lld collapsed %lld resident %lld ",
|
||||
2*c0_queueSize/mb,
|
||||
c0_totalCollapsed/mb,
|
||||
(c0_totalConsumed - (c0_totalCollapsed + c1_totalConsumed))/mb);
|
||||
fprintf(out, "C1 size %lld collapsed %lld resident %lld ",
|
||||
2*c1_queueSize/mb,
|
||||
c1_totalCollapsed/mb,
|
||||
(c1_totalConsumed - (c1_totalCollapsed + c2_totalConsumed))/mb);
|
||||
fprintf(out, "C2 size %lld collapsed %lld ",
|
||||
2*c2_queueSize/mb, c2_totalCollapsed/mb);
|
||||
fprintf(out, "C1 MB/s (eff; active) %6.1f C2 MB/s %6.1f\n",
|
||||
((double)c1_totalConsumed)/((double)c1_totalWorktime),
|
||||
((double)c2_totalConsumed)/((double)c2_totalWorktime)
|
||||
);
|
||||
((double)c2_totalConsumed)/((double)c2_totalWorktime));
|
||||
}
|
||||
class mergeStats {
|
||||
public:
|
||||
|
@ -64,10 +164,15 @@ public:
|
|||
num_tuples_out(0),
|
||||
num_datapages_out(0),
|
||||
bytes_in_small(0),
|
||||
bytes_in_small_delta(0),
|
||||
bytes_collapsed(0),
|
||||
bytes_collapsed_delta(0),
|
||||
num_tuples_in_small(0),
|
||||
bytes_in_large(0),
|
||||
num_tuples_in_large(0) {
|
||||
num_tuples_in_large(0),
|
||||
active(false) {
|
||||
gettimeofday(&sleep,0);
|
||||
gettimeofday(&last,0);
|
||||
}
|
||||
void new_merge() {
|
||||
merge_mgr->new_merge(this);
|
||||
|
@ -76,15 +181,22 @@ public:
|
|||
num_tuples_out = 0;
|
||||
num_datapages_out = 0;
|
||||
bytes_in_small = 0;
|
||||
bytes_in_small_delta = 0;
|
||||
bytes_collapsed = 0;
|
||||
bytes_collapsed_delta = 0;
|
||||
num_tuples_in_small = 0;
|
||||
bytes_in_large = 0;
|
||||
num_tuples_in_large = 0;
|
||||
gettimeofday(&sleep,0);
|
||||
}
|
||||
void starting_merge() {
|
||||
active = true;
|
||||
gettimeofday(&start, 0);
|
||||
gettimeofday(&last, 0);
|
||||
}
|
||||
void finished_merge() {
|
||||
active = false;
|
||||
merge_mgr->tick(this, true);
|
||||
gettimeofday(&done, 0);
|
||||
}
|
||||
void read_tuple_from_large_component(datatuple * tup) {
|
||||
|
@ -96,9 +208,16 @@ public:
|
|||
void read_tuple_from_small_component(datatuple * tup) {
|
||||
if(tup) {
|
||||
num_tuples_in_small++;
|
||||
bytes_in_small_delta += tup->byte_length();
|
||||
bytes_in_small += tup->byte_length();
|
||||
merge_mgr->tick(this);
|
||||
}
|
||||
}
|
||||
void merged_tuples(datatuple * merged, datatuple * small, datatuple * large) {
|
||||
pageid_t d = (merged->byte_length() - (small->byte_length() + large->byte_length()));
|
||||
bytes_collapsed += d;
|
||||
bytes_collapsed_delta += d;
|
||||
}
|
||||
void wrote_tuple(datatuple * tup) {
|
||||
num_tuples_out++;
|
||||
bytes_out_tuples += tup->byte_length();
|
||||
|
@ -118,6 +237,7 @@ public:
|
|||
struct timeval sleep; // When did we go to sleep waiting for input?
|
||||
struct timeval start; // When did we wake up and start merging? (at steady state with max throughput, this should be equal to sleep)
|
||||
struct timeval done; // When did we finish merging?
|
||||
struct timeval last;
|
||||
|
||||
double float_tv(struct timeval& tv) {
|
||||
return ((double)tv.tv_sec) + ((double)tv.tv_usec) / 1000000.0;
|
||||
|
@ -129,9 +249,13 @@ public:
|
|||
pageid_t num_tuples_out; // How many tuples did we write?
|
||||
pageid_t num_datapages_out; // How many datapages?
|
||||
pageid_t bytes_in_small; // How many bytes from the small input tree (for C0, we ignore tree overheads)?
|
||||
pageid_t bytes_in_small_delta; // How many bytes from the small input tree during this tick (for C0, we ignore tree overheads)?
|
||||
pageid_t bytes_collapsed; // How many bytes disappeared due to tuple merges?
|
||||
pageid_t bytes_collapsed_delta;
|
||||
pageid_t num_tuples_in_small; // Tuples from the small input?
|
||||
pageid_t bytes_in_large; // Bytes from the large input?
|
||||
pageid_t num_tuples_in_large; // Tuples from large input?
|
||||
bool active;
|
||||
public:
|
||||
|
||||
void pretty_print(FILE* fd) {
|
||||
|
@ -177,7 +301,38 @@ public:
|
|||
|
||||
|
||||
mergeStats* newMergeStats(int mergeLevel) {
|
||||
return new mergeStats(this, mergeLevel);
|
||||
if (mergeLevel == 0) {
|
||||
return c0;
|
||||
} else if (mergeLevel == 1) {
|
||||
return c1;
|
||||
} else if(mergeLevel == 2) {
|
||||
return c2;
|
||||
} else {
|
||||
abort();
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
pthread_mutex_t mut;
|
||||
pageid_t c0_queueSize;
|
||||
pageid_t c1_queueSize; // How many bytes must c0-c1 consume before trying to swap over to an empty c1? ( = current target c1 size)
|
||||
pageid_t c2_queueSize; // How many bytes must c1-c2 consume before there is room for a new empty c1? ( = previous c1 size)
|
||||
pageid_t c0_totalConsumed;
|
||||
pageid_t c0_totalCollapsed;
|
||||
pageid_t c0_totalWorktime;
|
||||
pageid_t c1_totalConsumed; // What is long-term effective throughput of merger #1? (Excluding blocked times)
|
||||
pageid_t c1_totalCollapsed;
|
||||
pageid_t c1_totalWorktime;
|
||||
pageid_t c2_totalConsumed; // What is long term effective throughput of merger #2? (Excluding blocked times)
|
||||
pageid_t c2_totalCollapsed;
|
||||
pageid_t c2_totalWorktime;
|
||||
mergeStats * c0;
|
||||
mergeStats * c1;
|
||||
mergeStats * c2;
|
||||
struct timespec last_throttle;
|
||||
pthread_mutex_t throttle_mut;
|
||||
pthread_mutex_t dummy_throttle_mut;
|
||||
pthread_cond_t dummy_throttle_cond;
|
||||
|
||||
};
|
||||
#endif /* MERGESTATS_H_ */
|
||||
|
|
|
@ -132,7 +132,7 @@ void merge_scheduler::startlogtable(int index, int64_t MAX_C0_SIZE)
|
|||
|
||||
//disk merger args
|
||||
|
||||
ltable->max_c0_size = MAX_C0_SIZE;
|
||||
ltable->set_max_c0_size(MAX_C0_SIZE);
|
||||
|
||||
diskTreeComponent ** block1_scratch = new diskTreeComponent*;
|
||||
*block1_scratch=0;
|
||||
|
@ -523,6 +523,7 @@ void merge_iterators(int xid,
|
|||
if(t1 != 0 && datatuple::compare(t1->key(), t1->keylen(), t2->key(), t2->keylen()) == 0)
|
||||
{
|
||||
datatuple *mtuple = ltable->gettuplemerger()->merge(t1,t2);
|
||||
stats->merged_tuples(mtuple, t2, t1); // this looks backwards, but is right.
|
||||
|
||||
//insert merged tuple, drop deletes
|
||||
if(dropDeletes && !mtuple->isDelete()) {
|
||||
|
|
Loading…
Reference in a new issue