memTreeComponent is no longer a template

git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@2667 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
sears 2011-06-09 00:27:57 +00:00
parent 0923cd9d96
commit ef4d360b68
6 changed files with 47 additions and 50 deletions

View file

@ -80,7 +80,7 @@ logtable<TUPLE>::~logtable()
if(tree_c0 != NULL) if(tree_c0 != NULL)
{ {
memTreeComponent<datatuple>::tearDownTree(tree_c0); memTreeComponent::tearDownTree(tree_c0);
} }
log_file->close(log_file); log_file->close(log_file);
@ -121,7 +121,7 @@ recordid logtable<TUPLE>::allocTable(int xid)
merge_mgr->set_c0_size(max_c0_size); merge_mgr->set_c0_size(max_c0_size);
merge_mgr->new_merge(0); merge_mgr->new_merge(0);
tree_c0 = new memTreeComponent<datatuple>::rbtree_t; tree_c0 = new memTreeComponent::rbtree_t;
tbl_header.merge_manager = merge_mgr->talloc(xid); tbl_header.merge_manager = merge_mgr->talloc(xid);
tbl_header.log_trunc = 0; tbl_header.log_trunc = 0;
update_persistent_header(xid); update_persistent_header(xid);
@ -134,7 +134,7 @@ void logtable<TUPLE>::openTable(int xid, recordid rid) {
Tread(xid, table_rec, &tbl_header); Tread(xid, table_rec, &tbl_header);
tree_c2 = new diskTreeComponent(xid, tbl_header.c2_root, tbl_header.c2_state, tbl_header.c2_dp_state, 0); tree_c2 = new diskTreeComponent(xid, tbl_header.c2_root, tbl_header.c2_state, tbl_header.c2_dp_state, 0);
tree_c1 = new diskTreeComponent(xid, tbl_header.c1_root, tbl_header.c1_state, tbl_header.c1_dp_state, 0); tree_c1 = new diskTreeComponent(xid, tbl_header.c1_root, tbl_header.c1_state, tbl_header.c1_dp_state, 0);
tree_c0 = new memTreeComponent<datatuple>::rbtree_t; tree_c0 = new memTreeComponent::rbtree_t;
merge_mgr = new mergeManager(this, xid, tbl_header.merge_manager); merge_mgr = new mergeManager(this, xid, tbl_header.merge_manager);
merge_mgr->set_c0_size(max_c0_size); merge_mgr->set_c0_size(max_c0_size);
@ -281,7 +281,7 @@ datatuple * logtable<TUPLE>::findTuple(int xid, const datatuple::key_t key, size
datatuple *ret_tuple=0; datatuple *ret_tuple=0;
//step 1: look in tree_c0 //step 1: look in tree_c0
memTreeComponent<datatuple>::rbtree_t::iterator rbitr = get_tree_c0()->find(search_tuple); memTreeComponent::rbtree_t::iterator rbitr = get_tree_c0()->find(search_tuple);
if(rbitr != get_tree_c0()->end()) if(rbitr != get_tree_c0()->end())
{ {
DEBUG("tree_c0 size %d\n", get_tree_c0()->size()); DEBUG("tree_c0 size %d\n", get_tree_c0()->size());
@ -462,7 +462,7 @@ datatuple * logtable<TUPLE>::findTuple_first(int xid, datatuple::key_t key, size
pthread_mutex_lock(&rb_mut); pthread_mutex_lock(&rb_mut);
memTreeComponent<datatuple>::rbtree_t::iterator rbitr = get_tree_c0()->find(search_tuple); memTreeComponent::rbtree_t::iterator rbitr = get_tree_c0()->find(search_tuple);
if(rbitr != get_tree_c0()->end()) if(rbitr != get_tree_c0()->end())
{ {
DEBUG("tree_c0 size %d\n", tree_c0->size()); DEBUG("tree_c0 size %d\n", tree_c0->size());
@ -567,7 +567,7 @@ datatuple * logtable<TUPLE>::insertTupleHelper(datatuple *tuple)
free(newkey); free(newkey);
need_free = true; need_free = true;
} //find the previous tuple with same key in the memtree if exists } //find the previous tuple with same key in the memtree if exists
memTreeComponent<datatuple>::rbtree_t::iterator rbitr = tree_c0->find(tuple); memTreeComponent::rbtree_t::iterator rbitr = tree_c0->find(tuple);
datatuple * t = 0; datatuple * t = 0;
datatuple * pre_t = 0; datatuple * pre_t = 0;
if(rbitr != tree_c0->end()) if(rbitr != tree_c0->end())

View file

@ -88,13 +88,13 @@ public:
pthread_cond_t c1_needed; pthread_cond_t c1_needed;
pthread_cond_t c1_ready; pthread_cond_t c1_ready;
inline memTreeComponent<datatuple>::rbtree_ptr_t get_tree_c0(){return tree_c0;} inline memTreeComponent::rbtree_ptr_t get_tree_c0(){return tree_c0;}
inline memTreeComponent<datatuple>::rbtree_ptr_t get_tree_c0_mergeable(){return tree_c0_mergeable;} inline memTreeComponent::rbtree_ptr_t get_tree_c0_mergeable(){return tree_c0_mergeable;}
void set_tree_c0(memTreeComponent<datatuple>::rbtree_ptr_t newtree){tree_c0 = newtree; bump_epoch(); } void set_tree_c0(memTreeComponent::rbtree_ptr_t newtree){tree_c0 = newtree; bump_epoch(); }
bool get_c0_is_merging() { return c0_is_merging; } bool get_c0_is_merging() { return c0_is_merging; }
void set_c0_is_merging(bool is_merging) { c0_is_merging = is_merging; } void set_c0_is_merging(bool is_merging) { c0_is_merging = is_merging; }
void set_tree_c0_mergeable(memTreeComponent<datatuple>::rbtree_ptr_t newtree){tree_c0_mergeable = newtree; bump_epoch(); } void set_tree_c0_mergeable(memTreeComponent::rbtree_ptr_t newtree){tree_c0_mergeable = newtree; bump_epoch(); }
lsn_t get_log_offset(); lsn_t get_log_offset();
void truncate_log(); void truncate_log();
@ -152,8 +152,8 @@ private:
diskTreeComponent *tree_c1; //small tree diskTreeComponent *tree_c1; //small tree
diskTreeComponent *tree_c1_mergeable; //small tree: ready to be merged with c2 diskTreeComponent *tree_c1_mergeable; //small tree: ready to be merged with c2
diskTreeComponent *tree_c1_prime; //small tree: ready to be merged with c2 diskTreeComponent *tree_c1_prime; //small tree: ready to be merged with c2
memTreeComponent<datatuple>::rbtree_ptr_t tree_c0; // in-mem red black tree memTreeComponent::rbtree_ptr_t tree_c0; // in-mem red black tree
memTreeComponent<datatuple>::rbtree_ptr_t tree_c0_mergeable; // in-mem red black tree: ready to be merged with c1. memTreeComponent::rbtree_ptr_t tree_c0_mergeable; // in-mem red black tree: ready to be merged with c1.
bool c0_is_merging; bool c0_is_merging;
public: public:
@ -415,8 +415,8 @@ public:
logtable * ltable; logtable * ltable;
uint64_t epoch; uint64_t epoch;
typedef mergeManyIterator< typedef mergeManyIterator<
typename memTreeComponent<TUPLE>::batchedRevalidatingIterator, typename memTreeComponent::batchedRevalidatingIterator,
typename memTreeComponent<TUPLE>::iterator> inner_merge_it_t; typename memTreeComponent::iterator> inner_merge_it_t;
typedef mergeManyIterator< typedef mergeManyIterator<
inner_merge_it_t, inner_merge_it_t,
diskTreeComponent::iterator> merge_it_t; diskTreeComponent::iterator> merge_it_t;
@ -445,8 +445,8 @@ public:
void validate() { void validate() {
typename memTreeComponent<TUPLE>::batchedRevalidatingIterator * c0_it; typename memTreeComponent::batchedRevalidatingIterator * c0_it;
typename memTreeComponent<TUPLE>::iterator *c0_mergeable_it[1]; typename memTreeComponent::iterator *c0_mergeable_it[1];
diskTreeComponent::iterator * disk_it[4]; diskTreeComponent::iterator * disk_it[4];
epoch = ltable->get_epoch(); epoch = ltable->get_epoch();
@ -459,8 +459,8 @@ public:
t = NULL; t = NULL;
} }
c0_it = new typename memTreeComponent<TUPLE>::batchedRevalidatingIterator(ltable->get_tree_c0(), 100, &ltable->rb_mut, t); c0_it = new typename memTreeComponent::batchedRevalidatingIterator(ltable->get_tree_c0(), 100, &ltable->rb_mut, t);
c0_mergeable_it[0] = new typename memTreeComponent<TUPLE>::iterator (ltable->get_tree_c0_mergeable(), t); c0_mergeable_it[0] = new typename memTreeComponent::iterator (ltable->get_tree_c0_mergeable(), t);
if(ltable->get_tree_c1_prime()) { if(ltable->get_tree_c1_prime()) {
disk_it[0] = ltable->get_tree_c1_prime()->open_iterator(t); disk_it[0] = ltable->get_tree_c1_prime()->open_iterator(t);
} else { } else {

View file

@ -1,16 +1,15 @@
#include "memTreeComponent.h" #include "memTreeComponent.h"
#include "datatuple.h" #include "datatuple.h"
template<class TUPLE> void memTreeComponent::tearDownTree(rbtree_ptr_t tree) {
void memTreeComponent<TUPLE>::tearDownTree(rbtree_ptr_t tree) { datatuple * t = 0;
TUPLE * t = 0;
typename rbtree_t::iterator old; typename rbtree_t::iterator old;
for(typename rbtree_t::iterator delitr = tree->begin(); for(typename rbtree_t::iterator delitr = tree->begin();
delitr != tree->end(); delitr != tree->end();
delitr++) { delitr++) {
if(t) { if(t) {
tree->erase(old); tree->erase(old);
TUPLE::freetuple(t); datatuple::freetuple(t);
t = 0; t = 0;
} }
t = *delitr; t = *delitr;
@ -18,8 +17,7 @@ void memTreeComponent<TUPLE>::tearDownTree(rbtree_ptr_t tree) {
} }
if(t) { if(t) {
tree->erase(old); tree->erase(old);
TUPLE::freetuple(t); datatuple::freetuple(t);
} }
delete tree; delete tree;
} }
template class memTreeComponent<datatuple>;

View file

@ -2,14 +2,13 @@
#define _MEMTREECOMPONENT_H_ #define _MEMTREECOMPONENT_H_
#include <set> #include <set>
#include <assert.h> #include <assert.h>
#include <mergeManager.h> // XXX for double_to_ts.
#include <mergeStats.h> #include <mergeStats.h>
#include <stasis/util/stlslab.h> #include <stasis/util/stlslab.h>
template<class TUPLE>
class memTreeComponent { class memTreeComponent {
public: public:
// typedef std::set<TUPLE*, TUPLE, stlslab<TUPLE*> > rbtree_t; // typedef std::set<datatuple*, datatuple, stlslab<datatuple*> > rbtree_t;
typedef std::set<TUPLE*, TUPLE > rbtree_t; typedef std::set<datatuple*, datatuple> rbtree_t;
typedef rbtree_t* rbtree_ptr_t; typedef rbtree_t* rbtree_ptr_t;
static void tearDownTree(rbtree_ptr_t t); static void tearDownTree(rbtree_ptr_t t);
@ -30,7 +29,7 @@ public:
init_iterators(s, NULL, NULL); init_iterators(s, NULL, NULL);
} }
iterator( rbtree_t *s, TUPLE *&key ) iterator( rbtree_t *s, datatuple *&key )
: first_(true), done_(s == NULL) { : first_(true), done_(s == NULL) {
init_iterators(s, key, NULL); init_iterators(s, key, NULL);
} }
@ -40,7 +39,7 @@ public:
delete itend_; delete itend_;
} }
TUPLE* next_callerFrees() { datatuple* next_callerFrees() {
if(done_) { return NULL; } if(done_) { return NULL; }
if(first_) { first_ = 0;} else { (*it_)++; } if(first_) { first_ = 0;} else { (*it_)++; }
if(*it_==*itend_) { done_= true; return NULL; } if(*it_==*itend_) { done_= true; return NULL; }
@ -50,7 +49,7 @@ public:
private: private:
void init_iterators(rbtree_t * s, TUPLE * key1, TUPLE * key2) { void init_iterators(rbtree_t * s, datatuple * key1, datatuple * key2) {
if(s) { if(s) {
it_ = key1 ? new MTITER(s->lower_bound(key1)) : new MTITER(s->begin()); it_ = key1 ? new MTITER(s->lower_bound(key1)) : new MTITER(s->begin());
itend_ = key2 ? new MTITER(s->upper_bound(key2)) : new MTITER(s->end()); itend_ = key2 ? new MTITER(s->upper_bound(key2)) : new MTITER(s->end());
@ -97,7 +96,7 @@ public:
} }
if(mut_) pthread_mutex_unlock(mut_); if(mut_) pthread_mutex_unlock(mut_);
} }
revalidatingIterator( rbtree_t *s, pthread_mutex_t * rb_mut, TUPLE *&key ) : s_(s), mut_(rb_mut) { revalidatingIterator( rbtree_t *s, pthread_mutex_t * rb_mut, datatuple *&key ) : s_(s), mut_(rb_mut) {
if(mut_) pthread_mutex_lock(mut_); if(mut_) pthread_mutex_lock(mut_);
if(key) { if(key) {
if(s_->find(key) != s_->end()) { if(s_->find(key) != s_->end()) {
@ -119,12 +118,12 @@ public:
} }
~revalidatingIterator() { ~revalidatingIterator() {
if(next_ret_) TUPLE::freetuple(next_ret_); if(next_ret_) datatuple::freetuple(next_ret_);
} }
TUPLE* next_callerFrees() { datatuple* next_callerFrees() {
if(mut_) pthread_mutex_lock(mut_); if(mut_) pthread_mutex_lock(mut_);
TUPLE * ret = next_ret_; datatuple * ret = next_ret_;
if(next_ret_) { if(next_ret_) {
if(s_->upper_bound(next_ret_) == s_->end()) { if(s_->upper_bound(next_ret_) == s_->end()) {
next_ret_ = 0; next_ret_ = 0;
@ -142,7 +141,7 @@ public:
int operator-(revalidatingIterator & t) { abort(); } int operator-(revalidatingIterator & t) { abort(); }
rbtree_t *s_; rbtree_t *s_;
TUPLE * next_ret_; datatuple * next_ret_;
pthread_mutex_t * mut_; pthread_mutex_t * mut_;
}; };
@ -157,7 +156,7 @@ public:
typedef typename rbtree_t::const_iterator MTITER; typedef typename rbtree_t::const_iterator MTITER;
void populate_next_ret_impl(std::_Rb_tree_const_iterator<TUPLE*>/*MTITER*/ it) { void populate_next_ret_impl(std::_Rb_tree_const_iterator<datatuple*>/*MTITER*/ it) {
num_batched_ = 0; num_batched_ = 0;
cur_off_ = 0; cur_off_ = 0;
while(it != s_->end() && num_batched_ < batch_size_) { while(it != s_->end() && num_batched_ < batch_size_) {
@ -166,7 +165,7 @@ public:
it++; it++;
} }
} }
void populate_next_ret(TUPLE *key=NULL, bool include_key=false) { void populate_next_ret(datatuple *key=NULL, bool include_key=false) {
if(cur_off_ == num_batched_) { if(cur_off_ == num_batched_) {
if(mut_) pthread_mutex_lock(mut_); if(mut_) pthread_mutex_lock(mut_);
if(mgr_) { if(mgr_) {
@ -189,24 +188,24 @@ public:
public: public:
batchedRevalidatingIterator( rbtree_t *s, mergeManager * mgr, int64_t target_size, bool * flushing, int batch_size, pthread_mutex_t * rb_mut ) : s_(s), mgr_(mgr), target_size_(target_size), flushing_(flushing), batch_size_(batch_size), num_batched_(batch_size), cur_off_(batch_size), mut_(rb_mut) { batchedRevalidatingIterator( rbtree_t *s, mergeManager * mgr, int64_t target_size, bool * flushing, int batch_size, pthread_mutex_t * rb_mut ) : s_(s), mgr_(mgr), target_size_(target_size), flushing_(flushing), batch_size_(batch_size), num_batched_(batch_size), cur_off_(batch_size), mut_(rb_mut) {
next_ret_ = (TUPLE**)malloc(sizeof(next_ret_[0]) * batch_size_); next_ret_ = (datatuple**)malloc(sizeof(next_ret_[0]) * batch_size_);
populate_next_ret(); populate_next_ret();
} }
batchedRevalidatingIterator( rbtree_t *s, int batch_size, pthread_mutex_t * rb_mut, TUPLE *&key ) : s_(s), mgr_(NULL), target_size_(0), flushing_(0), batch_size_(batch_size), num_batched_(batch_size), cur_off_(batch_size), mut_(rb_mut) { batchedRevalidatingIterator( rbtree_t *s, int batch_size, pthread_mutex_t * rb_mut, datatuple *&key ) : s_(s), mgr_(NULL), target_size_(0), flushing_(0), batch_size_(batch_size), num_batched_(batch_size), cur_off_(batch_size), mut_(rb_mut) {
next_ret_ = (TUPLE**)malloc(sizeof(next_ret_[0]) * batch_size_); next_ret_ = (datatuple**)malloc(sizeof(next_ret_[0]) * batch_size_);
populate_next_ret(key, true); populate_next_ret(key, true);
} }
~batchedRevalidatingIterator() { ~batchedRevalidatingIterator() {
for(int i = cur_off_; i < num_batched_; i++) { for(int i = cur_off_; i < num_batched_; i++) {
TUPLE::freetuple(next_ret_[i]); datatuple::freetuple(next_ret_[i]);
} }
free(next_ret_); free(next_ret_);
} }
TUPLE* next_callerFrees() { datatuple* next_callerFrees() {
if(cur_off_ == num_batched_) { return NULL; } // the last thing we did is call populate_next_ret_(), which only leaves us in this state at the end of the iterator. if(cur_off_ == num_batched_) { return NULL; } // the last thing we did is call populate_next_ret_(), which only leaves us in this state at the end of the iterator.
TUPLE * ret = next_ret_[cur_off_]; datatuple * ret = next_ret_[cur_off_];
cur_off_++; cur_off_++;
populate_next_ret(ret); populate_next_ret(ret);
return ret; return ret;
@ -218,7 +217,7 @@ public:
int operator-(batchedRevalidatingIterator & t) { abort(); } int operator-(batchedRevalidatingIterator & t) { abort(); }
rbtree_t *s_; rbtree_t *s_;
TUPLE ** next_ret_; datatuple ** next_ret_;
mergeManager * mgr_; mergeManager * mgr_;
int64_t target_size_; // the low-water size for the tree. If cur_size_ is not null, and *cur_size_ < C * target_size_, we sleep. int64_t target_size_; // the low-water size for the tree. If cur_size_ is not null, and *cur_size_ < C * target_size_, we sleep.
bool* flushing_; // never block if *flushing is true. bool* flushing_; // never block if *flushing is true.

View file

@ -115,8 +115,8 @@ void * merge_scheduler::memMergeThread() {
rwlc_unlock(ltable_->header_mut); rwlc_unlock(ltable_->header_mut);
// needs to be past the rwlc_unlock... // needs to be past the rwlc_unlock...
memTreeComponent<datatuple>::batchedRevalidatingIterator *itrB = memTreeComponent::batchedRevalidatingIterator *itrB =
new memTreeComponent<datatuple>::batchedRevalidatingIterator(ltable_->get_tree_c0(), ltable_->merge_mgr, ltable_->max_c0_size, &ltable_->c0_flushing, 100, &ltable_->rb_mut); new memTreeComponent::batchedRevalidatingIterator(ltable_->get_tree_c0(), ltable_->merge_mgr, ltable_->max_c0_size, &ltable_->c0_flushing, 100, &ltable_->rb_mut);
//: do the merge //: do the merge
DEBUG("mmt:\tMerging:\n"); DEBUG("mmt:\tMerging:\n");
@ -345,7 +345,7 @@ static int garbage_collect(logtable<datatuple> * ltable_, datatuple ** garbage,
for(int i = 0; i < next_garbage; i++) { for(int i = 0; i < next_garbage; i++) {
datatuple * t2tmp = NULL; datatuple * t2tmp = NULL;
{ {
memTreeComponent<datatuple>::rbtree_t::iterator rbitr = ltable_->get_tree_c0()->find(garbage[i]); memTreeComponent::rbtree_t::iterator rbitr = ltable_->get_tree_c0()->find(garbage[i]);
if(rbitr != ltable_->get_tree_c0()->end()) { if(rbitr != ltable_->get_tree_c0()->end()) {
t2tmp = *rbitr; t2tmp = *rbitr;
if((t2tmp->datalen() == garbage[i]->datalen()) && if((t2tmp->datalen() == garbage[i]->datalen()) &&

View file

@ -40,7 +40,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
if(data_arr.size() > NUM_ENTRIES) if(data_arr.size() > NUM_ENTRIES)
data_arr.erase(data_arr.begin()+NUM_ENTRIES, data_arr.end()); data_arr.erase(data_arr.begin()+NUM_ENTRIES, data_arr.end());
memTreeComponent<datatuple>::rbtree_t rbtree; memTreeComponent::rbtree_t rbtree;
int64_t datasize = 0; int64_t datasize = 0;
std::vector<pageid_t> dsp; std::vector<pageid_t> dsp;
@ -71,7 +71,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
//step 1: look in tree_c0 //step 1: look in tree_c0
memTreeComponent<datatuple>::rbtree_t::iterator rbitr = rbtree.find(search_tuple); memTreeComponent::rbtree_t::iterator rbitr = rbtree.find(search_tuple);
if(rbitr != rbtree.end()) if(rbitr != rbtree.end())
{ {
datatuple *tuple = *rbitr; datatuple *tuple = *rbitr;