clean up initialization / marshalling code. Lets some of the public mergeStat fields to be protected

git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@1490 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
sears 2010-12-14 00:06:32 +00:00
parent eb4446f409
commit 43425128ba
12 changed files with 157 additions and 100 deletions

View file

@ -20,8 +20,11 @@ static inline double tv_to_double(struct timeval tv)
/////////////////////////////////////////////////////////////////
template<class TUPLE>
logtable<TUPLE>::logtable(pageid_t internal_region_size, pageid_t datapage_region_size, pageid_t datapage_size)
logtable<TUPLE>::logtable(pageid_t max_c0_size, pageid_t internal_region_size, pageid_t datapage_region_size, pageid_t datapage_size)
{
this->max_c0_size = max_c0_size;
this->mean_c0_effective_size = max_c0_size;
this->num_c0_mergers = 0;
r_val = 3.0; // MIN_R
tree_c0 = NULL;
@ -35,7 +38,7 @@ logtable<TUPLE>::logtable(pageid_t internal_region_size, pageid_t datapage_regio
this->accepting_new_requests = true;
this->shutting_down_ = false;
flushing = false;
this->merge_mgr = new mergeManager(this);
this->merge_mgr = 0;
tmerger = new tuplemerger(&replace_merger);
header_mut = rwlc_initlock();
@ -108,15 +111,14 @@ recordid logtable<TUPLE>::allocTable(int xid)
//create the small tree
tree_c1 = new diskTreeComponent(xid, internal_region_size, datapage_region_size, datapage_size, stats);
c0_stats = merge_mgr->get_merge_stats(0);
merge_mgr = new mergeManager(this);
merge_mgr->set_c0_size(max_c0_size);
merge_mgr->new_merge(0);
c0_stats->starting_merge();
merge_mgr->get_merge_stats(0)->starting_merge();
tree_c0 = new memTreeComponent<datatuple>::rbtree_t;
update_persistent_header(xid, 1);
update_persistent_header(xid, 2);
tbl_header.merge_manager = merge_mgr->talloc(xid);
update_persistent_header(xid);
return table_rec;
}
@ -128,19 +130,15 @@ void logtable<TUPLE>::openTable(int xid, recordid rid) {
tree_c1 = new diskTreeComponent(xid, tbl_header.c1_root, tbl_header.c1_state, tbl_header.c1_dp_state, 0);
tree_c0 = new memTreeComponent<datatuple>::rbtree_t;
merge_mgr->get_merge_stats(1)->bytes_out = tbl_header.c1_base_size;
merge_mgr->get_merge_stats(1)->base_size = tbl_header.c1_base_size;
merge_mgr->get_merge_stats(1)->mergeable_size = tbl_header.c1_mergeable_size;
merge_mgr->get_merge_stats(2)->base_size = tbl_header.c2_base_size;
merge_mgr->get_merge_stats(2)->bytes_out = tbl_header.c2_base_size;
merge_mgr = new mergeManager(this, xid, tbl_header.merge_manager);
merge_mgr->set_c0_size(max_c0_size);
c0_stats = merge_mgr->get_merge_stats(0);
merge_mgr->new_merge(0);
c0_stats->starting_merge();
merge_mgr->get_merge_stats(0)->starting_merge();
}
template<class TUPLE>
void logtable<TUPLE>::update_persistent_header(int xid, int merge_level) {
void logtable<TUPLE>::update_persistent_header(int xid) {
tbl_header.c2_root = tree_c2->get_root_rid();
tbl_header.c2_dp_state = tree_c2->get_datapage_allocator_rid();
@ -149,16 +147,7 @@ void logtable<TUPLE>::update_persistent_header(int xid, int merge_level) {
tbl_header.c1_dp_state = tree_c1->get_datapage_allocator_rid();
tbl_header.c1_state = tree_c1->get_internal_node_allocator_rid();
if(merge_level == 1) {
tbl_header.c1_base_size = merge_mgr->get_merge_stats(1)->bytes_out;
tbl_header.c1_mergeable_size = merge_mgr->get_merge_stats(1)->mergeable_size;
} else if(merge_level == 2) {
tbl_header.c1_mergeable_size = 0;
tbl_header.c2_base_size = merge_mgr->get_merge_stats(2)->bytes_out;
} else {
assert(merge_level == 1 || merge_level == 2);
abort();
}
merge_mgr->marshal(xid, tbl_header.merge_manager);
Tset(xid, table_rec, &tbl_header);
}
@ -193,7 +182,7 @@ void logtable<TUPLE>::flushTable()
}
set_c0_is_merging(true);
c0_stats->handed_off_tree();
merge_mgr->get_merge_stats(0)->handed_off_tree();
merge_mgr->new_merge(0);
gettimeofday(&stop_tv,0);
@ -202,7 +191,7 @@ void logtable<TUPLE>::flushTable()
DEBUG("Signaled c0-c1 merge thread\n");
merge_count ++;
c0_stats->starting_merge();
merge_mgr->get_merge_stats(0)->starting_merge();
tsize = 0;
tree_bytes = 0;
@ -503,7 +492,7 @@ datatuple * logtable<TUPLE>::insertTupleHelper(datatuple *tuple)
pre_t = *rbitr;
//do the merging
datatuple *new_t = tmerger->merge(pre_t, tuple);
c0_stats->merged_tuples(new_t, tuple, pre_t);
merge_mgr->get_merge_stats(0)->merged_tuples(new_t, tuple, pre_t);
t = new_t;
tree_c0->erase(pre_t); //remove the previous tuple

View file

@ -34,7 +34,7 @@ public:
// 6GB ~= 100B * 500 GB / (datapage_size * 4KB)
// (100B * 500GB) / (6GB * 4KB) = 2.035
// RCS: Set this to 1 so that we do (on average) one seek per b-tree read.
logtable(pageid_t internal_region_size = 1000, pageid_t datapage_region_size = 10000, pageid_t datapage_size = 1);
logtable(pageid_t max_c0_size = 100 * 1024 * 1024, pageid_t internal_region_size = 1000, pageid_t datapage_region_size = 10000, pageid_t datapage_size = 1);
~logtable();
@ -88,17 +88,11 @@ public:
inline memTreeComponent<datatuple>::rbtree_ptr_t get_tree_c0(){return tree_c0;}
inline memTreeComponent<datatuple>::rbtree_ptr_t get_tree_c0_mergeable(){return tree_c0_mergeable;}
void set_tree_c0(memTreeComponent<datatuple>::rbtree_ptr_t newtree){tree_c0 = newtree; bump_epoch(); }
void set_max_c0_size(int64_t max_c0_size) {
this->max_c0_size = max_c0_size;
this->mean_c0_effective_size = max_c0_size;
this->num_c0_mergers = 0;
merge_mgr->set_c0_size(max_c0_size);
merge_mgr->get_merge_stats(1);
}
bool get_c0_is_merging() { return c0_is_merging; }
void set_c0_is_merging(bool is_merging) { c0_is_merging = is_merging; }
void set_tree_c0_mergeable(memTreeComponent<datatuple>::rbtree_ptr_t newtree){tree_c0_mergeable = newtree; bump_epoch(); }
void update_persistent_header(int xid, int merge_level);
void update_persistent_header(int xid);
inline tuplemerger * gettuplemerger(){return tmerger;}
@ -111,9 +105,7 @@ public:
recordid c1_root;
recordid c1_state;
recordid c1_dp_state;
pageid_t c2_base_size;
pageid_t c1_mergeable_size;
pageid_t c1_base_size;
recordid merge_manager;
};
rwlc * header_mut;
pthread_mutex_t tick_mut;
@ -165,8 +157,6 @@ private:
std::vector<iterator *> its;
mergeStats * c0_stats;
public:
bool shutting_down_;

View file

@ -10,6 +10,10 @@
#include "logstore.h"
#include "math.h"
#include "time.h"
#include <stasis/transactional.h>
#undef try
#undef end
mergeStats* mergeManager:: get_merge_stats(int mergeLevel) {
if (mergeLevel == 0) {
return c0;
@ -46,6 +50,7 @@ void mergeManager::new_merge(int mergeLevel) {
s->new_merge2();
}
void mergeManager::set_c0_size(int64_t size) {
assert(size);
c0->target_size = size;
}
void mergeManager::update_progress(mergeStats * s, int delta) {
@ -241,18 +246,14 @@ void * merge_manager_pretty_print_thread(void * arg) {
return m->pretty_print_thread();
}
mergeManager::mergeManager(logtable<datatuple> *ltable):
UPDATE_PROGRESS_PERIOD(0.005),
cur_c1_c2_progress_delta(0.0),
ltable(ltable),
c0(new mergeStats(0, ltable ? ltable->max_c0_size : 10000000)),
c1(new mergeStats(1, (int64_t)(ltable ? ((double)(ltable->max_c0_size) * *ltable->R()) : 100000000.0) )),
c2(new mergeStats(2, 0)) {
void mergeManager::init_helper(void) {
struct timeval tv;
gettimeofday(&tv, 0);
sleeping[0] = false;
sleeping[1] = false;
sleeping[2] = false;
cur_c1_c2_progress_delta = c2->in_progress - c1->out_progress;
#if EXTENDED_STATS
double_to_ts(&c0->stats_last_tick, tv_to_double(&tv));
double_to_ts(&c1->stats_last_tick, tv_to_double(&tv));
@ -263,6 +264,41 @@ mergeManager::mergeManager(logtable<datatuple> *ltable):
pthread_create(&pp_thread, 0, merge_manager_pretty_print_thread, (void*)this);
}
mergeManager::mergeManager(logtable<datatuple> *ltable):
UPDATE_PROGRESS_PERIOD(0.005),
ltable(ltable) {
c0 = new mergeStats(0, ltable ? ltable->max_c0_size : 10000000);
c1 = new mergeStats(1, (int64_t)(ltable ? ((double)(ltable->max_c0_size) * *ltable->R()) : 100000000.0) );
c2 = new mergeStats(2, 0);
init_helper();
}
mergeManager::mergeManager(logtable<datatuple> *ltable, int xid, recordid rid):
UPDATE_PROGRESS_PERIOD(0.005),
ltable(ltable) {
marshalled_header h;
Tread(xid, rid, &h);
c0 = new mergeStats(xid, h.c0);
c1 = new mergeStats(xid, h.c1);
c2 = new mergeStats(xid, h.c2);
init_helper();
}
recordid mergeManager::talloc(int xid) {
marshalled_header h;
recordid ret = Talloc(xid, sizeof(h));
h.c0 = c0->talloc(xid);
h.c1 = c1->talloc(xid);
h.c2 = c2->talloc(xid);
Tset(xid, ret, &h);
return ret;
}
void mergeManager::marshal(int xid, recordid rid) {
marshalled_header h;
Tread(xid, rid, &h);
c0->marshal(xid, h.c0);
c1->marshal(xid, h.c1);
c2->marshal(xid, h.c2);
}
void mergeManager::pretty_print(FILE * out) {
#if EXTENDED_STATS

View file

@ -38,7 +38,9 @@ public:
return (1000000ULL * (uint64_t)tv.tv_sec) + ((uint64_t)tv.tv_usec);
}
mergeManager(logtable<datatuple> *ltable);
mergeManager(logtable<datatuple> *ltable, int xid, recordid rid);
void marshal(int xid, recordid rid);
recordid talloc(int xid);
~mergeManager();
void new_merge(int mergelevel);
@ -60,6 +62,12 @@ public:
double cur_c1_c2_progress_delta;
private:
void init_helper(void);
struct marshalled_header {
recordid c0;
recordid c1;
recordid c2;
};
logtable<datatuple>* ltable;
double throttle_seconds;
double last_throttle_seconds;

View file

@ -21,6 +21,10 @@
#include <mergeManager.h> // XXX for double_to_ts, etc... create a util class.
#include <stasis/transactional.h>
#undef try
#undef end
class mergeStats {
public:
mergeStats(int merge_level, int64_t target_size) :
@ -59,6 +63,35 @@ class mergeStats {
mergeManager::double_to_ts(&stats_last_tick, mergeManager::tv_to_double(&last));
#endif
}
mergeStats(int xid, recordid rid) {
marshalled_header h;
Tread(xid, rid, &h);
merge_level = h.merge_level;
base_size = h.base_size;
mergeable_size = h.mergeable_size;
target_size = h.target_size;
current_size = 0;
bytes_out = base_size;
bytes_in_small = 0;
bytes_in_large = 0;
just_handed_off= false;
delta = 0;
need_tick = 0;
in_progress = 0;
out_progress = ((double)base_size) / (double)target_size;
active = false;
}
recordid talloc(int xid) {
return Talloc(xid, sizeof(marshalled_header));
}
void marshal(int xid, recordid rid) {
marshalled_header h;
h.merge_level = merge_level;
h.base_size = base_size;
h.mergeable_size = mergeable_size;
h.target_size = h.target_size;
Tset(xid, rid, &h);
}
~mergeStats() { }
void new_merge2() {
if(just_handed_off) {
@ -110,7 +143,7 @@ class mergeStats {
pageid_t output_size() {
return bytes_out;
}
const int merge_level; // 1 => C0->C1, 2 => C1->C2
int merge_level; // 1 => C0->C1, 2 => C1->C2
protected:
double float_tv(struct timeval& tv) {
@ -118,12 +151,23 @@ class mergeStats {
}
friend class mergeManager;
public: // XXX only accessed during initialization.
protected: // XXX only accessed during initialization.
struct marshalled_header {
int merge_level;
pageid_t base_size;
pageid_t mergeable_size;
pageid_t target_size; // Needed?
};
public:
pageid_t base_size; // size of table at beginning of merge. for c0, size of table at beginning of current c0-c1 merge round, plus data written since then. (this minus c1->bytes_in_small is the current size)
protected:
pageid_t mergeable_size; // protected by mutex.
public:
pageid_t target_size;
protected:
pageid_t current_size;
pageid_t bytes_out; // How many bytes worth of tuples did we write?
public:
pageid_t bytes_in_small; // How many bytes from the small input tree (for C0, we ignore tree overheads)?
protected:
pageid_t bytes_in_large; // Bytes from the large input?

View file

@ -139,27 +139,28 @@ void * merge_scheduler::memMergeThread() {
double new_c1_size = stats->output_size();
pthread_cond_signal(&ltable_->c0_needed);
ltable_->update_persistent_header(xid, 1);
ltable_->update_persistent_header(xid);
Tcommit(xid);
//TODO: this is simplistic for now
//6: if c1' is too big, signal the other merger
// update c0 effective size.
double frac = 1.0/(double)merge_count;
ltable_->num_c0_mergers = merge_count;
ltable_->mean_c0_effective_size =
(int64_t) (
((double)ltable_->mean_c0_effective_size)*(1-frac) +
((double)stats->bytes_in_small*frac));
ltable_->merge_mgr->get_merge_stats(0)->target_size = ltable_->mean_c0_effective_size;
double target_R = *ltable_->R();
if(stats->bytes_in_small) {
// update c0 effective size.
double frac = 1.0/(double)merge_count;
ltable_->num_c0_mergers = merge_count;
ltable_->mean_c0_effective_size =
(int64_t) (
((double)ltable_->mean_c0_effective_size)*(1-frac) +
((double)stats->bytes_in_small*frac));
ltable_->merge_mgr->get_merge_stats(0)->target_size = ltable_->mean_c0_effective_size;
}
printf("Merge done. R = %f MemSize = %lld Mean = %lld, This = %lld, Count = %d factor %3.3fcur%3.3favg\n", target_R, (long long)ltable_->max_c0_size, (long long int)ltable_->mean_c0_effective_size, stats->bytes_in_small, merge_count, ((double)stats->bytes_in_small) / (double)ltable_->max_c0_size, ((double)ltable_->mean_c0_effective_size) / (double)ltable_->max_c0_size);
printf("Merge done. R = %f MemSize = %lld Mean = %lld, This = %lld, Count = %d factor %3.3fcur%3.3favg\n", *ltable_->R(), (long long)ltable_->max_c0_size, (long long int)ltable_->mean_c0_effective_size, stats->bytes_in_small, merge_count, ((double)stats->bytes_in_small) / (double)ltable_->max_c0_size, ((double)ltable_->mean_c0_effective_size) / (double)ltable_->max_c0_size);
assert(target_R >= MIN_R);
bool signal_c2 = (new_c1_size / ltable_->mean_c0_effective_size > target_R);
DEBUG("\nc1 size %f R %f\n", new_c1_size, target_R);
assert(*ltable_->R() >= MIN_R);
bool signal_c2 = (new_c1_size / ltable_->mean_c0_effective_size > *ltable_->R());
DEBUG("\nc1 size %f R %f\n", new_c1_size, *ltable_->R());
if( signal_c2 )
{
DEBUG("mmt:\tsignaling C2 for merge\n");
@ -183,10 +184,7 @@ void * merge_scheduler::memMergeThread() {
ltable_->set_tree_c1(new diskTreeComponent(xid, ltable_->internal_region_size, ltable_->datapage_region_size, ltable_->datapage_size, stats));
pthread_cond_signal(&ltable_->c1_ready);
pageid_t old_bytes_out = stats->bytes_out;
stats->bytes_out = 0; // XXX HACK
ltable_->update_persistent_header(xid, 1);
stats->bytes_out = old_bytes_out;
ltable_->update_persistent_header(xid);
Tcommit(xid);
}
@ -302,7 +300,7 @@ void * merge_scheduler::diskMergeThread()
DEBUG("dmt:\tUpdated C2's position on disk to %lld\n",(long long)-1);
// 13
ltable_->update_persistent_header(xid, 2);
ltable_->update_persistent_header(xid);
Tcommit(xid);
rwlc_unlock(ltable_->header_mut);

View file

@ -22,8 +22,6 @@ int main(int argc, char *argv[])
int xid = Tbegin();
logtable<datatuple> ltable;
recordid table_root = ROOT_RECORD;
int64_t c0_size = 1024 * 1024 * 512 * 1;
@ -38,6 +36,8 @@ int main(int argc, char *argv[])
printf("note: running w/ 2GB c0 for benchmarking\n"); // XXX build a separate test server and deployment server?
}
logtable<datatuple> ltable(c0_size);
if(TrecordType(xid, ROOT_RECORD) == INVALID_SLOT) {
printf("Creating empty logstore\n");
table_root = ltable.allocTable(xid);
@ -50,7 +50,6 @@ int main(int argc, char *argv[])
}
Tcommit(xid);
ltable.set_max_c0_size(c0_size);
merge_scheduler * mscheduler = new merge_scheduler(&ltable);
mscheduler->start();

View file

@ -54,8 +54,21 @@ int main(int argc, char *argv[])
int xid = Tbegin();
int64_t c0_size = 1024 * 1024 * 512 * 1;
logtable<datatuple> ltable;
if(argc == 2 && !strcmp(argv[1], "--test")) {
c0_size = 1024 * 1024 * 100;
printf("warning: running w/ tiny c0 for testing"); // XXX build a separate test server and deployment server?
}
if(argc == 2 && !strcmp(argv[1], "--benchmark")) {
c0_size = 1024 * 1024 * 1024 * 1;
printf("note: running w/ 2GB c0 for benchmarking"); // XXX build a separate test server and deployment server?
}
logtable<datatuple> ltable(c0_size);
recordid table_root = ROOT_RECORD;
if(TrecordType(xid, ROOT_RECORD) == INVALID_SLOT) {
@ -71,22 +84,6 @@ int main(int argc, char *argv[])
Tcommit(xid);
int64_t c0_size = 1024 * 1024 * 512 * 1;
if(argc == 2 && !strcmp(argv[1], "--test")) {
c0_size = 1024 * 1024 * 100;
printf("warning: running w/ tiny c0 for testing"); // XXX build a separate test server and deployment server?
}
if(argc == 2 && !strcmp(argv[1], "--benchmark")) {
c0_size = 1024 * 1024 * 1024 * 1;
printf("note: running w/ 2GB c0 for benchmarking"); // XXX build a separate test server and deployment server?
}
ltable.set_max_c0_size(c0_size);
mscheduler = new merge_scheduler(&ltable);
mscheduler->start();

View file

@ -50,8 +50,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
int xid = Tbegin();
logtable<datatuple> * ltable = new logtable<datatuple>(1000, 10000, 5);
ltable->set_max_c0_size(10 * 1024 * 1024);
logtable<datatuple> * ltable = new logtable<datatuple>(10 * 1024 * 1024, 1000, 10000, 5);
merge_scheduler mscheduler(ltable);
recordid table_root = ltable->allocTable(xid);

View file

@ -44,8 +44,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
int xid = Tbegin();
logtable<datatuple> *ltable = new logtable<datatuple>(1000, 10000, 100);
ltable->set_max_c0_size(10*1024*1024);
logtable<datatuple> *ltable = new logtable<datatuple>(10*1024*1024, 1000, 10000, 100);
merge_scheduler mscheduler(ltable);
recordid table_root = ltable->allocTable(xid);

View file

@ -102,8 +102,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
int xid = Tbegin();
logtable<datatuple> *ltable = new logtable<datatuple>(1000, 1000, 40);
ltable->set_max_c0_size(10 * 1024 * 1024);
logtable<datatuple> *ltable = new logtable<datatuple>(10 * 1024 * 1024, 1000, 1000, 40);
merge_scheduler mscheduler(ltable);
recordid table_root = ltable->allocTable(xid);

View file

@ -61,8 +61,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
logtable<datatuple>::init_stasis();
int xid = Tbegin();
ltable = new logtable<datatuple>(1000, 10000, 5);
ltable->set_max_c0_size(10*1024*1024);
ltable = new logtable<datatuple>(10 * 1024 * 1024, 1000, 10000, 5);
merge_scheduler mscheduler(ltable);