numerous bugfixes; rate limit progress meter display

git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@816 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
sears 2010-05-27 23:15:24 +00:00
parent 6eecf1557a
commit d981d91dfa
7 changed files with 72 additions and 28 deletions

View file

@ -82,6 +82,8 @@ template<class TUPLE>
void logtable<TUPLE>::init_stasis() {
DataPage<datatuple>::register_stasis_page_impl();
// XXX Workaround Stasis' (still broken) default concurrent buffer manager
stasis_buffer_manager_factory = stasis_buffer_manager_hash_factory;
Tinit();
@ -146,8 +148,6 @@ void logtable<TUPLE>::flushTable()
start = tv_to_double(start_tv);
rwlc_writelock(header_mut);
int expmcount = merge_count;
c0_stats->finished_merge();
@ -160,7 +160,6 @@ void logtable<TUPLE>::flushTable()
rwlc_cond_wait(&c0_needed, header_mut);
blocked = true;
if(expmcount != merge_count) {
rwlc_writeunlock(header_mut);
return;
}
}
@ -182,8 +181,6 @@ void logtable<TUPLE>::flushTable()
tsize = 0;
tree_bytes = 0;
rwlc_writeunlock(header_mut);
if(blocked && stop - start > 0.1) {
if(first)
@ -429,7 +426,7 @@ datatuple * logtable<TUPLE>::findTuple_first(int xid, datatuple::key_t key, size
template<class TUPLE>
void logtable<TUPLE>::insertTuple(datatuple *tuple)
{
rwlc_readlock(header_mut);
rwlc_writelock(header_mut); // XXX want this to be a readlock, but tick, and the stats need it to be a writelock for now...
//lock the red-black tree
pthread_mutex_lock(&rb_mut);
c0_stats->read_tuple_from_small_component(tuple);
@ -474,12 +471,14 @@ void logtable<TUPLE>::insertTuple(datatuple *tuple)
if(tree_bytes >= max_c0_size )
{
DEBUG("tree size before merge %d tuples %lld bytes.\n", tsize, tree_bytes);
rwlc_unlock(header_mut);
flushTable();
} else {
//unlock
rwlc_unlock(header_mut);
// rwlc_unlock(header_mut);
// rwlc_writelock(header_mut);
// the test of tree size needs to be atomic with the flushTable, and flushTable needs a writelock.
if(tree_bytes >= max_c0_size) {
flushTable();
}
}
rwlc_unlock(header_mut);
DEBUG("tree size %d tuples %lld bytes.\n", tsize, tree_bytes);
}
@ -499,7 +498,6 @@ void logtable<TUPLE>::forgetIterator(iterator * it) {
}
template<class TUPLE>
void logtable<TUPLE>::bump_epoch() {
// assert(!trywritelock(header_lock,0));
epoch++;
for(unsigned int i = 0; i < its.size(); i++) {
its[i]->invalidate();

View file

@ -107,9 +107,13 @@ public:
inline bool is_still_running() { return still_running_; }
inline void stop() {
still_running_ = false;
flushTable();
// XXX must need to do other things!
rwlc_writelock(header_mut);
if(still_running_) {
still_running_ = false;
flushTable();
}
rwlc_unlock(header_mut);
// XXX must need to do other things! (join the threads?)
}
private:

View file

@ -26,6 +26,7 @@ mergeManager::~mergeManager() {
pthread_mutex_destroy(&throttle_mut);
pthread_mutex_destroy(&dummy_throttle_mut);
pthread_cond_destroy(&dummy_throttle_cond);
pthread_cond_destroy(&throttle_wokeup_cond);
delete c0;
delete c1;
delete c2;
@ -77,6 +78,9 @@ void mergeManager::tick(mergeStats * s, bool block) {
s->current_size = s->base_size + s->bytes_out - s->bytes_in_large;
if(block) {
while(sleeping[s->merge_level]) {
rwlc_cond_wait(&throttle_wokeup_cond, ltable->header_mut);
}
// pthread_mutex_lock(&mut);
struct timeval now;
gettimeofday(&now, 0);
@ -127,9 +131,14 @@ void mergeManager::tick(mergeStats * s, bool block) {
//#define PP_THREAD_INFO
#ifdef PP_THREAD_INFO
printf("#%d mbps %6.1f overshoot %9lld current_size = %9lld ",s->merge_level, bps / (1024.0*1024.0), overshoot, s->current_size);
printf("#%d mbps %6.1f overshoot %9lld current_size = %9lld ",s->merge_level, bps / (1024.0*1024.0), overshoot, s->current_size);
#endif
if(print_skipped == 10000) {
pretty_print(stdout);
print_skipped = 0;
} else {
print_skipped++;
}
if(overshoot > 0) {
// throttle
// it took "elapsed" seconds to process "tick_length_bytes" mb
@ -154,12 +163,20 @@ void mergeManager::tick(mergeStats * s, bool block) {
}
double_to_ts(&sleep_until, sleeptime + tv_to_double(&now));
sleeping[s->merge_level] = true;
rwlc_cond_timedwait(&dummy_throttle_cond, ltable->header_mut, &sleep_until);
sleeping[s->merge_level] = false;
pthread_cond_broadcast(&throttle_wokeup_cond);
gettimeofday(&now, 0);
}
} while((overshoot > 0) && (raw_overshoot > 0));
} else {
pretty_print(stdout);
if(print_skipped == 10000) {
pretty_print(stdout);
print_skipped = 0;
} else {
print_skipped++;
}
}
// pthread_mutex_unlock(&mut);
}
@ -174,8 +191,13 @@ mergeManager::mergeManager(logtable<datatuple> *ltable):
pthread_mutex_init(&throttle_mut, 0);
pthread_mutex_init(&dummy_throttle_mut, 0);
pthread_cond_init(&dummy_throttle_cond, 0);
pthread_cond_init(&throttle_wokeup_cond, 0);
struct timeval tv;
gettimeofday(&tv, 0);
sleeping[0] = false;
sleeping[1] = false;
sleeping[2] = false;
print_skipped = 0;
double_to_ts(&c0->last_tick, tv_to_double(&tv));
double_to_ts(&c1->last_tick, tv_to_double(&tv));
double_to_ts(&c2->last_tick, tv_to_double(&tv));

View file

@ -58,6 +58,9 @@ private:
pthread_mutex_t throttle_mut;
pthread_mutex_t dummy_throttle_mut;
pthread_cond_t dummy_throttle_cond;
pthread_cond_t throttle_wokeup_cond;
bool sleeping[3];
int print_skipped;
};
#endif /* MERGEMANAGER_H_ */

View file

@ -390,6 +390,16 @@ void *diskMergeThread(void*arg)
return 0;
}
#define FORCE_INTERVAL 1000000 // XXX do not hardcode FORCE_INTERVAL
static void periodically_force(int xid, int *i, diskTreeComponent * forceMe, stasis_log_t * log) {
if(*i > FORCE_INTERVAL) {
if(forceMe) forceMe->force(xid);
log->force_tail(log, LOG_FORCE_WAL);
*i = 0;
}
}
template <class ITA, class ITB>
void merge_iterators(int xid,
diskTreeComponent * forceMe,
@ -410,9 +420,9 @@ void merge_iterators(int xid,
int i = 0;
rwlc_writelock(ltable->header_mut); // XXX slow
while( (t2=itrB->next_callerFrees()) != 0)
{
rwlc_writelock(ltable->header_mut); // XXX slow
stats->read_tuple_from_small_component(t2);
rwlc_unlock(ltable->header_mut); // XXX slow
@ -421,10 +431,10 @@ void merge_iterators(int xid,
while(t1 != 0 && datatuple::compare(t1->key(), t1->keylen(), t2->key(), t2->keylen()) < 0) // t1 is less than t2
{
rwlc_writelock(ltable->header_mut); // XXX slow
rwlc_writelock(ltable->header_mut); // XXX slow
//insert t1
scratch_tree->insertTuple(xid, t1);
i+=t1->byte_length(); if(i > 1000000) { if(forceMe) forceMe->force(xid); log->force_tail(log, LOG_FORCE_WAL); i = 0; }
i+=t1->byte_length();
stats->wrote_tuple(t1);
datatuple::freetuple(t1);
//advance itrA
@ -433,6 +443,8 @@ void merge_iterators(int xid,
stats->read_tuple_from_large_component(t1);
}
rwlc_unlock(ltable->header_mut); // XXX slow
periodically_force(xid, &i, forceMe, log);
}
if(t1 != 0 && datatuple::compare(t1->key(), t1->keylen(), t2->key(), t2->keylen()) == 0)
@ -444,7 +456,7 @@ void merge_iterators(int xid,
//insert merged tuple, drop deletes
if(dropDeletes && !mtuple->isDelete()) {
scratch_tree->insertTuple(xid, mtuple);
i+=mtuple->byte_length(); if(i > 1000000) { if(forceMe) forceMe->force(xid); log->force_tail(log, LOG_FORCE_WAL); i = 0; }
i+=mtuple->byte_length();
}
datatuple::freetuple(t1);
stats->wrote_tuple(mtuple);
@ -457,33 +469,36 @@ void merge_iterators(int xid,
}
else
{
rwlc_writelock(ltable->header_mut); // XXX slow
//insert t2
scratch_tree->insertTuple(xid, t2);
i+=t2->byte_length(); if(i > 1000000) { if(forceMe) forceMe->force(xid); log->force_tail(log, LOG_FORCE_WAL); i = 0; }
rwlc_writelock(ltable->header_mut); // XXX slow
i+=t2->byte_length();
stats->wrote_tuple(t2);
rwlc_unlock(ltable->header_mut); // XXX slow
// cannot free any tuples here; they may still be read through a lookup
}
periodically_force(xid, &i, forceMe, log);
datatuple::freetuple(t2);
rwlc_writelock(ltable->header_mut); // XXX slow
}
while(t1 != 0) {// t1 is less than t2
rwlc_writelock(ltable->header_mut); // XXX slow
scratch_tree->insertTuple(xid, t1);
stats->wrote_tuple(t1);
i += t1->byte_length(); if(i > 1000000) { if(forceMe) forceMe->force(xid); log->force_tail(log, LOG_FORCE_WAL); i = 0; }
i += t1->byte_length();
datatuple::freetuple(t1);
//advance itrA
t1 = itrA->next_callerFrees();
stats->read_tuple_from_large_component(t1);
rwlc_unlock(ltable->header_mut); // XXX slow
periodically_force(xid, &i, forceMe, log);
rwlc_writelock(ltable->header_mut); // XXX slow
}
DEBUG("dpages: %d\tnpages: %d\tntuples: %d\n", dpages, npages, ntuples);
scratch_tree->writes_done();
rwlc_writeunlock(ltable->header_mut);
}

View file

@ -48,6 +48,9 @@ public:
rid_.page = INVALID_PAGE;
regionCount_ = -1;
}
~RegionAllocator() {
bm_->closeHandleImpl(bm_, bmh_);
}
Page * load_page(int xid, pageid_t p) { return bm_->loadPageImpl(bm_, bmh_, xid, p, UNKNOWN_TYPE_PAGE); }
// XXX handle disk full?
@ -83,7 +86,6 @@ public:
Tread(xid, list_entry, &pid);
TregionForce(xid, bm_, bmh_, pid);
}
bm_->closeHandleImpl(bm_, bmh_);
}
void dealloc_regions(int xid) {
pageid_t regionCount = TarrayListLength(xid, header_.region_list);

View file

@ -91,7 +91,7 @@ int main(int argc, char *argv[])
mscheduler->startlogtable(lindex, c0_size);
lserver = new logserver(100, 32432);
lserver = new logserver(5, 32432);
lserver->startserver(&ltable);