diff --git a/logstore.cpp b/logstore.cpp index 67965ed..f684819 100644 --- a/logstore.cpp +++ b/logstore.cpp @@ -82,6 +82,8 @@ template void logtable::init_stasis() { DataPage::register_stasis_page_impl(); + // XXX Workaround Stasis' (still broken) default concurrent buffer manager + stasis_buffer_manager_factory = stasis_buffer_manager_hash_factory; Tinit(); @@ -146,8 +148,6 @@ void logtable::flushTable() start = tv_to_double(start_tv); - rwlc_writelock(header_mut); - int expmcount = merge_count; c0_stats->finished_merge(); @@ -160,7 +160,6 @@ void logtable::flushTable() rwlc_cond_wait(&c0_needed, header_mut); blocked = true; if(expmcount != merge_count) { - rwlc_writeunlock(header_mut); return; } } @@ -182,8 +181,6 @@ void logtable::flushTable() tsize = 0; tree_bytes = 0; - - rwlc_writeunlock(header_mut); if(blocked && stop - start > 0.1) { if(first) @@ -429,7 +426,7 @@ datatuple * logtable::findTuple_first(int xid, datatuple::key_t key, size template void logtable::insertTuple(datatuple *tuple) { - rwlc_readlock(header_mut); + rwlc_writelock(header_mut); // XXX want this to be a readlock, but tick, and the stats need it to be a writelock for now... //lock the red-black tree pthread_mutex_lock(&rb_mut); c0_stats->read_tuple_from_small_component(tuple); @@ -474,12 +471,14 @@ void logtable::insertTuple(datatuple *tuple) if(tree_bytes >= max_c0_size ) { DEBUG("tree size before merge %d tuples %lld bytes.\n", tsize, tree_bytes); - rwlc_unlock(header_mut); - flushTable(); - } else { - //unlock - rwlc_unlock(header_mut); +// rwlc_unlock(header_mut); +// rwlc_writelock(header_mut); + // the test of tree size needs to be atomic with the flushTable, and flushTable needs a writelock. + if(tree_bytes >= max_c0_size) { + flushTable(); + } } + rwlc_unlock(header_mut); DEBUG("tree size %d tuples %lld bytes.\n", tsize, tree_bytes); } @@ -499,7 +498,6 @@ void logtable::forgetIterator(iterator * it) { } template void logtable::bump_epoch() { -// assert(!trywritelock(header_lock,0)); epoch++; for(unsigned int i = 0; i < its.size(); i++) { its[i]->invalidate(); diff --git a/logstore.h b/logstore.h index 982c22b..9d01e6b 100644 --- a/logstore.h +++ b/logstore.h @@ -107,9 +107,13 @@ public: inline bool is_still_running() { return still_running_; } inline void stop() { - still_running_ = false; - flushTable(); - // XXX must need to do other things! + rwlc_writelock(header_mut); + if(still_running_) { + still_running_ = false; + flushTable(); + } + rwlc_unlock(header_mut); + // XXX must need to do other things! (join the threads?) } private: diff --git a/mergeManager.cpp b/mergeManager.cpp index 063fb77..2bfda43 100644 --- a/mergeManager.cpp +++ b/mergeManager.cpp @@ -26,6 +26,7 @@ mergeManager::~mergeManager() { pthread_mutex_destroy(&throttle_mut); pthread_mutex_destroy(&dummy_throttle_mut); pthread_cond_destroy(&dummy_throttle_cond); + pthread_cond_destroy(&throttle_wokeup_cond); delete c0; delete c1; delete c2; @@ -77,6 +78,9 @@ void mergeManager::tick(mergeStats * s, bool block) { s->current_size = s->base_size + s->bytes_out - s->bytes_in_large; if(block) { + while(sleeping[s->merge_level]) { + rwlc_cond_wait(&throttle_wokeup_cond, ltable->header_mut); + } // pthread_mutex_lock(&mut); struct timeval now; gettimeofday(&now, 0); @@ -127,9 +131,14 @@ void mergeManager::tick(mergeStats * s, bool block) { //#define PP_THREAD_INFO #ifdef PP_THREAD_INFO - printf("#%d mbps %6.1f overshoot %9lld current_size = %9lld ",s->merge_level, bps / (1024.0*1024.0), overshoot, s->current_size); + printf("#%d mbps %6.1f overshoot %9lld current_size = %9lld ",s->merge_level, bps / (1024.0*1024.0), overshoot, s->current_size); #endif + if(print_skipped == 10000) { pretty_print(stdout); + print_skipped = 0; + } else { + print_skipped++; + } if(overshoot > 0) { // throttle // it took "elapsed" seconds to process "tick_length_bytes" mb @@ -154,12 +163,20 @@ void mergeManager::tick(mergeStats * s, bool block) { } double_to_ts(&sleep_until, sleeptime + tv_to_double(&now)); + sleeping[s->merge_level] = true; rwlc_cond_timedwait(&dummy_throttle_cond, ltable->header_mut, &sleep_until); + sleeping[s->merge_level] = false; + pthread_cond_broadcast(&throttle_wokeup_cond); gettimeofday(&now, 0); } } while((overshoot > 0) && (raw_overshoot > 0)); } else { - pretty_print(stdout); + if(print_skipped == 10000) { + pretty_print(stdout); + print_skipped = 0; + } else { + print_skipped++; + } } // pthread_mutex_unlock(&mut); } @@ -174,8 +191,13 @@ mergeManager::mergeManager(logtable *ltable): pthread_mutex_init(&throttle_mut, 0); pthread_mutex_init(&dummy_throttle_mut, 0); pthread_cond_init(&dummy_throttle_cond, 0); + pthread_cond_init(&throttle_wokeup_cond, 0); struct timeval tv; gettimeofday(&tv, 0); + sleeping[0] = false; + sleeping[1] = false; + sleeping[2] = false; + print_skipped = 0; double_to_ts(&c0->last_tick, tv_to_double(&tv)); double_to_ts(&c1->last_tick, tv_to_double(&tv)); double_to_ts(&c2->last_tick, tv_to_double(&tv)); diff --git a/mergeManager.h b/mergeManager.h index 1664379..220f515 100644 --- a/mergeManager.h +++ b/mergeManager.h @@ -58,6 +58,9 @@ private: pthread_mutex_t throttle_mut; pthread_mutex_t dummy_throttle_mut; pthread_cond_t dummy_throttle_cond; + pthread_cond_t throttle_wokeup_cond; + bool sleeping[3]; + int print_skipped; }; #endif /* MERGEMANAGER_H_ */ diff --git a/merger.cpp b/merger.cpp index 95b423c..606a623 100644 --- a/merger.cpp +++ b/merger.cpp @@ -390,6 +390,16 @@ void *diskMergeThread(void*arg) return 0; } +#define FORCE_INTERVAL 1000000 // XXX do not hardcode FORCE_INTERVAL + +static void periodically_force(int xid, int *i, diskTreeComponent * forceMe, stasis_log_t * log) { + if(*i > FORCE_INTERVAL) { + if(forceMe) forceMe->force(xid); + log->force_tail(log, LOG_FORCE_WAL); + *i = 0; + } +} + template void merge_iterators(int xid, diskTreeComponent * forceMe, @@ -410,9 +420,9 @@ void merge_iterators(int xid, int i = 0; + rwlc_writelock(ltable->header_mut); // XXX slow while( (t2=itrB->next_callerFrees()) != 0) { - rwlc_writelock(ltable->header_mut); // XXX slow stats->read_tuple_from_small_component(t2); rwlc_unlock(ltable->header_mut); // XXX slow @@ -421,10 +431,10 @@ void merge_iterators(int xid, while(t1 != 0 && datatuple::compare(t1->key(), t1->keylen(), t2->key(), t2->keylen()) < 0) // t1 is less than t2 { - rwlc_writelock(ltable->header_mut); // XXX slow + rwlc_writelock(ltable->header_mut); // XXX slow //insert t1 scratch_tree->insertTuple(xid, t1); - i+=t1->byte_length(); if(i > 1000000) { if(forceMe) forceMe->force(xid); log->force_tail(log, LOG_FORCE_WAL); i = 0; } + i+=t1->byte_length(); stats->wrote_tuple(t1); datatuple::freetuple(t1); //advance itrA @@ -433,6 +443,8 @@ void merge_iterators(int xid, stats->read_tuple_from_large_component(t1); } rwlc_unlock(ltable->header_mut); // XXX slow + + periodically_force(xid, &i, forceMe, log); } if(t1 != 0 && datatuple::compare(t1->key(), t1->keylen(), t2->key(), t2->keylen()) == 0) @@ -444,7 +456,7 @@ void merge_iterators(int xid, //insert merged tuple, drop deletes if(dropDeletes && !mtuple->isDelete()) { scratch_tree->insertTuple(xid, mtuple); - i+=mtuple->byte_length(); if(i > 1000000) { if(forceMe) forceMe->force(xid); log->force_tail(log, LOG_FORCE_WAL); i = 0; } + i+=mtuple->byte_length(); } datatuple::freetuple(t1); stats->wrote_tuple(mtuple); @@ -457,33 +469,36 @@ void merge_iterators(int xid, } else { + rwlc_writelock(ltable->header_mut); // XXX slow //insert t2 scratch_tree->insertTuple(xid, t2); - i+=t2->byte_length(); if(i > 1000000) { if(forceMe) forceMe->force(xid); log->force_tail(log, LOG_FORCE_WAL); i = 0; } - rwlc_writelock(ltable->header_mut); // XXX slow + i+=t2->byte_length(); stats->wrote_tuple(t2); rwlc_unlock(ltable->header_mut); // XXX slow // cannot free any tuples here; they may still be read through a lookup } - + periodically_force(xid, &i, forceMe, log); datatuple::freetuple(t2); + rwlc_writelock(ltable->header_mut); // XXX slow } while(t1 != 0) {// t1 is less than t2 - rwlc_writelock(ltable->header_mut); // XXX slow scratch_tree->insertTuple(xid, t1); stats->wrote_tuple(t1); - i += t1->byte_length(); if(i > 1000000) { if(forceMe) forceMe->force(xid); log->force_tail(log, LOG_FORCE_WAL); i = 0; } + i += t1->byte_length(); datatuple::freetuple(t1); //advance itrA t1 = itrA->next_callerFrees(); stats->read_tuple_from_large_component(t1); rwlc_unlock(ltable->header_mut); // XXX slow + periodically_force(xid, &i, forceMe, log); + rwlc_writelock(ltable->header_mut); // XXX slow } DEBUG("dpages: %d\tnpages: %d\tntuples: %d\n", dpages, npages, ntuples); scratch_tree->writes_done(); + rwlc_writeunlock(ltable->header_mut); } diff --git a/regionAllocator.h b/regionAllocator.h index 031dafc..043e3ac 100644 --- a/regionAllocator.h +++ b/regionAllocator.h @@ -48,6 +48,9 @@ public: rid_.page = INVALID_PAGE; regionCount_ = -1; } + ~RegionAllocator() { + bm_->closeHandleImpl(bm_, bmh_); + } Page * load_page(int xid, pageid_t p) { return bm_->loadPageImpl(bm_, bmh_, xid, p, UNKNOWN_TYPE_PAGE); } // XXX handle disk full? @@ -83,7 +86,6 @@ public: Tread(xid, list_entry, &pid); TregionForce(xid, bm_, bmh_, pid); } - bm_->closeHandleImpl(bm_, bmh_); } void dealloc_regions(int xid) { pageid_t regionCount = TarrayListLength(xid, header_.region_list); diff --git a/server.cpp b/server.cpp index 13e75b3..a2fa2f5 100644 --- a/server.cpp +++ b/server.cpp @@ -91,7 +91,7 @@ int main(int argc, char *argv[]) mscheduler->startlogtable(lindex, c0_size); - lserver = new logserver(100, 32432); + lserver = new logserver(5, 32432); lserver->startserver(<able);