From 2a1157602a3718f73036a9e44071ae21ff180d37 Mon Sep 17 00:00:00 2001 From: sears Date: Wed, 19 May 2010 23:42:06 +0000 Subject: [PATCH] Rewrite merge thread synchronization code, update status messages, implement preliminary (and ineffective) admission control, and force write merged data every megabyte (so that prograess can be tracked by admission control. Also, refactor quite a few things. git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@805 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe --- CMakeLists.txt | 2 +- diskTreeComponent.cpp | 7 +- diskTreeComponent.h | 7 +- logserver.cpp | 8 +- logstore.cpp | 129 ++++++++++------------ logstore.h | 45 ++++---- memTreeComponent.h | 12 +-- mergeManager.cpp | 194 +++++++++++++++++++++++++++++++++ mergeManager.h | 73 +++++++++++++ mergeStats.cpp | 12 +++ mergeStats.h | 188 ++------------------------------ merger.cpp | 220 +++++++++++--------------------------- merger.h | 32 +----- server.cpp | 4 - test/check_logtable.cpp | 4 +- test/check_merge.cpp | 3 +- test/check_mergelarge.cpp | 2 - test/check_mergetuple.cpp | 2 - 18 files changed, 455 insertions(+), 489 deletions(-) create mode 100644 mergeManager.cpp create mode 100644 mergeManager.h create mode 100644 mergeStats.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index d208a34..bd3fc1e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -76,7 +76,7 @@ ENDIF ( "${CMAKE_C_COMPILER_ID}" STREQUAL "GNU" ) #CONFIGURE_FILE(${CMAKE_CURRENT_SOURCE_DIR}/config.h.cmake ${CMAKE_CURRENT_BINARY_DIR}/config.h) IF ( HAVE_STASIS ) - ADD_LIBRARY(logstore logserver.cpp logstore.cpp diskTreeComponent.cpp memTreeComponent.cpp datapage.cpp merger.cpp tuplemerger.cpp) + ADD_LIBRARY(logstore logserver.cpp logstore.cpp diskTreeComponent.cpp memTreeComponent.cpp datapage.cpp merger.cpp tuplemerger.cpp mergeStats.cpp mergeManager.cpp) CREATE_EXECUTABLE(server) ENDIF ( HAVE_STASIS ) ADD_LIBRARY(logstore_client tcpclient.cpp) diff --git a/diskTreeComponent.cpp b/diskTreeComponent.cpp index 659efaf..c763a08 100644 --- a/diskTreeComponent.cpp +++ b/diskTreeComponent.cpp @@ -14,6 +14,7 @@ #include "diskTreeComponent.h" #include "regionAllocator.h" +#include "mergeStats.h" #include #include #include @@ -54,7 +55,7 @@ void diskTreeComponent::list_regions(int xid, pageid_t *internal_node_region_len void diskTreeComponent::writes_done() { if(dp) { - stats->wrote_datapage(dp); + ((mergeStats*)stats)->wrote_datapage(dp); dp->writes_done(); delete dp; dp = 0; @@ -69,7 +70,7 @@ int diskTreeComponent::insertTuple(int xid, datatuple *t) // stats->num_datapages_out++; } else if(!dp->append(t)) { // stats->bytes_out += (PAGE_SIZE * dp->get_page_count()); - stats->wrote_datapage(dp); + ((mergeStats*)stats)->wrote_datapage(dp); dp->writes_done(); delete dp; dp = insertDataPage(xid, t); @@ -91,7 +92,7 @@ DataPage* diskTreeComponent::insertDataPage(int xid, datatuple *tuple if(!dp->append(tuple)) { // the last datapage must have not wanted the tuple, and then this datapage figured out the region is full. - stats->wrote_datapage(dp); + ((mergeStats*)stats)->wrote_datapage(dp); dp->writes_done(); delete dp; dp = 0; diff --git a/diskTreeComponent.h b/diskTreeComponent.h index 471ae1d..2b0d092 100644 --- a/diskTreeComponent.h +++ b/diskTreeComponent.h @@ -11,14 +11,13 @@ #include "datapage.h" #include "datatuple.h" #include "mergeStats.h" - class diskTreeComponent { public: class internalNodes; class iterator; diskTreeComponent(int xid, pageid_t internal_region_size, pageid_t datapage_region_size, pageid_t datapage_size, - mergeManager::mergeStats* stats) : + mergeStats* stats) : ltree(new diskTreeComponent::internalNodes(xid, internal_region_size, datapage_region_size, datapage_size)), dp(0), datapage_size(datapage_size), @@ -26,7 +25,7 @@ class diskTreeComponent { diskTreeComponent(int xid, recordid root, recordid internal_node_state, recordid datapage_state, - mergeManager::mergeStats* stats) : + mergeStats* stats) : ltree(new diskTreeComponent::internalNodes(xid, root, internal_node_state, datapage_state)), dp(0), datapage_size(-1), @@ -74,7 +73,7 @@ class diskTreeComponent { internalNodes * ltree; DataPage* dp; pageid_t datapage_size; - mergeManager::mergeStats *stats; + /*mergeManager::mergeStats*/ void *stats; // XXX hack to work around circular includes. public: class internalNodes{ diff --git a/logserver.cpp b/logserver.cpp index 47cd663..732d283 100644 --- a/logserver.cpp +++ b/logserver.cpp @@ -514,7 +514,7 @@ int op_stat_space_usage(pthread_data* data) { int xid = Tbegin(); - readlock(data->ltable->header_lock, 0); + pthread_mutex_lock(&data->ltable->header_mut); /* pageid_t datapage_c1_region_length, datapage_c1_mergeable_region_length = 0, datapage_c2_region_length; pageid_t datapage_c1_region_count, datapage_c1_mergeable_region_count = 0, datapage_c2_region_count; @@ -587,7 +587,7 @@ int op_stat_space_usage(pthread_data* data) { ; } while(TregionNextBoundaryTag(xid, &pid, &tag, 0/*all allocation managers*/)); - unlock(data->ltable->header_lock); + pthread_mutex_unlock(&data->ltable->header_mut); Tcommit(xid); @@ -672,7 +672,7 @@ int op_dbg_blockmap(pthread_data* data) { // produce a list of stasis regions int xid = Tbegin(); - readlock(data->ltable->header_lock, 0); + pthread_mutex_lock(&data->ltable->header_mut); // produce a list of regions used by current tree components /* pageid_t datapage_c1_region_length, datapage_c1_mergeable_region_length = 0, datapage_c2_region_length; @@ -714,7 +714,7 @@ int op_dbg_blockmap(pthread_data* data) { &internal_c2_region_length, &internal_c2_region_count, &internal_c2_regions, &datapage_c2_region_length, &datapage_c2_region_count, &datapage_c2_regions); - unlock(data->ltable->header_lock); + pthread_mutex_unlock(&data->ltable->header_mut); Tcommit(xid); diff --git a/logstore.cpp b/logstore.cpp index ea919dd..d7a4cd0 100644 --- a/logstore.cpp +++ b/logstore.cpp @@ -4,6 +4,7 @@ #include #include #include +#include "mergeStats.h" #undef try #undef end @@ -28,12 +29,16 @@ logtable::logtable(pageid_t internal_region_size, pageid_t datapage_regio tree_c1_mergeable = NULL; tree_c2 = NULL; this->still_running_ = true; - this->merge_mgr = new mergeManager(); + this->merge_mgr = new mergeManager(this); this->mergedata = 0; //tmerger = new tuplemerger(&append_merger); tmerger = new tuplemerger(&replace_merger); - header_lock = initlock(); + pthread_mutex_init(&header_mut, 0); + pthread_cond_init(&c0_needed, 0); + pthread_cond_init(&c0_ready, 0); + pthread_cond_init(&c1_needed, 0); + pthread_cond_init(&c1_ready, 0); tsize = 0; tree_bytes = 0; @@ -62,7 +67,11 @@ logtable::~logtable() memTreeComponent::tearDownTree(tree_c0); } - deletelock(header_lock); + pthread_mutex_destroy(&header_mut); + pthread_cond_destroy(&c0_needed); + pthread_cond_destroy(&c0_ready); + pthread_cond_destroy(&c1_needed); + pthread_cond_destroy(&c1_ready); delete tmerger; } @@ -83,7 +92,7 @@ recordid logtable::allocTable(int xid) { table_rec = Talloc(xid, sizeof(tbl_header)); - mergeManager::mergeStats * stats = 0; + mergeStats * stats = 0; //create the big tree tree_c2 = new diskTreeComponent(xid, internal_region_size, datapage_region_size, datapage_size, stats); @@ -104,7 +113,7 @@ void logtable::openTable(int xid, recordid rid) { template void logtable::update_persistent_header(int xid) { - tbl_header.c2_root = tree_c2->get_root_rid(); + tbl_header.c2_root = tree_c2->get_root_rid(); tbl_header.c2_dp_state = tree_c2->get_datapage_allocator_rid(); tbl_header.c2_state = tree_c2->get_internal_node_allocator_rid(); tbl_header.c1_root = tree_c1->get_root_rid(); @@ -117,10 +126,6 @@ void logtable::update_persistent_header(int xid) { template void logtable::setMergeData(logtable_mergedata * mdata){ this->mergedata = mdata; - mdata->internal_region_size = internal_region_size; - mdata->datapage_region_size = datapage_region_size; - mdata->datapage_size = datapage_size; - bump_epoch(); } @@ -140,38 +145,22 @@ void logtable::flushTable() c0_stats->finished_merge(); c0_stats->new_merge(); - writelock(header_lock,0); - pthread_mutex_lock(mergedata->rbtree_mut); - - int expmcount = merge_count; + pthread_mutex_lock(&header_mut); + int expmcount = merge_count; //this is for waiting the previous merger of the mem-tree //hopefullly this wont happen - while(get_tree_c0_mergeable()) { - unlock(header_lock); - if(tree_bytes >= max_c0_size) - pthread_cond_wait(mergedata->input_needed_cond, mergedata->rbtree_mut); - else - { - pthread_mutex_unlock(mergedata->rbtree_mut); - return; - } + bool blocked = false; - - pthread_mutex_unlock(mergedata->rbtree_mut); - - writelock(header_lock,0); - pthread_mutex_lock(mergedata->rbtree_mut); - - if(expmcount != merge_count) - { - unlock(header_lock); - pthread_mutex_unlock(mergedata->rbtree_mut); - return; - } - + while(get_tree_c0_mergeable()) { + pthread_cond_wait(&c0_needed, &header_mut); + blocked = true; + if(expmcount != merge_count) { + pthread_mutex_unlock(&header_mut); + return; + } } gettimeofday(&stop_tv,0); @@ -179,28 +168,30 @@ void logtable::flushTable() set_tree_c0_mergeable(get_tree_c0()); - pthread_cond_broadcast(mergedata->input_ready_cond); + pthread_cond_signal(&c0_ready); merge_count ++; set_tree_c0(new memTreeComponent::rbtree_t); + c0_stats->starting_merge(); tsize = 0; tree_bytes = 0; - pthread_mutex_unlock(mergedata->rbtree_mut); - unlock(header_lock); - c0_stats->starting_merge(); - if(first) - { - printf("Blocked writes for %f sec\n", stop-start); - first = 0; + pthread_mutex_unlock(&header_mut); + + if(blocked) { + if(first) + { + printf("\nBlocked writes for %f sec\n", stop-start); + first = 0; + } + else + { + printf("\nBlocked writes for %f sec (serviced writes for %f sec)\n", + stop-start, start-last_start); + } + last_start = stop; } - else - { - printf("Blocked writes for %f sec (serviced writes for %f sec)\n", - stop-start, start-last_start); - } - last_start = stop; } @@ -208,10 +199,9 @@ template datatuple * logtable::findTuple(int xid, const datatuple::key_t key, size_t keySize) { //prepare a search tuple - datatuple *search_tuple = datatuple::create(key, keySize); + datatuple *search_tuple = datatuple::create(key, keySize); - readlock(header_lock,0); - pthread_mutex_lock(mergedata->rbtree_mut); + pthread_mutex_lock(&header_mut); datatuple *ret_tuple=0; @@ -243,14 +233,13 @@ datatuple * logtable::findTuple(int xid, const datatuple::key_t key, size } else //key first found in old mem tree { - ret_tuple = tuple->create_copy(); + ret_tuple = tuple->create_copy(); } //we cannot free tuple from old-tree 'cos it is not a copy } } - //release the memtree lock - pthread_mutex_unlock(mergedata->rbtree_mut); + //TODO: Arange to only hold read latches while hitting disk. //step 3: check c1 if(!done) @@ -340,8 +329,7 @@ datatuple * logtable::findTuple(int xid, const datatuple::key_t key, size } } - //pthread_mutex_unlock(mergedata->rbtree_mut); - unlock(header_lock); + pthread_mutex_unlock(&header_mut); // XXX release this each time we could block on disk... datatuple::freetuple(search_tuple); return ret_tuple; @@ -357,7 +345,7 @@ datatuple * logtable::findTuple_first(int xid, datatuple::key_t key, size //prepare a search tuple datatuple * search_tuple = datatuple::create(key, keySize); - pthread_mutex_lock(mergedata->rbtree_mut); + pthread_mutex_lock(&header_mut); datatuple *ret_tuple=0; //step 1: look in tree_c0 @@ -413,7 +401,7 @@ datatuple * logtable::findTuple_first(int xid, datatuple::key_t key, size } } - pthread_mutex_unlock(mergedata->rbtree_mut); + pthread_mutex_unlock(&header_mut); datatuple::freetuple(search_tuple); return ret_tuple; @@ -424,9 +412,8 @@ template void logtable::insertTuple(datatuple *tuple) { //lock the red-black tree - readlock(header_lock,0); - pthread_mutex_lock(mergedata->rbtree_mut); c0_stats->read_tuple_from_small_component(tuple); + pthread_mutex_lock(&header_mut); //find the previous tuple with same key in the memtree if exists memTreeComponent::rbtree_t::iterator rbitr = tree_c0->find(tuple); if(rbitr != tree_c0->end()) @@ -459,19 +446,13 @@ void logtable::insertTuple(datatuple *tuple) //flushing logic if(tree_bytes >= max_c0_size ) { - DEBUG("tree size before merge %d tuples %lld bytes.\n", tsize, tree_bytes); - pthread_mutex_unlock(mergedata->rbtree_mut); - unlock(header_lock); - flushTable(); - - readlock(header_lock,0); - pthread_mutex_lock(mergedata->rbtree_mut); + DEBUG("tree size before merge %d tuples %lld bytes.\n", tsize, tree_bytes); + pthread_mutex_unlock(&header_mut); + flushTable(); + } else { + //unlock + pthread_mutex_unlock(&header_mut); } - - //unlock - pthread_mutex_unlock(mergedata->rbtree_mut); - unlock(header_lock); - DEBUG("tree size %d tuples %lld bytes.\n", tsize, tree_bytes); } @@ -491,7 +472,7 @@ void logtable::forgetIterator(iterator * it) { } template void logtable::bump_epoch() { - assert(!trywritelock(header_lock,0)); +// assert(!trywritelock(header_lock,0)); epoch++; for(unsigned int i = 0; i < its.size(); i++) { its[i]->invalidate(); diff --git a/logstore.h b/logstore.h index ff6986c..1badbbf 100644 --- a/logstore.h +++ b/logstore.h @@ -12,9 +12,10 @@ #include "tuplemerger.h" +#include "mergeManager.h" #include "mergeStats.h" -struct logtable_mergedata; +class logtable_mergedata; template class logtable { @@ -64,7 +65,11 @@ public: inline void set_tree_c1(diskTreeComponent *t){tree_c1=t; bump_epoch(); } inline void set_tree_c1_mergeable(diskTreeComponent *t){tree_c1_mergeable=t; bump_epoch(); } inline void set_tree_c2(diskTreeComponent *t){tree_c2=t; bump_epoch(); } - + pthread_cond_t c0_needed; + pthread_cond_t c0_ready; + pthread_cond_t c1_needed; + pthread_cond_t c1_ready; + inline memTreeComponent::rbtree_ptr_t get_tree_c0(){return tree_c0;} inline memTreeComponent::rbtree_ptr_t get_tree_c0_mergeable(){return tree_c0_mergeable;} void set_tree_c0(memTreeComponent::rbtree_ptr_t newtree){tree_c0 = newtree; bump_epoch(); } @@ -93,14 +98,15 @@ public: }; logtable_mergedata * mergedata; - rwl * header_lock; + pthread_mutex_t header_mut; int64_t max_c0_size; mergeManager * merge_mgr; inline bool is_still_running() { return still_running_; } inline void stop() { - still_running_ = false; - // XXX must need to do other things! + still_running_ = false; + flushTable(); + // XXX must need to do other things! } private: @@ -116,16 +122,17 @@ private: int tsize; //number of tuples int64_t tree_bytes; //number of bytes +public: //DATA PAGE SETTINGS pageid_t internal_region_size; // in number of pages pageid_t datapage_region_size; // " pageid_t datapage_size; // " - +private: tuplemerger *tmerger; std::vector its; - mergeManager::mergeStats * c0_stats; + mergeStats * c0_stats; bool still_running_; public: @@ -241,10 +248,10 @@ public: last_returned(NULL), key(NULL), valid(false) { - writelock(ltable->header_lock, 0); + pthread_mutex_lock(<able->header_mut); ltable->registerIterator(this); validate(); - unlock(ltable->header_lock); + pthread_mutex_unlock(<able->header_mut); } explicit iterator(logtable* ltable,TUPLE *key) @@ -255,18 +262,18 @@ public: key(key), valid(false) { - writelock(ltable->header_lock, 0); + pthread_mutex_lock(<able->header_mut); ltable->registerIterator(this); validate(); - unlock(ltable->header_lock); + pthread_mutex_unlock(<able->header_mut); } ~iterator() { - writelock(ltable->header_lock,0); + pthread_mutex_lock(<able->header_mut); ltable->forgetIterator(this); invalidate(); if(last_returned) TUPLE::freetuple(last_returned); - unlock(ltable->header_lock); + pthread_mutex_unlock(<able->header_mut); } private: TUPLE * getnextHelper() { @@ -280,24 +287,24 @@ public: } public: TUPLE * getnextIncludingTombstones() { - readlock(ltable->header_lock, 0); + pthread_mutex_lock(<able->header_mut); revalidate(); TUPLE * ret = getnextHelper(); - unlock(ltable->header_lock); + pthread_mutex_unlock(<able->header_mut); return ret ? ret->create_copy() : NULL; } TUPLE * getnext() { - readlock(ltable->header_lock, 0); + pthread_mutex_lock(<able->header_mut); revalidate(); TUPLE * ret; while((ret = getnextHelper()) && ret->isDelete()) { } // getNextHelper handles its own memory. - unlock(ltable->header_lock); + pthread_mutex_unlock(<able->header_mut); return ret ? ret->create_copy() : NULL; // XXX hate making copy! Caller should not manage our memory. } void invalidate() { - assert(!trywritelock(ltable->header_lock,0)); +// assert(!trywritelock(ltable->header_lock,0)); if(valid) { delete merge_it_; merge_it_ = NULL; @@ -354,7 +361,7 @@ public: t = NULL; } - c0_it = new typename memTreeComponent::revalidatingIterator(ltable->get_tree_c0(), ltable->getMergeData()->rbtree_mut, t); + c0_it = new typename memTreeComponent::revalidatingIterator(ltable->get_tree_c0(), NULL/*need something that is not <able->header_mut*/, t); c0_mergeable_it[0] = new typename memTreeComponent::iterator (ltable->get_tree_c0_mergeable(), t); disk_it[0] = ltable->get_tree_c1()->open_iterator(t); if(ltable->get_tree_c1_mergeable()) { diff --git a/memTreeComponent.h b/memTreeComponent.h index f7b3d45..ff63cb2 100644 --- a/memTreeComponent.h +++ b/memTreeComponent.h @@ -85,16 +85,16 @@ public: public: revalidatingIterator( rbtree_t *s, pthread_mutex_t * rb_mut ) : s_(s), mut_(rb_mut) { - pthread_mutex_lock(mut_); + if(mut_) pthread_mutex_lock(mut_); if(s_->begin() == s_->end()) { next_ret_ = NULL; } else { next_ret_ = (*s_->begin())->create_copy(); // the create_copy() calls have to happen before we release mut_... } - pthread_mutex_unlock(mut_); + if(mut_) pthread_mutex_unlock(mut_); } revalidatingIterator( rbtree_t *s, pthread_mutex_t * rb_mut, TUPLE *&key ) : s_(s), mut_(rb_mut) { - pthread_mutex_lock(mut_); + if(mut_) pthread_mutex_lock(mut_); if(key) { if(s_->find(key) != s_->end()) { next_ret_ = (*(s_->find(key)))->create_copy(); @@ -111,7 +111,7 @@ public: } } // DEBUG("changing mem next ret = %s key = %s\n", next_ret_ ? (const char*)next_ret_->key() : "NONE", key ? (const char*)key->key() : "NULL"); - pthread_mutex_unlock(mut_); + if(mut_) pthread_mutex_unlock(mut_); } ~revalidatingIterator() { @@ -119,7 +119,7 @@ public: } TUPLE* getnext() { - pthread_mutex_lock(mut_); + if(mut_) pthread_mutex_lock(mut_); TUPLE * ret = next_ret_; if(next_ret_) { if(s_->upper_bound(next_ret_) == s_->end()) { @@ -128,7 +128,7 @@ public: next_ret_ = (*s_->upper_bound(next_ret_))->create_copy(); } } - pthread_mutex_unlock(mut_); + if(mut_) pthread_mutex_unlock(mut_); return ret; } diff --git a/mergeManager.cpp b/mergeManager.cpp new file mode 100644 index 0000000..dfa2980 --- /dev/null +++ b/mergeManager.cpp @@ -0,0 +1,194 @@ +/* + * mergeManager.cpp + * + * Created on: May 19, 2010 + * Author: sears + */ + +#include "mergeManager.h" +#include "mergeStats.h" +#include "logstore.h" +#include "math.h" +mergeStats* mergeManager:: newMergeStats(int mergeLevel) { + if (mergeLevel == 0) { + return c0; + } else if (mergeLevel == 1) { + return c1; + } else if(mergeLevel == 2) { + return c2; + } else { + abort(); + } +} + +mergeManager::~mergeManager() { + pthread_mutex_destroy(&mut); + pthread_mutex_destroy(&throttle_mut); + pthread_mutex_destroy(&dummy_throttle_mut); + pthread_cond_destroy(&dummy_throttle_cond); + delete c0; + delete c1; + delete c2; +} + +void mergeManager::new_merge(mergeStats * s) { + pthread_mutex_lock(&mut); + if(s->merge_count) { + if(s->merge_level == 0) { + // queueSize was set during startup + } else if(s->merge_level == 1) { + c1_queueSize = c1_queueSize > s->bytes_out ? c1_queueSize : s->bytes_out; + } else if(s->merge_level == 2) { + c2_queueSize = s->bytes_in_small; + } else { abort(); } + pretty_print(stdout); + } + pthread_mutex_unlock(&mut); +} +void mergeManager::set_c0_size(int64_t size) { + c0_queueSize = size; +} +void mergeManager::tick(mergeStats * s, bool done) { + if(s->merge_level == 0) { + pthread_mutex_lock(&throttle_mut); + } + // throttle? + if(s->bytes_in_small_delta > c0_queueSize / 10000) { + if(s->merge_level == 0) { + struct timeval now; + gettimeofday(&now, 0); + double elapsed_delta = tv_to_double(&now) - ts_to_double(&last_throttle); + pageid_t bytes_written_delta = (s->bytes_in_small_delta - s->bytes_collapsed_delta); + double min_throughput = 0.0; // don't throttle below 100 kilobytes / sec + double max_throughput = 10.0 * 1024.0 * 1024.0; + double c0_badness = (double)((c0_totalConsumed + bytes_written_delta - c1_totalConsumed)- c0_queueSize) / ((double)c0_queueSize); + double raw_throughput = ((double)bytes_written_delta)/elapsed_delta; + if(raw_throughput > max_throughput || c0_badness > 0) { + //double target_throughput = min_throughput / (c0_badness); // * c0_badness * c0_badness); + double target_throughput; + if(c0_badness > 0) { + target_throughput = (max_throughput - min_throughput) * (1.0-sqrt(sqrt(c0_badness))) + min_throughput; + } else { + target_throughput = max_throughput; + } + double target_elapsed = ((double)bytes_written_delta)/target_throughput; + //printf("Worked %6.1f (target %6.1f)\n", elapsed_delta, target_elapsed); + if(target_elapsed > elapsed_delta) { + struct timespec sleep_until; + double_to_ts(&sleep_until, ts_to_double(&last_throttle) + target_elapsed); + //fprintf(stdout, "Throttling for %6.1f seconds\n", target_elapsed - (double)elapsed_delta); + last_throttle_seconds = target_elapsed - (double)elapsed_delta; + last_elapsed_seconds = target_elapsed; + throttle_seconds += last_throttle_seconds; + elapsed_seconds += last_elapsed_seconds; + pthread_mutex_lock(&dummy_throttle_mut); + pthread_cond_timedwait(&dummy_throttle_cond, &dummy_throttle_mut, &sleep_until); + pthread_mutex_unlock(&dummy_throttle_mut); + memcpy(&last_throttle, &sleep_until, sizeof(sleep_until)); + } else { + double_to_ts(&last_throttle, tv_to_double(&now)); + } + } else { + double_to_ts(&last_throttle, tv_to_double(&now)); + } + } + struct timeval now; + gettimeofday(&now,0); + unsigned long long elapsed = long_tv(now) - long_tv(s->last); + + pthread_mutex_lock(&mut); + if(s->merge_level == 0) { + c0_totalConsumed += s->bytes_in_small_delta; + c0_totalWorktime += elapsed; + c0_totalCollapsed += s->bytes_collapsed_delta; + } else if(s->merge_level == 1) { + c1_totalConsumed += s->bytes_in_small_delta; + c1_totalWorktime += elapsed; + c1_totalCollapsed += s->bytes_collapsed_delta; + } else if(s->merge_level == 2) { + c2_totalConsumed += s->bytes_in_small_delta; + c2_totalWorktime += elapsed; + c2_totalCollapsed += s->bytes_collapsed_delta; + } else { abort(); } + pthread_mutex_unlock(&mut); + + s->bytes_in_small_delta = 0; + s->bytes_collapsed_delta = 0; + + memcpy(&s->last, &now, sizeof(now)); + + pretty_print(stdout); + } + if(s->merge_level == 0) { + pthread_mutex_unlock(&throttle_mut); + } +} + +mergeManager::mergeManager(void *ltable): + ltable(ltable), + c0_queueSize(0), + c1_queueSize(0), + c2_queueSize(0), + c0_totalConsumed(0), + c0_totalCollapsed(0), + c0_totalWorktime(0), + c1_totalConsumed(0), + c1_totalCollapsed(0), + c1_totalWorktime(0), + c2_totalConsumed(0), + c2_totalCollapsed(0), + c2_totalWorktime(0), + c0(new mergeStats(this, 0)), + c1(new mergeStats(this, 1)), + c2(new mergeStats(this, 2)) { + pthread_mutex_init(&mut, 0); + pthread_mutex_init(&throttle_mut, 0); + pthread_mutex_init(&dummy_throttle_mut, 0); + pthread_cond_init(&dummy_throttle_cond, 0); + struct timeval tv; + gettimeofday(&tv, 0); + double_to_ts(&last_throttle, tv_to_double(&tv)); +} + +void mergeManager::pretty_print(FILE * out) { + pageid_t mb = 1024 * 1024; + logtable * lt = (logtable*)ltable; + bool have_c0 = false; + bool have_c0m = false; + bool have_c1 = false; + bool have_c1m = false; + bool have_c2 = false; + if(lt) { + pthread_mutex_lock(<->header_mut); + have_c0 = NULL != lt->get_tree_c0(); + have_c0m = NULL != lt->get_tree_c0_mergeable(); + have_c1 = NULL != lt->get_tree_c1(); + have_c1m = NULL != lt->get_tree_c1_mergeable() ; + have_c2 = NULL != lt->get_tree_c2(); + pthread_mutex_unlock(<->header_mut); + } + fprintf(out,"[%s] %s %s [%s] %s %s [%s] %s ", + c0->active ? "RUN" : "---", + have_c0 ? "C0" : "..", + have_c0m ? "C0'" : "...", + c1->active ? "RUN" : "---", + have_c1 ? "C1" : "..", + have_c1m ? "C1'" : "...", + c2->active ? "RUN" : "---", + have_c2 ? "C2" : ".."); + fprintf(out, "Throttle: %6.1f%% (cur) %6.1f%% (overall) ", 100.0*(last_throttle_seconds/(last_elapsed_seconds)), 100.0*(throttle_seconds/(elapsed_seconds))); + fprintf(out, "C0 size %4lld collapsed %4lld resident %4lld ", + 2*c0_queueSize/mb, + c0_totalCollapsed/mb, + (c0_totalConsumed - (c0_totalCollapsed + c1_totalConsumed))/mb); + fprintf(out, "C1 size %4lld collapsed %4lld resident %4lld ", + 2*c1_queueSize/mb, + c1_totalCollapsed/mb, + (c1_totalConsumed - (c1_totalCollapsed + c2_totalConsumed))/mb); + fprintf(out, "C2 size %4lld collapsed %4lld ", + 2*c2_queueSize/mb, c2_totalCollapsed/mb); + fprintf(out, "C1 MB/s (eff; active) %6.1f C2 MB/s %6.1f\r", + ((double)c1_totalConsumed)/((double)c1_totalWorktime), + ((double)c2_totalConsumed)/((double)c2_totalWorktime)); + fflush(out); +} diff --git a/mergeManager.h b/mergeManager.h new file mode 100644 index 0000000..dc46019 --- /dev/null +++ b/mergeManager.h @@ -0,0 +1,73 @@ +/* + * mergeManager.h + * + * Created on: May 19, 2010 + * Author: sears + */ + +#ifndef MERGEMANAGER_H_ +#define MERGEMANAGER_H_ + +#include +#undef try +#undef end +#include +#include + +class mergeStats; + +class mergeManager { +private: + double tv_to_double(struct timeval * tv) { + return (double)tv->tv_sec + ((double)tv->tv_usec)/1000000.0; + } + double ts_to_double(struct timespec * ts) { + return (double)ts->tv_sec + ((double)ts->tv_nsec)/1000000000.0; + } + void double_to_ts(struct timespec *ts, double time) { + ts->tv_sec = (time_t)(time); + ts->tv_nsec = (long)((time - (double)ts->tv_sec) * 1000000000.0); + } + uint64_t long_tv(struct timeval& tv) { + return (1000000ULL * (uint64_t)tv.tv_sec) + ((uint64_t)tv.tv_usec); + } +public: + mergeManager(void *ltable); + + ~mergeManager(); + + void new_merge(mergeStats * s); + void set_c0_size(int64_t size); + void tick(mergeStats * s, bool done = false); + void pretty_print(FILE * out); + mergeStats* newMergeStats(int mergeLevel); + +private: + pthread_mutex_t mut; + void* ltable; + pageid_t c0_queueSize; + pageid_t c1_queueSize; // How many bytes must c0-c1 consume before trying to swap over to an empty c1? ( = current target c1 size) + pageid_t c2_queueSize; // How many bytes must c1-c2 consume before there is room for a new empty c1? ( = previous c1 size) + pageid_t c0_totalConsumed; + pageid_t c0_totalCollapsed; + pageid_t c0_totalWorktime; + pageid_t c1_totalConsumed; // What is long-term effective throughput of merger #1? (Excluding blocked times) + pageid_t c1_totalCollapsed; + pageid_t c1_totalWorktime; + pageid_t c2_totalConsumed; // What is long term effective throughput of merger #2? (Excluding blocked times) + pageid_t c2_totalCollapsed; + pageid_t c2_totalWorktime; + double throttle_seconds; + double elapsed_seconds; + double last_throttle_seconds; + double last_elapsed_seconds; + mergeStats * c0; + mergeStats * c1; + mergeStats * c2; + struct timespec last_throttle; + pthread_mutex_t throttle_mut; + pthread_mutex_t dummy_throttle_mut; + pthread_cond_t dummy_throttle_cond; + +}; +#endif /* MERGEMANAGER_H_ */ diff --git a/mergeStats.cpp b/mergeStats.cpp new file mode 100644 index 0000000..00d31ba --- /dev/null +++ b/mergeStats.cpp @@ -0,0 +1,12 @@ +/* + * mergeStats.cpp + * + * Created on: May 18, 2010 + * Author: sears + */ + +#include "mergeStats.h" +#include "logstore.h" +#include "datatuple.h" + + diff --git a/mergeStats.h b/mergeStats.h index 6b1457e..5f71143 100644 --- a/mergeStats.h +++ b/mergeStats.h @@ -9,152 +9,16 @@ #define MERGESTATS_H_ #include +#undef try +#undef end -class mergeManager { -private: - double tv_to_double(struct timeval * tv) { - return (double)tv->tv_sec + ((double)tv->tv_usec)/1000000.0; - } - double ts_to_double(struct timespec * ts) { - return (double)ts->tv_sec + ((double)ts->tv_nsec)/1000000000.0; - } - void double_to_ts(struct timespec *ts, double time) { - ts->tv_sec = (time_t)(time); - ts->tv_nsec = (long)((time - (double)ts->tv_sec) * 1000000000.0); - } -public: - mergeManager() : - c0_queueSize(0), - c1_queueSize(0), - c2_queueSize(0), - c0_totalConsumed(0), - c0_totalCollapsed(0), - c0_totalWorktime(0), - c1_totalConsumed(0), - c1_totalCollapsed(0), - c1_totalWorktime(0), - c2_totalConsumed(0), - c2_totalCollapsed(0), - c2_totalWorktime(0), - c0(new mergeStats(this, 0)), - c1(new mergeStats(this, 1)), - c2(new mergeStats(this, 2)) { - pthread_mutex_init(&mut, 0); - pthread_mutex_init(&throttle_mut, 0); - pthread_mutex_init(&dummy_throttle_mut, 0); - pthread_cond_init(&dummy_throttle_cond, 0); - struct timeval tv; - gettimeofday(&tv, 0); - double_to_ts(&last_throttle, tv_to_double(&tv)); +#include +#include +#include "datatuple.h" +#include "datapage.h" +#include "mergeManager.h" - } - ~mergeManager() { - pthread_mutex_destroy(&mut); - } - class mergeStats; - - void new_merge(mergeStats * s) { - pthread_mutex_lock(&mut); - if(s->merge_count) { - if(s->merge_level == 0) { - // queueSize was set during startup - } else if(s->merge_level == 1) { - c1_queueSize = c1_queueSize > s->bytes_out ? c1_queueSize : s->bytes_out; - } else if(s->merge_level == 2) { - c2_queueSize = s->bytes_in_small; - } else { abort(); } - pretty_print(stdout); - } - pthread_mutex_unlock(&mut); - } - void set_c0_size(int64_t size) { - c0_queueSize = size; - } - void tick(mergeStats * s, bool done = false) { - if(s->merge_level == 0) { - pthread_mutex_lock(&throttle_mut); - // throttle? - if(s->bytes_in_small_delta > c0_queueSize / 100) { - struct timeval now; - gettimeofday(&now, 0); - double elapsed_delta = tv_to_double(&now) - ts_to_double(&last_throttle); - pageid_t bytes_written_delta = (s->bytes_in_small_delta - s->bytes_collapsed_delta); - double min_throughput = 100 * 1024; // don't throttle below 100 kilobytes / sec - double c0_badness = (double)((c0_totalConsumed + bytes_written_delta - c1_totalConsumed) - c0_queueSize)/ (double)c0_queueSize; - if(c0_badness > 0) { - double target_throughput = min_throughput / (c0_badness * c0_badness); - //double raw_throughput = ((double)bytes_written_delta)/elapsed_delta; - double target_elapsed = ((double)bytes_written_delta)/target_throughput; - printf("Worked %6.1f (target %6.1f)\n", elapsed_delta, target_elapsed); - if(target_elapsed > elapsed_delta) { - struct timespec sleep_until; - double_to_ts(&sleep_until, ts_to_double(&last_throttle) + target_elapsed); - fprintf(stdout, "Throttling for %6.1f seconds\n", target_elapsed - (double)elapsed_delta); - pthread_mutex_lock(&dummy_throttle_mut); - pthread_cond_timedwait(&dummy_throttle_cond, &dummy_throttle_mut, &sleep_until); - pthread_mutex_unlock(&dummy_throttle_mut); - memcpy(&last_throttle, &sleep_until, sizeof(sleep_until)); - } else { - double_to_ts(&last_throttle, tv_to_double(&now)); - } - } else { - printf("badness is negative\n"); - double_to_ts(&last_throttle, tv_to_double(&now)); - } - } - } - if(done || s->bytes_in_small_delta > c0_queueSize / 100) { - struct timeval now; - gettimeofday(&now,0); - unsigned long long elapsed = long_tv(now) - long_tv(s->last); - - pthread_mutex_lock(&mut); - if(s->merge_level == 0) { - c0_totalConsumed += s->bytes_in_small_delta; - c0_totalWorktime += elapsed; - c0_totalCollapsed += s->bytes_collapsed_delta; - } else if(s->merge_level == 1) { - c1_totalConsumed += s->bytes_in_small_delta; - c1_totalWorktime += elapsed; - c1_totalCollapsed += s->bytes_collapsed_delta; - } else if(s->merge_level == 2) { - c2_totalConsumed += s->bytes_in_small_delta; - c2_totalWorktime += elapsed; - c2_totalCollapsed += s->bytes_collapsed_delta; - } else { abort(); } - pthread_mutex_unlock(&mut); - - s->bytes_in_small_delta = 0; - s->bytes_collapsed_delta = 0; - - memcpy(&s->last, &now, sizeof(now)); - pretty_print(stdout); - } - if(s->merge_level == 0) { - pthread_mutex_unlock(&throttle_mut); - } - } - uint64_t long_tv(struct timeval& tv) { - return (1000000ULL * (uint64_t)tv.tv_sec) + ((uint64_t)tv.tv_usec); - } - void pretty_print(FILE * out) { - pageid_t mb = 1024 * 1024; - fprintf(out,"%s %s %s ", c0->active ? "RUN" : "---", c1->active ? "RUN" : "---", c2->active ? "RUN" : "---"); - fprintf(out, "C0 size %lld collapsed %lld resident %lld ", - 2*c0_queueSize/mb, - c0_totalCollapsed/mb, - (c0_totalConsumed - (c0_totalCollapsed + c1_totalConsumed))/mb); - fprintf(out, "C1 size %lld collapsed %lld resident %lld ", - 2*c1_queueSize/mb, - c1_totalCollapsed/mb, - (c1_totalConsumed - (c1_totalCollapsed + c2_totalConsumed))/mb); - fprintf(out, "C2 size %lld collapsed %lld ", - 2*c2_queueSize/mb, c2_totalCollapsed/mb); - fprintf(out, "C1 MB/s (eff; active) %6.1f C2 MB/s %6.1f\n", - ((double)c1_totalConsumed)/((double)c1_totalWorktime), - ((double)c2_totalConsumed)/((double)c2_totalWorktime)); - } - class mergeStats { +class mergeStats { public: mergeStats(mergeManager* merge_mgr, int merge_level) : merge_mgr(merge_mgr), @@ -299,40 +163,4 @@ public: } }; - - mergeStats* newMergeStats(int mergeLevel) { - if (mergeLevel == 0) { - return c0; - } else if (mergeLevel == 1) { - return c1; - } else if(mergeLevel == 2) { - return c2; - } else { - abort(); - } - } - -private: - pthread_mutex_t mut; - pageid_t c0_queueSize; - pageid_t c1_queueSize; // How many bytes must c0-c1 consume before trying to swap over to an empty c1? ( = current target c1 size) - pageid_t c2_queueSize; // How many bytes must c1-c2 consume before there is room for a new empty c1? ( = previous c1 size) - pageid_t c0_totalConsumed; - pageid_t c0_totalCollapsed; - pageid_t c0_totalWorktime; - pageid_t c1_totalConsumed; // What is long-term effective throughput of merger #1? (Excluding blocked times) - pageid_t c1_totalCollapsed; - pageid_t c1_totalWorktime; - pageid_t c2_totalConsumed; // What is long term effective throughput of merger #2? (Excluding blocked times) - pageid_t c2_totalCollapsed; - pageid_t c2_totalWorktime; - mergeStats * c0; - mergeStats * c1; - mergeStats * c2; - struct timespec last_throttle; - pthread_mutex_t throttle_mut; - pthread_mutex_t dummy_throttle_mut; - pthread_cond_t dummy_throttle_cond; - -}; #endif /* MERGESTATS_H_ */ diff --git a/merger.cpp b/merger.cpp index 1e62194..794a4eb 100644 --- a/merger.cpp +++ b/merger.cpp @@ -13,19 +13,7 @@ int merge_scheduler::addlogtable(logtable *ltable) struct logtable_mergedata * mdata = new logtable_mergedata; // initialize merge data - mdata->rbtree_mut = new pthread_mutex_t; - pthread_mutex_init(mdata->rbtree_mut,0); ltable->set_tree_c0_mergeable(NULL); - - mdata->input_needed = new bool(false); - - mdata->input_ready_cond = new pthread_cond_t; - pthread_cond_init(mdata->input_ready_cond,0); - - mdata->input_needed_cond = new pthread_cond_t; - pthread_cond_init(mdata->input_needed_cond,0); - - mdata->input_size = new int64_t(100); mdata->diskmerge_args = new merger_args; mdata->memmerge_args = new merger_args; @@ -42,27 +30,6 @@ merge_scheduler::~merge_scheduler() logtable *ltable = mergedata[i].first; logtable_mergedata *mdata = mergedata[i].second; - //delete the mergedata fields - delete mdata->rbtree_mut; - delete mdata->input_needed; - delete mdata->input_ready_cond; - delete mdata->input_needed_cond; - delete mdata->input_size; - - //delete the merge thread structure variables - pthread_cond_destroy(mdata->diskmerge_args->in_block_needed_cond); - delete mdata->diskmerge_args->in_block_needed_cond; - delete mdata->diskmerge_args->in_block_needed; - - pthread_cond_destroy(mdata->diskmerge_args->out_block_needed_cond); - delete mdata->diskmerge_args->out_block_needed_cond; - delete mdata->diskmerge_args->out_block_needed; - - pthread_cond_destroy(mdata->diskmerge_args->in_block_ready_cond); - delete mdata->diskmerge_args->in_block_ready_cond; - pthread_cond_destroy(mdata->diskmerge_args->out_block_ready_cond); - delete mdata->diskmerge_args->out_block_ready_cond; - delete mdata->diskmerge_args; delete mdata->memmerge_args; } @@ -76,18 +43,8 @@ void merge_scheduler::shutdown() for(size_t i=0; i *ltable = mergedata[i].first; - logtable_mergedata *mdata = mergedata[i].second; - //flush the in memory table to write any tuples still in memory - ltable->flushTable(); - - pthread_mutex_lock(mdata->rbtree_mut); ltable->stop(); - pthread_cond_signal(mdata->input_ready_cond); - - //*(mdata->diskmerge_args->still_open)=false;//same pointer so no need - - pthread_mutex_unlock(mdata->rbtree_mut); } @@ -107,25 +64,8 @@ void merge_scheduler::startlogtable(int index, int64_t MAX_C0_SIZE) logtable * ltable = mergedata[index].first; struct logtable_mergedata *mdata = mergedata[index].second; - - pthread_cond_t * block1_needed_cond = new pthread_cond_t; - pthread_cond_init(block1_needed_cond,0); - pthread_cond_t * block2_needed_cond = new pthread_cond_t; - pthread_cond_init(block2_needed_cond,0); - - pthread_cond_t * block1_ready_cond = new pthread_cond_t; - pthread_cond_init(block1_ready_cond,0); - pthread_cond_t * block2_ready_cond = new pthread_cond_t; - pthread_cond_init(block2_ready_cond,0); - - bool *block1_needed = new bool(false); - bool *block2_needed = new bool(false); - //wait to merge the next block until we have merged block FUDGE times. - static const int FUDGE = 1; static double R = MIN_R; - int64_t * block1_size = new int64_t; - *block1_size = FUDGE * ((int)R) * (*(mdata->input_size)); //initialize rb-tree ltable->set_tree_c0(new memTreeComponent::rbtree_t); @@ -142,17 +82,6 @@ void merge_scheduler::startlogtable(int index, int64_t MAX_C0_SIZE) struct merger_args diskmerge_args= { ltable, - 1, //worker id - mdata->rbtree_mut, //block_ready_mutex - block1_needed_cond, //in_block_needed_cond - block1_needed, //in_block_needed - block2_needed_cond, //out_block_needed_cond - block2_needed, //out_block_needed - block1_ready_cond, //in_block_ready_cond - block2_ready_cond, //out_block_ready_cond - mdata->internal_region_size, - mdata->datapage_region_size, - mdata->datapage_size, 0, //max_tree_size No max size for biggest component &R, //r_i }; @@ -162,17 +91,6 @@ void merge_scheduler::startlogtable(int index, int64_t MAX_C0_SIZE) struct merger_args memmerge_args = { ltable, - 2, - mdata->rbtree_mut, - mdata->input_needed_cond, - mdata->input_needed, - block1_needed_cond, - block1_needed, - mdata->input_ready_cond, - block1_ready_cond, - mdata->internal_region_size, // TODO different region / datapage sizes for C1? - mdata->datapage_region_size, - mdata->datapage_size, (int64_t)(R * R * MAX_C0_SIZE), &R, }; @@ -188,12 +106,12 @@ void merge_scheduler::startlogtable(int index, int64_t MAX_C0_SIZE) } template -void merge_iterators(int xid, +void merge_iterators(int xid, diskTreeComponent * forceMe, ITA *itrA, ITB *itrB, logtable *ltable, diskTreeComponent *scratch_tree, - mergeManager::mergeStats *stats, + mergeStats *stats, bool dropDeletes); @@ -229,45 +147,35 @@ void* memMergeThread(void*arg) assert(ltable->get_tree_c1()); int merge_count =0; - mergeManager::mergeStats * stats = a->ltable->merge_mgr->newMergeStats(1); + mergeStats * stats = a->ltable->merge_mgr->newMergeStats(1); while(true) // 1 { - writelock(ltable->header_lock,0); stats->new_merge(); + pthread_mutex_lock(<able->header_mut); int done = 0; // 2: wait for c0_mergable while(!ltable->get_tree_c0_mergeable()) { - pthread_mutex_lock(a->block_ready_mut); - *a->in_block_needed = true; - //pthread_cond_signal(a->in_block_needed_cond); - pthread_cond_broadcast(a->in_block_needed_cond); + pthread_cond_signal(<able->c0_needed); if(!ltable->is_still_running()){ done = 1; - pthread_mutex_unlock(a->block_ready_mut); break; } DEBUG("mmt:\twaiting for block ready cond\n"); - unlock(ltable->header_lock); - pthread_cond_wait(a->in_block_ready_cond, a->block_ready_mut); - pthread_mutex_unlock(a->block_ready_mut); - - writelock(ltable->header_lock,0); + pthread_cond_wait(<able->c0_ready, <able->header_mut); + DEBUG("mmt:\tblock ready\n"); } - *a->in_block_needed = false; if(done==1) { - pthread_mutex_lock(a->block_ready_mut); - pthread_cond_signal(a->out_block_ready_cond); // no block is ready. this allows the other thread to wake up, and see that we're shutting down. - pthread_mutex_unlock(a->block_ready_mut); - unlock(ltable->header_lock); + pthread_cond_signal(<able->c1_ready); // no block is ready. this allows the other thread to wake up, and see that we're shutting down. + pthread_mutex_unlock(<able->header_mut); break; } @@ -285,15 +193,14 @@ void* memMergeThread(void*arg) //create a new tree - diskTreeComponent * c1_prime = new diskTreeComponent(xid, a->internal_region_size, a->datapage_region_size, a->datapage_size, stats); + diskTreeComponent * c1_prime = new diskTreeComponent(xid, ltable->internal_region_size, ltable->datapage_region_size, ltable->datapage_size, stats); - //pthread_mutex_unlock(a->block_ready_mut); - unlock(ltable->header_lock); + pthread_mutex_unlock(<able->header_mut); //: do the merge DEBUG("mmt:\tMerging:\n"); - merge_iterators(xid, itrA, itrB, ltable, c1_prime, stats, false); + merge_iterators(xid, c1_prime, itrA, itrB, ltable, c1_prime, stats, false); delete itrA; delete itrB; @@ -306,8 +213,27 @@ void* memMergeThread(void*arg) merge_count++; DEBUG("mmt:\tmerge_count %lld #bytes written %lld\n", stats.merge_count, stats.output_size()); - writelock(ltable->header_lock,0); + pthread_mutex_lock(<able->header_mut); + // Immediately clean out c0 mergeable so that writers may continue. + + // first, we need to move the c1' into c1. + + // 12: delete old c1 + ltable->get_tree_c1()->dealloc(xid); + delete ltable->get_tree_c1(); + + // 10: c1 = c1' + ltable->set_tree_c1(c1_prime); + + // 11.5: delete old c0_mergeable + memTreeComponent::tearDownTree(ltable->get_tree_c0_mergeable()); + // 11: c0_mergeable = NULL + ltable->set_tree_c0_mergeable(NULL); + pthread_cond_signal(<able->c0_needed); + + ltable->update_persistent_header(xid); + Tcommit(xid); //TODO: this is simplistic for now //6: if c1' is too big, signal the other merger @@ -322,50 +248,35 @@ void* memMergeThread(void*arg) DEBUG("mmt:\tnew_c1_size %.2f\tMAX_C0_SIZE %lld\ta->max_size %lld\t targetr %.2f \n", new_c1_size, ltable->max_c0_size, a->max_size, target_R); - // XXX need to report backpressure here! Also, shouldn't be inside a transaction while waiting on backpressure. We could break this into two transactions; replace c1 with the new c1, then wait for backpressure, then move c1 into c1_mergeable, and zerou out c1 + // XXX need to report backpressure here! while(ltable->get_tree_c1_mergeable()) { - pthread_mutex_lock(a->block_ready_mut); - unlock(ltable->header_lock); - - pthread_cond_wait(a->out_block_needed_cond, a->block_ready_mut); - pthread_mutex_unlock(a->block_ready_mut); - writelock(ltable->header_lock,0); + pthread_cond_wait(<able->c1_needed, <able->header_mut); } - } - // 12: delete old c1 - ltable->get_tree_c1()->dealloc(xid); - delete ltable->get_tree_c1(); + xid = Tbegin(); - // 11.5: delete old c0_mergeable - memTreeComponent::tearDownTree(ltable->get_tree_c0_mergeable()); - // 11: c0_mergeable = NULL - ltable->set_tree_c0_mergeable(NULL); - - if( signal_c2 ) { + // we just set c1 = c1'. Want to move c1 -> c1 mergeable, clean out c1. // 7: and perhaps c1_mergeable - ltable->set_tree_c1_mergeable(c1_prime); + ltable->set_tree_c1_mergeable(c1_prime); // c1_prime == c1. // 8: c1 = new empty. - ltable->set_tree_c1(new diskTreeComponent(xid, a->internal_region_size, a->datapage_region_size, a->datapage_size, stats)); + ltable->set_tree_c1(new diskTreeComponent(xid, ltable->internal_region_size, ltable->datapage_region_size, ltable->datapage_size, stats)); - pthread_cond_signal(a->out_block_ready_cond); + pthread_cond_signal(<able->c1_ready); + + ltable->update_persistent_header(xid); + Tcommit(xid); - } else { - // 10: c1 = c1' - ltable->set_tree_c1(c1_prime); } - DEBUG("mmt:\tUpdated C1's position on disk to %lld\n",ltable->get_tree_c1()->get_root_rec().page); +// DEBUG("mmt:\tUpdated C1's position on disk to %lld\n",ltable->get_tree_c1()->get_root_rec().page); // 13 - ltable->update_persistent_header(xid); - Tcommit(xid); - unlock(ltable->header_lock); + pthread_mutex_unlock(<able->header_mut); stats->finished_merge(); - stats->pretty_print(stdout); +// stats->pretty_print(stdout); //TODO: get the freeing outside of the lock } @@ -386,42 +297,34 @@ void *diskMergeThread(void*arg) int merge_count =0; - mergeManager::mergeStats * stats = a->ltable->merge_mgr->newMergeStats(2); + mergeStats * stats = a->ltable->merge_mgr->newMergeStats(2); while(true) { // 2: wait for input - writelock(ltable->header_lock,0); stats->new_merge(); + pthread_mutex_lock(<able->header_mut); int done = 0; // get a new input for merge while(!ltable->get_tree_c1_mergeable()) { - pthread_mutex_lock(a->block_ready_mut); - *a->in_block_needed = true; - pthread_cond_signal(a->in_block_needed_cond); + pthread_cond_signal(<able->c1_needed); if(!ltable->is_still_running()){ done = 1; - pthread_mutex_unlock(a->block_ready_mut); break; } DEBUG("dmt:\twaiting for block ready cond\n"); - unlock(ltable->header_lock); - pthread_cond_wait(a->in_block_ready_cond, a->block_ready_mut); - pthread_mutex_unlock(a->block_ready_mut); + pthread_cond_wait(<able->c1_ready, <able->header_mut); DEBUG("dmt:\tblock ready\n"); - writelock(ltable->header_lock,0); } - *a->in_block_needed = false; if(done==1) { - pthread_cond_signal(a->out_block_ready_cond); - unlock(ltable->header_lock); + pthread_mutex_unlock(<able->header_mut); break; } @@ -436,14 +339,14 @@ void *diskMergeThread(void*arg) diskTreeComponent::iterator *itrB = ltable->get_tree_c1_mergeable()->open_iterator(); //create a new tree - diskTreeComponent * c2_prime = new diskTreeComponent(xid, a->internal_region_size, a->datapage_region_size, a->datapage_size, stats); + diskTreeComponent * c2_prime = new diskTreeComponent(xid, ltable->internal_region_size, ltable->datapage_region_size, ltable->datapage_size, stats); - unlock(ltable->header_lock); + pthread_mutex_unlock(<able->header_mut); //do the merge DEBUG("dmt:\tMerging:\n"); - merge_iterators(xid, itrA, itrB, ltable, c2_prime, stats, true); + merge_iterators(xid, NULL, itrA, itrB, ltable, c2_prime, stats, true); delete itrA; delete itrB; @@ -453,7 +356,7 @@ void *diskMergeThread(void*arg) // (skip 6, 7, 8, 8.5, 9)) - writelock(ltable->header_lock,0); + pthread_mutex_lock(<able->header_mut); //12 ltable->get_tree_c2()->dealloc(xid); delete ltable->get_tree_c2(); @@ -479,11 +382,11 @@ void *diskMergeThread(void*arg) // 13 ltable->update_persistent_header(xid); Tcommit(xid); - - unlock(ltable->header_lock); + + pthread_mutex_unlock(<able->header_mut); stats->finished_merge(); - stats->pretty_print(stdout); +// stats->pretty_print(stdout); } return 0; @@ -491,10 +394,11 @@ void *diskMergeThread(void*arg) template void merge_iterators(int xid, + diskTreeComponent * forceMe, ITA *itrA, //iterator on c1 or c2 ITB *itrB, //iterator on c0 or c1, respectively logtable *ltable, - diskTreeComponent *scratch_tree, mergeManager::mergeStats *stats, + diskTreeComponent *scratch_tree, mergeStats *stats, bool dropDeletes // should be true iff this is biggest component ) { @@ -502,6 +406,8 @@ void merge_iterators(int xid, stats->read_tuple_from_large_component(t1); datatuple *t2 = 0; + int i = 0; + while( (t2=itrB->next_callerFrees()) != 0) { stats->read_tuple_from_small_component(t2); @@ -513,6 +419,7 @@ void merge_iterators(int xid, { //insert t1 scratch_tree->insertTuple(xid, t1); + i+=t1->byte_length(); if(i > 1000000) { if(forceMe) forceMe->force(xid); i = 0; } stats->wrote_tuple(t1); datatuple::freetuple(t1); //advance itrA @@ -543,12 +450,15 @@ void merge_iterators(int xid, // cannot free any tuples here; they may still be read through a lookup } + i+= t2->byte_length(); if(i > 1000000) { if(forceMe) forceMe->force(xid); i = 0; } stats->wrote_tuple(t2); + datatuple::freetuple(t2); } while(t1 != 0) {// t1 is less than t2 scratch_tree->insertTuple(xid, t1); + i += t1->byte_length(); if(i > 1000000) { if(forceMe) forceMe->force(xid); i = 0; } stats->wrote_tuple(t1); datatuple::freetuple(t1); diff --git a/merger.h b/merger.h index b27c08c..9a85543 100644 --- a/merger.h +++ b/merger.h @@ -15,46 +15,16 @@ static const double MIN_R = 3.0; struct merger_args { logtable * ltable; - int worker_id; - - pthread_mutex_t * block_ready_mut; - - pthread_cond_t * in_block_needed_cond; - bool * in_block_needed; - - pthread_cond_t * out_block_needed_cond; - bool * out_block_needed; - - pthread_cond_t * in_block_ready_cond; - pthread_cond_t * out_block_ready_cond; - - pageid_t internal_region_size; - pageid_t datapage_region_size; - pageid_t datapage_size; - int64_t max_size; double * r_i; }; - struct logtable_mergedata { //merge threads pthread_t diskmerge_thread; pthread_t memmerge_thread; - pthread_mutex_t * rbtree_mut; - - bool *input_needed; // memmerge-input needed - - pthread_cond_t * input_ready_cond; - pthread_cond_t * input_needed_cond; - int64_t * input_size; - - pageid_t internal_region_size; - pageid_t datapage_region_size; - pageid_t datapage_size; - //merge args 1 struct merger_args *diskmerge_args; //merge args 2 @@ -68,7 +38,7 @@ class merge_scheduler public: ~merge_scheduler(); - + int addlogtable(logtable * ltable); void startlogtable(int index, int64_t MAX_C0_SIZE = 100*1024*1024); diff --git a/server.cpp b/server.cpp index 55bd95d..13e75b3 100644 --- a/server.cpp +++ b/server.cpp @@ -72,8 +72,6 @@ int main(int argc, char *argv[]) Tcommit(xid); - writelock(ltable.header_lock,0); - int lindex = mscheduler->addlogtable(<able); ltable.setMergeData(mscheduler->getMergeData(lindex)); @@ -93,8 +91,6 @@ int main(int argc, char *argv[]) mscheduler->startlogtable(lindex, c0_size); - unlock(ltable.header_lock); - lserver = new logserver(100, 32432); lserver->startserver(<able); diff --git a/test/check_logtable.cpp b/test/check_logtable.cpp index ff55c9e..50f8588 100644 --- a/test/check_logtable.cpp +++ b/test/check_logtable.cpp @@ -34,8 +34,8 @@ void insertProbeIter(size_t NUM_ENTRIES) xid = Tbegin(); - mergeManager merge_mgr; - mergeManager::mergeStats * stats = merge_mgr.newMergeStats(1); + mergeManager merge_mgr(0); + mergeStats * stats = merge_mgr.newMergeStats(1); diskTreeComponent *ltable_c1 = new diskTreeComponent(xid, 1000, 10000, 5, stats); diff --git a/test/check_merge.cpp b/test/check_merge.cpp index a3904e2..26e8d1a 100644 --- a/test/check_merge.cpp +++ b/test/check_merge.cpp @@ -53,12 +53,11 @@ void insertProbeIter(size_t NUM_ENTRIES) Tcommit(xid); - writelock(ltable.header_lock,0); int lindex = mscheduler.addlogtable(<able); ltable.setMergeData(mscheduler.getMergeData(lindex)); mscheduler.startlogtable(lindex, 10 * 1024 * 1024); - unlock(ltable.header_lock); + printf("Stage 1: Writing %d keys\n", NUM_ENTRIES); struct timeval start_tv, stop_tv, ti_st, ti_end; diff --git a/test/check_mergelarge.cpp b/test/check_mergelarge.cpp index e17c57f..250c6c8 100644 --- a/test/check_mergelarge.cpp +++ b/test/check_mergelarge.cpp @@ -52,12 +52,10 @@ void insertProbeIter(size_t NUM_ENTRIES) Tcommit(xid); - writelock(ltable.header_lock,0); int lindex = mscheduler.addlogtable(<able); ltable.setMergeData(mscheduler.getMergeData(lindex)); mscheduler.startlogtable(lindex, 10 * 1024 * 1024); - unlock(ltable.header_lock); printf("Stage 1: Writing %d keys\n", NUM_ENTRIES); diff --git a/test/check_mergetuple.cpp b/test/check_mergetuple.cpp index 3e35080..5d71e11 100644 --- a/test/check_mergetuple.cpp +++ b/test/check_mergetuple.cpp @@ -107,12 +107,10 @@ void insertProbeIter(size_t NUM_ENTRIES) recordid table_root = ltable.allocTable(xid); Tcommit(xid); - writelock(ltable.header_lock,0); int lindex = mscheduler.addlogtable(<able); ltable.setMergeData(mscheduler.getMergeData(lindex)); mscheduler.startlogtable(lindex, 10 * 1024 * 1024); - unlock(ltable.header_lock); printf("Stage 1: Writing %d keys\n", NUM_ENTRIES);