Refactoring of logtable. Cleaned a bunch of includes, and logtable is now a template. This is in preparation for the new merge policy

git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@698 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
sears 2010-03-17 21:51:26 +00:00
parent 6cd1ccb5ff
commit e1c937a602
21 changed files with 433 additions and 420 deletions

View file

@ -1,5 +1,7 @@
#include "logstore.h"
#include "datapage.h"
#include "regionAllocator.h"
#include <stasis/page.h>
static const int DATA_PAGE = USER_DEFINED_PAGE(1);

View file

@ -5,7 +5,8 @@
#include <stasis/page.h>
#include <stasis/constants.h>
#include "regionAllocator.h"
struct RegionAllocator;
//#define CHECK_FOR_SCRIBBLING

View file

@ -12,6 +12,7 @@
#include "merger.h"
#include "diskTreeComponent.h"
#include "regionAllocator.h"
#include <stasis/transactional.h>
#include <stasis/page.h>
@ -46,6 +47,27 @@ void diskTreeComponent::internalNodes::init_stasis() {
}
recordid diskTreeComponent::get_root_rid() { return ltree->get_root_rec(); }
recordid diskTreeComponent::get_datapage_allocator_rid() { return ltree->get_datapage_alloc()->header_rid(); }
recordid diskTreeComponent::get_internal_node_allocator_rid() { return ltree->get_internal_node_alloc()->header_rid(); }
void diskTreeComponent::force(int xid) {
ltree->get_datapage_alloc()->force_regions(xid);
ltree->get_internal_node_alloc()->force_regions(xid);
}
void diskTreeComponent::dealloc(int xid) {
ltree->get_datapage_alloc()->dealloc_regions(xid);
ltree->get_internal_node_alloc()->dealloc_regions(xid);
}
void diskTreeComponent::list_regions(int xid, pageid_t *internal_node_region_length, pageid_t *internal_node_region_count, pageid_t **internal_node_regions,
pageid_t *datapage_region_length, pageid_t *datapage_region_count, pageid_t **datapage_regions) {
*internal_node_regions = ltree->get_internal_node_alloc()->list_regions(xid, internal_node_region_length, internal_node_region_count);
*datapage_regions = ltree->get_datapage_alloc() ->list_regions(xid, datapage_region_length, datapage_region_count);
}
void diskTreeComponent::writes_done() {
if(dp) {
dp->writes_done();
@ -335,6 +357,20 @@ recordid diskTreeComponent::internalNodes::appendPage(int xid,
return ret;
}
diskTreeComponent::internalNodes::internalNodes(int xid, pageid_t internal_region_size, pageid_t datapage_region_size, pageid_t datapage_size)
: lastLeaf(-1),
internal_node_alloc(new RegionAllocator(xid, internal_region_size)),
datapage_alloc(new RegionAllocator(xid, datapage_region_size))
{ create(xid); }
diskTreeComponent::internalNodes::internalNodes(int xid, recordid root, recordid internal_node_state, recordid datapage_state)
: lastLeaf(-1),
root_rec(root),
internal_node_alloc(new RegionAllocator(xid, internal_node_state)),
datapage_alloc(new RegionAllocator(xid, datapage_state))
{ }
/* adding pages:
1) Try to append value to lsmTreeState->lastLeaf
@ -842,7 +878,7 @@ void diskTreeComponent::internalNodes::iterator::close() {
// tree iterator implementation
/////////////////////////////////////////////////////////////////////
void diskTreeComponent::diskTreeIterator::init_iterators(datatuple * key1, datatuple * key2) {
void diskTreeComponent::iterator::init_iterators(datatuple * key1, datatuple * key2) {
assert(!key2); // unimplemented
if(tree_.size == INVALID_SIZE) {
lsmIterator_ = NULL;
@ -855,14 +891,14 @@ void diskTreeComponent::diskTreeIterator::init_iterators(datatuple * key1, datat
}
}
diskTreeComponent::diskTreeIterator::diskTreeIterator(diskTreeComponent::internalNodes *tree) :
diskTreeComponent::iterator::iterator(diskTreeComponent::internalNodes *tree) :
tree_(tree ? tree->get_root_rec() : NULLRID)
{
init_iterators(NULL, NULL);
init_helper(NULL);
}
diskTreeComponent::diskTreeIterator::diskTreeIterator(diskTreeComponent::internalNodes *tree, datatuple* key) :
diskTreeComponent::iterator::iterator(diskTreeComponent::internalNodes *tree, datatuple* key) :
tree_(tree ? tree->get_root_rec() : NULLRID)
{
init_iterators(key,NULL);
@ -870,7 +906,7 @@ diskTreeComponent::diskTreeIterator::diskTreeIterator(diskTreeComponent::interna
}
diskTreeComponent::diskTreeIterator::~diskTreeIterator()
diskTreeComponent::iterator::~iterator()
{
if(lsmIterator_) {
lsmIterator_->close();
@ -886,7 +922,7 @@ diskTreeComponent::diskTreeIterator::~diskTreeIterator()
}
void diskTreeComponent::diskTreeIterator::init_helper(datatuple* key1)
void diskTreeComponent::iterator::init_helper(datatuple* key1)
{
if(!lsmIterator_)
{
@ -918,7 +954,7 @@ void diskTreeComponent::diskTreeIterator::init_helper(datatuple* key1)
}
}
datatuple * diskTreeComponent::diskTreeIterator::next_callerFrees()
datatuple * diskTreeComponent::iterator::next_callerFrees()
{
if(!this->lsmIterator_) { return NULL; }

View file

@ -8,25 +8,13 @@
#ifndef DISKTREECOMPONENT_H_
#define DISKTREECOMPONENT_H_
#include <stasis/transactional.h>
#include <stasis/operations.h>
#include <stasis/bufferManager.h>
#include <stasis/allocationPolicy.h>
#include <stasis/blobManager.h>
#include <stasis/page.h>
#include <stasis/truncation.h>
#include "merger.h"
#include "regionAllocator.h"
#include "datapage.h"
#include "tuplemerger.h"
#include "datatuple.h"
class diskTreeComponent {
public:
class internalNodes;
class diskTreeIterator;
class iterator;
diskTreeComponent(int xid, pageid_t internal_region_size, pageid_t datapage_region_size, pageid_t datapage_size) :
ltree(new diskTreeComponent::internalNodes(xid, internal_region_size, datapage_region_size, datapage_size)),
@ -43,35 +31,26 @@ class diskTreeComponent {
delete ltree;
}
recordid get_root_rid() { return ltree->get_root_rec(); }
recordid get_datapage_allocator_rid() { return ltree->get_datapage_alloc()->header_rid(); }
recordid get_internal_node_allocator_rid() { return ltree->get_internal_node_alloc()->header_rid(); }
recordid get_root_rid();
recordid get_datapage_allocator_rid();
recordid get_internal_node_allocator_rid();
internalNodes * get_internal_nodes() { return ltree; }
datatuple* findTuple(int xid, datatuple::key_t key, size_t keySize);
int insertTuple(int xid, /*DataPage<datatuple> *dp,*/ datatuple *t, merge_stats_t *stats);
void writes_done();
diskTreeIterator * iterator() {
return new diskTreeIterator(ltree);
iterator * open_iterator() {
return new iterator(ltree);
}
diskTreeIterator * iterator(datatuple * key) {
return new diskTreeIterator(ltree, key);
iterator * open_iterator(datatuple * key) {
return new iterator(ltree, key);
}
void force(int xid) {
ltree->get_datapage_alloc()->force_regions(xid);
ltree->get_internal_node_alloc()->force_regions(xid);
}
void dealloc(int xid) {
ltree->get_datapage_alloc()->dealloc_regions(xid);
ltree->get_internal_node_alloc()->dealloc_regions(xid);
}
void force(int xid);
void dealloc(int xid);
void list_regions(int xid, pageid_t *internal_node_region_length, pageid_t *internal_node_region_count, pageid_t **internal_node_regions,
pageid_t *datapage_region_length, pageid_t *datapage_region_count, pageid_t **datapage_regions) {
*internal_node_regions = ltree->get_internal_node_alloc()->list_regions(xid, internal_node_region_length, internal_node_region_count);
*datapage_regions = ltree->get_datapage_alloc() ->list_regions(xid, datapage_region_length, datapage_region_count);
}
pageid_t *datapage_region_length, pageid_t *datapage_region_count, pageid_t **datapage_regions);
void print_tree(int xid) {
ltree->print_tree(xid);
@ -93,18 +72,8 @@ class diskTreeComponent {
static void init_stasis();
static void deinit_stasis();
internalNodes(int xid, pageid_t internal_region_size, pageid_t datapage_region_size, pageid_t datapage_size)
: lastLeaf(-1),
internal_node_alloc(new RegionAllocator(xid, internal_region_size)),
datapage_alloc(new RegionAllocator(xid, datapage_region_size))
{ create(xid); }
internalNodes(int xid, recordid root, recordid internal_node_state, recordid datapage_state)
: lastLeaf(-1),
root_rec(root),
internal_node_alloc(new RegionAllocator(xid, internal_node_state)),
datapage_alloc(new RegionAllocator(xid, datapage_state))
{ }
internalNodes(int xid, pageid_t internal_region_size, pageid_t datapage_region_size, pageid_t datapage_size);
internalNodes(int xid, recordid root, recordid internal_node_state, recordid datapage_state);
void print_tree(int xid);
@ -199,15 +168,15 @@ class diskTreeComponent {
};
};
class diskTreeIterator
class iterator
{
public:
explicit diskTreeIterator(diskTreeComponent::internalNodes *tree);
explicit iterator(diskTreeComponent::internalNodes *tree);
explicit diskTreeIterator(diskTreeComponent::internalNodes *tree,datatuple *key);
explicit iterator(diskTreeComponent::internalNodes *tree,datatuple *key);
~diskTreeIterator();
~iterator();
datatuple * next_callerFrees();
@ -215,9 +184,9 @@ class diskTreeComponent {
void init_iterators(datatuple * key1, datatuple * key2);
inline void init_helper(datatuple * key1);
explicit diskTreeIterator() { abort(); }
void operator=(diskTreeIterator & t) { abort(); }
int operator-(diskTreeIterator & t) { abort(); }
explicit iterator() { abort(); }
void operator=(iterator & t) { abort(); }
int operator-(iterator & t) { abort(); }
private:
recordid tree_; //root of the tree

View file

@ -1,3 +1,5 @@
#include <sstream>
#include "logserver.h"
#include "datatuple.h"
#include "merger.h"
@ -27,7 +29,7 @@
void *serverLoop(void *args);
void logserver::startserver(logtable *ltable)
void logserver::startserver(logtable<datatuple> *ltable)
{
sys_alive = true;
this->ltable = ltable;
@ -479,7 +481,7 @@ int op_scan(pthread_data *data, datatuple * tuple, datatuple * tuple2, size_t li
int err = writeoptosocket(*(data->workitem), LOGSTORE_RESPONSE_SENDING_TUPLES);
if(!err) {
logtableIterator<datatuple> * itr = new logtableIterator<datatuple>(data->ltable, tuple);
logtable<datatuple>::iterator * itr = new logtable<datatuple>::iterator(data->ltable, tuple);
datatuple * t;
while(!err && (t = itr->getnext())) {
if(tuple2) { // are we at the end of range?
@ -767,7 +769,7 @@ int op_dbg_blockmap(pthread_data* data) {
}
int op_dbg_drop_database(pthread_data * data) {
logtableIterator<datatuple> * itr = new logtableIterator<datatuple>(data->ltable);
logtable<datatuple>::iterator * itr = new logtable<datatuple>::iterator(data->ltable);
datatuple * del;
fprintf(stderr, "DROPPING DATABASE...\n");
long long n = 0;

View file

@ -9,6 +9,8 @@
#include <stasis/transactional.h>
#include <pthread.h>
#include "logstore.h"
#undef begin
#undef try
#undef end
@ -21,8 +23,6 @@
#include <map>
#endif
class logtable;
struct pthread_item;
struct pthread_data {
@ -39,7 +39,7 @@ struct pthread_data {
int *workitem; //id of the socket to work
logtable *ltable;
logtable<datatuple> *ltable;
bool *sys_alive;
#ifdef STATS_ENABLED
@ -97,7 +97,7 @@ public:
delete qlock;
}
void startserver(logtable *ltable);
void startserver(logtable<datatuple> *ltable);
void stopserver();
@ -126,7 +126,7 @@ private:
int * self_pipe; // write a byte to self_pipe[1] to wake up select().
std::vector<pthread_item *> th_list; // list of threads
logtable *ltable;
logtable<datatuple> *ltable;
#ifdef STATS_ENABLED
int num_reqs;

View file

@ -1,17 +1,9 @@
#include <string.h>
#include <assert.h>
#include <math.h>
#include <ctype.h>
#include "merger.h"
#include "logstore.h"
#include "datapage.h"
#include "merger.h"
#include <stasis/transactional.h>
#include <stasis/page.h>
#include <stasis/page/slotted.h>
#include <stasis/bufferManager.h>
#include <stasis/bufferManager/bufferHash.h>
#undef try
#undef end
static inline double tv_to_double(struct timeval tv)
{
@ -23,7 +15,8 @@ static inline double tv_to_double(struct timeval tv)
// LOG TABLE IMPLEMENTATION
/////////////////////////////////////////////////////////////////
logtable::logtable(pageid_t internal_region_size, pageid_t datapage_region_size, pageid_t datapage_size)
template<class TUPLE>
logtable<TUPLE>::logtable(pageid_t internal_region_size, pageid_t datapage_region_size, pageid_t datapage_size)
{
tree_c0 = NULL;
@ -48,7 +41,8 @@ logtable::logtable(pageid_t internal_region_size, pageid_t datapage_region_size,
this->datapage_size = datapage_size;
}
logtable::~logtable()
template<class TUPLE>
logtable<TUPLE>::~logtable()
{
if(tree_c1 != NULL)
delete tree_c1;
@ -64,7 +58,8 @@ logtable::~logtable()
delete tmerger;
}
recordid logtable::allocTable(int xid)
template<class TUPLE>
recordid logtable<TUPLE>::allocTable(int xid)
{
table_rec = Talloc(xid, sizeof(tbl_header));
@ -79,13 +74,15 @@ recordid logtable::allocTable(int xid)
return table_rec;
}
void logtable::openTable(int xid, recordid rid) {
template<class TUPLE>
void logtable<TUPLE>::openTable(int xid, recordid rid) {
table_rec = rid;
Tread(xid, table_rec, &tbl_header);
tree_c2 = new diskTreeComponent(xid, tbl_header.c2_root, tbl_header.c2_state, tbl_header.c2_dp_state);
tree_c1 = new diskTreeComponent(xid, tbl_header.c1_root, tbl_header.c1_state, tbl_header.c1_dp_state);
}
void logtable::update_persistent_header(int xid) {
template<class TUPLE>
void logtable<TUPLE>::update_persistent_header(int xid) {
tbl_header.c2_root = tree_c2->get_root_rid();
tbl_header.c2_dp_state = tree_c2->get_datapage_allocator_rid();
@ -97,7 +94,18 @@ void logtable::update_persistent_header(int xid) {
Tset(xid, table_rec, &tbl_header);
}
void logtable::flushTable()
template<class TUPLE>
void logtable<TUPLE>::setMergeData(logtable_mergedata * mdata){
this->mergedata = mdata;
mdata->internal_region_size = internal_region_size;
mdata->datapage_region_size = datapage_region_size;
mdata->datapage_size = datapage_size;
bump_epoch();
}
template<class TUPLE>
void logtable<TUPLE>::flushTable()
{
struct timeval start_tv, stop_tv;
double start, stop;
@ -173,7 +181,8 @@ void logtable::flushTable()
}
datatuple * logtable::findTuple(int xid, const datatuple::key_t key, size_t keySize)
template<class TUPLE>
datatuple * logtable<TUPLE>::findTuple(int xid, const datatuple::key_t key, size_t keySize)
{
//prepare a search tuple
datatuple *search_tuple = datatuple::create(key, keySize);
@ -319,7 +328,8 @@ datatuple * logtable::findTuple(int xid, const datatuple::key_t key, size_t keyS
* returns the first record found with the matching key
* (not to be used together with diffs)
**/
datatuple * logtable::findTuple_first(int xid, datatuple::key_t key, size_t keySize)
template<class TUPLE>
datatuple * logtable<TUPLE>::findTuple_first(int xid, datatuple::key_t key, size_t keySize)
{
//prepare a search tuple
datatuple * search_tuple = datatuple::create(key, keySize);
@ -387,7 +397,8 @@ datatuple * logtable::findTuple_first(int xid, datatuple::key_t key, size_t keyS
}
void logtable::insertTuple(datatuple *tuple)
template<class TUPLE>
void logtable<TUPLE>::insertTuple(datatuple *tuple)
{
//lock the red-black tree
readlock(header_lock,0);
@ -440,10 +451,12 @@ void logtable::insertTuple(datatuple *tuple)
DEBUG("tree size %d tuples %lld bytes.\n", tsize, tree_bytes);
}
void logtable::registerIterator(logtableIterator<datatuple> * it) {
template<class TUPLE>
void logtable<TUPLE>::registerIterator(iterator * it) {
its.push_back(it);
}
void logtable::forgetIterator(logtableIterator<datatuple> * it) {
template<class TUPLE>
void logtable<TUPLE>::forgetIterator(iterator * it) {
for(unsigned int i = 0; i < its.size(); i++) {
if(its[i] == it) {
its.erase(its.begin()+i);
@ -451,12 +464,12 @@ void logtable::forgetIterator(logtableIterator<datatuple> * it) {
}
}
}
void logtable::bump_epoch() {
template<class TUPLE>
void logtable<TUPLE>::bump_epoch() {
assert(!trywritelock(header_lock,0));
epoch++;
for(unsigned int i = 0; i < its.size(); i++) {
its[i]->invalidate();
}
}
template class logtableIterator<datatuple>;
template class logtable<datatuple>;

View file

@ -1,49 +1,41 @@
#ifndef _LOGSTORE_H_
#define _LOGSTORE_H_
#include <stasis/common.h>
#undef try
#undef end
#undef begin
#include <string>
//#include <set>
#include <sstream>
#include <iostream>
#include <queue>
#include <vector>
#include "logserver.h"
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <pthread.h>
#include <stasis/transactional.h>
#include <stasis/operations.h>
#include <stasis/bufferManager.h>
#include <stasis/allocationPolicy.h>
#include <stasis/blobManager.h>
#include <stasis/page.h>
#include <stasis/truncation.h>
typedef struct merge_stats_t {
int merge_level; // 1 => C0->C1, 2 => C1->C2
pageid_t merge_count; // This is the merge_count'th merge
struct timeval sleep; // When did we go to sleep waiting for input?
struct timeval start; // When did we wake up and start merging? (at steady state with max throughput, this should be equal to sleep)
struct timeval done; // When did we finish merging?
pageid_t bytes_out; // How many bytes did we write (including internal tree nodes)?
pageid_t num_tuples_out; // How many tuples did we write?
pageid_t num_datapages_out; // How many datapages?
pageid_t bytes_in_small; // How many bytes from the small input tree (for C0, we ignore tree overheads)?
pageid_t num_tuples_in_small; // Tuples from the small input?
pageid_t bytes_in_large; // Bytes from the large input?
pageid_t num_tuples_in_large; // Tuples from large input?
} merge_stats_t;
#include "diskTreeComponent.h"
#include "memTreeComponent.h"
#include "datapage.h"
#include "tuplemerger.h"
#include "datatuple.h"
#include "merger.h"
#include "tuplemerger.h"
struct logtable_mergedata;
template<class TUPLE>
class logtableIterator ;
class logtable
{
class logtable {
public:
class iterator;
logtable(pageid_t internal_region_size = 1000, pageid_t datapage_region_size = 10000, pageid_t datapage_size = 40); // scans 160KB / 2 per lookup on average. at 100MB/s, this is 0.7 ms. XXX pick datapage_size in principled way.
~logtable();
@ -59,15 +51,12 @@ public:
void openTable(int xid, recordid rid);
void flushTable();
// DataPage<datatuple>* insertTuple(int xid, datatuple *tuple,diskTreeComponent::internalNodes *ltree);
// datatuple * findTuple(int xid, const datatuple::key_t key, size_t keySize, diskTreeComponent::internalNodes *ltree);
inline recordid & get_table_rec(){return table_rec;} // TODO This is called by merger.cpp for no good reason. (remove the calls)
inline uint64_t get_epoch() { return epoch; }
void registerIterator(logtableIterator<datatuple> * it);
void forgetIterator(logtableIterator<datatuple> * it);
void registerIterator(iterator * it);
void forgetIterator(iterator * it);
void bump_epoch() ;
inline diskTreeComponent * get_tree_c2(){return tree_c2;}
@ -85,14 +74,7 @@ public:
void update_persistent_header(int xid);
void setMergeData(logtable_mergedata * mdata) {
this->mergedata = mdata;
mdata->internal_region_size = internal_region_size;
mdata->datapage_region_size = datapage_region_size;
mdata->datapage_size = datapage_size;
bump_epoch(); }
void setMergeData(logtable_mergedata * mdata);
logtable_mergedata* getMergeData(){return mergedata;}
inline tuplemerger * gettuplemerger(){return tmerger;}
@ -139,12 +121,12 @@ private:
tuplemerger *tmerger;
std::vector<logtableIterator<datatuple> *> its;
std::vector<iterator *> its;
bool still_running_;
};
public:
template<class ITRA, class ITRN, class TUPLE>
template<class ITRA, class ITRN>
class mergeManyIterator {
public:
explicit mergeManyIterator(ITRA* a, ITRN** iters, int num_iters, TUPLE*(*merge)(const TUPLE*,const TUPLE*), int (*cmp)(const TUPLE*,const TUPLE*)) :
@ -243,12 +225,13 @@ private:
// temporary variables initiaized once for effiency
int * dups;
};
template<class TUPLE>
class logtableIterator {
class iterator {
public:
explicit logtableIterator(logtable* ltable)
explicit iterator(logtable* ltable)
: ltable(ltable),
epoch(ltable->get_epoch()),
merge_it_(NULL),
@ -261,7 +244,7 @@ public:
unlock(ltable->header_lock);
}
explicit logtableIterator(logtable* ltable,TUPLE *key)
explicit iterator(logtable* ltable,TUPLE *key)
: ltable(ltable),
epoch(ltable->get_epoch()),
merge_it_(NULL),
@ -275,7 +258,7 @@ public:
unlock(ltable->header_lock);
}
~logtableIterator() {
~iterator() {
ltable->forgetIterator(this);
invalidate();
if(last_returned) TUPLE::freetuple(last_returned);
@ -319,9 +302,9 @@ public:
private:
inline void init_helper();
explicit logtableIterator() { abort(); }
void operator=(logtableIterator<TUPLE> & t) { abort(); }
int operator-(logtableIterator<TUPLE> & t) { abort(); }
explicit iterator() { abort(); }
void operator=(iterator & t) { abort(); }
int operator-(iterator & t) { abort(); }
private:
static const int C1 = 0;
@ -331,12 +314,10 @@ private:
uint64_t epoch;
typedef mergeManyIterator<
typename memTreeComponent<TUPLE>::revalidatingIterator,
typename memTreeComponent<TUPLE>::iterator,
TUPLE> inner_merge_it_t;
typename memTreeComponent<TUPLE>::iterator> inner_merge_it_t;
typedef mergeManyIterator<
inner_merge_it_t,
diskTreeComponent::diskTreeIterator,
TUPLE> merge_it_t;
diskTreeComponent::iterator> merge_it_t;
merge_it_t* merge_it_;
@ -355,26 +336,26 @@ private:
void validate() {
typename memTreeComponent<TUPLE>::revalidatingIterator * c0_it;
typename memTreeComponent<TUPLE>::iterator *c0_mergeable_it[1];
diskTreeComponent::diskTreeIterator * disk_it[3];
diskTreeComponent::iterator * disk_it[3];
epoch = ltable->get_epoch();
if(last_returned) {
c0_it = new typename memTreeComponent<TUPLE>::revalidatingIterator(ltable->get_tree_c0(), ltable->getMergeData()->rbtree_mut, last_returned);
c0_mergeable_it[0] = new typename memTreeComponent<TUPLE>::iterator (ltable->get_tree_c0_mergeable(), last_returned);
disk_it[0] = ltable->get_tree_c1()->iterator(last_returned);
disk_it[1] = ltable->get_tree_c1_mergeable()->iterator(last_returned);
disk_it[2] = ltable->get_tree_c2()->iterator(last_returned);
disk_it[0] = ltable->get_tree_c1()->open_iterator(last_returned);
disk_it[1] = ltable->get_tree_c1_mergeable()->open_iterator(last_returned);
disk_it[2] = ltable->get_tree_c2()->open_iterator(last_returned);
} else if(key) {
c0_it = new typename memTreeComponent<TUPLE>::revalidatingIterator(ltable->get_tree_c0(), ltable->getMergeData()->rbtree_mut, key);
c0_mergeable_it[0] = new typename memTreeComponent<TUPLE>::iterator (ltable->get_tree_c0_mergeable(), key);
disk_it[0] = ltable->get_tree_c1()->iterator(key);
disk_it[1] = ltable->get_tree_c1_mergeable()->iterator(key);
disk_it[2] = ltable->get_tree_c2()->iterator(key);
disk_it[0] = ltable->get_tree_c1()->open_iterator(key);
disk_it[1] = ltable->get_tree_c1_mergeable()->open_iterator(key);
disk_it[2] = ltable->get_tree_c2()->open_iterator(key);
} else {
c0_it = new typename memTreeComponent<TUPLE>::revalidatingIterator(ltable->get_tree_c0(), ltable->getMergeData()->rbtree_mut );
c0_mergeable_it[0] = new typename memTreeComponent<TUPLE>::iterator (ltable->get_tree_c0_mergeable() );
disk_it[0] = ltable->get_tree_c1()->iterator();
disk_it[1] = ltable->get_tree_c1_mergeable()->iterator();
disk_it[2] = ltable->get_tree_c2()->iterator();
disk_it[0] = ltable->get_tree_c1()->open_iterator();
disk_it[1] = ltable->get_tree_c1_mergeable()->open_iterator();
disk_it[2] = ltable->get_tree_c2()->open_iterator();
}
inner_merge_it_t * inner_merge_it =
@ -389,6 +370,7 @@ private:
}
valid = true;
}
};
};

View file

@ -1,4 +1,6 @@
#include "memTreeComponent.h"
#include "datatuple.h"
template<class TUPLE>
void memTreeComponent<TUPLE>::tearDownTree(rbtree_ptr_t tree) {
TUPLE * t = 0;

View file

@ -1,7 +1,7 @@
#ifndef _MEMTREECOMPONENT_H_
#define _MEMTREECOMPONENT_H_
#include <set>
#include "datatuple.h"
template<class TUPLE>
class memTreeComponent {
public:
@ -115,7 +115,7 @@ public:
}
~revalidatingIterator() {
if(next_ret_) datatuple::freetuple(next_ret_);
if(next_ret_) TUPLE::freetuple(next_ret_);
}
TUPLE* getnext() {

View file

@ -2,6 +2,11 @@
#include <math.h>
#include "merger.h"
#include <stasis/transactional.h>
#undef try
#undef end
void merge_stats_pp(FILE* fd, merge_stats_t &stats) {
long long sleep_time = stats.start.tv_sec - stats.sleep.tv_sec;
long long work_time = stats.done.tv_sec - stats.start.tv_sec;
@ -44,7 +49,7 @@ void merge_stats_pp(FILE* fd, merge_stats_t &stats) {
double merge_stats_nsec_to_merge_in_bytes(merge_stats_t); // how many nsec did we burn on each byte from the small tree (want this to be equal for the two mergers)
int merge_scheduler::addlogtable(logtable *ltable)
int merge_scheduler::addlogtable(logtable<datatuple> *ltable)
{
struct logtable_mergedata * mdata = new logtable_mergedata;
@ -76,7 +81,7 @@ merge_scheduler::~merge_scheduler()
{
for(size_t i=0; i<mergedata.size(); i++)
{
logtable *ltable = mergedata[i].first;
logtable<datatuple> *ltable = mergedata[i].first;
logtable_mergedata *mdata = mergedata[i].second;
//delete the mergedata fields
@ -112,7 +117,7 @@ void merge_scheduler::shutdown()
//signal shutdown
for(size_t i=0; i<mergedata.size(); i++)
{
logtable *ltable = mergedata[i].first;
logtable<datatuple> *ltable = mergedata[i].first;
logtable_mergedata *mdata = mergedata[i].second;
//flush the in memory table to write any tuples still in memory
@ -142,7 +147,7 @@ void merge_scheduler::shutdown()
void merge_scheduler::startlogtable(int index, int64_t MAX_C0_SIZE)
{
logtable * ltable = mergedata[index].first;
logtable<datatuple> * ltable = mergedata[index].first;
struct logtable_mergedata *mdata = mergedata[index].second;
pthread_cond_t * block1_needed_cond = new pthread_cond_t;
@ -228,7 +233,7 @@ template <class ITA, class ITB>
void merge_iterators(int xid,
ITA *itrA,
ITB *itrB,
logtable *ltable,
logtable<datatuple> *ltable,
diskTreeComponent *scratch_tree,
merge_stats_t *stats,
bool dropDeletes);
@ -262,7 +267,7 @@ void* memMergeThread(void*arg)
merger_args * a = (merger_args*)(arg);
logtable * ltable = a->ltable;
logtable<datatuple> * ltable = a->ltable;
assert(ltable->get_tree_c1());
int merge_count =0;
@ -318,7 +323,7 @@ void* memMergeThread(void*arg)
// 4: Merge
//create the iterators
diskTreeComponent::diskTreeIterator *itrA = ltable->get_tree_c1()->iterator();
diskTreeComponent::iterator *itrA = ltable->get_tree_c1()->open_iterator();
memTreeComponent<datatuple>::iterator *itrB =
new memTreeComponent<datatuple>::iterator(ltable->get_tree_c0_mergeable());
@ -420,7 +425,7 @@ void *diskMergeThread(void*arg)
merger_args * a = (merger_args*)(arg);
logtable * ltable = a->ltable;
logtable<datatuple> * ltable = a->ltable;
assert(ltable->get_tree_c2());
@ -472,8 +477,8 @@ void *diskMergeThread(void*arg)
// 4: do the merge.
//create the iterators
diskTreeComponent::diskTreeIterator *itrA = ltable->get_tree_c2()->iterator(); //new diskTreeIterator<datatuple>(ltable->get_tree_c2()->get_root_rec());
diskTreeComponent::diskTreeIterator *itrB = ltable->get_tree_c1_mergeable()->iterator();
diskTreeComponent::iterator *itrA = ltable->get_tree_c2()->open_iterator(); //new iterator<datatuple>(ltable->get_tree_c2()->get_root_rec());
diskTreeComponent::iterator *itrB = ltable->get_tree_c1_mergeable()->open_iterator();
//create a new tree
diskTreeComponent * c2_prime = new diskTreeComponent(xid, a->internal_region_size, a->datapage_region_size, a->datapage_size);
@ -535,7 +540,7 @@ template <class ITA, class ITB>
void merge_iterators(int xid,
ITA *itrA, //iterator on c1 or c2
ITB *itrB, //iterator on c0 or c1, respectively
logtable *ltable,
logtable<datatuple> *ltable,
diskTreeComponent *scratch_tree, merge_stats_t *stats,
bool dropDeletes // should be true iff this is biggest component
)

View file

@ -1,8 +1,8 @@
#ifndef _MERGER_H_
#define _MERGER_H_
#include <vector>
#include <utility>
#include "logstore.h"
#include "datatuple.h"
#include <stasis/common.h>
#undef try
@ -11,26 +11,10 @@
//TODO: 400 bytes overhead per tuple, this is nuts, check if this is true...
static const int RB_TREE_OVERHEAD = 400;
static const double MIN_R = 3.0;
class logtable;
typedef struct merge_stats_t {
int merge_level; // 1 => C0->C1, 2 => C1->C2
pageid_t merge_count; // This is the merge_count'th merge
struct timeval sleep; // When did we go to sleep waiting for input?
struct timeval start; // When did we wake up and start merging? (at steady state with max throughput, this should be equal to sleep)
struct timeval done; // When did we finish merging?
pageid_t bytes_out; // How many bytes did we write (including internal tree nodes)?
pageid_t num_tuples_out; // How many tuples did we write?
pageid_t num_datapages_out; // How many datapages?
pageid_t bytes_in_small; // How many bytes from the small input tree (for C0, we ignore tree overheads)?
pageid_t num_tuples_in_small; // Tuples from the small input?
pageid_t bytes_in_large; // Bytes from the large input?
pageid_t num_tuples_in_large; // Tuples from large input?
} merge_stats_t;
struct merger_args
{
logtable * ltable;
logtable<datatuple> * ltable;
int worker_id;
pthread_mutex_t * block_ready_mut;
@ -82,12 +66,12 @@ struct logtable_mergedata
class merge_scheduler
{
std::vector<std::pair<logtable *, logtable_mergedata*> > mergedata;
std::vector<std::pair<logtable<datatuple> *, logtable_mergedata*> > mergedata;
public:
~merge_scheduler();
int addlogtable(logtable * ltable);
int addlogtable(logtable<datatuple> * ltable);
void startlogtable(int index, int64_t MAX_C0_SIZE = 100*1024*1024);
struct logtable_mergedata *getMergeData(int index){return mergedata[index].second;}

View file

@ -8,6 +8,11 @@
#ifndef REGIONALLOCATOR_H_
#define REGIONALLOCATOR_H_
#include <stasis/transactional.h>
#undef try
#undef end
class RegionAllocator
{
public:

View file

@ -3,6 +3,7 @@
#include <iostream>
#include <sstream>
#include "logstore.h"
#include "logserver.h"
#include "datapage.h"
#include "merger.h"
#include <assert.h>
@ -55,7 +56,7 @@ void initialize_server()
mscheduler = new merge_scheduler;
logtable ltable;
logtable<datatuple> ltable;
recordid table_root = ROOT_RECORD;
if(TrecordType(xid, ROOT_RECORD) == INVALID_SLOT) {

View file

@ -13,7 +13,9 @@
#include <time.h>
#include "check_util.h"
#include "regionAllocator.h"
#include <stasis/transactional.h>
#undef begin
#undef end

View file

@ -1,4 +1,8 @@
#include <stasis/transactional.h>
#undef begin
#undef end
#include "logstore.h"
@ -13,7 +17,7 @@ int main(int argc, char **argv)
int xid = Tbegin();
logtable ltable(1000, 10000, 5);
logtable<datatuple> ltable(1000, 10000, 5);
recordid table_root = ltable.allocTable(xid);

View file

@ -12,6 +12,7 @@
#include <sys/time.h>
#include <time.h>
#include <stasis/transactional.h>
#undef begin
#undef end
@ -29,7 +30,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
int xid = Tbegin();
logtable ltable(1000, 10000, 5);
logtable<datatuple> ltable(1000, 10000, 5);
recordid table_root = ltable.allocTable(xid);
Tcommit(xid);
@ -84,7 +85,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
printf("Stage 2: Sequentially reading %d tuples\n", NUM_ENTRIES);
size_t tuplenum = 0;
diskTreeComponent::diskTreeIterator * tree_itr = ltable_c1->iterator();
diskTreeComponent::iterator * tree_itr = ltable_c1->open_iterator();
datatuple *dt=0;

View file

@ -20,6 +20,7 @@
#define OFFSET (NUM_ENTRIES * 10)
#include <stasis/transactional.h>
#undef begin
#undef end

View file

@ -12,6 +12,7 @@
#include <sys/time.h>
#include <time.h>
#include <stasis/transactional.h>
#undef begin
#undef end
@ -46,7 +47,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
int xid = Tbegin();
merge_scheduler mscheduler;
logtable ltable(1000, 10000, 5);
logtable<datatuple> ltable(1000, 10000, 5);
recordid table_root = ltable.allocTable(xid);

View file

@ -12,6 +12,7 @@
#include <sys/time.h>
#include <time.h>
#include <stasis/transactional.h>
#undef begin
#undef end
@ -45,7 +46,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
int xid = Tbegin();
merge_scheduler mscheduler;
logtable ltable(1000, 10000, 100);
logtable<datatuple> ltable(1000, 10000, 100);
recordid table_root = ltable.allocTable(xid);

View file

@ -12,6 +12,7 @@
#include <sys/time.h>
#include <time.h>
#include <stasis/transactional.h>
#undef begin
#undef end
@ -101,7 +102,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
int xid = Tbegin();
merge_scheduler mscheduler;
logtable ltable(1000, 1000, 40);
logtable<datatuple> ltable(1000, 1000, 40);
recordid table_root = ltable.allocTable(xid);