Rewrite merge thread synchronization code, update status messages, implement preliminary (and ineffective) admission control, and force write merged data every megabyte (so that prograess can be tracked by admission control. Also, refactor quite a few things.

git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@805 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
sears 2010-05-19 23:42:06 +00:00
parent 1adb980555
commit 2a1157602a
18 changed files with 455 additions and 489 deletions

View file

@ -76,7 +76,7 @@ ENDIF ( "${CMAKE_C_COMPILER_ID}" STREQUAL "GNU" )
#CONFIGURE_FILE(${CMAKE_CURRENT_SOURCE_DIR}/config.h.cmake ${CMAKE_CURRENT_BINARY_DIR}/config.h) #CONFIGURE_FILE(${CMAKE_CURRENT_SOURCE_DIR}/config.h.cmake ${CMAKE_CURRENT_BINARY_DIR}/config.h)
IF ( HAVE_STASIS ) IF ( HAVE_STASIS )
ADD_LIBRARY(logstore logserver.cpp logstore.cpp diskTreeComponent.cpp memTreeComponent.cpp datapage.cpp merger.cpp tuplemerger.cpp) ADD_LIBRARY(logstore logserver.cpp logstore.cpp diskTreeComponent.cpp memTreeComponent.cpp datapage.cpp merger.cpp tuplemerger.cpp mergeStats.cpp mergeManager.cpp)
CREATE_EXECUTABLE(server) CREATE_EXECUTABLE(server)
ENDIF ( HAVE_STASIS ) ENDIF ( HAVE_STASIS )
ADD_LIBRARY(logstore_client tcpclient.cpp) ADD_LIBRARY(logstore_client tcpclient.cpp)

View file

@ -14,6 +14,7 @@
#include "diskTreeComponent.h" #include "diskTreeComponent.h"
#include "regionAllocator.h" #include "regionAllocator.h"
#include "mergeStats.h"
#include <stasis/transactional.h> #include <stasis/transactional.h>
#include <stasis/page.h> #include <stasis/page.h>
#include <stasis/page/slotted.h> #include <stasis/page/slotted.h>
@ -54,7 +55,7 @@ void diskTreeComponent::list_regions(int xid, pageid_t *internal_node_region_len
void diskTreeComponent::writes_done() { void diskTreeComponent::writes_done() {
if(dp) { if(dp) {
stats->wrote_datapage(dp); ((mergeStats*)stats)->wrote_datapage(dp);
dp->writes_done(); dp->writes_done();
delete dp; delete dp;
dp = 0; dp = 0;
@ -69,7 +70,7 @@ int diskTreeComponent::insertTuple(int xid, datatuple *t)
// stats->num_datapages_out++; // stats->num_datapages_out++;
} else if(!dp->append(t)) { } else if(!dp->append(t)) {
// stats->bytes_out += (PAGE_SIZE * dp->get_page_count()); // stats->bytes_out += (PAGE_SIZE * dp->get_page_count());
stats->wrote_datapage(dp); ((mergeStats*)stats)->wrote_datapage(dp);
dp->writes_done(); dp->writes_done();
delete dp; delete dp;
dp = insertDataPage(xid, t); dp = insertDataPage(xid, t);
@ -91,7 +92,7 @@ DataPage<datatuple>* diskTreeComponent::insertDataPage(int xid, datatuple *tuple
if(!dp->append(tuple)) if(!dp->append(tuple))
{ {
// the last datapage must have not wanted the tuple, and then this datapage figured out the region is full. // the last datapage must have not wanted the tuple, and then this datapage figured out the region is full.
stats->wrote_datapage(dp); ((mergeStats*)stats)->wrote_datapage(dp);
dp->writes_done(); dp->writes_done();
delete dp; delete dp;
dp = 0; dp = 0;

View file

@ -11,14 +11,13 @@
#include "datapage.h" #include "datapage.h"
#include "datatuple.h" #include "datatuple.h"
#include "mergeStats.h" #include "mergeStats.h"
class diskTreeComponent { class diskTreeComponent {
public: public:
class internalNodes; class internalNodes;
class iterator; class iterator;
diskTreeComponent(int xid, pageid_t internal_region_size, pageid_t datapage_region_size, pageid_t datapage_size, diskTreeComponent(int xid, pageid_t internal_region_size, pageid_t datapage_region_size, pageid_t datapage_size,
mergeManager::mergeStats* stats) : mergeStats* stats) :
ltree(new diskTreeComponent::internalNodes(xid, internal_region_size, datapage_region_size, datapage_size)), ltree(new diskTreeComponent::internalNodes(xid, internal_region_size, datapage_region_size, datapage_size)),
dp(0), dp(0),
datapage_size(datapage_size), datapage_size(datapage_size),
@ -26,7 +25,7 @@ class diskTreeComponent {
diskTreeComponent(int xid, recordid root, recordid internal_node_state, recordid datapage_state, diskTreeComponent(int xid, recordid root, recordid internal_node_state, recordid datapage_state,
mergeManager::mergeStats* stats) : mergeStats* stats) :
ltree(new diskTreeComponent::internalNodes(xid, root, internal_node_state, datapage_state)), ltree(new diskTreeComponent::internalNodes(xid, root, internal_node_state, datapage_state)),
dp(0), dp(0),
datapage_size(-1), datapage_size(-1),
@ -74,7 +73,7 @@ class diskTreeComponent {
internalNodes * ltree; internalNodes * ltree;
DataPage<datatuple>* dp; DataPage<datatuple>* dp;
pageid_t datapage_size; pageid_t datapage_size;
mergeManager::mergeStats *stats; /*mergeManager::mergeStats*/ void *stats; // XXX hack to work around circular includes.
public: public:
class internalNodes{ class internalNodes{

View file

@ -514,7 +514,7 @@ int op_stat_space_usage(pthread_data* data) {
int xid = Tbegin(); int xid = Tbegin();
readlock(data->ltable->header_lock, 0); pthread_mutex_lock(&data->ltable->header_mut);
/* pageid_t datapage_c1_region_length, datapage_c1_mergeable_region_length = 0, datapage_c2_region_length; /* pageid_t datapage_c1_region_length, datapage_c1_mergeable_region_length = 0, datapage_c2_region_length;
pageid_t datapage_c1_region_count, datapage_c1_mergeable_region_count = 0, datapage_c2_region_count; pageid_t datapage_c1_region_count, datapage_c1_mergeable_region_count = 0, datapage_c2_region_count;
@ -587,7 +587,7 @@ int op_stat_space_usage(pthread_data* data) {
; ;
} while(TregionNextBoundaryTag(xid, &pid, &tag, 0/*all allocation managers*/)); } while(TregionNextBoundaryTag(xid, &pid, &tag, 0/*all allocation managers*/));
unlock(data->ltable->header_lock); pthread_mutex_unlock(&data->ltable->header_mut);
Tcommit(xid); Tcommit(xid);
@ -672,7 +672,7 @@ int op_dbg_blockmap(pthread_data* data) {
// produce a list of stasis regions // produce a list of stasis regions
int xid = Tbegin(); int xid = Tbegin();
readlock(data->ltable->header_lock, 0); pthread_mutex_lock(&data->ltable->header_mut);
// produce a list of regions used by current tree components // produce a list of regions used by current tree components
/* pageid_t datapage_c1_region_length, datapage_c1_mergeable_region_length = 0, datapage_c2_region_length; /* pageid_t datapage_c1_region_length, datapage_c1_mergeable_region_length = 0, datapage_c2_region_length;
@ -714,7 +714,7 @@ int op_dbg_blockmap(pthread_data* data) {
&internal_c2_region_length, &internal_c2_region_count, &internal_c2_regions, &internal_c2_region_length, &internal_c2_region_count, &internal_c2_regions,
&datapage_c2_region_length, &datapage_c2_region_count, &datapage_c2_regions); &datapage_c2_region_length, &datapage_c2_region_count, &datapage_c2_regions);
unlock(data->ltable->header_lock); pthread_mutex_unlock(&data->ltable->header_mut);
Tcommit(xid); Tcommit(xid);

View file

@ -4,6 +4,7 @@
#include <stasis/transactional.h> #include <stasis/transactional.h>
#include <stasis/bufferManager.h> #include <stasis/bufferManager.h>
#include <stasis/bufferManager/bufferHash.h> #include <stasis/bufferManager/bufferHash.h>
#include "mergeStats.h"
#undef try #undef try
#undef end #undef end
@ -28,12 +29,16 @@ logtable<TUPLE>::logtable(pageid_t internal_region_size, pageid_t datapage_regio
tree_c1_mergeable = NULL; tree_c1_mergeable = NULL;
tree_c2 = NULL; tree_c2 = NULL;
this->still_running_ = true; this->still_running_ = true;
this->merge_mgr = new mergeManager(); this->merge_mgr = new mergeManager(this);
this->mergedata = 0; this->mergedata = 0;
//tmerger = new tuplemerger(&append_merger); //tmerger = new tuplemerger(&append_merger);
tmerger = new tuplemerger(&replace_merger); tmerger = new tuplemerger(&replace_merger);
header_lock = initlock(); pthread_mutex_init(&header_mut, 0);
pthread_cond_init(&c0_needed, 0);
pthread_cond_init(&c0_ready, 0);
pthread_cond_init(&c1_needed, 0);
pthread_cond_init(&c1_ready, 0);
tsize = 0; tsize = 0;
tree_bytes = 0; tree_bytes = 0;
@ -62,7 +67,11 @@ logtable<TUPLE>::~logtable()
memTreeComponent<datatuple>::tearDownTree(tree_c0); memTreeComponent<datatuple>::tearDownTree(tree_c0);
} }
deletelock(header_lock); pthread_mutex_destroy(&header_mut);
pthread_cond_destroy(&c0_needed);
pthread_cond_destroy(&c0_ready);
pthread_cond_destroy(&c1_needed);
pthread_cond_destroy(&c1_ready);
delete tmerger; delete tmerger;
} }
@ -83,7 +92,7 @@ recordid logtable<TUPLE>::allocTable(int xid)
{ {
table_rec = Talloc(xid, sizeof(tbl_header)); table_rec = Talloc(xid, sizeof(tbl_header));
mergeManager::mergeStats * stats = 0; mergeStats * stats = 0;
//create the big tree //create the big tree
tree_c2 = new diskTreeComponent(xid, internal_region_size, datapage_region_size, datapage_size, stats); tree_c2 = new diskTreeComponent(xid, internal_region_size, datapage_region_size, datapage_size, stats);
@ -104,7 +113,7 @@ void logtable<TUPLE>::openTable(int xid, recordid rid) {
template<class TUPLE> template<class TUPLE>
void logtable<TUPLE>::update_persistent_header(int xid) { void logtable<TUPLE>::update_persistent_header(int xid) {
tbl_header.c2_root = tree_c2->get_root_rid(); tbl_header.c2_root = tree_c2->get_root_rid();
tbl_header.c2_dp_state = tree_c2->get_datapage_allocator_rid(); tbl_header.c2_dp_state = tree_c2->get_datapage_allocator_rid();
tbl_header.c2_state = tree_c2->get_internal_node_allocator_rid(); tbl_header.c2_state = tree_c2->get_internal_node_allocator_rid();
tbl_header.c1_root = tree_c1->get_root_rid(); tbl_header.c1_root = tree_c1->get_root_rid();
@ -117,10 +126,6 @@ void logtable<TUPLE>::update_persistent_header(int xid) {
template<class TUPLE> template<class TUPLE>
void logtable<TUPLE>::setMergeData(logtable_mergedata * mdata){ void logtable<TUPLE>::setMergeData(logtable_mergedata * mdata){
this->mergedata = mdata; this->mergedata = mdata;
mdata->internal_region_size = internal_region_size;
mdata->datapage_region_size = datapage_region_size;
mdata->datapage_size = datapage_size;
bump_epoch(); bump_epoch();
} }
@ -140,38 +145,22 @@ void logtable<TUPLE>::flushTable()
c0_stats->finished_merge(); c0_stats->finished_merge();
c0_stats->new_merge(); c0_stats->new_merge();
writelock(header_lock,0); pthread_mutex_lock(&header_mut);
pthread_mutex_lock(mergedata->rbtree_mut);
int expmcount = merge_count;
int expmcount = merge_count;
//this is for waiting the previous merger of the mem-tree //this is for waiting the previous merger of the mem-tree
//hopefullly this wont happen //hopefullly this wont happen
while(get_tree_c0_mergeable()) { bool blocked = false;
unlock(header_lock);
if(tree_bytes >= max_c0_size)
pthread_cond_wait(mergedata->input_needed_cond, mergedata->rbtree_mut);
else
{
pthread_mutex_unlock(mergedata->rbtree_mut);
return;
}
while(get_tree_c0_mergeable()) {
pthread_mutex_unlock(mergedata->rbtree_mut); pthread_cond_wait(&c0_needed, &header_mut);
blocked = true;
writelock(header_lock,0); if(expmcount != merge_count) {
pthread_mutex_lock(mergedata->rbtree_mut); pthread_mutex_unlock(&header_mut);
return;
if(expmcount != merge_count) }
{
unlock(header_lock);
pthread_mutex_unlock(mergedata->rbtree_mut);
return;
}
} }
gettimeofday(&stop_tv,0); gettimeofday(&stop_tv,0);
@ -179,28 +168,30 @@ void logtable<TUPLE>::flushTable()
set_tree_c0_mergeable(get_tree_c0()); set_tree_c0_mergeable(get_tree_c0());
pthread_cond_broadcast(mergedata->input_ready_cond); pthread_cond_signal(&c0_ready);
merge_count ++; merge_count ++;
set_tree_c0(new memTreeComponent<datatuple>::rbtree_t); set_tree_c0(new memTreeComponent<datatuple>::rbtree_t);
c0_stats->starting_merge();
tsize = 0; tsize = 0;
tree_bytes = 0; tree_bytes = 0;
pthread_mutex_unlock(mergedata->rbtree_mut); pthread_mutex_unlock(&header_mut);
unlock(header_lock);
c0_stats->starting_merge(); if(blocked) {
if(first) if(first)
{ {
printf("Blocked writes for %f sec\n", stop-start); printf("\nBlocked writes for %f sec\n", stop-start);
first = 0; first = 0;
}
else
{
printf("\nBlocked writes for %f sec (serviced writes for %f sec)\n",
stop-start, start-last_start);
}
last_start = stop;
} }
else
{
printf("Blocked writes for %f sec (serviced writes for %f sec)\n",
stop-start, start-last_start);
}
last_start = stop;
} }
@ -208,10 +199,9 @@ template<class TUPLE>
datatuple * logtable<TUPLE>::findTuple(int xid, const datatuple::key_t key, size_t keySize) datatuple * logtable<TUPLE>::findTuple(int xid, const datatuple::key_t key, size_t keySize)
{ {
//prepare a search tuple //prepare a search tuple
datatuple *search_tuple = datatuple::create(key, keySize); datatuple *search_tuple = datatuple::create(key, keySize);
readlock(header_lock,0); pthread_mutex_lock(&header_mut);
pthread_mutex_lock(mergedata->rbtree_mut);
datatuple *ret_tuple=0; datatuple *ret_tuple=0;
@ -243,14 +233,13 @@ datatuple * logtable<TUPLE>::findTuple(int xid, const datatuple::key_t key, size
} }
else //key first found in old mem tree else //key first found in old mem tree
{ {
ret_tuple = tuple->create_copy(); ret_tuple = tuple->create_copy();
} }
//we cannot free tuple from old-tree 'cos it is not a copy //we cannot free tuple from old-tree 'cos it is not a copy
} }
} }
//release the memtree lock //TODO: Arange to only hold read latches while hitting disk.
pthread_mutex_unlock(mergedata->rbtree_mut);
//step 3: check c1 //step 3: check c1
if(!done) if(!done)
@ -340,8 +329,7 @@ datatuple * logtable<TUPLE>::findTuple(int xid, const datatuple::key_t key, size
} }
} }
//pthread_mutex_unlock(mergedata->rbtree_mut); pthread_mutex_unlock(&header_mut); // XXX release this each time we could block on disk...
unlock(header_lock);
datatuple::freetuple(search_tuple); datatuple::freetuple(search_tuple);
return ret_tuple; return ret_tuple;
@ -357,7 +345,7 @@ datatuple * logtable<TUPLE>::findTuple_first(int xid, datatuple::key_t key, size
//prepare a search tuple //prepare a search tuple
datatuple * search_tuple = datatuple::create(key, keySize); datatuple * search_tuple = datatuple::create(key, keySize);
pthread_mutex_lock(mergedata->rbtree_mut); pthread_mutex_lock(&header_mut);
datatuple *ret_tuple=0; datatuple *ret_tuple=0;
//step 1: look in tree_c0 //step 1: look in tree_c0
@ -413,7 +401,7 @@ datatuple * logtable<TUPLE>::findTuple_first(int xid, datatuple::key_t key, size
} }
} }
pthread_mutex_unlock(mergedata->rbtree_mut); pthread_mutex_unlock(&header_mut);
datatuple::freetuple(search_tuple); datatuple::freetuple(search_tuple);
return ret_tuple; return ret_tuple;
@ -424,9 +412,8 @@ template<class TUPLE>
void logtable<TUPLE>::insertTuple(datatuple *tuple) void logtable<TUPLE>::insertTuple(datatuple *tuple)
{ {
//lock the red-black tree //lock the red-black tree
readlock(header_lock,0);
pthread_mutex_lock(mergedata->rbtree_mut);
c0_stats->read_tuple_from_small_component(tuple); c0_stats->read_tuple_from_small_component(tuple);
pthread_mutex_lock(&header_mut);
//find the previous tuple with same key in the memtree if exists //find the previous tuple with same key in the memtree if exists
memTreeComponent<datatuple>::rbtree_t::iterator rbitr = tree_c0->find(tuple); memTreeComponent<datatuple>::rbtree_t::iterator rbitr = tree_c0->find(tuple);
if(rbitr != tree_c0->end()) if(rbitr != tree_c0->end())
@ -459,19 +446,13 @@ void logtable<TUPLE>::insertTuple(datatuple *tuple)
//flushing logic //flushing logic
if(tree_bytes >= max_c0_size ) if(tree_bytes >= max_c0_size )
{ {
DEBUG("tree size before merge %d tuples %lld bytes.\n", tsize, tree_bytes); DEBUG("tree size before merge %d tuples %lld bytes.\n", tsize, tree_bytes);
pthread_mutex_unlock(mergedata->rbtree_mut); pthread_mutex_unlock(&header_mut);
unlock(header_lock); flushTable();
flushTable(); } else {
//unlock
readlock(header_lock,0); pthread_mutex_unlock(&header_mut);
pthread_mutex_lock(mergedata->rbtree_mut);
} }
//unlock
pthread_mutex_unlock(mergedata->rbtree_mut);
unlock(header_lock);
DEBUG("tree size %d tuples %lld bytes.\n", tsize, tree_bytes); DEBUG("tree size %d tuples %lld bytes.\n", tsize, tree_bytes);
} }
@ -491,7 +472,7 @@ void logtable<TUPLE>::forgetIterator(iterator * it) {
} }
template<class TUPLE> template<class TUPLE>
void logtable<TUPLE>::bump_epoch() { void logtable<TUPLE>::bump_epoch() {
assert(!trywritelock(header_lock,0)); // assert(!trywritelock(header_lock,0));
epoch++; epoch++;
for(unsigned int i = 0; i < its.size(); i++) { for(unsigned int i = 0; i < its.size(); i++) {
its[i]->invalidate(); its[i]->invalidate();

View file

@ -12,9 +12,10 @@
#include "tuplemerger.h" #include "tuplemerger.h"
#include "mergeManager.h"
#include "mergeStats.h" #include "mergeStats.h"
struct logtable_mergedata; class logtable_mergedata;
template<class TUPLE> template<class TUPLE>
class logtable { class logtable {
@ -64,7 +65,11 @@ public:
inline void set_tree_c1(diskTreeComponent *t){tree_c1=t; bump_epoch(); } inline void set_tree_c1(diskTreeComponent *t){tree_c1=t; bump_epoch(); }
inline void set_tree_c1_mergeable(diskTreeComponent *t){tree_c1_mergeable=t; bump_epoch(); } inline void set_tree_c1_mergeable(diskTreeComponent *t){tree_c1_mergeable=t; bump_epoch(); }
inline void set_tree_c2(diskTreeComponent *t){tree_c2=t; bump_epoch(); } inline void set_tree_c2(diskTreeComponent *t){tree_c2=t; bump_epoch(); }
pthread_cond_t c0_needed;
pthread_cond_t c0_ready;
pthread_cond_t c1_needed;
pthread_cond_t c1_ready;
inline memTreeComponent<datatuple>::rbtree_ptr_t get_tree_c0(){return tree_c0;} inline memTreeComponent<datatuple>::rbtree_ptr_t get_tree_c0(){return tree_c0;}
inline memTreeComponent<datatuple>::rbtree_ptr_t get_tree_c0_mergeable(){return tree_c0_mergeable;} inline memTreeComponent<datatuple>::rbtree_ptr_t get_tree_c0_mergeable(){return tree_c0_mergeable;}
void set_tree_c0(memTreeComponent<datatuple>::rbtree_ptr_t newtree){tree_c0 = newtree; bump_epoch(); } void set_tree_c0(memTreeComponent<datatuple>::rbtree_ptr_t newtree){tree_c0 = newtree; bump_epoch(); }
@ -93,14 +98,15 @@ public:
}; };
logtable_mergedata * mergedata; logtable_mergedata * mergedata;
rwl * header_lock; pthread_mutex_t header_mut;
int64_t max_c0_size; int64_t max_c0_size;
mergeManager * merge_mgr; mergeManager * merge_mgr;
inline bool is_still_running() { return still_running_; } inline bool is_still_running() { return still_running_; }
inline void stop() { inline void stop() {
still_running_ = false; still_running_ = false;
// XXX must need to do other things! flushTable();
// XXX must need to do other things!
} }
private: private:
@ -116,16 +122,17 @@ private:
int tsize; //number of tuples int tsize; //number of tuples
int64_t tree_bytes; //number of bytes int64_t tree_bytes; //number of bytes
public:
//DATA PAGE SETTINGS //DATA PAGE SETTINGS
pageid_t internal_region_size; // in number of pages pageid_t internal_region_size; // in number of pages
pageid_t datapage_region_size; // " pageid_t datapage_region_size; // "
pageid_t datapage_size; // " pageid_t datapage_size; // "
private:
tuplemerger *tmerger; tuplemerger *tmerger;
std::vector<iterator *> its; std::vector<iterator *> its;
mergeManager::mergeStats * c0_stats; mergeStats * c0_stats;
bool still_running_; bool still_running_;
public: public:
@ -241,10 +248,10 @@ public:
last_returned(NULL), last_returned(NULL),
key(NULL), key(NULL),
valid(false) { valid(false) {
writelock(ltable->header_lock, 0); pthread_mutex_lock(&ltable->header_mut);
ltable->registerIterator(this); ltable->registerIterator(this);
validate(); validate();
unlock(ltable->header_lock); pthread_mutex_unlock(&ltable->header_mut);
} }
explicit iterator(logtable* ltable,TUPLE *key) explicit iterator(logtable* ltable,TUPLE *key)
@ -255,18 +262,18 @@ public:
key(key), key(key),
valid(false) valid(false)
{ {
writelock(ltable->header_lock, 0); pthread_mutex_lock(&ltable->header_mut);
ltable->registerIterator(this); ltable->registerIterator(this);
validate(); validate();
unlock(ltable->header_lock); pthread_mutex_unlock(&ltable->header_mut);
} }
~iterator() { ~iterator() {
writelock(ltable->header_lock,0); pthread_mutex_lock(&ltable->header_mut);
ltable->forgetIterator(this); ltable->forgetIterator(this);
invalidate(); invalidate();
if(last_returned) TUPLE::freetuple(last_returned); if(last_returned) TUPLE::freetuple(last_returned);
unlock(ltable->header_lock); pthread_mutex_unlock(&ltable->header_mut);
} }
private: private:
TUPLE * getnextHelper() { TUPLE * getnextHelper() {
@ -280,24 +287,24 @@ public:
} }
public: public:
TUPLE * getnextIncludingTombstones() { TUPLE * getnextIncludingTombstones() {
readlock(ltable->header_lock, 0); pthread_mutex_lock(&ltable->header_mut);
revalidate(); revalidate();
TUPLE * ret = getnextHelper(); TUPLE * ret = getnextHelper();
unlock(ltable->header_lock); pthread_mutex_unlock(&ltable->header_mut);
return ret ? ret->create_copy() : NULL; return ret ? ret->create_copy() : NULL;
} }
TUPLE * getnext() { TUPLE * getnext() {
readlock(ltable->header_lock, 0); pthread_mutex_lock(&ltable->header_mut);
revalidate(); revalidate();
TUPLE * ret; TUPLE * ret;
while((ret = getnextHelper()) && ret->isDelete()) { } // getNextHelper handles its own memory. while((ret = getnextHelper()) && ret->isDelete()) { } // getNextHelper handles its own memory.
unlock(ltable->header_lock); pthread_mutex_unlock(&ltable->header_mut);
return ret ? ret->create_copy() : NULL; // XXX hate making copy! Caller should not manage our memory. return ret ? ret->create_copy() : NULL; // XXX hate making copy! Caller should not manage our memory.
} }
void invalidate() { void invalidate() {
assert(!trywritelock(ltable->header_lock,0)); // assert(!trywritelock(ltable->header_lock,0));
if(valid) { if(valid) {
delete merge_it_; delete merge_it_;
merge_it_ = NULL; merge_it_ = NULL;
@ -354,7 +361,7 @@ public:
t = NULL; t = NULL;
} }
c0_it = new typename memTreeComponent<TUPLE>::revalidatingIterator(ltable->get_tree_c0(), ltable->getMergeData()->rbtree_mut, t); c0_it = new typename memTreeComponent<TUPLE>::revalidatingIterator(ltable->get_tree_c0(), NULL/*need something that is not &ltable->header_mut*/, t);
c0_mergeable_it[0] = new typename memTreeComponent<TUPLE>::iterator (ltable->get_tree_c0_mergeable(), t); c0_mergeable_it[0] = new typename memTreeComponent<TUPLE>::iterator (ltable->get_tree_c0_mergeable(), t);
disk_it[0] = ltable->get_tree_c1()->open_iterator(t); disk_it[0] = ltable->get_tree_c1()->open_iterator(t);
if(ltable->get_tree_c1_mergeable()) { if(ltable->get_tree_c1_mergeable()) {

View file

@ -85,16 +85,16 @@ public:
public: public:
revalidatingIterator( rbtree_t *s, pthread_mutex_t * rb_mut ) : s_(s), mut_(rb_mut) { revalidatingIterator( rbtree_t *s, pthread_mutex_t * rb_mut ) : s_(s), mut_(rb_mut) {
pthread_mutex_lock(mut_); if(mut_) pthread_mutex_lock(mut_);
if(s_->begin() == s_->end()) { if(s_->begin() == s_->end()) {
next_ret_ = NULL; next_ret_ = NULL;
} else { } else {
next_ret_ = (*s_->begin())->create_copy(); // the create_copy() calls have to happen before we release mut_... next_ret_ = (*s_->begin())->create_copy(); // the create_copy() calls have to happen before we release mut_...
} }
pthread_mutex_unlock(mut_); if(mut_) pthread_mutex_unlock(mut_);
} }
revalidatingIterator( rbtree_t *s, pthread_mutex_t * rb_mut, TUPLE *&key ) : s_(s), mut_(rb_mut) { revalidatingIterator( rbtree_t *s, pthread_mutex_t * rb_mut, TUPLE *&key ) : s_(s), mut_(rb_mut) {
pthread_mutex_lock(mut_); if(mut_) pthread_mutex_lock(mut_);
if(key) { if(key) {
if(s_->find(key) != s_->end()) { if(s_->find(key) != s_->end()) {
next_ret_ = (*(s_->find(key)))->create_copy(); next_ret_ = (*(s_->find(key)))->create_copy();
@ -111,7 +111,7 @@ public:
} }
} }
// DEBUG("changing mem next ret = %s key = %s\n", next_ret_ ? (const char*)next_ret_->key() : "NONE", key ? (const char*)key->key() : "NULL"); // DEBUG("changing mem next ret = %s key = %s\n", next_ret_ ? (const char*)next_ret_->key() : "NONE", key ? (const char*)key->key() : "NULL");
pthread_mutex_unlock(mut_); if(mut_) pthread_mutex_unlock(mut_);
} }
~revalidatingIterator() { ~revalidatingIterator() {
@ -119,7 +119,7 @@ public:
} }
TUPLE* getnext() { TUPLE* getnext() {
pthread_mutex_lock(mut_); if(mut_) pthread_mutex_lock(mut_);
TUPLE * ret = next_ret_; TUPLE * ret = next_ret_;
if(next_ret_) { if(next_ret_) {
if(s_->upper_bound(next_ret_) == s_->end()) { if(s_->upper_bound(next_ret_) == s_->end()) {
@ -128,7 +128,7 @@ public:
next_ret_ = (*s_->upper_bound(next_ret_))->create_copy(); next_ret_ = (*s_->upper_bound(next_ret_))->create_copy();
} }
} }
pthread_mutex_unlock(mut_); if(mut_) pthread_mutex_unlock(mut_);
return ret; return ret;
} }

194
mergeManager.cpp Normal file
View file

@ -0,0 +1,194 @@
/*
* mergeManager.cpp
*
* Created on: May 19, 2010
* Author: sears
*/
#include "mergeManager.h"
#include "mergeStats.h"
#include "logstore.h"
#include "math.h"
mergeStats* mergeManager:: newMergeStats(int mergeLevel) {
if (mergeLevel == 0) {
return c0;
} else if (mergeLevel == 1) {
return c1;
} else if(mergeLevel == 2) {
return c2;
} else {
abort();
}
}
mergeManager::~mergeManager() {
pthread_mutex_destroy(&mut);
pthread_mutex_destroy(&throttle_mut);
pthread_mutex_destroy(&dummy_throttle_mut);
pthread_cond_destroy(&dummy_throttle_cond);
delete c0;
delete c1;
delete c2;
}
void mergeManager::new_merge(mergeStats * s) {
pthread_mutex_lock(&mut);
if(s->merge_count) {
if(s->merge_level == 0) {
// queueSize was set during startup
} else if(s->merge_level == 1) {
c1_queueSize = c1_queueSize > s->bytes_out ? c1_queueSize : s->bytes_out;
} else if(s->merge_level == 2) {
c2_queueSize = s->bytes_in_small;
} else { abort(); }
pretty_print(stdout);
}
pthread_mutex_unlock(&mut);
}
void mergeManager::set_c0_size(int64_t size) {
c0_queueSize = size;
}
void mergeManager::tick(mergeStats * s, bool done) {
if(s->merge_level == 0) {
pthread_mutex_lock(&throttle_mut);
}
// throttle?
if(s->bytes_in_small_delta > c0_queueSize / 10000) {
if(s->merge_level == 0) {
struct timeval now;
gettimeofday(&now, 0);
double elapsed_delta = tv_to_double(&now) - ts_to_double(&last_throttle);
pageid_t bytes_written_delta = (s->bytes_in_small_delta - s->bytes_collapsed_delta);
double min_throughput = 0.0; // don't throttle below 100 kilobytes / sec
double max_throughput = 10.0 * 1024.0 * 1024.0;
double c0_badness = (double)((c0_totalConsumed + bytes_written_delta - c1_totalConsumed)- c0_queueSize) / ((double)c0_queueSize);
double raw_throughput = ((double)bytes_written_delta)/elapsed_delta;
if(raw_throughput > max_throughput || c0_badness > 0) {
//double target_throughput = min_throughput / (c0_badness); // * c0_badness * c0_badness);
double target_throughput;
if(c0_badness > 0) {
target_throughput = (max_throughput - min_throughput) * (1.0-sqrt(sqrt(c0_badness))) + min_throughput;
} else {
target_throughput = max_throughput;
}
double target_elapsed = ((double)bytes_written_delta)/target_throughput;
//printf("Worked %6.1f (target %6.1f)\n", elapsed_delta, target_elapsed);
if(target_elapsed > elapsed_delta) {
struct timespec sleep_until;
double_to_ts(&sleep_until, ts_to_double(&last_throttle) + target_elapsed);
//fprintf(stdout, "Throttling for %6.1f seconds\n", target_elapsed - (double)elapsed_delta);
last_throttle_seconds = target_elapsed - (double)elapsed_delta;
last_elapsed_seconds = target_elapsed;
throttle_seconds += last_throttle_seconds;
elapsed_seconds += last_elapsed_seconds;
pthread_mutex_lock(&dummy_throttle_mut);
pthread_cond_timedwait(&dummy_throttle_cond, &dummy_throttle_mut, &sleep_until);
pthread_mutex_unlock(&dummy_throttle_mut);
memcpy(&last_throttle, &sleep_until, sizeof(sleep_until));
} else {
double_to_ts(&last_throttle, tv_to_double(&now));
}
} else {
double_to_ts(&last_throttle, tv_to_double(&now));
}
}
struct timeval now;
gettimeofday(&now,0);
unsigned long long elapsed = long_tv(now) - long_tv(s->last);
pthread_mutex_lock(&mut);
if(s->merge_level == 0) {
c0_totalConsumed += s->bytes_in_small_delta;
c0_totalWorktime += elapsed;
c0_totalCollapsed += s->bytes_collapsed_delta;
} else if(s->merge_level == 1) {
c1_totalConsumed += s->bytes_in_small_delta;
c1_totalWorktime += elapsed;
c1_totalCollapsed += s->bytes_collapsed_delta;
} else if(s->merge_level == 2) {
c2_totalConsumed += s->bytes_in_small_delta;
c2_totalWorktime += elapsed;
c2_totalCollapsed += s->bytes_collapsed_delta;
} else { abort(); }
pthread_mutex_unlock(&mut);
s->bytes_in_small_delta = 0;
s->bytes_collapsed_delta = 0;
memcpy(&s->last, &now, sizeof(now));
pretty_print(stdout);
}
if(s->merge_level == 0) {
pthread_mutex_unlock(&throttle_mut);
}
}
mergeManager::mergeManager(void *ltable):
ltable(ltable),
c0_queueSize(0),
c1_queueSize(0),
c2_queueSize(0),
c0_totalConsumed(0),
c0_totalCollapsed(0),
c0_totalWorktime(0),
c1_totalConsumed(0),
c1_totalCollapsed(0),
c1_totalWorktime(0),
c2_totalConsumed(0),
c2_totalCollapsed(0),
c2_totalWorktime(0),
c0(new mergeStats(this, 0)),
c1(new mergeStats(this, 1)),
c2(new mergeStats(this, 2)) {
pthread_mutex_init(&mut, 0);
pthread_mutex_init(&throttle_mut, 0);
pthread_mutex_init(&dummy_throttle_mut, 0);
pthread_cond_init(&dummy_throttle_cond, 0);
struct timeval tv;
gettimeofday(&tv, 0);
double_to_ts(&last_throttle, tv_to_double(&tv));
}
void mergeManager::pretty_print(FILE * out) {
pageid_t mb = 1024 * 1024;
logtable<datatuple> * lt = (logtable<datatuple>*)ltable;
bool have_c0 = false;
bool have_c0m = false;
bool have_c1 = false;
bool have_c1m = false;
bool have_c2 = false;
if(lt) {
pthread_mutex_lock(&lt->header_mut);
have_c0 = NULL != lt->get_tree_c0();
have_c0m = NULL != lt->get_tree_c0_mergeable();
have_c1 = NULL != lt->get_tree_c1();
have_c1m = NULL != lt->get_tree_c1_mergeable() ;
have_c2 = NULL != lt->get_tree_c2();
pthread_mutex_unlock(&lt->header_mut);
}
fprintf(out,"[%s] %s %s [%s] %s %s [%s] %s ",
c0->active ? "RUN" : "---",
have_c0 ? "C0" : "..",
have_c0m ? "C0'" : "...",
c1->active ? "RUN" : "---",
have_c1 ? "C1" : "..",
have_c1m ? "C1'" : "...",
c2->active ? "RUN" : "---",
have_c2 ? "C2" : "..");
fprintf(out, "Throttle: %6.1f%% (cur) %6.1f%% (overall) ", 100.0*(last_throttle_seconds/(last_elapsed_seconds)), 100.0*(throttle_seconds/(elapsed_seconds)));
fprintf(out, "C0 size %4lld collapsed %4lld resident %4lld ",
2*c0_queueSize/mb,
c0_totalCollapsed/mb,
(c0_totalConsumed - (c0_totalCollapsed + c1_totalConsumed))/mb);
fprintf(out, "C1 size %4lld collapsed %4lld resident %4lld ",
2*c1_queueSize/mb,
c1_totalCollapsed/mb,
(c1_totalConsumed - (c1_totalCollapsed + c2_totalConsumed))/mb);
fprintf(out, "C2 size %4lld collapsed %4lld ",
2*c2_queueSize/mb, c2_totalCollapsed/mb);
fprintf(out, "C1 MB/s (eff; active) %6.1f C2 MB/s %6.1f\r",
((double)c1_totalConsumed)/((double)c1_totalWorktime),
((double)c2_totalConsumed)/((double)c2_totalWorktime));
fflush(out);
}

73
mergeManager.h Normal file
View file

@ -0,0 +1,73 @@
/*
* mergeManager.h
*
* Created on: May 19, 2010
* Author: sears
*/
#ifndef MERGEMANAGER_H_
#define MERGEMANAGER_H_
#include <stasis/common.h>
#undef try
#undef end
#include <sys/time.h>
#include <stdio.h>
class mergeStats;
class mergeManager {
private:
double tv_to_double(struct timeval * tv) {
return (double)tv->tv_sec + ((double)tv->tv_usec)/1000000.0;
}
double ts_to_double(struct timespec * ts) {
return (double)ts->tv_sec + ((double)ts->tv_nsec)/1000000000.0;
}
void double_to_ts(struct timespec *ts, double time) {
ts->tv_sec = (time_t)(time);
ts->tv_nsec = (long)((time - (double)ts->tv_sec) * 1000000000.0);
}
uint64_t long_tv(struct timeval& tv) {
return (1000000ULL * (uint64_t)tv.tv_sec) + ((uint64_t)tv.tv_usec);
}
public:
mergeManager(void *ltable);
~mergeManager();
void new_merge(mergeStats * s);
void set_c0_size(int64_t size);
void tick(mergeStats * s, bool done = false);
void pretty_print(FILE * out);
mergeStats* newMergeStats(int mergeLevel);
private:
pthread_mutex_t mut;
void* ltable;
pageid_t c0_queueSize;
pageid_t c1_queueSize; // How many bytes must c0-c1 consume before trying to swap over to an empty c1? ( = current target c1 size)
pageid_t c2_queueSize; // How many bytes must c1-c2 consume before there is room for a new empty c1? ( = previous c1 size)
pageid_t c0_totalConsumed;
pageid_t c0_totalCollapsed;
pageid_t c0_totalWorktime;
pageid_t c1_totalConsumed; // What is long-term effective throughput of merger #1? (Excluding blocked times)
pageid_t c1_totalCollapsed;
pageid_t c1_totalWorktime;
pageid_t c2_totalConsumed; // What is long term effective throughput of merger #2? (Excluding blocked times)
pageid_t c2_totalCollapsed;
pageid_t c2_totalWorktime;
double throttle_seconds;
double elapsed_seconds;
double last_throttle_seconds;
double last_elapsed_seconds;
mergeStats * c0;
mergeStats * c1;
mergeStats * c2;
struct timespec last_throttle;
pthread_mutex_t throttle_mut;
pthread_mutex_t dummy_throttle_mut;
pthread_cond_t dummy_throttle_cond;
};
#endif /* MERGEMANAGER_H_ */

12
mergeStats.cpp Normal file
View file

@ -0,0 +1,12 @@
/*
* mergeStats.cpp
*
* Created on: May 18, 2010
* Author: sears
*/
#include "mergeStats.h"
#include "logstore.h"
#include "datatuple.h"

View file

@ -9,152 +9,16 @@
#define MERGESTATS_H_ #define MERGESTATS_H_
#include <stasis/common.h> #include <stasis/common.h>
#undef try
#undef end
class mergeManager { #include <sys/time.h>
private: #include <stdio.h>
double tv_to_double(struct timeval * tv) { #include "datatuple.h"
return (double)tv->tv_sec + ((double)tv->tv_usec)/1000000.0; #include "datapage.h"
} #include "mergeManager.h"
double ts_to_double(struct timespec * ts) {
return (double)ts->tv_sec + ((double)ts->tv_nsec)/1000000000.0;
}
void double_to_ts(struct timespec *ts, double time) {
ts->tv_sec = (time_t)(time);
ts->tv_nsec = (long)((time - (double)ts->tv_sec) * 1000000000.0);
}
public:
mergeManager() :
c0_queueSize(0),
c1_queueSize(0),
c2_queueSize(0),
c0_totalConsumed(0),
c0_totalCollapsed(0),
c0_totalWorktime(0),
c1_totalConsumed(0),
c1_totalCollapsed(0),
c1_totalWorktime(0),
c2_totalConsumed(0),
c2_totalCollapsed(0),
c2_totalWorktime(0),
c0(new mergeStats(this, 0)),
c1(new mergeStats(this, 1)),
c2(new mergeStats(this, 2)) {
pthread_mutex_init(&mut, 0);
pthread_mutex_init(&throttle_mut, 0);
pthread_mutex_init(&dummy_throttle_mut, 0);
pthread_cond_init(&dummy_throttle_cond, 0);
struct timeval tv;
gettimeofday(&tv, 0);
double_to_ts(&last_throttle, tv_to_double(&tv));
} class mergeStats {
~mergeManager() {
pthread_mutex_destroy(&mut);
}
class mergeStats;
void new_merge(mergeStats * s) {
pthread_mutex_lock(&mut);
if(s->merge_count) {
if(s->merge_level == 0) {
// queueSize was set during startup
} else if(s->merge_level == 1) {
c1_queueSize = c1_queueSize > s->bytes_out ? c1_queueSize : s->bytes_out;
} else if(s->merge_level == 2) {
c2_queueSize = s->bytes_in_small;
} else { abort(); }
pretty_print(stdout);
}
pthread_mutex_unlock(&mut);
}
void set_c0_size(int64_t size) {
c0_queueSize = size;
}
void tick(mergeStats * s, bool done = false) {
if(s->merge_level == 0) {
pthread_mutex_lock(&throttle_mut);
// throttle?
if(s->bytes_in_small_delta > c0_queueSize / 100) {
struct timeval now;
gettimeofday(&now, 0);
double elapsed_delta = tv_to_double(&now) - ts_to_double(&last_throttle);
pageid_t bytes_written_delta = (s->bytes_in_small_delta - s->bytes_collapsed_delta);
double min_throughput = 100 * 1024; // don't throttle below 100 kilobytes / sec
double c0_badness = (double)((c0_totalConsumed + bytes_written_delta - c1_totalConsumed) - c0_queueSize)/ (double)c0_queueSize;
if(c0_badness > 0) {
double target_throughput = min_throughput / (c0_badness * c0_badness);
//double raw_throughput = ((double)bytes_written_delta)/elapsed_delta;
double target_elapsed = ((double)bytes_written_delta)/target_throughput;
printf("Worked %6.1f (target %6.1f)\n", elapsed_delta, target_elapsed);
if(target_elapsed > elapsed_delta) {
struct timespec sleep_until;
double_to_ts(&sleep_until, ts_to_double(&last_throttle) + target_elapsed);
fprintf(stdout, "Throttling for %6.1f seconds\n", target_elapsed - (double)elapsed_delta);
pthread_mutex_lock(&dummy_throttle_mut);
pthread_cond_timedwait(&dummy_throttle_cond, &dummy_throttle_mut, &sleep_until);
pthread_mutex_unlock(&dummy_throttle_mut);
memcpy(&last_throttle, &sleep_until, sizeof(sleep_until));
} else {
double_to_ts(&last_throttle, tv_to_double(&now));
}
} else {
printf("badness is negative\n");
double_to_ts(&last_throttle, tv_to_double(&now));
}
}
}
if(done || s->bytes_in_small_delta > c0_queueSize / 100) {
struct timeval now;
gettimeofday(&now,0);
unsigned long long elapsed = long_tv(now) - long_tv(s->last);
pthread_mutex_lock(&mut);
if(s->merge_level == 0) {
c0_totalConsumed += s->bytes_in_small_delta;
c0_totalWorktime += elapsed;
c0_totalCollapsed += s->bytes_collapsed_delta;
} else if(s->merge_level == 1) {
c1_totalConsumed += s->bytes_in_small_delta;
c1_totalWorktime += elapsed;
c1_totalCollapsed += s->bytes_collapsed_delta;
} else if(s->merge_level == 2) {
c2_totalConsumed += s->bytes_in_small_delta;
c2_totalWorktime += elapsed;
c2_totalCollapsed += s->bytes_collapsed_delta;
} else { abort(); }
pthread_mutex_unlock(&mut);
s->bytes_in_small_delta = 0;
s->bytes_collapsed_delta = 0;
memcpy(&s->last, &now, sizeof(now));
pretty_print(stdout);
}
if(s->merge_level == 0) {
pthread_mutex_unlock(&throttle_mut);
}
}
uint64_t long_tv(struct timeval& tv) {
return (1000000ULL * (uint64_t)tv.tv_sec) + ((uint64_t)tv.tv_usec);
}
void pretty_print(FILE * out) {
pageid_t mb = 1024 * 1024;
fprintf(out,"%s %s %s ", c0->active ? "RUN" : "---", c1->active ? "RUN" : "---", c2->active ? "RUN" : "---");
fprintf(out, "C0 size %lld collapsed %lld resident %lld ",
2*c0_queueSize/mb,
c0_totalCollapsed/mb,
(c0_totalConsumed - (c0_totalCollapsed + c1_totalConsumed))/mb);
fprintf(out, "C1 size %lld collapsed %lld resident %lld ",
2*c1_queueSize/mb,
c1_totalCollapsed/mb,
(c1_totalConsumed - (c1_totalCollapsed + c2_totalConsumed))/mb);
fprintf(out, "C2 size %lld collapsed %lld ",
2*c2_queueSize/mb, c2_totalCollapsed/mb);
fprintf(out, "C1 MB/s (eff; active) %6.1f C2 MB/s %6.1f\n",
((double)c1_totalConsumed)/((double)c1_totalWorktime),
((double)c2_totalConsumed)/((double)c2_totalWorktime));
}
class mergeStats {
public: public:
mergeStats(mergeManager* merge_mgr, int merge_level) : mergeStats(mergeManager* merge_mgr, int merge_level) :
merge_mgr(merge_mgr), merge_mgr(merge_mgr),
@ -299,40 +163,4 @@ public:
} }
}; };
mergeStats* newMergeStats(int mergeLevel) {
if (mergeLevel == 0) {
return c0;
} else if (mergeLevel == 1) {
return c1;
} else if(mergeLevel == 2) {
return c2;
} else {
abort();
}
}
private:
pthread_mutex_t mut;
pageid_t c0_queueSize;
pageid_t c1_queueSize; // How many bytes must c0-c1 consume before trying to swap over to an empty c1? ( = current target c1 size)
pageid_t c2_queueSize; // How many bytes must c1-c2 consume before there is room for a new empty c1? ( = previous c1 size)
pageid_t c0_totalConsumed;
pageid_t c0_totalCollapsed;
pageid_t c0_totalWorktime;
pageid_t c1_totalConsumed; // What is long-term effective throughput of merger #1? (Excluding blocked times)
pageid_t c1_totalCollapsed;
pageid_t c1_totalWorktime;
pageid_t c2_totalConsumed; // What is long term effective throughput of merger #2? (Excluding blocked times)
pageid_t c2_totalCollapsed;
pageid_t c2_totalWorktime;
mergeStats * c0;
mergeStats * c1;
mergeStats * c2;
struct timespec last_throttle;
pthread_mutex_t throttle_mut;
pthread_mutex_t dummy_throttle_mut;
pthread_cond_t dummy_throttle_cond;
};
#endif /* MERGESTATS_H_ */ #endif /* MERGESTATS_H_ */

View file

@ -13,19 +13,7 @@ int merge_scheduler::addlogtable(logtable<datatuple> *ltable)
struct logtable_mergedata * mdata = new logtable_mergedata; struct logtable_mergedata * mdata = new logtable_mergedata;
// initialize merge data // initialize merge data
mdata->rbtree_mut = new pthread_mutex_t;
pthread_mutex_init(mdata->rbtree_mut,0);
ltable->set_tree_c0_mergeable(NULL); ltable->set_tree_c0_mergeable(NULL);
mdata->input_needed = new bool(false);
mdata->input_ready_cond = new pthread_cond_t;
pthread_cond_init(mdata->input_ready_cond,0);
mdata->input_needed_cond = new pthread_cond_t;
pthread_cond_init(mdata->input_needed_cond,0);
mdata->input_size = new int64_t(100);
mdata->diskmerge_args = new merger_args; mdata->diskmerge_args = new merger_args;
mdata->memmerge_args = new merger_args; mdata->memmerge_args = new merger_args;
@ -42,27 +30,6 @@ merge_scheduler::~merge_scheduler()
logtable<datatuple> *ltable = mergedata[i].first; logtable<datatuple> *ltable = mergedata[i].first;
logtable_mergedata *mdata = mergedata[i].second; logtable_mergedata *mdata = mergedata[i].second;
//delete the mergedata fields
delete mdata->rbtree_mut;
delete mdata->input_needed;
delete mdata->input_ready_cond;
delete mdata->input_needed_cond;
delete mdata->input_size;
//delete the merge thread structure variables
pthread_cond_destroy(mdata->diskmerge_args->in_block_needed_cond);
delete mdata->diskmerge_args->in_block_needed_cond;
delete mdata->diskmerge_args->in_block_needed;
pthread_cond_destroy(mdata->diskmerge_args->out_block_needed_cond);
delete mdata->diskmerge_args->out_block_needed_cond;
delete mdata->diskmerge_args->out_block_needed;
pthread_cond_destroy(mdata->diskmerge_args->in_block_ready_cond);
delete mdata->diskmerge_args->in_block_ready_cond;
pthread_cond_destroy(mdata->diskmerge_args->out_block_ready_cond);
delete mdata->diskmerge_args->out_block_ready_cond;
delete mdata->diskmerge_args; delete mdata->diskmerge_args;
delete mdata->memmerge_args; delete mdata->memmerge_args;
} }
@ -76,18 +43,8 @@ void merge_scheduler::shutdown()
for(size_t i=0; i<mergedata.size(); i++) for(size_t i=0; i<mergedata.size(); i++)
{ {
logtable<datatuple> *ltable = mergedata[i].first; logtable<datatuple> *ltable = mergedata[i].first;
logtable_mergedata *mdata = mergedata[i].second;
//flush the in memory table to write any tuples still in memory
ltable->flushTable();
pthread_mutex_lock(mdata->rbtree_mut);
ltable->stop(); ltable->stop();
pthread_cond_signal(mdata->input_ready_cond);
//*(mdata->diskmerge_args->still_open)=false;//same pointer so no need
pthread_mutex_unlock(mdata->rbtree_mut);
} }
@ -107,25 +64,8 @@ void merge_scheduler::startlogtable(int index, int64_t MAX_C0_SIZE)
logtable<datatuple> * ltable = mergedata[index].first; logtable<datatuple> * ltable = mergedata[index].first;
struct logtable_mergedata *mdata = mergedata[index].second; struct logtable_mergedata *mdata = mergedata[index].second;
pthread_cond_t * block1_needed_cond = new pthread_cond_t;
pthread_cond_init(block1_needed_cond,0);
pthread_cond_t * block2_needed_cond = new pthread_cond_t;
pthread_cond_init(block2_needed_cond,0);
pthread_cond_t * block1_ready_cond = new pthread_cond_t;
pthread_cond_init(block1_ready_cond,0);
pthread_cond_t * block2_ready_cond = new pthread_cond_t;
pthread_cond_init(block2_ready_cond,0);
bool *block1_needed = new bool(false);
bool *block2_needed = new bool(false);
//wait to merge the next block until we have merged block FUDGE times.
static const int FUDGE = 1;
static double R = MIN_R; static double R = MIN_R;
int64_t * block1_size = new int64_t;
*block1_size = FUDGE * ((int)R) * (*(mdata->input_size));
//initialize rb-tree //initialize rb-tree
ltable->set_tree_c0(new memTreeComponent<datatuple>::rbtree_t); ltable->set_tree_c0(new memTreeComponent<datatuple>::rbtree_t);
@ -142,17 +82,6 @@ void merge_scheduler::startlogtable(int index, int64_t MAX_C0_SIZE)
struct merger_args diskmerge_args= { struct merger_args diskmerge_args= {
ltable, ltable,
1, //worker id
mdata->rbtree_mut, //block_ready_mutex
block1_needed_cond, //in_block_needed_cond
block1_needed, //in_block_needed
block2_needed_cond, //out_block_needed_cond
block2_needed, //out_block_needed
block1_ready_cond, //in_block_ready_cond
block2_ready_cond, //out_block_ready_cond
mdata->internal_region_size,
mdata->datapage_region_size,
mdata->datapage_size,
0, //max_tree_size No max size for biggest component 0, //max_tree_size No max size for biggest component
&R, //r_i &R, //r_i
}; };
@ -162,17 +91,6 @@ void merge_scheduler::startlogtable(int index, int64_t MAX_C0_SIZE)
struct merger_args memmerge_args = struct merger_args memmerge_args =
{ {
ltable, ltable,
2,
mdata->rbtree_mut,
mdata->input_needed_cond,
mdata->input_needed,
block1_needed_cond,
block1_needed,
mdata->input_ready_cond,
block1_ready_cond,
mdata->internal_region_size, // TODO different region / datapage sizes for C1?
mdata->datapage_region_size,
mdata->datapage_size,
(int64_t)(R * R * MAX_C0_SIZE), (int64_t)(R * R * MAX_C0_SIZE),
&R, &R,
}; };
@ -188,12 +106,12 @@ void merge_scheduler::startlogtable(int index, int64_t MAX_C0_SIZE)
} }
template <class ITA, class ITB> template <class ITA, class ITB>
void merge_iterators(int xid, void merge_iterators(int xid, diskTreeComponent * forceMe,
ITA *itrA, ITA *itrA,
ITB *itrB, ITB *itrB,
logtable<datatuple> *ltable, logtable<datatuple> *ltable,
diskTreeComponent *scratch_tree, diskTreeComponent *scratch_tree,
mergeManager::mergeStats *stats, mergeStats *stats,
bool dropDeletes); bool dropDeletes);
@ -229,45 +147,35 @@ void* memMergeThread(void*arg)
assert(ltable->get_tree_c1()); assert(ltable->get_tree_c1());
int merge_count =0; int merge_count =0;
mergeManager::mergeStats * stats = a->ltable->merge_mgr->newMergeStats(1); mergeStats * stats = a->ltable->merge_mgr->newMergeStats(1);
while(true) // 1 while(true) // 1
{ {
writelock(ltable->header_lock,0);
stats->new_merge(); stats->new_merge();
pthread_mutex_lock(&ltable->header_mut);
int done = 0; int done = 0;
// 2: wait for c0_mergable // 2: wait for c0_mergable
while(!ltable->get_tree_c0_mergeable()) while(!ltable->get_tree_c0_mergeable())
{ {
pthread_mutex_lock(a->block_ready_mut); pthread_cond_signal(&ltable->c0_needed);
*a->in_block_needed = true;
//pthread_cond_signal(a->in_block_needed_cond);
pthread_cond_broadcast(a->in_block_needed_cond);
if(!ltable->is_still_running()){ if(!ltable->is_still_running()){
done = 1; done = 1;
pthread_mutex_unlock(a->block_ready_mut);
break; break;
} }
DEBUG("mmt:\twaiting for block ready cond\n"); DEBUG("mmt:\twaiting for block ready cond\n");
unlock(ltable->header_lock);
pthread_cond_wait(a->in_block_ready_cond, a->block_ready_mut); pthread_cond_wait(&ltable->c0_ready, &ltable->header_mut);
pthread_mutex_unlock(a->block_ready_mut);
writelock(ltable->header_lock,0);
DEBUG("mmt:\tblock ready\n"); DEBUG("mmt:\tblock ready\n");
} }
*a->in_block_needed = false;
if(done==1) if(done==1)
{ {
pthread_mutex_lock(a->block_ready_mut); pthread_cond_signal(&ltable->c1_ready); // no block is ready. this allows the other thread to wake up, and see that we're shutting down.
pthread_cond_signal(a->out_block_ready_cond); // no block is ready. this allows the other thread to wake up, and see that we're shutting down. pthread_mutex_unlock(&ltable->header_mut);
pthread_mutex_unlock(a->block_ready_mut);
unlock(ltable->header_lock);
break; break;
} }
@ -285,15 +193,14 @@ void* memMergeThread(void*arg)
//create a new tree //create a new tree
diskTreeComponent * c1_prime = new diskTreeComponent(xid, a->internal_region_size, a->datapage_region_size, a->datapage_size, stats); diskTreeComponent * c1_prime = new diskTreeComponent(xid, ltable->internal_region_size, ltable->datapage_region_size, ltable->datapage_size, stats);
//pthread_mutex_unlock(a->block_ready_mut); pthread_mutex_unlock(&ltable->header_mut);
unlock(ltable->header_lock);
//: do the merge //: do the merge
DEBUG("mmt:\tMerging:\n"); DEBUG("mmt:\tMerging:\n");
merge_iterators<typeof(*itrA),typeof(*itrB)>(xid, itrA, itrB, ltable, c1_prime, stats, false); merge_iterators<typeof(*itrA),typeof(*itrB)>(xid, c1_prime, itrA, itrB, ltable, c1_prime, stats, false);
delete itrA; delete itrA;
delete itrB; delete itrB;
@ -306,8 +213,27 @@ void* memMergeThread(void*arg)
merge_count++; merge_count++;
DEBUG("mmt:\tmerge_count %lld #bytes written %lld\n", stats.merge_count, stats.output_size()); DEBUG("mmt:\tmerge_count %lld #bytes written %lld\n", stats.merge_count, stats.output_size());
writelock(ltable->header_lock,0); pthread_mutex_lock(&ltable->header_mut);
// Immediately clean out c0 mergeable so that writers may continue.
// first, we need to move the c1' into c1.
// 12: delete old c1
ltable->get_tree_c1()->dealloc(xid);
delete ltable->get_tree_c1();
// 10: c1 = c1'
ltable->set_tree_c1(c1_prime);
// 11.5: delete old c0_mergeable
memTreeComponent<datatuple>::tearDownTree(ltable->get_tree_c0_mergeable());
// 11: c0_mergeable = NULL
ltable->set_tree_c0_mergeable(NULL);
pthread_cond_signal(&ltable->c0_needed);
ltable->update_persistent_header(xid);
Tcommit(xid);
//TODO: this is simplistic for now //TODO: this is simplistic for now
//6: if c1' is too big, signal the other merger //6: if c1' is too big, signal the other merger
@ -322,50 +248,35 @@ void* memMergeThread(void*arg)
DEBUG("mmt:\tnew_c1_size %.2f\tMAX_C0_SIZE %lld\ta->max_size %lld\t targetr %.2f \n", new_c1_size, DEBUG("mmt:\tnew_c1_size %.2f\tMAX_C0_SIZE %lld\ta->max_size %lld\t targetr %.2f \n", new_c1_size,
ltable->max_c0_size, a->max_size, target_R); ltable->max_c0_size, a->max_size, target_R);
// XXX need to report backpressure here! Also, shouldn't be inside a transaction while waiting on backpressure. We could break this into two transactions; replace c1 with the new c1, then wait for backpressure, then move c1 into c1_mergeable, and zerou out c1 // XXX need to report backpressure here!
while(ltable->get_tree_c1_mergeable()) { while(ltable->get_tree_c1_mergeable()) {
pthread_mutex_lock(a->block_ready_mut); pthread_cond_wait(&ltable->c1_needed, &ltable->header_mut);
unlock(ltable->header_lock);
pthread_cond_wait(a->out_block_needed_cond, a->block_ready_mut);
pthread_mutex_unlock(a->block_ready_mut);
writelock(ltable->header_lock,0);
} }
}
// 12: delete old c1 xid = Tbegin();
ltable->get_tree_c1()->dealloc(xid);
delete ltable->get_tree_c1();
// 11.5: delete old c0_mergeable // we just set c1 = c1'. Want to move c1 -> c1 mergeable, clean out c1.
memTreeComponent<datatuple>::tearDownTree(ltable->get_tree_c0_mergeable());
// 11: c0_mergeable = NULL
ltable->set_tree_c0_mergeable(NULL);
if( signal_c2 ) {
// 7: and perhaps c1_mergeable // 7: and perhaps c1_mergeable
ltable->set_tree_c1_mergeable(c1_prime); ltable->set_tree_c1_mergeable(c1_prime); // c1_prime == c1.
// 8: c1 = new empty. // 8: c1 = new empty.
ltable->set_tree_c1(new diskTreeComponent(xid, a->internal_region_size, a->datapage_region_size, a->datapage_size, stats)); ltable->set_tree_c1(new diskTreeComponent(xid, ltable->internal_region_size, ltable->datapage_region_size, ltable->datapage_size, stats));
pthread_cond_signal(a->out_block_ready_cond); pthread_cond_signal(&ltable->c1_ready);
ltable->update_persistent_header(xid);
Tcommit(xid);
} else {
// 10: c1 = c1'
ltable->set_tree_c1(c1_prime);
} }
DEBUG("mmt:\tUpdated C1's position on disk to %lld\n",ltable->get_tree_c1()->get_root_rec().page); // DEBUG("mmt:\tUpdated C1's position on disk to %lld\n",ltable->get_tree_c1()->get_root_rec().page);
// 13 // 13
ltable->update_persistent_header(xid);
Tcommit(xid);
unlock(ltable->header_lock); pthread_mutex_unlock(&ltable->header_mut);
stats->finished_merge(); stats->finished_merge();
stats->pretty_print(stdout); // stats->pretty_print(stdout);
//TODO: get the freeing outside of the lock //TODO: get the freeing outside of the lock
} }
@ -386,42 +297,34 @@ void *diskMergeThread(void*arg)
int merge_count =0; int merge_count =0;
mergeManager::mergeStats * stats = a->ltable->merge_mgr->newMergeStats(2); mergeStats * stats = a->ltable->merge_mgr->newMergeStats(2);
while(true) while(true)
{ {
// 2: wait for input // 2: wait for input
writelock(ltable->header_lock,0);
stats->new_merge(); stats->new_merge();
pthread_mutex_lock(&ltable->header_mut);
int done = 0; int done = 0;
// get a new input for merge // get a new input for merge
while(!ltable->get_tree_c1_mergeable()) while(!ltable->get_tree_c1_mergeable())
{ {
pthread_mutex_lock(a->block_ready_mut); pthread_cond_signal(&ltable->c1_needed);
*a->in_block_needed = true;
pthread_cond_signal(a->in_block_needed_cond);
if(!ltable->is_still_running()){ if(!ltable->is_still_running()){
done = 1; done = 1;
pthread_mutex_unlock(a->block_ready_mut);
break; break;
} }
DEBUG("dmt:\twaiting for block ready cond\n"); DEBUG("dmt:\twaiting for block ready cond\n");
unlock(ltable->header_lock);
pthread_cond_wait(a->in_block_ready_cond, a->block_ready_mut); pthread_cond_wait(&ltable->c1_ready, &ltable->header_mut);
pthread_mutex_unlock(a->block_ready_mut);
DEBUG("dmt:\tblock ready\n"); DEBUG("dmt:\tblock ready\n");
writelock(ltable->header_lock,0);
} }
*a->in_block_needed = false;
if(done==1) if(done==1)
{ {
pthread_cond_signal(a->out_block_ready_cond); pthread_mutex_unlock(&ltable->header_mut);
unlock(ltable->header_lock);
break; break;
} }
@ -436,14 +339,14 @@ void *diskMergeThread(void*arg)
diskTreeComponent::iterator *itrB = ltable->get_tree_c1_mergeable()->open_iterator(); diskTreeComponent::iterator *itrB = ltable->get_tree_c1_mergeable()->open_iterator();
//create a new tree //create a new tree
diskTreeComponent * c2_prime = new diskTreeComponent(xid, a->internal_region_size, a->datapage_region_size, a->datapage_size, stats); diskTreeComponent * c2_prime = new diskTreeComponent(xid, ltable->internal_region_size, ltable->datapage_region_size, ltable->datapage_size, stats);
unlock(ltable->header_lock); pthread_mutex_unlock(&ltable->header_mut);
//do the merge //do the merge
DEBUG("dmt:\tMerging:\n"); DEBUG("dmt:\tMerging:\n");
merge_iterators<typeof(*itrA),typeof(*itrB)>(xid, itrA, itrB, ltable, c2_prime, stats, true); merge_iterators<typeof(*itrA),typeof(*itrB)>(xid, NULL, itrA, itrB, ltable, c2_prime, stats, true);
delete itrA; delete itrA;
delete itrB; delete itrB;
@ -453,7 +356,7 @@ void *diskMergeThread(void*arg)
// (skip 6, 7, 8, 8.5, 9)) // (skip 6, 7, 8, 8.5, 9))
writelock(ltable->header_lock,0); pthread_mutex_lock(&ltable->header_mut);
//12 //12
ltable->get_tree_c2()->dealloc(xid); ltable->get_tree_c2()->dealloc(xid);
delete ltable->get_tree_c2(); delete ltable->get_tree_c2();
@ -479,11 +382,11 @@ void *diskMergeThread(void*arg)
// 13 // 13
ltable->update_persistent_header(xid); ltable->update_persistent_header(xid);
Tcommit(xid); Tcommit(xid);
unlock(ltable->header_lock); pthread_mutex_unlock(&ltable->header_mut);
stats->finished_merge(); stats->finished_merge();
stats->pretty_print(stdout); // stats->pretty_print(stdout);
} }
return 0; return 0;
@ -491,10 +394,11 @@ void *diskMergeThread(void*arg)
template <class ITA, class ITB> template <class ITA, class ITB>
void merge_iterators(int xid, void merge_iterators(int xid,
diskTreeComponent * forceMe,
ITA *itrA, //iterator on c1 or c2 ITA *itrA, //iterator on c1 or c2
ITB *itrB, //iterator on c0 or c1, respectively ITB *itrB, //iterator on c0 or c1, respectively
logtable<datatuple> *ltable, logtable<datatuple> *ltable,
diskTreeComponent *scratch_tree, mergeManager::mergeStats *stats, diskTreeComponent *scratch_tree, mergeStats *stats,
bool dropDeletes // should be true iff this is biggest component bool dropDeletes // should be true iff this is biggest component
) )
{ {
@ -502,6 +406,8 @@ void merge_iterators(int xid,
stats->read_tuple_from_large_component(t1); stats->read_tuple_from_large_component(t1);
datatuple *t2 = 0; datatuple *t2 = 0;
int i = 0;
while( (t2=itrB->next_callerFrees()) != 0) while( (t2=itrB->next_callerFrees()) != 0)
{ {
stats->read_tuple_from_small_component(t2); stats->read_tuple_from_small_component(t2);
@ -513,6 +419,7 @@ void merge_iterators(int xid,
{ {
//insert t1 //insert t1
scratch_tree->insertTuple(xid, t1); scratch_tree->insertTuple(xid, t1);
i+=t1->byte_length(); if(i > 1000000) { if(forceMe) forceMe->force(xid); i = 0; }
stats->wrote_tuple(t1); stats->wrote_tuple(t1);
datatuple::freetuple(t1); datatuple::freetuple(t1);
//advance itrA //advance itrA
@ -543,12 +450,15 @@ void merge_iterators(int xid,
// cannot free any tuples here; they may still be read through a lookup // cannot free any tuples here; they may still be read through a lookup
} }
i+= t2->byte_length(); if(i > 1000000) { if(forceMe) forceMe->force(xid); i = 0; }
stats->wrote_tuple(t2); stats->wrote_tuple(t2);
datatuple::freetuple(t2); datatuple::freetuple(t2);
} }
while(t1 != 0) {// t1 is less than t2 while(t1 != 0) {// t1 is less than t2
scratch_tree->insertTuple(xid, t1); scratch_tree->insertTuple(xid, t1);
i += t1->byte_length(); if(i > 1000000) { if(forceMe) forceMe->force(xid); i = 0; }
stats->wrote_tuple(t1); stats->wrote_tuple(t1);
datatuple::freetuple(t1); datatuple::freetuple(t1);

View file

@ -15,46 +15,16 @@ static const double MIN_R = 3.0;
struct merger_args struct merger_args
{ {
logtable<datatuple> * ltable; logtable<datatuple> * ltable;
int worker_id;
pthread_mutex_t * block_ready_mut;
pthread_cond_t * in_block_needed_cond;
bool * in_block_needed;
pthread_cond_t * out_block_needed_cond;
bool * out_block_needed;
pthread_cond_t * in_block_ready_cond;
pthread_cond_t * out_block_ready_cond;
pageid_t internal_region_size;
pageid_t datapage_region_size;
pageid_t datapage_size;
int64_t max_size; int64_t max_size;
double * r_i; double * r_i;
}; };
struct logtable_mergedata struct logtable_mergedata
{ {
//merge threads //merge threads
pthread_t diskmerge_thread; pthread_t diskmerge_thread;
pthread_t memmerge_thread; pthread_t memmerge_thread;
pthread_mutex_t * rbtree_mut;
bool *input_needed; // memmerge-input needed
pthread_cond_t * input_ready_cond;
pthread_cond_t * input_needed_cond;
int64_t * input_size;
pageid_t internal_region_size;
pageid_t datapage_region_size;
pageid_t datapage_size;
//merge args 1 //merge args 1
struct merger_args *diskmerge_args; struct merger_args *diskmerge_args;
//merge args 2 //merge args 2
@ -68,7 +38,7 @@ class merge_scheduler
public: public:
~merge_scheduler(); ~merge_scheduler();
int addlogtable(logtable<datatuple> * ltable); int addlogtable(logtable<datatuple> * ltable);
void startlogtable(int index, int64_t MAX_C0_SIZE = 100*1024*1024); void startlogtable(int index, int64_t MAX_C0_SIZE = 100*1024*1024);

View file

@ -72,8 +72,6 @@ int main(int argc, char *argv[])
Tcommit(xid); Tcommit(xid);
writelock(ltable.header_lock,0);
int lindex = mscheduler->addlogtable(&ltable); int lindex = mscheduler->addlogtable(&ltable);
ltable.setMergeData(mscheduler->getMergeData(lindex)); ltable.setMergeData(mscheduler->getMergeData(lindex));
@ -93,8 +91,6 @@ int main(int argc, char *argv[])
mscheduler->startlogtable(lindex, c0_size); mscheduler->startlogtable(lindex, c0_size);
unlock(ltable.header_lock);
lserver = new logserver(100, 32432); lserver = new logserver(100, 32432);
lserver->startserver(&ltable); lserver->startserver(&ltable);

View file

@ -34,8 +34,8 @@ void insertProbeIter(size_t NUM_ENTRIES)
xid = Tbegin(); xid = Tbegin();
mergeManager merge_mgr; mergeManager merge_mgr(0);
mergeManager::mergeStats * stats = merge_mgr.newMergeStats(1); mergeStats * stats = merge_mgr.newMergeStats(1);
diskTreeComponent *ltable_c1 = new diskTreeComponent(xid, 1000, 10000, 5, stats); diskTreeComponent *ltable_c1 = new diskTreeComponent(xid, 1000, 10000, 5, stats);

View file

@ -53,12 +53,11 @@ void insertProbeIter(size_t NUM_ENTRIES)
Tcommit(xid); Tcommit(xid);
writelock(ltable.header_lock,0);
int lindex = mscheduler.addlogtable(&ltable); int lindex = mscheduler.addlogtable(&ltable);
ltable.setMergeData(mscheduler.getMergeData(lindex)); ltable.setMergeData(mscheduler.getMergeData(lindex));
mscheduler.startlogtable(lindex, 10 * 1024 * 1024); mscheduler.startlogtable(lindex, 10 * 1024 * 1024);
unlock(ltable.header_lock);
printf("Stage 1: Writing %d keys\n", NUM_ENTRIES); printf("Stage 1: Writing %d keys\n", NUM_ENTRIES);
struct timeval start_tv, stop_tv, ti_st, ti_end; struct timeval start_tv, stop_tv, ti_st, ti_end;

View file

@ -52,12 +52,10 @@ void insertProbeIter(size_t NUM_ENTRIES)
Tcommit(xid); Tcommit(xid);
writelock(ltable.header_lock,0);
int lindex = mscheduler.addlogtable(&ltable); int lindex = mscheduler.addlogtable(&ltable);
ltable.setMergeData(mscheduler.getMergeData(lindex)); ltable.setMergeData(mscheduler.getMergeData(lindex));
mscheduler.startlogtable(lindex, 10 * 1024 * 1024); mscheduler.startlogtable(lindex, 10 * 1024 * 1024);
unlock(ltable.header_lock);
printf("Stage 1: Writing %d keys\n", NUM_ENTRIES); printf("Stage 1: Writing %d keys\n", NUM_ENTRIES);

View file

@ -107,12 +107,10 @@ void insertProbeIter(size_t NUM_ENTRIES)
recordid table_root = ltable.allocTable(xid); recordid table_root = ltable.allocTable(xid);
Tcommit(xid); Tcommit(xid);
writelock(ltable.header_lock,0);
int lindex = mscheduler.addlogtable(&ltable); int lindex = mscheduler.addlogtable(&ltable);
ltable.setMergeData(mscheduler.getMergeData(lindex)); ltable.setMergeData(mscheduler.getMergeData(lindex));
mscheduler.startlogtable(lindex, 10 * 1024 * 1024); mscheduler.startlogtable(lindex, 10 * 1024 * 1024);
unlock(ltable.header_lock);
printf("Stage 1: Writing %d keys\n", NUM_ENTRIES); printf("Stage 1: Writing %d keys\n", NUM_ENTRIES);