initial merge manager (multi-merge) statistics

git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@797 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
sears 2010-04-29 23:13:04 +00:00
parent ce48882946
commit 7b2560d906
6 changed files with 176 additions and 104 deletions

View file

@ -18,14 +18,15 @@ class diskTreeComponent {
class iterator; class iterator;
diskTreeComponent(int xid, pageid_t internal_region_size, pageid_t datapage_region_size, pageid_t datapage_size, diskTreeComponent(int xid, pageid_t internal_region_size, pageid_t datapage_region_size, pageid_t datapage_size,
mergeStats* stats) : mergeManager::mergeStats* stats) :
ltree(new diskTreeComponent::internalNodes(xid, internal_region_size, datapage_region_size, datapage_size)), ltree(new diskTreeComponent::internalNodes(xid, internal_region_size, datapage_region_size, datapage_size)),
dp(0), dp(0),
datapage_size(datapage_size), datapage_size(datapage_size),
stats(stats) {} stats(stats) {}
diskTreeComponent(int xid, recordid root, recordid internal_node_state, recordid datapage_state, mergeStats* stats) : diskTreeComponent(int xid, recordid root, recordid internal_node_state, recordid datapage_state,
mergeManager::mergeStats* stats) :
ltree(new diskTreeComponent::internalNodes(xid, root, internal_node_state, datapage_state)), ltree(new diskTreeComponent::internalNodes(xid, root, internal_node_state, datapage_state)),
dp(0), dp(0),
datapage_size(-1), datapage_size(-1),
@ -73,7 +74,7 @@ class diskTreeComponent {
internalNodes * ltree; internalNodes * ltree;
DataPage<datatuple>* dp; DataPage<datatuple>* dp;
pageid_t datapage_size; pageid_t datapage_size;
mergeStats *stats; mergeManager::mergeStats *stats;
public: public:
class internalNodes{ class internalNodes{

View file

@ -28,6 +28,7 @@ logtable<TUPLE>::logtable(pageid_t internal_region_size, pageid_t datapage_regio
tree_c1_mergeable = NULL; tree_c1_mergeable = NULL;
tree_c2 = NULL; tree_c2 = NULL;
this->still_running_ = true; this->still_running_ = true;
this->merge_mgr = new mergeManager();
this->mergedata = 0; this->mergedata = 0;
//tmerger = new tuplemerger(&append_merger); //tmerger = new tuplemerger(&append_merger);
tmerger = new tuplemerger(&replace_merger); tmerger = new tuplemerger(&replace_merger);
@ -78,7 +79,7 @@ recordid logtable<TUPLE>::allocTable(int xid)
{ {
table_rec = Talloc(xid, sizeof(tbl_header)); table_rec = Talloc(xid, sizeof(tbl_header));
mergeStats * stats = 0; mergeManager::mergeStats * stats = 0;
//create the big tree //create the big tree
tree_c2 = new diskTreeComponent(xid, internal_region_size, datapage_region_size, datapage_size, stats); tree_c2 = new diskTreeComponent(xid, internal_region_size, datapage_region_size, datapage_size, stats);

View file

@ -91,6 +91,8 @@ public:
int64_t max_c0_size; int64_t max_c0_size;
mergeManager * merge_mgr;
inline bool is_still_running() { return still_running_; } inline bool is_still_running() { return still_running_; }
inline void stop() { inline void stop() {
still_running_ = false; still_running_ = false;

View file

@ -10,14 +10,56 @@
#include <stasis/common.h> #include <stasis/common.h>
class mergeStats { class mergeManager {
int merge_level; // 1 => C0->C1, 2 => C1->C2 pageid_t c1_queueSize; // How many bytes must c0-c1 consume before trying to swap over to an empty c1? ( = current target c1 size)
pageid_t merge_count; // This is the merge_count'th merge pageid_t c2_queueSize; // How many bytes must c1-c2 consume before there is room for a new empty c1? ( = previous c1 size)
struct timeval sleep; // When did we go to sleep waiting for input? pageid_t c1_totalConsumed; // What is long-term effective throughput of merger #1? (Excluding blocked times)
pageid_t c1_totalWorktime;
pageid_t c2_totalConsumed; // What is long term effective throughput of merger #2? (Excluding blocked times)
pageid_t c2_totalWorktime;
public: public:
mergeStats(int merge_level, pageid_t merge_count) : mergeManager() :
c1_queueSize(0),
c2_queueSize(0),
c1_totalConsumed(0),
c1_totalWorktime(0),
c2_totalConsumed(0),
c2_totalWorktime(0) { }
class mergeStats;
void new_merge(mergeStats * s) {
if(s->merge_count) {
if(s->merge_level == 1) {
c1_queueSize = c1_queueSize > s->bytes_out ? c1_queueSize : s->bytes_out;
c1_totalConsumed += s->bytes_in_small;
c1_totalWorktime += (long_tv(s->done) - long_tv(s->start));
} else if(s->merge_level == 2) {
c2_queueSize = s->bytes_in_small;
c2_totalConsumed += s->bytes_in_small;
c2_totalWorktime += (long_tv(s->done) - long_tv(s->start));
} else { abort(); }
pretty_print(stdout);
}
}
uint64_t long_tv(struct timeval& tv) {
return (1000000ULL * (uint64_t)tv.tv_sec) + ((uint64_t)tv.tv_usec);
}
void pretty_print(FILE * out) {
fprintf(out,
"C1 queue size %lld C2 queue size %lld C1 MB/s (eff; active) %6.1f C2 MB/s %6.1f\n",
c1_queueSize, c2_queueSize,
((double)c1_totalConsumed)/((double)c1_totalWorktime),
((double)c2_totalConsumed)/((double)c2_totalWorktime)
);
}
class mergeStats {
public:
mergeStats(mergeManager* merge_mgr, int merge_level) :
merge_mgr(merge_mgr),
merge_level(merge_level), merge_level(merge_level),
merge_count(merge_count), merge_count(0),
bytes_out(0), bytes_out(0),
num_tuples_out(0), num_tuples_out(0),
num_datapages_out(0), num_datapages_out(0),
@ -27,6 +69,18 @@ public:
num_tuples_in_large(0) { num_tuples_in_large(0) {
gettimeofday(&sleep,0); gettimeofday(&sleep,0);
} }
void new_merge() {
merge_mgr->new_merge(this);
merge_count++;
bytes_out = 0;
num_tuples_out = 0;
num_datapages_out = 0;
bytes_in_small = 0;
num_tuples_in_small = 0;
bytes_in_large = 0;
num_tuples_in_large = 0;
gettimeofday(&sleep,0);
}
void starting_merge() { void starting_merge() {
gettimeofday(&start, 0); gettimeofday(&start, 0);
} }
@ -57,13 +111,18 @@ public:
pageid_t output_size() { pageid_t output_size() {
return bytes_out; return bytes_out;
} }
private: protected:
mergeManager* merge_mgr;
int merge_level; // 1 => C0->C1, 2 => C1->C2
pageid_t merge_count; // This is the merge_count'th merge
struct timeval sleep; // When did we go to sleep waiting for input?
struct timeval start; // When did we wake up and start merging? (at steady state with max throughput, this should be equal to sleep) struct timeval start; // When did we wake up and start merging? (at steady state with max throughput, this should be equal to sleep)
struct timeval done; // When did we finish merging? struct timeval done; // When did we finish merging?
double float_tv(struct timeval& tv) { double float_tv(struct timeval& tv) {
return ((double)tv.tv_sec) + ((double)tv.tv_usec) / 1000000.0; return ((double)tv.tv_sec) + ((double)tv.tv_usec) / 1000000.0;
} }
friend class mergeManager;
pageid_t bytes_out; // How many bytes did we write (including internal tree nodes)? pageid_t bytes_out; // How many bytes did we write (including internal tree nodes)?
pageid_t bytes_out_tuples; // How many bytes worth of tuples did we write? pageid_t bytes_out_tuples; // How many bytes worth of tuples did we write?
@ -116,4 +175,9 @@ public:
} }
}; };
mergeStats* newMergeStats(int mergeLevel) {
return new mergeStats(this, mergeLevel);
}
};
#endif /* MERGESTATS_H_ */ #endif /* MERGESTATS_H_ */

View file

@ -193,7 +193,7 @@ void merge_iterators(int xid,
ITB *itrB, ITB *itrB,
logtable<datatuple> *ltable, logtable<datatuple> *ltable,
diskTreeComponent *scratch_tree, diskTreeComponent *scratch_tree,
mergeStats *stats, mergeManager::mergeStats *stats,
bool dropDeletes); bool dropDeletes);
@ -229,11 +229,12 @@ void* memMergeThread(void*arg)
assert(ltable->get_tree_c1()); assert(ltable->get_tree_c1());
int merge_count =0; int merge_count =0;
mergeManager::mergeStats * stats = a->ltable->merge_mgr->newMergeStats(1);
while(true) // 1 while(true) // 1
{ {
mergeStats stats(1, merge_count);
writelock(ltable->header_lock,0); writelock(ltable->header_lock,0);
stats->new_merge();
int done = 0; int done = 0;
// 2: wait for c0_mergable // 2: wait for c0_mergable
while(!ltable->get_tree_c0_mergeable()) while(!ltable->get_tree_c0_mergeable())
@ -270,7 +271,7 @@ void* memMergeThread(void*arg)
break; break;
} }
stats.starting_merge(); stats->starting_merge();
// 3: Begin transaction // 3: Begin transaction
xid = Tbegin(); xid = Tbegin();
@ -284,7 +285,7 @@ void* memMergeThread(void*arg)
//create a new tree //create a new tree
diskTreeComponent * c1_prime = new diskTreeComponent(xid, a->internal_region_size, a->datapage_region_size, a->datapage_size, &stats); diskTreeComponent * c1_prime = new diskTreeComponent(xid, a->internal_region_size, a->datapage_region_size, a->datapage_size, stats);
//pthread_mutex_unlock(a->block_ready_mut); //pthread_mutex_unlock(a->block_ready_mut);
unlock(ltable->header_lock); unlock(ltable->header_lock);
@ -292,7 +293,7 @@ void* memMergeThread(void*arg)
//: do the merge //: do the merge
DEBUG("mmt:\tMerging:\n"); DEBUG("mmt:\tMerging:\n");
merge_iterators<typeof(*itrA),typeof(*itrB)>(xid, itrA, itrB, ltable, c1_prime, &stats, false); merge_iterators<typeof(*itrA),typeof(*itrB)>(xid, itrA, itrB, ltable, c1_prime, stats, false);
delete itrA; delete itrA;
delete itrB; delete itrB;
@ -311,7 +312,7 @@ void* memMergeThread(void*arg)
//TODO: this is simplistic for now //TODO: this is simplistic for now
//6: if c1' is too big, signal the other merger //6: if c1' is too big, signal the other merger
double target_R = *(a->r_i); double target_R = *(a->r_i);
double new_c1_size = stats.output_size(); double new_c1_size = stats->output_size();
assert(target_R >= MIN_R); assert(target_R >= MIN_R);
bool signal_c2 = (new_c1_size / ltable->max_c0_size > target_R) || bool signal_c2 = (new_c1_size / ltable->max_c0_size > target_R) ||
(a->max_size && new_c1_size > a->max_size ); (a->max_size && new_c1_size > a->max_size );
@ -347,7 +348,7 @@ void* memMergeThread(void*arg)
ltable->set_tree_c1_mergeable(c1_prime); ltable->set_tree_c1_mergeable(c1_prime);
// 8: c1 = new empty. // 8: c1 = new empty.
ltable->set_tree_c1(new diskTreeComponent(xid, a->internal_region_size, a->datapage_region_size, a->datapage_size, &stats)); ltable->set_tree_c1(new diskTreeComponent(xid, a->internal_region_size, a->datapage_region_size, a->datapage_size, stats));
pthread_cond_signal(a->out_block_ready_cond); pthread_cond_signal(a->out_block_ready_cond);
@ -363,8 +364,8 @@ void* memMergeThread(void*arg)
unlock(ltable->header_lock); unlock(ltable->header_lock);
stats.finished_merge(); stats->finished_merge();
stats.pretty_print(stdout); stats->pretty_print(stdout);
//TODO: get the freeing outside of the lock //TODO: get the freeing outside of the lock
} }
@ -385,13 +386,14 @@ void *diskMergeThread(void*arg)
int merge_count =0; int merge_count =0;
mergeManager::mergeStats * stats = a->ltable->merge_mgr->newMergeStats(2);
while(true) while(true)
{ {
mergeStats stats(2, merge_count);
// 2: wait for input // 2: wait for input
writelock(ltable->header_lock,0); writelock(ltable->header_lock,0);
stats->new_merge();
int done = 0; int done = 0;
// get a new input for merge // get a new input for merge
while(!ltable->get_tree_c1_mergeable()) while(!ltable->get_tree_c1_mergeable())
@ -423,7 +425,7 @@ void *diskMergeThread(void*arg)
break; break;
} }
stats.starting_merge(); stats->starting_merge();
// 3: begin // 3: begin
xid = Tbegin(); xid = Tbegin();
@ -434,14 +436,14 @@ void *diskMergeThread(void*arg)
diskTreeComponent::iterator *itrB = ltable->get_tree_c1_mergeable()->open_iterator(); diskTreeComponent::iterator *itrB = ltable->get_tree_c1_mergeable()->open_iterator();
//create a new tree //create a new tree
diskTreeComponent * c2_prime = new diskTreeComponent(xid, a->internal_region_size, a->datapage_region_size, a->datapage_size, &stats); diskTreeComponent * c2_prime = new diskTreeComponent(xid, a->internal_region_size, a->datapage_region_size, a->datapage_size, stats);
unlock(ltable->header_lock); unlock(ltable->header_lock);
//do the merge //do the merge
DEBUG("dmt:\tMerging:\n"); DEBUG("dmt:\tMerging:\n");
merge_iterators<typeof(*itrA),typeof(*itrB)>(xid, itrA, itrB, ltable, c2_prime, &stats, true); merge_iterators<typeof(*itrA),typeof(*itrB)>(xid, itrA, itrB, ltable, c2_prime, stats, true);
delete itrA; delete itrA;
delete itrB; delete itrB;
@ -467,7 +469,7 @@ void *diskMergeThread(void*arg)
merge_count++; merge_count++;
//update the current optimal R value //update the current optimal R value
*(a->r_i) = std::max(MIN_R, sqrt( (stats.output_size() * 1.0) / (ltable->max_c0_size) ) ); *(a->r_i) = std::max(MIN_R, sqrt( (stats->output_size() * 1.0) / (ltable->max_c0_size) ) );
DEBUG("dmt:\tmerge_count %lld\t#written bytes: %lld\n optimal r %.2f", stats.merge_count, stats.output_size(), *(a->r_i)); DEBUG("dmt:\tmerge_count %lld\t#written bytes: %lld\n optimal r %.2f", stats.merge_count, stats.output_size(), *(a->r_i));
// 10: C2 is never to big // 10: C2 is never to big
@ -480,8 +482,8 @@ void *diskMergeThread(void*arg)
unlock(ltable->header_lock); unlock(ltable->header_lock);
stats.finished_merge(); stats->finished_merge();
stats.pretty_print(stdout); stats->pretty_print(stdout);
} }
return 0; return 0;
@ -492,7 +494,7 @@ void merge_iterators(int xid,
ITA *itrA, //iterator on c1 or c2 ITA *itrA, //iterator on c1 or c2
ITB *itrB, //iterator on c0 or c1, respectively ITB *itrB, //iterator on c0 or c1, respectively
logtable<datatuple> *ltable, logtable<datatuple> *ltable,
diskTreeComponent *scratch_tree, mergeStats *stats, diskTreeComponent *scratch_tree, mergeManager::mergeStats *stats,
bool dropDeletes // should be true iff this is biggest component bool dropDeletes // should be true iff this is biggest component
) )
{ {

View file

@ -34,8 +34,10 @@ void insertProbeIter(size_t NUM_ENTRIES)
xid = Tbegin(); xid = Tbegin();
mergeStats stats(1,0); mergeManager merge_mgr;
diskTreeComponent *ltable_c1 = new diskTreeComponent(xid, 1000, 10000, 5, &stats); mergeManager::mergeStats * stats = merge_mgr.newMergeStats(1);
diskTreeComponent *ltable_c1 = new diskTreeComponent(xid, 1000, 10000, 5, stats);
std::vector<std::string> data_arr; std::vector<std::string> data_arr;
std::vector<std::string> key_arr; std::vector<std::string> key_arr;