more refactoring; all iterators now live in their respective tree components, fixed some hardcoded parameters; encapsulate allocators inside of diskTreeComponent

git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@689 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
sears 2010-03-13 00:05:06 +00:00
parent dc8185a236
commit c8c48a188d
19 changed files with 537 additions and 513 deletions

View file

@ -76,7 +76,7 @@ ENDIF ( "${CMAKE_C_COMPILER_ID}" STREQUAL "GNU" )
#CONFIGURE_FILE(${CMAKE_CURRENT_SOURCE_DIR}/config.h.cmake ${CMAKE_CURRENT_BINARY_DIR}/config.h)
IF ( HAVE_STASIS )
ADD_LIBRARY(logstore logserver.cpp logstore.cpp diskTreeComponent.cpp memTreeComponent.cpp logiterators.cpp datapage.cpp merger.cpp tuplemerger.cpp)
ADD_LIBRARY(logstore logserver.cpp logstore.cpp diskTreeComponent.cpp memTreeComponent.cpp datapage.cpp merger.cpp tuplemerger.cpp)
CREATE_EXECUTABLE(server)
ENDIF ( HAVE_STASIS )
ADD_LIBRARY(logstore_client tcpclient.cpp)

View file

@ -67,14 +67,19 @@ public:
DataPage( int xid, pageid_t page_count, RegionAllocator* alloc);
~DataPage() {
if(write_offset_ != -1) {
len_t dat_len = 0; // write terminating zero.
assert(write_offset_ == -1);
}
write_data((const byte*)&dat_len, sizeof(dat_len), false);
void writes_done() {
if(write_offset_ != -1) {
len_t dat_len = 0; // write terminating zero.
// if writing the zero fails, later reads will fail as well, and assume EOF.
write_data((const byte*)&dat_len, sizeof(dat_len), false);
}
// if writing the zero fails, later reads will fail as well, and assume EOF.
write_offset_ = -1;
}
}

View file

@ -46,6 +46,79 @@ void diskTreeComponent::internalNodes::init_stasis() {
}
void diskTreeComponent::writes_done() {
if(dp) {
dp->writes_done();
delete dp;
dp = 0;
}
}
int diskTreeComponent::insertTuple(int xid, datatuple *t, merge_stats_t *stats)
{
int ret = 0; // no error.
if(dp==0) {
dp = insertDataPage(xid, t);
stats->num_datapages_out++;
} else if(!dp->append(t)) {
stats->bytes_out += (PAGE_SIZE * dp->get_page_count());
dp->writes_done();
delete dp;
dp = insertDataPage(xid, t);
stats->num_datapages_out++;
}
return ret;
}
DataPage<datatuple>* diskTreeComponent::insertDataPage(int xid, datatuple *tuple) {
//create a new data page -- either the last region is full, or the last data page doesn't want our tuple. (or both)
DataPage<datatuple> * dp = 0;
int count = 0;
while(dp==0)
{
dp = new DataPage<datatuple>(xid, datapage_size, ltree->get_datapage_alloc());
//insert the record into the data page
if(!dp->append(tuple))
{
// the last datapage must have not wanted the tuple, and then this datapage figured out the region is full.
dp->writes_done();
delete dp;
dp = 0;
assert(count == 0); // only retry once.
count ++;
}
}
ltree->appendPage(xid,
tuple->key(),
tuple->keylen(),
dp->get_start_pid()
);
//return the datapage
return dp;
}
datatuple * diskTreeComponent::findTuple(int xid, datatuple::key_t key, size_t keySize)
{
datatuple * tup=0;
//find the datapage
pageid_t pid = ltree->findPage(xid, (byte*)key, keySize);
if(pid!=-1)
{
DataPage<datatuple> * dp = new DataPage<datatuple>(xid, pid);
dp->recordRead(key, keySize, &tup);
delete dp;
}
return tup;
}
void diskTreeComponent::internalNodes::deinit_stasis() { Tdeinit(); }
recordid diskTreeComponent::internalNodes::create(int xid) {
@ -763,3 +836,123 @@ void diskTreeComponent::internalNodes::iterator::close() {
}
if(t) free(t);
}
/////////////////////////////////////////////////////////////////////
// tree iterator implementation
/////////////////////////////////////////////////////////////////////
void diskTreeComponent::diskTreeIterator::init_iterators(datatuple * key1, datatuple * key2) {
assert(!key2); // unimplemented
if(tree_.size == INVALID_SIZE) {
lsmIterator_ = NULL;
} else {
if(key1) {
lsmIterator_ = new diskTreeComponent::internalNodes::iterator(-1, tree_, key1->key(), key1->keylen());
} else {
lsmIterator_ = new diskTreeComponent::internalNodes::iterator(-1, tree_);
}
}
}
diskTreeComponent::diskTreeIterator::diskTreeIterator(diskTreeComponent::internalNodes *tree) :
tree_(tree ? tree->get_root_rec() : NULLRID)
{
init_iterators(NULL, NULL);
init_helper(NULL);
}
diskTreeComponent::diskTreeIterator::diskTreeIterator(diskTreeComponent::internalNodes *tree, datatuple* key) :
tree_(tree ? tree->get_root_rec() : NULLRID)
{
init_iterators(key,NULL);
init_helper(key);
}
diskTreeComponent::diskTreeIterator::~diskTreeIterator()
{
if(lsmIterator_) {
lsmIterator_->close();
delete lsmIterator_;
}
if(curr_page!=NULL)
{
delete curr_page;
curr_page = 0;
}
}
void diskTreeComponent::diskTreeIterator::init_helper(datatuple* key1)
{
if(!lsmIterator_)
{
DEBUG("treeIterator:\t__error__ init_helper():\tnull lsmIterator_");
curr_page = 0;
dp_itr = 0;
}
else
{
if(lsmIterator_->next() == 0)
{
DEBUG("diskTreeIterator:\t__error__ init_helper():\tlogtreeIteratr::next returned 0." );
curr_page = 0;
dp_itr = 0;
}
else
{
pageid_t * pid_tmp;
pageid_t ** hack = &pid_tmp;
lsmIterator_->value((byte**)hack);
curr_pageid = *pid_tmp;
curr_page = new DataPage<datatuple>(-1, curr_pageid);
DEBUG("opening datapage iterator %lld at key %s\n.", curr_pageid, key1 ? (char*)key1->key() : "NULL");
dp_itr = new DPITR_T(curr_page, key1);
}
}
}
datatuple * diskTreeComponent::diskTreeIterator::next_callerFrees()
{
if(!this->lsmIterator_) { return NULL; }
if(dp_itr == 0)
return 0;
datatuple* readTuple = dp_itr->getnext();
if(!readTuple)
{
delete dp_itr;
dp_itr = 0;
delete curr_page;
curr_page = 0;
if(lsmIterator_->next())
{
pageid_t *pid_tmp;
pageid_t **hack = &pid_tmp;
size_t ret = lsmIterator_->value((byte**)hack);
assert(ret == sizeof(pageid_t));
curr_pageid = *pid_tmp;
curr_page = new DataPage<datatuple>(-1, curr_pageid);
DEBUG("opening datapage iterator %lld at beginning\n.", curr_pageid);
dp_itr = new DPITR_T(curr_page->begin());
readTuple = dp_itr->getnext();
assert(readTuple);
}
// else readTuple is null. We're done.
}
return readTuple;
}

View file

@ -17,40 +17,111 @@
#include <stasis/page.h>
#include <stasis/truncation.h>
#include "merger.h"
#include "regionAllocator.h"
#include "datapage.h"
#include "tuplemerger.h"
#include "datatuple.h"
class diskTreeComponent {
#include "logiterators.h"
public:
class diskTreeComponent {
public:
class internalNodes;
class diskTreeIterator;
diskTreeComponent(int xid, pageid_t internal_region_size, pageid_t datapage_region_size, pageid_t datapage_size) :
ltree(new diskTreeComponent::internalNodes(xid, internal_region_size, datapage_region_size, datapage_size)),
dp(0),
datapage_size(datapage_size) {}
diskTreeComponent(int xid, recordid root, recordid internal_node_state, recordid datapage_state) :
ltree(new diskTreeComponent::internalNodes(xid, root, internal_node_state, datapage_state)),
dp(0) {}
~diskTreeComponent() {
delete dp;
delete ltree;
}
recordid get_root_rid() { return ltree->get_root_rec(); }
recordid get_datapage_allocator_rid() { return ltree->get_datapage_alloc()->header_rid(); }
recordid get_internal_node_allocator_rid() { return ltree->get_internal_node_alloc()->header_rid(); }
internalNodes * get_internal_nodes() { return ltree; }
datatuple* findTuple(int xid, datatuple::key_t key, size_t keySize);
int insertTuple(int xid, /*DataPage<datatuple> *dp,*/ datatuple *t, merge_stats_t *stats);
void writes_done();
diskTreeIterator * iterator() {
return new diskTreeIterator(ltree);
}
diskTreeIterator * iterator(datatuple * key) {
return new diskTreeIterator(ltree, key);
}
void force(int xid) {
ltree->get_datapage_alloc()->force_regions(xid);
ltree->get_internal_node_alloc()->force_regions(xid);
}
void dealloc(int xid) {
ltree->get_datapage_alloc()->dealloc_regions(xid);
ltree->get_internal_node_alloc()->dealloc_regions(xid);
}
void list_regions(int xid, pageid_t *internal_node_region_length, pageid_t *internal_node_region_count, pageid_t **internal_node_regions,
pageid_t *datapage_region_length, pageid_t *datapage_region_count, pageid_t **datapage_regions) {
*internal_node_regions = ltree->get_internal_node_alloc()->list_regions(xid, internal_node_region_length, internal_node_region_count);
*datapage_regions = ltree->get_datapage_alloc() ->list_regions(xid, datapage_region_length, datapage_region_count);
}
void print_tree(int xid) {
ltree->print_tree(xid);
}
private:
DataPage<datatuple>* insertDataPage(int xid, datatuple *tuple);
internalNodes * ltree;
DataPage<datatuple>* dp;
pageid_t datapage_size;
public:
class internalNodes{
public:
struct indexnode_rec {
pageid_t ptr;
};
// XXX move these to another module.
static void init_stasis();
static void deinit_stasis();
internalNodes(int xid)
internalNodes(int xid, pageid_t internal_region_size, pageid_t datapage_region_size, pageid_t datapage_size)
: lastLeaf(-1),
internal_node_alloc(new RegionAllocator(xid, 1000)),
datapage_alloc(new RegionAllocator(xid, 10000))
{ create(xid); } // XXX shouldn't hardcode region size.
internal_node_alloc(new RegionAllocator(xid, internal_region_size)),
datapage_alloc(new RegionAllocator(xid, datapage_region_size))
{ create(xid); }
internalNodes(int xid, recordid root, recordid internal_node_state, recordid datapage_state)
: lastLeaf(-1),
root_rec(root),
internal_node_alloc(new RegionAllocator(xid, internal_node_state)),
datapage_alloc(new RegionAllocator(xid, datapage_state))
{ }
private:
recordid create(int xid);
public:
{ }
void print_tree(int xid);
static void init_stasis();
static void deinit_stasis();
//returns the id of the data page that could contain the given key
pageid_t findPage(int xid, const byte *key, size_t keySize);
//appends a leaf page, val_page is the id of the leaf page
recordid appendPage(int xid, const byte *key,size_t keySize, pageid_t val_page);
inline RegionAllocator* get_datapage_alloc() { return datapage_alloc; }
inline RegionAllocator* get_internal_node_alloc() { return internal_node_alloc; }
const recordid &get_root_rec(){return root_rec;}
private:
recordid create(int xid);
void writeNodeRecord(int xid, Page *p, recordid &rid,
const byte *key, size_t keylen, pageid_t ptr);
@ -71,31 +142,15 @@ public:
*/
inline static void initializeNodePage(int xid, Page *p);
//return the left-most leaf, these are not data pages, although referred to as leaf
static pageid_t findFirstLeaf(int xid, Page *root, int64_t depth);
//return the right-most leaf
static pageid_t findLastLeaf(int xid, Page *root, int64_t depth) ;
//returns a record that stores the pageid where the given key should be in, i.e. if it exists
static recordid lookup(int xid, Page *node, int64_t depth, const byte *key,
size_t keySize);
public:
//returns the id of the data page that could contain the given key
pageid_t findPage(int xid, const byte *key, size_t keySize);
//appends a leaf page, val_page is the id of the leaf page
recordid appendPage(int xid, const byte *key,size_t keySize, pageid_t val_page);
inline RegionAllocator* get_datapage_alloc() { return datapage_alloc; }
inline RegionAllocator* get_internal_node_alloc() { return internal_node_alloc; }
const recordid &get_root_rec(){return root_rec;}
private:
const static int64_t DEPTH;
const static int64_t COMPARATOR;
const static int64_t FIRST_SLOT;
@ -110,6 +165,10 @@ public:
RegionAllocator* internal_node_alloc;
RegionAllocator* datapage_alloc;
struct indexnode_rec {
pageid_t ptr;
};
public:
class iterator {
public:
@ -132,6 +191,7 @@ public:
inline void releaseLock() { }
private:
Page * p;
int xid_;
bool done;
@ -141,5 +201,35 @@ public:
};
};
class diskTreeIterator
{
public:
explicit diskTreeIterator(diskTreeComponent::internalNodes *tree);
explicit diskTreeIterator(diskTreeComponent::internalNodes *tree,datatuple *key);
~diskTreeIterator();
datatuple * next_callerFrees();
private:
void init_iterators(datatuple * key1, datatuple * key2);
inline void init_helper(datatuple * key1);
explicit diskTreeIterator() { abort(); }
void operator=(diskTreeIterator & t) { abort(); }
int operator-(diskTreeIterator & t) { abort(); }
private:
recordid tree_; //root of the tree
diskTreeComponent::internalNodes::iterator* lsmIterator_;
pageid_t curr_pageid; //current page id
DataPage<datatuple> *curr_page; //current page
typedef DataPage<datatuple>::iterator DPITR_T;
DPITR_T *dp_itr;
};
};
#endif /* DISKTREECOMPONENT_H_ */

View file

@ -1,145 +1,3 @@
#include "logstore.h"
#include "logiterators.h"
#include "memTreeComponent.h"
/////////////////////////////////////////////////////////////////////
// tree iterator implementation
/////////////////////////////////////////////////////////////////////
template <class TUPLE>
void diskTreeIterator<TUPLE>::init_iterators(TUPLE * key1, TUPLE * key2) {
assert(!key2); // unimplemented
if(tree_.size == INVALID_SIZE) {
lsmIterator_ = NULL;
} else {
if(key1) {
lsmIterator_ = new diskTreeComponent::internalNodes::iterator(-1, tree_, key1->key(), key1->keylen());
} else {
lsmIterator_ = new diskTreeComponent::internalNodes::iterator(-1, tree_);
}
}
}
template <class TUPLE>
diskTreeIterator<TUPLE>::diskTreeIterator(recordid tree) :
tree_(tree)
{
init_iterators(NULL,NULL);
init_helper(NULL);
}
template <class TUPLE>
diskTreeIterator<TUPLE>::diskTreeIterator(recordid tree, TUPLE& key) :
tree_(tree)
{
init_iterators(&key,NULL);
init_helper(&key);
}
template <class TUPLE>
diskTreeIterator<TUPLE>::diskTreeIterator(diskTreeComponent::internalNodes *tree) :
tree_(tree ? tree->get_root_rec() : NULLRID)
{
init_iterators(NULL, NULL);
init_helper(NULL);
}
template <class TUPLE>
diskTreeIterator<TUPLE>::diskTreeIterator(diskTreeComponent::internalNodes *tree, TUPLE& key) :
tree_(tree ? tree->get_root_rec() : NULLRID)
{
init_iterators(&key,NULL);
init_helper(&key);
}
template <class TUPLE>
diskTreeIterator<TUPLE>::~diskTreeIterator()
{
if(lsmIterator_) {
lsmIterator_->close();
delete lsmIterator_;
}
if(curr_page!=NULL)
{
delete curr_page;
curr_page = 0;
}
}
template <class TUPLE>
void diskTreeIterator<TUPLE>::init_helper(TUPLE* key1)
{
if(!lsmIterator_)
{
DEBUG("treeIterator:\t__error__ init_helper():\tnull lsmIterator_");
curr_page = 0;
dp_itr = 0;
}
else
{
if(lsmIterator_->next() == 0)
{
DEBUG("diskTreeIterator:\t__error__ init_helper():\tlogtreeIteratr::next returned 0." );
curr_page = 0;
dp_itr = 0;
}
else
{
pageid_t * pid_tmp;
pageid_t ** hack = &pid_tmp;
lsmIterator_->value((byte**)hack);
curr_pageid = *pid_tmp;
curr_page = new DataPage<TUPLE>(-1, curr_pageid);
DEBUG("opening datapage iterator %lld at key %s\n.", curr_pageid, key1 ? (char*)key1->key() : "NULL");
dp_itr = new DPITR_T(curr_page, key1);
}
}
}
template <class TUPLE>
TUPLE * diskTreeIterator<TUPLE>::next_callerFrees()
{
if(!this->lsmIterator_) { return NULL; }
if(dp_itr == 0)
return 0;
TUPLE* readTuple = dp_itr->getnext();
if(!readTuple)
{
delete dp_itr;
dp_itr = 0;
delete curr_page;
curr_page = 0;
if(lsmIterator_->next())
{
pageid_t *pid_tmp;
pageid_t **hack = &pid_tmp;
size_t ret = lsmIterator_->value((byte**)hack);
assert(ret == sizeof(pageid_t));
curr_pageid = *pid_tmp;
curr_page = new DataPage<TUPLE>(-1, curr_pageid);
DEBUG("opening datapage iterator %lld at beginning\n.", curr_pageid);
dp_itr = new DPITR_T(curr_page->begin());
readTuple = dp_itr->getnext();
assert(readTuple);
}
// else readTuple is null. We're done.
}
return readTuple;
}
template class diskTreeIterator<datatuple>;

View file

@ -1,45 +1,4 @@
#ifndef _LOG_ITERATORS_H_
#define _LOG_ITERATORS_H_
template <class TUPLE>
class DataPage;
/////////////////////////////////////////////////////////////////
template <class TUPLE>
class diskTreeIterator
{
public:
explicit diskTreeIterator(recordid tree);
explicit diskTreeIterator(recordid tree,TUPLE &key);
explicit diskTreeIterator(diskTreeComponent::internalNodes *tree);
explicit diskTreeIterator(diskTreeComponent::internalNodes *tree,TUPLE &key);
~diskTreeIterator();
TUPLE * next_callerFrees();
private:
void init_iterators(TUPLE * key1, TUPLE * key2);
inline void init_helper(TUPLE* key1);
explicit diskTreeIterator() { abort(); }
void operator=(diskTreeIterator & t) { abort(); }
int operator-(diskTreeIterator & t) { abort(); }
private:
recordid tree_; //root of the tree
diskTreeComponent::internalNodes::iterator* lsmIterator_;
pageid_t curr_pageid; //current page id
DataPage<TUPLE> *curr_page; //current page
typedef typename DataPage<TUPLE>::iterator DPITR_T;
DPITR_T *dp_itr;
};
#endif

View file

@ -513,7 +513,7 @@ int op_stat_space_usage(pthread_data* data) {
readlock(data->ltable->header_lock, 0);
pageid_t datapage_c1_region_length, datapage_c1_mergeable_region_length = 0, datapage_c2_region_length;
/* pageid_t datapage_c1_region_length, datapage_c1_mergeable_region_length = 0, datapage_c2_region_length;
pageid_t datapage_c1_region_count, datapage_c1_mergeable_region_count = 0, datapage_c2_region_count;
pageid_t tree_c1_region_length, tree_c1_mergeable_region_length = 0, tree_c2_region_length;
pageid_t tree_c1_region_count, tree_c1_mergeable_region_count = 0, tree_c2_region_count;
@ -534,23 +534,46 @@ int op_stat_space_usage(pthread_data* data) {
}
pageid_t * tree_c2_regions = data->ltable->get_tree_c2()->get_internal_node_alloc()->list_regions(xid, &tree_c2_region_length, &tree_c2_region_count);
*/
pageid_t internal_c1_region_length, internal_c1_mergeable_region_length = 0, internal_c2_region_length;
pageid_t internal_c1_region_count, internal_c1_mergeable_region_count = 0, internal_c2_region_count;
pageid_t *internal_c1_regions, *internal_c1_mergeable_regions = NULL, *internal_c2_regions;
pageid_t datapage_c1_region_length, datapage_c1_mergeable_region_length = 0, datapage_c2_region_length;
pageid_t datapage_c1_region_count, datapage_c1_mergeable_region_count = 0, datapage_c2_region_count;
pageid_t *datapage_c1_regions, *datapage_c1_mergeable_regions = NULL, *datapage_c2_regions;
data->ltable->get_tree_c1()->list_regions(xid,
&internal_c1_region_length, &internal_c1_region_count, &internal_c1_regions,
&datapage_c1_region_length, &datapage_c1_region_count, &datapage_c1_regions);
if(data->ltable->get_tree_c1_mergeable()) {
data->ltable->get_tree_c1_mergeable()->list_regions(xid,
&internal_c1_mergeable_region_length, &internal_c1_mergeable_region_count, &internal_c1_mergeable_regions,
&datapage_c1_mergeable_region_length, &datapage_c1_mergeable_region_count, &datapage_c1_mergeable_regions);
}
data->ltable->get_tree_c2()->list_regions(xid,
&internal_c2_region_length, &internal_c2_region_count, &internal_c2_regions,
&datapage_c2_region_length, &datapage_c2_region_count, &datapage_c2_regions);
free(datapage_c1_regions);
free(datapage_c1_mergeable_regions);
free(datapage_c2_regions);
free(tree_c1_regions);
free(tree_c1_mergeable_regions);
free(tree_c2_regions);
free(internal_c1_regions);
free(internal_c1_mergeable_regions);
free(internal_c2_regions);
uint64_t treesize = PAGE_SIZE *
( ( datapage_c1_region_count * datapage_c1_region_length )
+ ( datapage_c1_mergeable_region_count * datapage_c1_mergeable_region_length )
+ ( datapage_c2_region_count * datapage_c2_region_length)
+ ( tree_c1_region_count * tree_c1_region_length )
+ ( tree_c1_mergeable_region_count * tree_c1_mergeable_region_length )
+ ( tree_c2_region_count * tree_c2_region_length) );
+ ( internal_c1_region_count * internal_c1_region_length )
+ ( internal_c1_mergeable_region_count * internal_c1_mergeable_region_length )
+ ( internal_c2_region_count * internal_c2_region_length) );
boundary_tag tag;
pageid_t pid = ROOT_RECORD.page;
@ -592,7 +615,7 @@ int op_stat_histogram(pthread_data* data, size_t limit) {
}
int xid = Tbegin();
diskTreeComponent::internalNodes::iterator * it = new diskTreeComponent::internalNodes::iterator(xid, data->ltable->get_tree_c2()->get_root_rec());
diskTreeComponent::internalNodes::iterator * it = new diskTreeComponent::internalNodes::iterator(xid, data->ltable->get_tree_c2()->get_root_rid());
size_t count = 0;
int err = 0;
@ -618,7 +641,7 @@ int op_stat_histogram(pthread_data* data, size_t limit) {
size_t cur_stride = 0;
size_t i = 0;
it = new diskTreeComponent::internalNodes::iterator(xid, data->ltable->get_tree_c2()->get_root_rec());
it = new diskTreeComponent::internalNodes::iterator(xid, data->ltable->get_tree_c2()->get_root_rid()); // TODO make this method private?
while(it->next()) {
i++;
if(i == count || !cur_stride) { // do we want to send this key? (this matches the first, last and interior keys)
@ -647,25 +670,45 @@ int op_dbg_blockmap(pthread_data* data) {
readlock(data->ltable->header_lock, 0);
// produce a list of regions used by current tree components
pageid_t datapage_c1_region_length, datapage_c1_mergeable_region_length = 0, datapage_c2_region_length;
/* pageid_t datapage_c1_region_length, datapage_c1_mergeable_region_length = 0, datapage_c2_region_length;
pageid_t datapage_c1_region_count, datapage_c1_mergeable_region_count = 0, datapage_c2_region_count;
pageid_t * datapage_c1_regions = data->ltable->get_tree_c1()->get_datapage_alloc()->list_regions(xid, &datapage_c1_region_length, &datapage_c1_region_count);
pageid_t * datapage_c1_mergeable_regions = NULL;
if(data->ltable->get_tree_c1_mergeable()) {
datapage_c1_mergeable_regions = data->ltable->get_tree_c1_mergeable()->get_datapage_alloc()->list_regions(xid, &datapage_c1_mergeable_region_length, &datapage_c1_mergeable_region_count);
}
pageid_t * datapage_c2_regions = data->ltable->get_tree_c2()->get_datapage_alloc()->list_regions(xid, &datapage_c2_region_length, &datapage_c2_region_count);
pageid_t * datapage_c2_regions = data->ltable->get_tree_c2()->get_datapage_alloc()->list_regions(xid, &datapage_c2_region_length, &datapage_c2_region_count); */
pageid_t tree_c1_region_length, tree_c1_mergeable_region_length = 0, tree_c2_region_length;
pageid_t tree_c1_region_count, tree_c1_mergeable_region_count = 0, tree_c2_region_count;
pageid_t * tree_c1_regions = data->ltable->get_tree_c1()->get_internal_node_alloc()->list_regions(xid, &tree_c1_region_length, &tree_c1_region_count);
/* pageid_t * tree_c1_regions = data->ltable->get_tree_c1()->get_internal_node_alloc()->list_regions(xid, &tree_c1_region_length, &tree_c1_region_count);
pageid_t * tree_c1_mergeable_regions = NULL;
if(data->ltable->get_tree_c1_mergeable()) {
tree_c1_mergeable_regions = data->ltable->get_tree_c1_mergeable()->get_internal_node_alloc()->list_regions(xid, &tree_c1_mergeable_region_length, &tree_c1_mergeable_region_count);
}
pageid_t * tree_c2_regions = data->ltable->get_tree_c2()->get_internal_node_alloc()->list_regions(xid, &tree_c2_region_length, &tree_c2_region_count);
pageid_t * tree_c2_regions = data->ltable->get_tree_c2()->get_internal_node_alloc()->list_regions(xid, &tree_c2_region_length, &tree_c2_region_count); */
pageid_t internal_c1_region_length, internal_c1_mergeable_region_length = 0, internal_c2_region_length;
pageid_t internal_c1_region_count, internal_c1_mergeable_region_count = 0, internal_c2_region_count;
pageid_t *internal_c1_regions, *internal_c1_mergeable_regions = NULL, *internal_c2_regions;
pageid_t datapage_c1_region_length, datapage_c1_mergeable_region_length = 0, datapage_c2_region_length;
pageid_t datapage_c1_region_count, datapage_c1_mergeable_region_count = 0, datapage_c2_region_count;
pageid_t *datapage_c1_regions, *datapage_c1_mergeable_regions = NULL, *datapage_c2_regions;
data->ltable->get_tree_c1()->list_regions(xid,
&internal_c1_region_length, &internal_c1_region_count, &internal_c1_regions,
&datapage_c1_region_length, &datapage_c1_region_count, &datapage_c1_regions);
if(data->ltable->get_tree_c1_mergeable()) {
data->ltable->get_tree_c1_mergeable()->list_regions(xid,
&internal_c1_mergeable_region_length, &internal_c1_mergeable_region_count, &internal_c1_mergeable_regions,
&datapage_c1_mergeable_region_length, &datapage_c1_mergeable_region_count, &datapage_c1_mergeable_regions);
}
data->ltable->get_tree_c2()->list_regions(xid,
&internal_c2_region_length, &internal_c2_region_count, &internal_c2_regions,
&datapage_c2_region_length, &datapage_c2_region_count, &datapage_c2_regions);
unlock(data->ltable->header_lock);
Tcommit(xid);
@ -675,9 +718,9 @@ int op_dbg_blockmap(pthread_data* data) {
printf("%lld ", datapage_c1_regions[i]);
}
printf("\nC1 Internal Node Regions (each is %lld pages long):\n", tree_c1_region_length);
for(pageid_t i = 0; i < tree_c1_region_count; i++) {
printf("%lld ", tree_c1_regions[i]);
printf("\nC1 Internal Node Regions (each is %lld pages long):\n", internal_c1_region_length);
for(pageid_t i = 0; i < internal_c1_region_count; i++) {
printf("%lld ", internal_c1_regions[i]);
}
printf("\nC2 Datapage Regions (each is %lld pages long):\n", datapage_c2_region_length);
@ -685,9 +728,9 @@ int op_dbg_blockmap(pthread_data* data) {
printf("%lld ", datapage_c2_regions[i]);
}
printf("\nC2 Internal Node Regions (each is %lld pages long):\n", tree_c2_region_length);
for(pageid_t i = 0; i < tree_c2_region_count; i++) {
printf("%lld ", tree_c2_regions[i]);
printf("\nC2 Internal Node Regions (each is %lld pages long):\n", internal_c2_region_length);
for(pageid_t i = 0; i < internal_c2_region_count; i++) {
printf("%lld ", internal_c2_regions[i]);
}
printf("\nStasis Region Map\n");
@ -706,9 +749,9 @@ int op_dbg_blockmap(pthread_data* data) {
printf("\n");
printf("Tree components are using %lld megabytes. File is using %lld megabytes.\n",
PAGE_SIZE * (tree_c1_region_length * tree_c1_region_count
+ tree_c1_mergeable_region_length * tree_c1_mergeable_region_count
+ tree_c2_region_length * tree_c2_region_count
PAGE_SIZE * (internal_c1_region_length * internal_c1_region_count
+ internal_c1_mergeable_region_length * internal_c1_mergeable_region_count
+ internal_c2_region_length * internal_c2_region_count
+ datapage_c1_region_length * datapage_c1_region_count
+ datapage_c1_mergeable_region_length * datapage_c1_mergeable_region_count
+ datapage_c2_region_length * datapage_c2_region_count) / (1024 * 1024),
@ -717,9 +760,9 @@ int op_dbg_blockmap(pthread_data* data) {
free(datapage_c1_regions);
if(datapage_c1_mergeable_regions) free(datapage_c1_mergeable_regions);
free(datapage_c2_regions);
free(tree_c1_regions);
if(tree_c1_mergeable_regions) free(tree_c1_mergeable_regions);
free(tree_c2_regions);
free(internal_c1_regions);
if(internal_c1_mergeable_regions) free(internal_c1_mergeable_regions);
free(internal_c2_regions);
return writeoptosocket(*(data->workitem), LOGSTORE_RESPONSE_SUCCESS);
}

View file

@ -26,7 +26,7 @@ static inline double tv_to_double(struct timeval tv)
template class DataPage<datatuple>;
logtable::logtable()
logtable::logtable(pageid_t internal_region_size, pageid_t datapage_region_size, pageid_t datapage_size)
{
tree_c0 = NULL;
@ -36,7 +36,6 @@ logtable::logtable()
tree_c2 = NULL;
this->still_running_ = true;
this->mergedata = 0;
fixed_page_count = -1;
//tmerger = new tuplemerger(&append_merger);
tmerger = new tuplemerger(&replace_merger);
@ -46,7 +45,10 @@ logtable::logtable()
tree_bytes = 0;
epoch = 0;
this->internal_region_size = internal_region_size;
this->datapage_region_size = datapage_region_size;
this->datapage_size = datapage_size;
}
logtable::~logtable()
@ -71,29 +73,29 @@ recordid logtable::allocTable(int xid)
table_rec = Talloc(xid, sizeof(tbl_header));
//create the big tree
tree_c2 = new diskTreeComponent::internalNodes(xid);
tree_c2 = new diskTreeComponent(xid, internal_region_size, datapage_region_size, datapage_size);
//create the small tree
tree_c1 = new diskTreeComponent::internalNodes(xid);
tree_c1 = new diskTreeComponent(xid, internal_region_size, datapage_region_size, datapage_size);
update_persistent_header(xid);
return table_rec;
}
void logtable::openTable(int xid, recordid rid) {
table_rec = rid;
Tread(xid, table_rec, &tbl_header);
tree_c2 = new diskTreeComponent::internalNodes(xid, tbl_header.c2_root, tbl_header.c2_state, tbl_header.c2_dp_state);
tree_c1 = new diskTreeComponent::internalNodes(xid, tbl_header.c1_root, tbl_header.c1_state, tbl_header.c1_dp_state);
table_rec = rid;
Tread(xid, table_rec, &tbl_header);
tree_c2 = new diskTreeComponent(xid, tbl_header.c2_root, tbl_header.c2_state, tbl_header.c2_dp_state);
tree_c1 = new diskTreeComponent(xid, tbl_header.c1_root, tbl_header.c1_state, tbl_header.c1_dp_state);
}
void logtable::update_persistent_header(int xid) {
tbl_header.c2_root = tree_c2->get_root_rec();
tbl_header.c2_dp_state = tree_c2->get_datapage_alloc()->header_rid();
tbl_header.c2_state = tree_c2->get_internal_node_alloc()->header_rid();
tbl_header.c1_root = tree_c1->get_root_rec();
tbl_header.c1_dp_state = tree_c1->get_datapage_alloc()->header_rid();
tbl_header.c1_state = tree_c1->get_internal_node_alloc()->header_rid();
tbl_header.c2_root = tree_c2->get_root_rid();
tbl_header.c2_dp_state = tree_c2->get_datapage_allocator_rid();
tbl_header.c2_state = tree_c2->get_internal_node_allocator_rid();
tbl_header.c1_root = tree_c1->get_root_rid();
tbl_header.c1_dp_state = tree_c1->get_datapage_allocator_rid();
tbl_header.c1_state = tree_c1->get_internal_node_allocator_rid();
Tset(xid, table_rec, &tbl_header);
}
@ -224,7 +226,7 @@ datatuple * logtable::findTuple(int xid, const datatuple::key_t key, size_t keyS
//step 3: check c1
if(!done)
{
datatuple *tuple_c1 = findTuple(xid, key, keySize, get_tree_c1());
datatuple *tuple_c1 = get_tree_c1()->findTuple(xid, key, keySize);
if(tuple_c1 != NULL)
{
bool use_copy = false;
@ -253,7 +255,7 @@ datatuple * logtable::findTuple(int xid, const datatuple::key_t key, size_t keyS
if(!done && get_tree_c1_mergeable() != 0)
{
DEBUG("old c1 tree not null\n");
datatuple *tuple_oc1 = findTuple(xid, key, keySize, get_tree_c1_mergeable());
datatuple *tuple_oc1 = get_tree_c1_mergeable()->findTuple(xid, key, keySize);
if(tuple_oc1 != NULL)
{
@ -283,7 +285,7 @@ datatuple * logtable::findTuple(int xid, const datatuple::key_t key, size_t keyS
if(!done)
{
DEBUG("Not in old first disk tree\n");
datatuple *tuple_c2 = findTuple(xid, key, keySize, get_tree_c2());
datatuple *tuple_c2 = get_tree_c2()->findTuple(xid, key, keySize);
if(tuple_c2 != NULL)
{
@ -356,7 +358,7 @@ datatuple * logtable::findTuple_first(int xid, datatuple::key_t key, size_t keyS
DEBUG("Not in old mem tree\n");
//step 3: check c1
ret_tuple = findTuple(xid, key, keySize, get_tree_c1());
ret_tuple = get_tree_c1()->findTuple(xid, key, keySize);
}
if(ret_tuple == 0)
@ -366,8 +368,8 @@ datatuple * logtable::findTuple_first(int xid, datatuple::key_t key, size_t keyS
//step 4: check old c1 if exists
if( get_tree_c1_mergeable() != 0)
{
DEBUG("old c1 tree not null\n");
ret_tuple = findTuple(xid, key, keySize, get_tree_c1_mergeable());
DEBUG("old c1 tree not null\n");
ret_tuple = get_tree_c1_mergeable()->findTuple(xid, key, keySize);
}
}
@ -377,13 +379,10 @@ datatuple * logtable::findTuple_first(int xid, datatuple::key_t key, size_t keyS
DEBUG("Not in old first disk tree\n");
//step 5: check c2
ret_tuple = findTuple(xid, key, keySize, tree_c2);
}
ret_tuple = get_tree_c2()->findTuple(xid, key, keySize);
}
}
pthread_mutex_unlock(mergedata->rbtree_mut);
datatuple::freetuple(search_tuple);
@ -444,56 +443,6 @@ void logtable::insertTuple(datatuple *tuple)
DEBUG("tree size %d tuples %lld bytes.\n", tsize, tree_bytes);
}
DataPage<datatuple>* logtable::insertTuple(int xid, datatuple *tuple, diskTreeComponent::internalNodes *ltree)
{
//create a new data page -- either the last region is full, or the last data page doesn't want our tuple. (or both)
DataPage<datatuple> * dp = 0;
int count = 0;
while(dp==0)
{
dp = new DataPage<datatuple>(xid, fixed_page_count, ltree->get_datapage_alloc());
//insert the record into the data page
if(!dp->append(tuple))
{
// the last datapage must have not wanted the tuple, and then this datapage figured out the region is full.
delete dp;
dp = 0;
assert(count == 0); // only retry once.
count ++;
}
}
ltree->appendPage(xid,
tuple->key(),
tuple->keylen(),
dp->get_start_pid()
);
//return the datapage
return dp;
}
datatuple * logtable::findTuple(int xid, datatuple::key_t key, size_t keySize, diskTreeComponent::internalNodes *ltree)
{
datatuple * tup=0;
//find the datapage
pageid_t pid = ltree->findPage(xid, (byte*)key, keySize);
if(pid!=-1)
{
DataPage<datatuple> * dp = new DataPage<datatuple>(xid, pid);
dp->recordRead(key, keySize, &tup);
delete dp;
}
return tup;
}
void logtable::registerIterator(logtableIterator<datatuple> * it) {
its.push_back(it);
}

View file

@ -46,7 +46,7 @@ class logtableIterator ;
class logtable
{
public:
logtable();
logtable(pageid_t internal_region_size = 1000, pageid_t datapage_region_size = 10000, pageid_t datapage_size = 40); // scans 160KB / 2 per lookup on average. at 100MB/s, this is 0.7 ms. XXX pick datapage_size in principled way.
~logtable();
//user access functions
@ -61,9 +61,8 @@ public:
void openTable(int xid, recordid rid);
void flushTable();
DataPage<datatuple>* insertTuple(int xid, datatuple *tuple,diskTreeComponent::internalNodes *ltree);
datatuple * findTuple(int xid, const datatuple::key_t key, size_t keySize, diskTreeComponent::internalNodes *ltree);
// DataPage<datatuple>* insertTuple(int xid, datatuple *tuple,diskTreeComponent::internalNodes *ltree);
// datatuple * findTuple(int xid, const datatuple::key_t key, size_t keySize, diskTreeComponent::internalNodes *ltree);
inline recordid & get_table_rec(){return table_rec;} // TODO This is called by merger.cpp for no good reason. (remove the calls)
@ -73,13 +72,13 @@ public:
void forgetIterator(logtableIterator<datatuple> * it);
void bump_epoch() ;
inline diskTreeComponent::internalNodes * get_tree_c2(){return tree_c2;}
inline diskTreeComponent::internalNodes * get_tree_c1(){return tree_c1;}
inline diskTreeComponent::internalNodes * get_tree_c1_mergeable(){return tree_c1_mergeable;}
inline diskTreeComponent * get_tree_c2(){return tree_c2;}
inline diskTreeComponent * get_tree_c1(){return tree_c1;}
inline diskTreeComponent * get_tree_c1_mergeable(){return tree_c1_mergeable;}
inline void set_tree_c1(diskTreeComponent::internalNodes *t){tree_c1=t; bump_epoch(); }
inline void set_tree_c1_mergeable(diskTreeComponent::internalNodes *t){tree_c1_mergeable=t; bump_epoch(); }
inline void set_tree_c2(diskTreeComponent::internalNodes *t){tree_c2=t; bump_epoch(); }
inline void set_tree_c1(diskTreeComponent *t){tree_c1=t; bump_epoch(); }
inline void set_tree_c1_mergeable(diskTreeComponent *t){tree_c1_mergeable=t; bump_epoch(); }
inline void set_tree_c2(diskTreeComponent *t){tree_c2=t; bump_epoch(); }
inline memTreeComponent<datatuple>::rbtree_ptr_t get_tree_c0(){return tree_c0;}
inline memTreeComponent<datatuple>::rbtree_ptr_t get_tree_c0_mergeable(){return tree_c0_mergeable;}
@ -88,10 +87,14 @@ public:
void update_persistent_header(int xid);
int get_fixed_page_count(){return fixed_page_count;}
void set_fixed_page_count(int count){fixed_page_count = count;}
void setMergeData(logtable_mergedata * mdata) {
this->mergedata = mdata;
void setMergeData(logtable_mergedata * mdata) { this->mergedata = mdata; bump_epoch(); }
mdata->internal_region_size = internal_region_size;
mdata->datapage_region_size = datapage_region_size;
mdata->datapage_size = datapage_size;
bump_epoch(); }
logtable_mergedata* getMergeData(){return mergedata;}
inline tuplemerger * gettuplemerger(){return tmerger;}
@ -122,18 +125,19 @@ private:
recordid table_rec;
struct table_header tbl_header;
uint64_t epoch;
diskTreeComponent::internalNodes *tree_c2; //big tree
diskTreeComponent::internalNodes *tree_c1; //small tree
diskTreeComponent::internalNodes *tree_c1_mergeable; //small tree: ready to be merged with c2
diskTreeComponent *tree_c2; //big tree
diskTreeComponent *tree_c1; //small tree
diskTreeComponent *tree_c1_mergeable; //small tree: ready to be merged with c2
memTreeComponent<datatuple>::rbtree_ptr_t tree_c0; // in-mem red black tree
memTreeComponent<datatuple>::rbtree_ptr_t tree_c0_mergeable; // in-mem red black tree: ready to be merged with c1.
int tsize; //number of tuples
int64_t tree_bytes; //number of bytes
//DATA PAGE SETTINGS
int fixed_page_count;//number of pages in a datapage
pageid_t internal_region_size; // in number of pages
pageid_t datapage_region_size; // "
pageid_t datapage_size; // "
tuplemerger *tmerger;
@ -333,7 +337,7 @@ private:
TUPLE> inner_merge_it_t;
typedef mergeManyIterator<
inner_merge_it_t,
diskTreeIterator<TUPLE>,
diskTreeComponent::diskTreeIterator,
TUPLE> merge_it_t;
merge_it_t* merge_it_;
@ -353,26 +357,26 @@ private:
void validate() {
typename memTreeComponent<TUPLE>::revalidatingIterator * c0_it;
typename memTreeComponent<TUPLE>::iterator *c0_mergeable_it[1];
diskTreeIterator<TUPLE> * disk_it[3];
diskTreeComponent::diskTreeIterator * disk_it[3];
epoch = ltable->get_epoch();
if(last_returned) {
c0_it = new typename memTreeComponent<TUPLE>::revalidatingIterator(ltable->get_tree_c0(), ltable->getMergeData()->rbtree_mut, last_returned);
c0_mergeable_it[0] = new typename memTreeComponent<TUPLE>::iterator (ltable->get_tree_c0_mergeable(), last_returned);
disk_it[0] = new diskTreeIterator<TUPLE> (ltable->get_tree_c1(), *last_returned);
disk_it[1] = new diskTreeIterator<TUPLE> (ltable->get_tree_c1_mergeable(), *last_returned);
disk_it[2] = new diskTreeIterator<TUPLE> (ltable->get_tree_c2(), *last_returned);
disk_it[0] = ltable->get_tree_c1()->iterator(last_returned);
disk_it[1] = ltable->get_tree_c1_mergeable()->iterator(last_returned);
disk_it[2] = ltable->get_tree_c2()->iterator(last_returned);
} else if(key) {
c0_it = new typename memTreeComponent<TUPLE>::revalidatingIterator(ltable->get_tree_c0(), ltable->getMergeData()->rbtree_mut, key);
c0_mergeable_it[0] = new typename memTreeComponent<TUPLE>::iterator (ltable->get_tree_c0_mergeable(), key);
disk_it[0] = new diskTreeIterator<TUPLE> (ltable->get_tree_c1(), *key);
disk_it[1] = new diskTreeIterator<TUPLE> (ltable->get_tree_c1_mergeable(), *key);
disk_it[2] = new diskTreeIterator<TUPLE> (ltable->get_tree_c2(), *key);
disk_it[0] = ltable->get_tree_c1()->iterator(key);
disk_it[1] = ltable->get_tree_c1_mergeable()->iterator(key);
disk_it[2] = ltable->get_tree_c2()->iterator(key);
} else {
c0_it = new typename memTreeComponent<TUPLE>::revalidatingIterator(ltable->get_tree_c0(), ltable->getMergeData()->rbtree_mut );
c0_mergeable_it[0] = new typename memTreeComponent<TUPLE>::iterator (ltable->get_tree_c0_mergeable() );
disk_it[0] = new diskTreeIterator<TUPLE> (ltable->get_tree_c1() );
disk_it[1] = new diskTreeIterator<TUPLE> (ltable->get_tree_c1_mergeable() );
disk_it[2] = new diskTreeIterator<TUPLE> (ltable->get_tree_c2() );
disk_it[0] = ltable->get_tree_c1()->iterator();
disk_it[1] = ltable->get_tree_c1_mergeable()->iterator();
disk_it[2] = ltable->get_tree_c2()->iterator();
}
inner_merge_it_t * inner_merge_it =

View file

@ -2,22 +2,6 @@
#include <math.h>
#include "merger.h"
#include "logiterators.cpp"
#include "datapage.h"
typedef struct merge_stats_t {
int merge_level; // 1 => C0->C1, 2 => C1->C2
pageid_t merge_count; // This is the merge_count'th merge
struct timeval sleep; // When did we go to sleep waiting for input?
struct timeval start; // When did we wake up and start merging? (at steady state with max throughput, this should be equal to sleep)
struct timeval done; // When did we finish merging?
pageid_t bytes_out; // How many bytes did we write (including internal tree nodes)?
pageid_t num_tuples_out; // How many tuples did we write?
pageid_t num_datapages_out; // How many datapages?
pageid_t bytes_in_small; // How many bytes from the small input tree (for C0, we ignore tree overheads)?
pageid_t num_tuples_in_small; // Tuples from the small input?
pageid_t bytes_in_large; // Bytes from the large input?
pageid_t num_tuples_in_large; // Tuples from large input?
} merge_stats_t;
void merge_stats_pp(FILE* fd, merge_stats_t &stats) {
long long sleep_time = stats.start.tv_sec - stats.sleep.tv_sec;
@ -61,11 +45,6 @@ void merge_stats_pp(FILE* fd, merge_stats_t &stats) {
double merge_stats_nsec_to_merge_in_bytes(merge_stats_t); // how many nsec did we burn on each byte from the small tree (want this to be equal for the two mergers)
inline DataPage<datatuple>*
insertTuple(int xid, DataPage<datatuple> *dp, datatuple *t,
logtable *ltable,
diskTreeComponent::internalNodes * ltree, merge_stats_t*);
int merge_scheduler::addlogtable(logtable *ltable)
{
@ -209,6 +188,9 @@ void merge_scheduler::startlogtable(int index, int64_t MAX_C0_SIZE)
block2_needed, //out_block_needed
block1_ready_cond, //in_block_ready_cond
block2_ready_cond, //out_block_ready_cond
mdata->internal_region_size,
mdata->datapage_region_size,
mdata->datapage_size,
0, //max_tree_size No max size for biggest component
&R, //r_i
};
@ -226,6 +208,9 @@ void merge_scheduler::startlogtable(int index, int64_t MAX_C0_SIZE)
block1_needed,
mdata->input_ready_cond,
block1_ready_cond,
mdata->internal_region_size, // TODO different region / datapage sizes for C1?
mdata->datapage_region_size,
mdata->datapage_size,
(int64_t)(R * R * MAX_C0_SIZE),
&R,
};
@ -245,7 +230,7 @@ void merge_iterators(int xid,
ITA *itrA,
ITB *itrB,
logtable *ltable,
diskTreeComponent::internalNodes *scratch_tree,
diskTreeComponent *scratch_tree,
merge_stats_t *stats,
bool dropDeletes);
@ -334,13 +319,13 @@ void* memMergeThread(void*arg)
// 4: Merge
//create the iterators
diskTreeIterator<datatuple> *itrA = new diskTreeIterator<datatuple>(ltable->get_tree_c1()->get_root_rec()); // XXX don't want get_root_rec() to be here.
diskTreeComponent::diskTreeIterator *itrA = ltable->get_tree_c1()->iterator();
memTreeComponent<datatuple>::iterator *itrB =
new memTreeComponent<datatuple>::iterator(ltable->get_tree_c0_mergeable());
//create a new tree
diskTreeComponent::internalNodes * c1_prime = new diskTreeComponent::internalNodes(xid); // XXX should not hardcode region size)
diskTreeComponent * c1_prime = new diskTreeComponent(xid, a->internal_region_size, a->datapage_region_size, a->datapage_size);
//pthread_mutex_unlock(a->block_ready_mut);
unlock(ltable->header_lock);
@ -355,10 +340,8 @@ void* memMergeThread(void*arg)
// 5: force c1'
//force write the new region to disk
c1_prime->get_internal_node_alloc()->force_regions(xid);
//force write the new datapages
c1_prime->get_datapage_alloc()->force_regions(xid);
//force write the new tree to disk
c1_prime->force(xid);
merge_count++;
DEBUG("mmt:\tmerge_count %lld #bytes written %lld\n", stats.merge_count, stats.bytes_out);
@ -391,8 +374,7 @@ void* memMergeThread(void*arg)
}
// 12: delete old c1
ltable->get_tree_c1()->get_internal_node_alloc()->dealloc_regions(xid);
ltable->get_tree_c1()->get_datapage_alloc()->dealloc_regions(xid);
ltable->get_tree_c1()->dealloc(xid);
delete ltable->get_tree_c1();
// 11.5: delete old c0_mergeable
@ -406,7 +388,7 @@ void* memMergeThread(void*arg)
ltable->set_tree_c1_mergeable(c1_prime);
// 8: c1 = new empty.
ltable->set_tree_c1(new diskTreeComponent::internalNodes(xid));
ltable->set_tree_c1(new diskTreeComponent(xid, a->internal_region_size, a->datapage_region_size, a->datapage_size));
pthread_cond_signal(a->out_block_ready_cond);
@ -491,13 +473,11 @@ void *diskMergeThread(void*arg)
// 4: do the merge.
//create the iterators
diskTreeIterator<datatuple> *itrA = new diskTreeIterator<datatuple>(ltable->get_tree_c2()->get_root_rec());
diskTreeIterator<datatuple> *itrB =
new diskTreeIterator<datatuple>(ltable->get_tree_c1_mergeable()->get_root_rec());
diskTreeComponent::diskTreeIterator *itrA = ltable->get_tree_c2()->iterator(); //new diskTreeIterator<datatuple>(ltable->get_tree_c2()->get_root_rec());
diskTreeComponent::diskTreeIterator *itrB = ltable->get_tree_c1_mergeable()->iterator();
//create a new tree
//TODO: maybe you want larger regions for the second tree?
diskTreeComponent::internalNodes * c2_prime = new diskTreeComponent::internalNodes(xid);
diskTreeComponent * c2_prime = new diskTreeComponent(xid, a->internal_region_size, a->datapage_region_size, a->datapage_size);
unlock(ltable->header_lock);
@ -510,19 +490,16 @@ void *diskMergeThread(void*arg)
delete itrB;
//5: force write the new region to disk
c2_prime->get_internal_node_alloc()->force_regions(xid);
c2_prime->get_datapage_alloc()->force_regions(xid);
c2_prime->force(xid);
// (skip 6, 7, 8, 8.5, 9))
writelock(ltable->header_lock,0);
//12
ltable->get_tree_c2()->get_internal_node_alloc()->dealloc_regions(xid);
ltable->get_tree_c2()->get_datapage_alloc()->dealloc_regions(xid);
ltable->get_tree_c2()->dealloc(xid);
delete ltable->get_tree_c2();
//11.5
ltable->get_tree_c1_mergeable()->get_internal_node_alloc()->dealloc_regions(xid);
ltable->get_tree_c1_mergeable()->get_datapage_alloc()->dealloc_regions(xid);
ltable->get_tree_c1_mergeable()->dealloc(xid);
//11
delete ltable->get_tree_c1_mergeable();
ltable->set_tree_c1_mergeable(0);
@ -560,7 +537,7 @@ void merge_iterators(int xid,
ITA *itrA, //iterator on c1 or c2
ITB *itrB, //iterator on c0 or c1, respectively
logtable *ltable,
diskTreeComponent::internalNodes *scratch_tree, merge_stats_t *stats,
diskTreeComponent *scratch_tree, merge_stats_t *stats,
bool dropDeletes // should be true iff this is biggest component
)
{
@ -571,8 +548,6 @@ void merge_iterators(int xid,
stats->bytes_in_large = 0;
stats->num_tuples_in_large = 0;
DataPage<datatuple> *dp = 0;
datatuple *t1 = itrA->next_callerFrees();
if(t1) {
stats->num_tuples_in_large++;
@ -591,8 +566,7 @@ void merge_iterators(int xid,
while(t1 != 0 && datatuple::compare(t1->key(), t1->keylen(), t2->key(), t2->keylen()) < 0) // t1 is less than t2
{
//insert t1
dp = insertTuple(xid, dp, t1, ltable, scratch_tree, stats);
scratch_tree->insertTuple(xid, t1, stats);
datatuple::freetuple(t1);
stats->num_tuples_out++;
//advance itrA
@ -608,9 +582,9 @@ void merge_iterators(int xid,
datatuple *mtuple = ltable->gettuplemerger()->merge(t1,t2);
//insert merged tuple, drop deletes
if(dropDeletes && !mtuple->isDelete())
dp = insertTuple(xid, dp, mtuple, ltable, scratch_tree, stats);
if(dropDeletes && !mtuple->isDelete()) {
scratch_tree->insertTuple(xid, mtuple, stats);
}
datatuple::freetuple(t1);
t1 = itrA->next_callerFrees(); //advance itrA
if(t1) {
@ -622,7 +596,7 @@ void merge_iterators(int xid,
else
{
//insert t2
dp = insertTuple(xid, dp, t2, ltable, scratch_tree, stats);
scratch_tree->insertTuple(xid, t2, stats);
// cannot free any tuples here; they may still be read through a lookup
}
@ -630,9 +604,8 @@ void merge_iterators(int xid,
stats->num_tuples_out++;
}
while(t1 != 0) // t1 is less than t2
{
dp = insertTuple(xid, dp, t1, ltable, scratch_tree, stats);
while(t1 != 0) {// t1 is less than t2
scratch_tree->insertTuple(xid, t1, stats);
datatuple::freetuple(t1);
stats->num_tuples_out++;
@ -642,33 +615,9 @@ void merge_iterators(int xid,
stats->num_tuples_in_large++;
stats->bytes_in_large += t1->byte_length();
}
}
if(dp!=NULL)
delete dp;
}
DEBUG("dpages: %d\tnpages: %d\tntuples: %d\n", dpages, npages, ntuples);
scratch_tree->writes_done();
}
inline DataPage<datatuple>*
insertTuple(int xid, DataPage<datatuple> *dp, datatuple *t,
logtable *ltable,
diskTreeComponent::internalNodes * ltree, merge_stats_t * stats)
{
if(dp==0)
{
dp = ltable->insertTuple(xid, t, ltree);
stats->num_datapages_out++;
}
else if(!dp->append(t))
{
stats->bytes_out += (PAGE_SIZE * dp->get_page_count());
delete dp;
dp = ltable->insertTuple(xid, t, ltree);
stats->num_datapages_out++;
}
return dp;
}

View file

@ -4,11 +4,30 @@
#include <vector>
#include <utility>
#include <stasis/common.h>
#undef try
#undef end
//TODO: 400 bytes overhead per tuple, this is nuts, check if this is true...
static const int RB_TREE_OVERHEAD = 400;
static const double MIN_R = 3.0;
class logtable;
typedef struct merge_stats_t {
int merge_level; // 1 => C0->C1, 2 => C1->C2
pageid_t merge_count; // This is the merge_count'th merge
struct timeval sleep; // When did we go to sleep waiting for input?
struct timeval start; // When did we wake up and start merging? (at steady state with max throughput, this should be equal to sleep)
struct timeval done; // When did we finish merging?
pageid_t bytes_out; // How many bytes did we write (including internal tree nodes)?
pageid_t num_tuples_out; // How many tuples did we write?
pageid_t num_datapages_out; // How many datapages?
pageid_t bytes_in_small; // How many bytes from the small input tree (for C0, we ignore tree overheads)?
pageid_t num_tuples_in_small; // Tuples from the small input?
pageid_t bytes_in_large; // Bytes from the large input?
pageid_t num_tuples_in_large; // Tuples from large input?
} merge_stats_t;
struct merger_args
{
logtable * ltable;
@ -25,6 +44,10 @@ struct merger_args
pthread_cond_t * in_block_ready_cond;
pthread_cond_t * out_block_ready_cond;
pageid_t internal_region_size;
pageid_t datapage_region_size;
pageid_t datapage_size;
int64_t max_size;
double * r_i;
};
@ -44,6 +67,10 @@ struct logtable_mergedata
pthread_cond_t * input_needed_cond;
int64_t * input_size;
pageid_t internal_region_size;
pageid_t datapage_region_size;
pageid_t datapage_size;
//merge args 1
struct merger_args *diskmerge_args;
//merge args 2
@ -52,8 +79,6 @@ struct logtable_mergedata
};
#include "logstore.h" // XXX hacky include workaround.
#include "logiterators.h"
class merge_scheduler
{

View file

@ -55,10 +55,9 @@ void initialize_server()
int xid = Tbegin();
mscheduler = new merge_scheduler;
logtable ltable;
int pcount = 40;
ltable.set_fixed_page_count(pcount);
recordid table_root = ROOT_RECORD;
if(TrecordType(xid, ROOT_RECORD) == INVALID_SLOT) {
printf("Creating empty logstore\n");

View file

@ -67,6 +67,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
{
dpages++;
if(dp)
dp->writes_done();
delete dp;
dp = new DataPage<datatuple>(xid, pcount, alloc);
@ -78,6 +79,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
}
}
if(dp) {
dp->writes_done();
delete dp;
}

View file

@ -13,18 +13,14 @@ int main(int argc, char **argv)
int xid = Tbegin();
logtable ltable;
logtable ltable(1000, 10000, 5);
recordid table_root = ltable.allocTable(xid);
Tcommit(xid);
//ltable.startTable();
// lsmTableHandle<PAGELAYOUT>* h = TlsmTableStart<PAGELAYOUT>(lsmTable, INVALID_COL);
xid = Tbegin();
diskTreeComponent::internalNodes::iterator * it = new diskTreeComponent::internalNodes::iterator(xid,ltable.get_tree_c2()->get_root_rec() );
diskTreeComponent::internalNodes::iterator * it = new diskTreeComponent::internalNodes::iterator(xid,ltable.get_tree_c2()->get_root_rid() );
it->close();
delete it;
Tcommit(xid);

View file

@ -18,8 +18,6 @@
#include "check_util.h"
template class diskTreeIterator<datatuple>;
void insertProbeIter(size_t NUM_ENTRIES)
{
srand(1000);
@ -32,19 +30,13 @@ void insertProbeIter(size_t NUM_ENTRIES)
int xid = Tbegin();
logtable ltable;
int pcount = 5;
ltable.set_fixed_page_count(pcount);
logtable ltable(1000, 10000, 5);
recordid table_root = ltable.allocTable(xid);
Tcommit(xid);
xid = Tbegin();
diskTreeComponent::internalNodes *ltable_c1 = ltable.get_tree_c1();
recordid tree_root = ltable_c1->get_root_rec();
diskTreeComponent *ltable_c1 = ltable.get_tree_c1();
std::vector<std::string> data_arr;
std::vector<std::string> key_arr;
@ -62,54 +54,30 @@ void insertProbeIter(size_t NUM_ENTRIES)
if(data_arr.size() > NUM_ENTRIES)
data_arr.erase(data_arr.begin()+NUM_ENTRIES, data_arr.end());
printf("Stage 1: Writing %d keys\n", NUM_ENTRIES);
int dpages = 0;
int npages = 0;
DataPage<datatuple> *dp=0;
int64_t datasize = 0;
std::vector<pageid_t> dsp;
merge_stats_t *stats = (merge_stats_t*)calloc(sizeof(stats), 1);
for(size_t i = 0; i < NUM_ENTRIES; i++)
{
//prepare the tuple
datatuple* newtuple = datatuple::create(key_arr[i].c_str(), key_arr[i].length()+1, data_arr[i].c_str(), data_arr[i].length()+1);
datasize += newtuple->byte_length();
stats->bytes_in_small += newtuple->byte_length();
if(dp == NULL)
{
dp = ltable.insertTuple(xid, newtuple, ltable_c1);
dpages++;
dsp.push_back(dp->get_start_pid());
}
else
{
if(!dp->append(newtuple))
{
npages += dp->get_page_count();
delete dp;
dp = ltable.insertTuple(xid, newtuple, ltable_c1);
dpages++;
dsp.push_back(dp->get_start_pid());
}
}
ltable_c1->insertTuple(xid, newtuple, stats);
datatuple::freetuple(newtuple);
}
printf("\nTREE STRUCTURE\n");
ltable_c1->print_tree(xid);
printf("Total data set length: %lld\n", datasize);
printf("Storage utilization: %.2f\n", (datasize+.0) / (PAGE_SIZE * npages));
printf("Number of datapages: %d\n", dpages);
printf("Total data set length: %lld\n", stats->bytes_in_small);
printf("Storage utilization: %.2f\n", (stats->bytes_in_small+.0) / (1.0* stats->bytes_out));
printf("Number of datapages: %lld\n", (long long)stats->num_datapages_out);
printf("Writes complete.\n");
if(dp)
delete dp;
ltable_c1->writes_done();
Tcommit(xid);
xid = Tbegin();
@ -117,11 +85,11 @@ void insertProbeIter(size_t NUM_ENTRIES)
printf("Stage 2: Sequentially reading %d tuples\n", NUM_ENTRIES);
size_t tuplenum = 0;
diskTreeIterator<datatuple> tree_itr(tree_root);
diskTreeComponent::diskTreeIterator * tree_itr = ltable_c1->iterator();
datatuple *dt=0;
while( (dt=tree_itr.next_callerFrees()) != NULL)
while( (dt=tree_itr->next_callerFrees()) != NULL)
{
assert(dt->keylen() == key_arr[tuplenum].length()+1);
assert(dt->datalen() == data_arr[tuplenum].length()+1);
@ -129,7 +97,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
datatuple::freetuple(dt);
dt = 0;
}
delete(tree_itr);
assert(tuplenum == key_arr.size());
printf("Sequential Reads completed.\n");
@ -142,7 +110,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
//randomly pick a key
int ri = rand()%key_arr.size();
datatuple *dt = ltable.findTuple(xid, (const datatuple::key_t) key_arr[ri].c_str(), (size_t)key_arr[ri].length()+1, ltable_c1);
datatuple *dt = ltable_c1->findTuple(xid, (const datatuple::key_t) key_arr[ri].c_str(), (size_t)key_arr[ri].length()+1);
assert(dt!=0);
assert(dt->keylen() == key_arr[ri].length()+1);

View file

@ -4,6 +4,7 @@
#include <iostream>
#include <sstream>
#include "logstore.h"
#include "diskTreeComponent.h"
#include <assert.h>
#include <limits.h>
@ -36,14 +37,10 @@ void insertProbeIter_str(int NUM_ENTRIES)
int xid = Tbegin();
logtable ltable;
recordid table_root = ltable.allocTable(xid);
Tcommit(xid);
xid = Tbegin();
diskTreeComponent::internalNodes *lt = ltable.get_tree_c1();
diskTreeComponent::internalNodes *lt = new diskTreeComponent::internalNodes(xid, 1000, 10000, 40);
long oldpagenum = -1;
@ -157,14 +154,10 @@ void insertProbeIter_int(int NUM_ENTRIES)
int xid = Tbegin();
logtable ltable;
recordid table_root = ltable.allocTable(xid);
Tcommit(xid);
xid = Tbegin();
diskTreeComponent::internalNodes *lt = ltable.get_tree_c1();
diskTreeComponent::internalNodes *lt = new diskTreeComponent::internalNodes(xid, 1000, 10000, 40);
long oldpagenum = -1;

View file

@ -47,10 +47,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
int xid = Tbegin();
merge_scheduler mscheduler;
logtable ltable;
int pcount = 5;
ltable.set_fixed_page_count(pcount);
logtable ltable(1000, 10000, 5);
recordid table_root = ltable.allocTable(xid);

View file

@ -46,10 +46,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
int xid = Tbegin();
merge_scheduler mscheduler;
logtable ltable;
int pcount = 100;
ltable.set_fixed_page_count(pcount);
logtable ltable(1000, 10000, 100);
recordid table_root = ltable.allocTable(xid);

View file

@ -102,10 +102,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
int xid = Tbegin();
merge_scheduler mscheduler;
logtable ltable;
int pcount = 40;
ltable.set_fixed_page_count(pcount);
logtable ltable(1000, 1000, 40);
recordid table_root = ltable.allocTable(xid);