revert accidentally committed files

This commit is contained in:
Sears Russell 2008-11-24 23:43:26 +00:00
parent a321ba6e4e
commit 11082997d8
2 changed files with 66 additions and 51 deletions

View file

@ -636,35 +636,67 @@ void TlsmFree(int xid, recordid tree, lsm_page_deallocator_t dealloc,
Tdealloc(xid, *(recordid*)allocator_state); Tdealloc(xid, *(recordid*)allocator_state);
} }
static const recordid lsmLookup(int xid, Page *node, int depth, const byte *key, static const recordid lsmLookup(int xid, Page *node, int depth,
size_t keySize, lsm_comparator_t cmp) { const byte *key, size_t keySize, lsm_comparator_t cmp) {
if(*recordcount_ptr(node) == FIRST_SLOT) { if(*recordcount_ptr(node) == FIRST_SLOT) {
return NULLRID; return NULLRID;
} }
assert(*recordcount_ptr(node) > FIRST_SLOT); assert(*recordcount_ptr(node) > FIRST_SLOT);
int match = FIRST_SLOT;
// don't need to compare w/ first item in tree. const lsmTreeNodeRecord *prev = readNodeRecord(xid,node,FIRST_SLOT,keySize);
const lsmTreeNodeRecord *rec = readNodeRecord(xid,node,FIRST_SLOT,keySize); slotid_t prev_slot = FIRST_SLOT;
int prev_cmp_key = cmp(prev+1,key);
// @todo binary search within each page
for(int i = FIRST_SLOT+1; i < *recordcount_ptr(node); i++) { for(int i = FIRST_SLOT+1; i < *recordcount_ptr(node); i++) {
rec = readNodeRecord(xid,node,i,keySize); const lsmTreeNodeRecord *rec = readNodeRecord(xid,node,i,keySize);
int cmpval = cmp(rec+1,key);
if(cmpval > 0) { int rec_cmp_key = cmp(rec+1,key);
break;
if(depth) {
if(prev_cmp_key <= 0 && rec_cmp_key > 0) {
pageid_t child_id = prev->ptr;
Page *child_page = loadPage(xid, child_id);
readlock(child_page->rwlatch,0);
recordid ret = lsmLookup(xid,child_page,depth-1,key,keySize,cmp);
unlock(child_page->rwlatch);
releasePage(child_page);
return ret;
}
} else {
// XXX Doesn't handle runs of duplicates.
if(prev_cmp_key <= 0 && rec_cmp_key > 0) {
recordid ret = {node->id, prev_slot, keySize};
return ret;
}
} }
match = i; prev = rec;
prev_slot = i;
prev_cmp_key = rec_cmp_key;
if(rec_cmp_key > 0) { break; }
} }
if(depth) { if(depth) {
pageid_t child_id = readNodeRecord(xid,node,match,keySize)->ptr; // this handles the rhs of the tree.
Page* child_page = loadPage(xid, child_id); if(prev_cmp_key <= 0) {
readlock(child_page->rwlatch,0); pageid_t child_id = prev->ptr;
recordid ret = lsmLookup(xid,child_page,depth-1,key,keySize,cmp); Page *child_page = loadPage(xid, child_id);
unlock(child_page->rwlatch); readlock(child_page->rwlatch,0);
releasePage(child_page); recordid ret = lsmLookup(xid,child_page,depth-1,key,keySize,cmp);
return ret; unlock(child_page->rwlatch);
releasePage(child_page);
return ret;
}
} else { } else {
recordid ret = {node->id, match, keySize}; if(prev_cmp_key <= 0) {
return ret; recordid ret = {node->id, prev_slot, keySize};
return ret;
}
} }
return NULLRID;
} }
static pageid_t lsmLookupLeafPageFromRid(int xid, recordid rid, size_t keySize) { static pageid_t lsmLookupLeafPageFromRid(int xid, recordid rid, size_t keySize) {
@ -820,11 +852,6 @@ lladdIterator_t* lsmTreeIterator_openAt(int xid, recordid root, const byte* key)
recordid lsm_entry_rid = lsmLookup(xid,p,depth,key,getKeySize(xid,p),comparators[cmp_nr->ptr]); recordid lsm_entry_rid = lsmLookup(xid,p,depth,key,getKeySize(xid,p),comparators[cmp_nr->ptr]);
if(lsm_entry_rid.page == NULLRID.page && lsm_entry_rid.slot == NULLRID.slot) {
return 0;
}
assert(lsm_entry_rid.size != INVALID_SLOT);
if(root.page != lsm_entry_rid.page) { if(root.page != lsm_entry_rid.page) {
unlock(p->rwlatch); unlock(p->rwlatch);
releasePage(p); releasePage(p);

View file

@ -107,15 +107,14 @@ namespace rose {
// this is just a guessed value... it seems about right based on // this is just a guessed value... it seems about right based on
// experiments, but 450 bytes overhead per tuple is insane! // experiments, but 450 bytes overhead per tuple is insane!
static const int RB_TREE_OVERHEAD = 400; // = 450; static const int RB_TREE_OVERHEAD = 400; // = 450;
static pageid_t C0_MEM_SIZE = 1000 * 1000 * 1000; static const pageid_t MEM_SIZE = 1000 * 1000 * 1000;
// static const pageid_t C0_MEM_SIZE = 100 * 1000; // static const pageid_t MEM_SIZE = 100 * 1000;
// How many pages should we try to fill with the first C1 merge? // How many pages should we try to fill with the first C1 merge?
static int R = 10; // XXX set this as low as possible (for dynamic setting. = sqrt(C2 size / C0 size)) static int R = 10; // XXX set this as low as possible (for dynamic setting. = sqrt(C2 size / C0 size))
#ifdef THROTTLED #ifdef THROTTLED
static const pageid_t START_SIZE = 100; //10 * 1000; /*10 **/ //1000; // XXX 4 is fudge related to RB overhead. static const pageid_t START_SIZE = 100; //10 * 1000; /*10 **/ //1000; // XXX 4 is fudge related to RB overhead.
#else #else
Do not run this code static const pageid_t START_SIZE = MEM_SIZE * R /( PAGE_SIZE * 4); //10 * 1000; /*10 **/ //1000; // XXX 4 is fudge related to RB overhead.
static const pageid_t START_SIZE = C0_MEM_SIZE * R /( PAGE_SIZE * 4); //10 * 1000; /*10 **/ //1000; // XXX 4 is fudge related to RB overhead.
#endif #endif
// Lower total work by perfomrming one merge at higher level // Lower total work by perfomrming one merge at higher level
// for every FUDGE^2 merges at the immediately lower level. // for every FUDGE^2 merges at the immediately lower level.
@ -148,9 +147,8 @@ namespace rose {
// loop around here to produce multiple batches for merge. // loop around here to produce multiple batches for merge.
gettimeofday(&start_push_tv,0); gettimeofday(&start_push_tv,0);
gettimeofday(&start_tv,0); gettimeofday(&start_tv,0);
pthread_mutex_lock(a->block_ready_mut);
while(1) { while(1) {
pthread_mutex_lock(a->block_ready_mut);
int done = 0; int done = 0;
@ -166,6 +164,7 @@ namespace rose {
*a->in_block_needed = false; *a->in_block_needed = false;
if(done) { if(done) {
pthread_cond_signal(a->out_block_ready_cond); pthread_cond_signal(a->out_block_ready_cond);
pthread_mutex_unlock(a->block_ready_mut);
break; break;
} }
@ -182,7 +181,7 @@ namespace rose {
ITERB *tbEnd = tbBegin->end(); ITERB *tbEnd = tbBegin->end();
{ // this { protects us from recalcitrant iterators below (tree iterators hold stasis page latches...) { // this { protects us from recalcitrant iterators below (tree iterators hold stasis page latches...)
///XXX pthread_mutex_unlock(a->block_ready_mut); pthread_mutex_unlock(a->block_ready_mut);
Tcommit(xid); Tcommit(xid);
xid = Tbegin(); xid = Tbegin();
@ -264,7 +263,7 @@ namespace rose {
gettimeofday(&start_push_tv,0); gettimeofday(&start_push_tv,0);
//XXX pthread_mutex_lock(a->block_ready_mut); pthread_mutex_lock(a->block_ready_mut);
// keep actual handle around so that it can be freed below. // keep actual handle around so that it can be freed below.
typename ITERB::handle old_in_tree = **a->in_tree; typename ITERB::handle old_in_tree = **a->in_tree;
@ -299,10 +298,10 @@ namespace rose {
if(a->out_tree) { if(a->out_tree) {
double frac_wasted = ((double)RB_TREE_OVERHEAD)/(double)(RB_TREE_OVERHEAD + PAGELAYOUT::FMT::TUP::sizeofBytes()); double frac_wasted = ((double)RB_TREE_OVERHEAD)/(double)(RB_TREE_OVERHEAD + PAGELAYOUT::FMT::TUP::sizeofBytes());
target_R = sqrt(((double)(*a->out_tree_size+*a->my_tree_size)) / ((C0_MEM_SIZE*(1-frac_wasted))/(4096*ratio))); target_R = sqrt(((double)(*a->out_tree_size+*a->my_tree_size)) / ((MEM_SIZE*(1-frac_wasted))/(4096*ratio)));
printf("R_C2-C1 = %6.1f R_C1-C0 = %6.1f target = %6.1f\n", printf("R_C2-C1 = %6.1f R_C1-C0 = %6.1f target = %6.1f\n",
((double)(*a->out_tree_size/*+*a->my_tree_size*/)) / ((double)*a->my_tree_size), ((double)(*a->out_tree_size/*+*a->my_tree_size*/)) / ((double)*a->my_tree_size),
((double)*a->my_tree_size) / ((double)(C0_MEM_SIZE*(1-frac_wasted))/(4096*ratio)),target_R); ((double)*a->my_tree_size) / ((double)(MEM_SIZE*(1-frac_wasted))/(4096*ratio)),target_R);
} }
#else #else
if(a->out_tree_size) { if(a->out_tree_size) {
@ -370,11 +369,11 @@ namespace rose {
assert(a->my_tree->r_.page != tree->r_.page); assert(a->my_tree->r_.page != tree->r_.page);
*a->my_tree = *tree; *a->my_tree = *tree;
pthread_mutex_unlock(a->block_ready_mut);
gettimeofday(&start_tv,0); gettimeofday(&start_tv,0);
} }
pthread_mutex_unlock(a->block_ready_mut);
Tcommit(xid); Tcommit(xid);
return 0; return 0;
@ -582,7 +581,7 @@ namespace rose {
ret->still_open, ret->still_open,
block0_size, block0_size,
block1_size, block1_size,
(R * C0_MEM_SIZE) / (PAGE_SIZE * 4), // XXX 4 = estimated compression ratio (R * MEM_SIZE) / (PAGE_SIZE * 4), // XXX 4 = estimated compression ratio
R, R,
//new typename LSM_ITER::treeIteratorHandle(NULLRID), //new typename LSM_ITER::treeIteratorHandle(NULLRID),
block0_scratch, block0_scratch,
@ -666,30 +665,25 @@ namespace rose {
assert(*((char*)t.get(i)) || *((char*)t.get(i))+1); assert(*((char*)t.get(i)) || *((char*)t.get(i))+1);
} */ } */
pthread_mutex_lock(h->mut); //XXX
h->scratch_tree->insert(t); h->scratch_tree->insert(t);
uint64_t handleBytes = h->scratch_tree->size() * (RB_TREE_OVERHEAD + PAGELAYOUT::FMT::TUP::sizeofBytes()); uint64_t handleBytes = h->scratch_tree->size() * (RB_TREE_OVERHEAD + PAGELAYOUT::FMT::TUP::sizeofBytes());
//XXX 4 = estimated compression ratio. //XXX 4 = estimated compression ratio.
uint64_t inputSizeThresh = (4 * PAGE_SIZE * *h->input_size); // / (PAGELAYOUT::FMT::TUP::sizeofBytes()); uint64_t inputSizeThresh = (4 * PAGE_SIZE * *h->input_size); // / (PAGELAYOUT::FMT::TUP::sizeofBytes());
uint64_t memSizeThresh = C0_MEM_SIZE; uint64_t memSizeThresh = MEM_SIZE;
#ifdef INFINITE_RESOURCES #ifdef INFINITE_RESOURCES
static const int LATCH_INTERVAL = 10000; static const int LATCH_INTERVAL = 10000;
static int count = LATCH_INTERVAL; /// XXX HACK static int count = LATCH_INTERVAL; /// XXX HACK
bool go = false; bool go = false;
if(!count) { if(!count) {
///XXX pthread_mutex_lock(h->mut); pthread_mutex_lock(h->mut);
go = *h->input_needed; go = *h->input_needed;
///XXX pthread_mutex_unlock(h->mut); pthread_mutex_unlock(h->mut);
count = LATCH_INTERVAL; count = LATCH_INTERVAL;
} }
count --; count --;
#endif #endif
pthread_mutex_unlock(h->mut);
if( (handleBytes > memSizeThresh / 2) && ( if( (handleBytes > memSizeThresh / 2) && (
#ifdef INFINITE_RESOURCES #ifdef INFINITE_RESOURCES
go || go ||
@ -847,7 +841,6 @@ namespace rose {
void** void**
TlsmTableFindGTE(int xid, lsmTableHandle<PAGELAYOUT> *h, TlsmTableFindGTE(int xid, lsmTableHandle<PAGELAYOUT> *h,
typename PAGELAYOUT::FMT::TUP &val) { typename PAGELAYOUT::FMT::TUP &val) {
pthread_mutex_lock(h->mut);
// typedef stlSetIterator<typename std::set<typename PAGELAYOUT::FMT::TUP, // typedef stlSetIterator<typename std::set<typename PAGELAYOUT::FMT::TUP,
typedef stlSetIterator<typename std::set<typename PAGELAYOUT::FMT::TUP, typedef stlSetIterator<typename std::set<typename PAGELAYOUT::FMT::TUP,
@ -887,11 +880,6 @@ namespace rose {
return ret; return ret;
} }
template<class PAGELAYOUT>
void
TlsmTableFindGTEDone(lsmTableHandle<PAGELAYOUT> *h) {
pthread_mutex_unlock(h->mut);
}
template<class PAGELAYOUT> template<class PAGELAYOUT>
const typename PAGELAYOUT::FMT::TUP * const typename PAGELAYOUT::FMT::TUP *
TlsmTableFind(int xid, lsmTableHandle<PAGELAYOUT> *h, TlsmTableFind(int xid, lsmTableHandle<PAGELAYOUT> *h,