Tuple insertion now works (modulo storage leakage, hard coded tree sizes (not even ratios), and many other problems...)

This commit is contained in:
Sears Russell 2007-11-02 21:20:30 +00:00
parent f101919244
commit 75cbb20e6d
4 changed files with 192 additions and 89 deletions

View file

@ -22,7 +22,7 @@ namespace rose {
unlink("logfile.txt");
sync();
stasis_page_impl_register(Multicolumn<typename PAGELAYOUT::FMT::TUP >::impl());
bufferManagerNonBlockingSlowHandleType = IO_HANDLE_PFILE;
Tinit();
@ -33,9 +33,24 @@ namespace rose {
Tcommit(xid);
TlsmTableStart<PAGELAYOUT>(lsmTable);
lsmTableHandle<PAGELAYOUT>* h = TlsmTableStart<PAGELAYOUT>(lsmTable);
TlsmTableStop<PAGELAYOUT>(lsmTable);
typename PAGELAYOUT::FMT::TUP t;
static const long INSERTS = 1000000;
long int count = INSERTS / 20;
for(long int i = 0; i < INSERTS; i++) {
t.set0(&i);
TlsmTableInsert(h,t);
count --;
if(!count) {
count = INSERTS / 20;
printf("%d pct complete\n", (i * 100) / INSERTS);
}
}
TlsmTableStop<PAGELAYOUT>(h);
Tdeinit();
@ -48,6 +63,7 @@ int main(int argc, char **argv) {
// XXX multicolumn is deprecated; want static dispatch!
return rose::main
<rose::SingleColumnTypePageLayout
<rose::Multicolumn<tup>,rose::Rle<val_t> > >
<rose::Multicolumn<tup>,rose::For<val_t> > >
(argc,argv);
return 0;
}

View file

@ -46,14 +46,8 @@ inline const byte * toByteArray(stlSetIterator<STLITER,ROW> * const t);
*/
template <class ROW, class PAGELAYOUT>
class treeIterator {
public:
explicit treeIterator(recordid tree, ROW &scratch, int keylen) :
tree_(tree),
scratch_(scratch),
keylen_(keylen),
lsmIterator_(lsmTreeIterator_open(-1,tree)),
slot_(0)
{
private:
inline void init_helper() {
if(!lsmTreeIterator_next(-1, lsmIterator_)) {
currentPage_ = 0;
pageid_ = -1;
@ -66,12 +60,26 @@ class treeIterator {
currentPage_ = (PAGELAYOUT*)p_->impl;
}
}
public:
explicit treeIterator(recordid tree, ROW &scratch, int keylen) :
tree_(tree),
scratch_(scratch),
keylen_(keylen),
lsmIterator_(lsmTreeIterator_open(-1,tree)),
slot_(0)
{
init_helper();
}
typedef recordid handle;
explicit treeIterator(recordid tree) :
tree_(tree),
scratch_(),
keylen_(ROW::sizeofBytes())
{ }
keylen_(ROW::sizeofBytes()),
lsmIterator_(lsmTreeIterator_open(-1,tree)),
slot_(0)
{
init_helper();
}
explicit treeIterator(treeIterator& t) :
tree_(t.tree_),
scratch_(t.scratch_),
@ -391,9 +399,9 @@ class versioningIterator {
private:
typedef typename SET::iterator STLITER;
public:
typedef SET handle;
typedef SET * handle;
stlSetIterator( SET& s ) : it_(s.begin()), itend_(s.end()) {}
stlSetIterator( SET * s ) : it_(s->begin()), itend_(s->end()) {}
stlSetIterator( STLITER& it, STLITER& itend ) : it_(it), itend_(itend) {}
explicit stlSetIterator(stlSetIterator &i) : it_(i.it_), itend_(i.itend_){}

View file

@ -30,12 +30,13 @@ namespace rose {
pthread_cond_t * in_block_ready_cond;
pthread_cond_t * out_block_ready_cond;
bool * still_open;
typename ITERA::handle ** out_tree;
typename ITERA::handle in_process_tree;
typename ITERB::handle ** in_tree;
typename ITERA::handle ** out_tree;
};
template <class PAGELAYOUT, class ITER>
pageid_t compressData(ITER * begin, ITER * end, recordid tree,
pageid_t compressData(int xid, ITER * begin, ITER * end, recordid tree,
pageid_t (*pageAlloc)(int,void*),
void *pageAllocState, uint64_t *inserted) {
*inserted = 0;
@ -43,12 +44,12 @@ namespace rose {
if(*begin == *end) {
return 0;
}
pageid_t next_page = pageAlloc(-1,pageAllocState);
Page *p = loadPage(-1, next_page);
pageid_t next_page = pageAlloc(xid,pageAllocState);
Page *p = loadPage(xid, next_page);
pageid_t pageCount = 0;
if(*begin != *end) {
TlsmAppendPage(-1,tree,toByteArray(begin),pageAlloc,pageAllocState,p->id);
TlsmAppendPage(xid,tree,toByteArray(begin),pageAlloc,pageAllocState,p->id);
}
pageCount++;
@ -57,7 +58,7 @@ namespace rose {
int lastEmpty = 0;
for(ITER i(*begin); i != *end; ++i) {
rose::slot_index_t ret = mc->append(-1, *i);
rose::slot_index_t ret = mc->append(xid, *i);
(*inserted)++;
@ -68,12 +69,12 @@ namespace rose {
--(*end);
if(i != *end) {
next_page = pageAlloc(-1,pageAllocState);
p = loadPage(-1, next_page);
next_page = pageAlloc(xid,pageAllocState);
p = loadPage(xid, next_page);
mc = PAGELAYOUT::initPage(p, &*i);
TlsmAppendPage(-1,tree,toByteArray(&i),pageAlloc,pageAllocState,p->id);
TlsmAppendPage(xid,tree,toByteArray(&i),pageAlloc,pageAllocState,p->id);
pageCount++;
lastEmpty = 0;
@ -109,49 +110,58 @@ namespace rose {
// Initialize tree with an empty tree.
// XXX hardcodes ITERA's type:
recordid oldtree = TlsmCreate(xid, PAGELAYOUT::cmp_id(),a->pageAlloc,
typename ITERA::handle oldtree = TlsmCreate(xid, PAGELAYOUT::cmp_id(),a->pageAlloc,
a->pageAllocState,PAGELAYOUT::FMT::TUP::sizeofBytes());
Tcommit(xid);
// loop around here to produce multiple batches for merge.
while(1) {
gettimeofday(&start_tv,0);
pthread_mutex_lock(a->block_ready_mut);
if(!*(a->still_open)) {
int done = 0;
while(!*(a->in_tree)) {
pthread_cond_signal(a->in_block_needed_cond);
if(!*(a->still_open)) {
done = 1;
break;
}
pthread_cond_wait(a->in_block_ready_cond,a->block_ready_mut);
}
if(done) {
a->in_process_tree = oldtree;
pthread_cond_signal(a->out_block_ready_cond);
pthread_mutex_unlock(a->block_ready_mut);
break;
}
while(!*(a->in_tree)) {
pthread_cond_signal(a->in_block_needed_cond);
pthread_cond_wait(a->in_block_ready_cond,a->block_ready_mut);
}
gettimeofday(&wait_tv,0);
// XXX keep in_tree handle around so that it can be freed below.
typename ITERB::handle old_in_tree = **a->in_tree;
ITERA taBegin(oldtree);
ITERB tbBegin(**a->in_tree);
ITERA *taEnd = taBegin.end();
ITERB *tbEnd = tbBegin.end();
free(*a->in_tree); // free's copy of handle; not tree
*a->in_tree = 0; // free slot for producer
pthread_cond_signal(a->in_block_needed_cond);
pthread_mutex_unlock(a->block_ready_mut);
xid = Tbegin();
recordid tree = TlsmCreate(xid, PAGELAYOUT::cmp_id(),a->pageAlloc,
a->pageAllocState,PAGELAYOUT::FMT::TUP::sizeofBytes());
ITERA taBegin(oldtree);
ITERB tbBegin(**a->in_tree);
// XXX keep in_tree handle around so that it can be freed below.
free(*a->in_tree); // free's copy of handle; not tree
*a->in_tree = 0; // free slot for producer
pthread_cond_signal(a->in_block_needed_cond);
pthread_mutex_unlock(a->block_ready_mut);
ITERA *taEnd = taBegin.end();
ITERB *tbEnd = tbBegin.end();
mergeIterator<ITERA, ITERB, typename PAGELAYOUT::FMT::TUP>
mBegin(taBegin, tbBegin, *taEnd, *tbEnd);
@ -161,8 +171,9 @@ namespace rose {
mEnd.seekEnd();
uint64_t insertedTuples;
pageid_t mergedPages = compressData<PAGELAYOUT,mergeIterator<ITERA,ITERB,typename PAGELAYOUT::FMT::TUP> >
(&mBegin, &mEnd,tree,a->pageAlloc,a->pageAllocState,&insertedTuples);
pageid_t mergedPages = compressData
<PAGELAYOUT,mergeIterator<ITERA,ITERB,typename PAGELAYOUT::FMT::TUP> >
(xid, &mBegin, &mEnd,tree,a->pageAlloc,a->pageAllocState,&insertedTuples);
delete taEnd;
delete tbEnd;
@ -195,6 +206,11 @@ namespace rose {
a->pageAllocState,PAGELAYOUT::FMT::TUP::sizeofBytes());
}
// XXX TlsmFree(xid,oldtree);
oldtree = tree;
pthread_mutex_unlock(a->block_ready_mut);
merge_count++;
@ -208,8 +224,8 @@ namespace rose {
/ (1024.0 * 1024.0 * total_elapsed);
printf("merge # %-6d: comp ratio: %-9.3f waited %6.1f sec "
"worked %6.1f sec inserts %-12ld (%9.3f mb/s)\n", merge_count, ratio,
wait_elapsed, work_elapsed, (unsigned long)insertedTuples, throughput);
"worked %6.1f sec inserts %-12ld (%9.3f mb/s) %6d pages\n", merge_count, ratio,
wait_elapsed, work_elapsed, (unsigned long)insertedTuples, throughput, mergedPages);
Tcommit(xid);
@ -250,13 +266,33 @@ namespace rose {
return ret;
}
/// XXX start should return a struct that contains these!
pthread_t merge1_thread;
pthread_t merge2_thread;
bool * still_open;
template <class PAGELAYOUT>
struct lsmTableHandle {
pthread_t merge1_thread;
pthread_t merge2_thread;
bool * still_open;
typename stlSetIterator
<typename std::set
<typename PAGELAYOUT::FMT::TUP,
typename PAGELAYOUT::FMT::TUP::stl_cmp>,
typename PAGELAYOUT::FMT::TUP>::handle ** input_handle;
typename std::set
<typename PAGELAYOUT::FMT::TUP,
typename PAGELAYOUT::FMT::TUP::stl_cmp> * scratch_handle;
pthread_mutex_t * mut;
pthread_cond_t * input_ready_cond;
pthread_cond_t * input_needed_cond;
merge_args<PAGELAYOUT, treeIterator<typename PAGELAYOUT::FMT::TUP,
typename PAGELAYOUT::FMT>, treeIterator<typename PAGELAYOUT::FMT::TUP,
typename PAGELAYOUT::FMT> > * args1;
merge_args<PAGELAYOUT, treeIterator<typename PAGELAYOUT::FMT::TUP,
typename PAGELAYOUT::FMT>, stlSetIterator<typename std::set<typename PAGELAYOUT::FMT::TUP,
typename PAGELAYOUT::FMT::TUP::stl_cmp>,
typename PAGELAYOUT::FMT::TUP> > * args2;
};
template<class PAGELAYOUT>
void TlsmTableStart(recordid tree) {
lsmTableHandle <PAGELAYOUT> * TlsmTableStart(recordid& tree) {
/// XXX xid for daemon processes?
lsmTableHeader_t h;
Tread(-1, tree, &h);
@ -268,6 +304,9 @@ namespace rose {
typename PAGELAYOUT::FMT::TUP::stl_cmp>,
typename PAGELAYOUT::FMT::TUP> RB_ITER;
typedef typename LSM_ITER::handle LSM_HANDLE;
typedef typename RB_ITER::handle RB_HANDLE;
pthread_mutex_t * block_ready_mut =
(pthread_mutex_t*)malloc(sizeof(pthread_mutex_t));
pthread_cond_t * block0_needed_cond =
@ -291,17 +330,34 @@ namespace rose {
pthread_cond_init(block1_ready_cond,0);
pthread_cond_init(block2_ready_cond,0);
typename LSM_ITER::handle * block1_scratch =
(typename LSM_ITER::handle*) malloc(sizeof(typename LSM_ITER::handle));
still_open = (bool*)malloc(sizeof(bool));
*still_open = 1;
LSM_HANDLE ** block1_scratch = (LSM_HANDLE**) malloc(sizeof(LSM_HANDLE*));
*block1_scratch = 0;
RB_HANDLE ** block0_scratch = (RB_HANDLE**) malloc(sizeof(RB_HANDLE*));
*block0_scratch = 0;
lsmTableHandle<PAGELAYOUT> * ret = (lsmTableHandle<PAGELAYOUT>*)
malloc(sizeof(lsmTableHandle<PAGELAYOUT>));
// merge1_thread initialized during pthread_create, below.
// merge2_thread initialized during pthread_create, below.
ret->still_open = (bool*)malloc(sizeof(bool));
*ret->still_open = 1;
ret->input_handle = block0_scratch;
ret->scratch_handle = new typeof(*ret->scratch_handle);
ret->mut = block_ready_mut;
ret->input_ready_cond = block0_ready_cond;
ret->input_needed_cond = block0_needed_cond;
recordid * ridp = (recordid*)malloc(sizeof(recordid));
*ridp = h.bigTreeAllocState;
recordid ** block1_scratch_p = (recordid**)malloc(sizeof(block1_scratch));
*block1_scratch_p = block1_scratch;
merge_args<PAGELAYOUT, LSM_ITER, LSM_ITER> * args1 = (merge_args<PAGELAYOUT,LSM_ITER,LSM_ITER>*)malloc(sizeof(merge_args<PAGELAYOUT,LSM_ITER,LSM_ITER>));
ret->args1 = (merge_args<PAGELAYOUT,LSM_ITER,LSM_ITER>*)malloc(sizeof(merge_args<PAGELAYOUT,LSM_ITER,LSM_ITER>));
merge_args<PAGELAYOUT, LSM_ITER, LSM_ITER> tmpargs1 =
{
TlsmRegionAllocRid,
@ -311,18 +367,18 @@ namespace rose {
block2_needed_cond,
block1_ready_cond,
block2_ready_cond,
still_open,
0,
block1_scratch_p
ret->still_open,
NULLRID,
block1_scratch,
0
};
*args1 = tmpargs1;
void * (*merger1)(void*) = mergeThread
<PAGELAYOUT, LSM_ITER, LSM_ITER>;
*ret->args1 = tmpargs1;
void * (*merger1)(void*) = mergeThread<PAGELAYOUT, LSM_ITER, LSM_ITER>;
ridp = (recordid*)malloc(sizeof(recordid));
*ridp = h.mediumTreeAllocState;
merge_args<PAGELAYOUT, LSM_ITER, RB_ITER> * args2 = (merge_args<PAGELAYOUT,LSM_ITER,RB_ITER>*)malloc(sizeof(merge_args<PAGELAYOUT,LSM_ITER,RB_ITER>));
ret->args2 = (merge_args<PAGELAYOUT,LSM_ITER,RB_ITER>*)malloc(sizeof(merge_args<PAGELAYOUT,LSM_ITER,RB_ITER>));
merge_args<PAGELAYOUT, LSM_ITER, RB_ITER> tmpargs2 =
{
TlsmRegionAllocRid,
@ -332,24 +388,48 @@ namespace rose {
block1_needed_cond,
block0_ready_cond,
block1_ready_cond,
still_open,
block1_scratch_p,
0 // XXX how does this thing get fed new trees of tuples?
ret->still_open,
NULLRID,
block0_scratch,
block1_scratch
};
*args2 = tmpargs2;
void * (*merger2)(void*) = mergeThread
<PAGELAYOUT, LSM_ITER, RB_ITER>;
*ret->args2 = tmpargs2;
void * (*merger2)(void*) = mergeThread<PAGELAYOUT, LSM_ITER, RB_ITER>;
pthread_create(&ret->merge1_thread, 0, merger1, ret->args1);
pthread_create(&ret->merge2_thread, 0, merger2, ret->args2);
pthread_create(&merge1_thread, 0, merger1, args1);
pthread_create(&merge2_thread, 0, merger2, args2);
return ret;
}
template<class PAGELAYOUT>
void TlsmTableStop(recordid tree) {
*still_open = 0;
pthread_join(merge1_thread,0);
pthread_join(merge2_thread,0);
void TlsmTableStop( lsmTableHandle<PAGELAYOUT> * h) {
*(h->still_open) = 0;
pthread_join(h->merge1_thread,0);
pthread_join(h->merge2_thread,0);
}
template<class PAGELAYOUT>
void TlsmTableInsert( lsmTableHandle<PAGELAYOUT> *h,
typename PAGELAYOUT::FMT::TUP &t) {
h->scratch_handle->insert(t);
if(h->scratch_handle->size() > 100000) { // XXX set threshold sanely!!!
pthread_mutex_lock(h->mut);
while(*h->input_handle) {
pthread_cond_wait(h->input_needed_cond, h->mut);
}
typeof(h->scratch_handle)* tmp_ptr
= (typeof(h->scratch_handle)*) malloc(sizeof(void*));
*tmp_ptr = h->scratch_handle;
*(h->input_handle) = tmp_ptr;
pthread_cond_signal(h->input_ready_cond);
h->scratch_handle = new typeof(*h->scratch_handle);
pthread_mutex_unlock(h->mut);
}
}
}

View file

@ -17,7 +17,7 @@ namespace rose {
s.flag_ = NORMAL;
initializePointers();
}
explicit inline StaticTuple(StaticTuple& t) {
explicit inline StaticTuple(const StaticTuple& t) {
s.flag_ = t.s.flag_;
s.epoch_ = t.s.epoch_;
if(0 < N) s.cols0_ = t.s.cols0_;
@ -151,9 +151,9 @@ namespace rose {
struct stl_cmp
{
bool operator()(const StaticTuple* s1, const StaticTuple* s2) const
bool operator()(const StaticTuple& s1, const StaticTuple& s2) const
{
return *s1 < *s2;
return s1 < s2;
}
};
@ -178,6 +178,7 @@ namespace rose {
if(9 < N) scratch_.set9((TYPE9*)dat_[9][off_]);
return scratch_;
}
inline bool operator==(const iterator &a) const {
return (off_==a.off_);
}
@ -204,8 +205,6 @@ namespace rose {
};
private:
explicit StaticTuple(const StaticTuple& t) { abort(); }
void * cols_[N];
size_t size_[N];
typedef char flag_t;