latching fixes, server no longer ignores --test, performance tuning
git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@759 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
parent
ead6207f86
commit
2ea8b9ff44
5 changed files with 47 additions and 22 deletions
|
@ -85,7 +85,7 @@ DataPage<TUPLE>::DataPage(int xid, pageid_t page_count, RegionAllocator *alloc)
|
|||
first_page_(alloc_->alloc_extent(xid_, page_count_)),
|
||||
write_offset_(0)
|
||||
{
|
||||
DEBUG("Datapage page count: %lld pid = %lld\n", (long long int)page_count_, (long long int)first_page_);
|
||||
printf("Datapage page count: %lld pid = %lld\n", (long long int)initial_page_count_, (long long int)first_page_);
|
||||
assert(page_count_ >= 1);
|
||||
initialize();
|
||||
}
|
||||
|
@ -241,7 +241,12 @@ template <class TUPLE>
|
|||
bool DataPage<TUPLE>::append(TUPLE const * dat)
|
||||
{
|
||||
// Don't append record to already-full datapage. The record could push us over the page limit, but that's OK.
|
||||
if(write_offset_ > (initial_page_count_ * PAGE_SIZE)) { return false; }
|
||||
if(write_offset_ > (initial_page_count_ * PAGE_SIZE)) {
|
||||
DEBUG("offset %lld closing datapage\n", write_offset_);
|
||||
return false;
|
||||
}
|
||||
|
||||
DEBUG("offset %lld continuing datapage\n", write_offset_);
|
||||
|
||||
byte * buf = dat->to_bytes(); // TODO could be more efficient; this does a malloc and memcpy. The alternative couples us more strongly to datapage, but simplifies datapage.
|
||||
len_t dat_len = dat->byte_length();
|
||||
|
|
|
@ -805,12 +805,12 @@ diskTreeComponent::internalNodes::iterator::iterator(int xid, recordid root, con
|
|||
current.size = lsm_entry_rid.size;
|
||||
|
||||
xid_ = xid;
|
||||
t = 0; // must be zero so free() doesn't croak.
|
||||
justOnePage = (depth==0);
|
||||
|
||||
DEBUG("diskTreeComponentIterator: index root %lld index page %lld data page %lld key %s\n", root.page, current.page, rec->ptr, key);
|
||||
DEBUG("entry = %s key = %s\n", (char*)(rec+1), (char*)key);
|
||||
}
|
||||
t = 0; // must be zero so free() doesn't croak.
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -859,7 +859,6 @@ int diskTreeComponent::internalNodes::iterator::next()
|
|||
} else {
|
||||
assert(!p);
|
||||
if(t != NULL) { free(t); t = NULL; }
|
||||
t = 0;
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
@ -869,8 +868,12 @@ void diskTreeComponent::internalNodes::iterator::close() {
|
|||
if(p) {
|
||||
unlock(p->rwlatch);
|
||||
releasePage(p);
|
||||
p = NULL;
|
||||
}
|
||||
if(t) {
|
||||
free(t);
|
||||
t = NULL;
|
||||
}
|
||||
if(t) free(t);
|
||||
}
|
||||
|
||||
|
||||
|
|
23
logstore.h
23
logstore.h
|
@ -36,7 +36,19 @@ class logtable {
|
|||
public:
|
||||
|
||||
class iterator;
|
||||
logtable(pageid_t internal_region_size = 1000, pageid_t datapage_region_size = 10000, pageid_t datapage_size = 40); // scans 160KB / 2 per lookup on average. at 100MB/s, this is 0.7 ms. XXX pick datapage_size in principled way.
|
||||
|
||||
// We want datapages to be as small as possible, assuming they don't force an extra seek to traverse the bottom level of internal nodes.
|
||||
// Internal b-tree mem requirements:
|
||||
// - Assume keys are small (compared to stasis pages) so we can ignore all but the bottom level of the tree.
|
||||
//
|
||||
// |internal nodes| ~= (|key| * |tree|) / (datapage_size * |stasis PAGE_SIZE|)
|
||||
//
|
||||
// Plugging in the numbers today:
|
||||
//
|
||||
// 6GB ~= 100B * 500 GB / (datapage_size * 4KB)
|
||||
// (100B * 500GB) / (6GB * 4KB) = 2.035
|
||||
logtable(pageid_t internal_region_size = 1000, pageid_t datapage_region_size = 10000, pageid_t datapage_size = 2);
|
||||
|
||||
~logtable();
|
||||
|
||||
//user access functions
|
||||
|
@ -259,9 +271,11 @@ public:
|
|||
}
|
||||
|
||||
~iterator() {
|
||||
ltable->forgetIterator(this);
|
||||
invalidate();
|
||||
if(last_returned) TUPLE::freetuple(last_returned);
|
||||
writelock(ltable->header_lock,0);
|
||||
ltable->forgetIterator(this);
|
||||
invalidate();
|
||||
if(last_returned) TUPLE::freetuple(last_returned);
|
||||
unlock(ltable->header_lock);
|
||||
}
|
||||
private:
|
||||
TUPLE * getnextHelper() {
|
||||
|
@ -292,6 +306,7 @@ public:
|
|||
}
|
||||
|
||||
void invalidate() {
|
||||
assert(!trywritelock(ltable->header_lock,0));
|
||||
if(valid) {
|
||||
delete merge_it_;
|
||||
merge_it_ = NULL;
|
||||
|
|
22
merger.cpp
22
merger.cpp
|
@ -274,10 +274,11 @@ void* memMergeThread(void*arg)
|
|||
|
||||
while(true) // 1
|
||||
{
|
||||
merge_stats_t stats;
|
||||
stats.merge_level = 1;
|
||||
stats.merge_count = merge_count;
|
||||
gettimeofday(&stats.sleep,0);
|
||||
merge_stats_t stats;
|
||||
memset((void*)&stats, 0, sizeof(stats));
|
||||
stats.merge_level = 1;
|
||||
stats.merge_count = merge_count;
|
||||
gettimeofday(&stats.sleep,0);
|
||||
writelock(ltable->header_lock,0);
|
||||
int done = 0;
|
||||
// 2: wait for c0_mergable
|
||||
|
@ -433,12 +434,13 @@ void *diskMergeThread(void*arg)
|
|||
|
||||
while(true)
|
||||
{
|
||||
merge_stats_t stats;
|
||||
stats.merge_level = 2;
|
||||
stats.merge_count = merge_count;
|
||||
gettimeofday(&stats.sleep,0);
|
||||
// 2: wait for input
|
||||
writelock(ltable->header_lock,0);
|
||||
merge_stats_t stats;
|
||||
memset((void*)&stats, 0, sizeof(stats));
|
||||
stats.merge_level = 2;
|
||||
stats.merge_count = merge_count;
|
||||
gettimeofday(&stats.sleep,0);
|
||||
// 2: wait for input
|
||||
writelock(ltable->header_lock,0);
|
||||
int done = 0;
|
||||
// get a new input for merge
|
||||
while(!ltable->get_tree_c1_mergeable())
|
||||
|
|
|
@ -77,9 +77,9 @@ int main(int argc, char *argv[])
|
|||
int lindex = mscheduler->addlogtable(<able);
|
||||
ltable.setMergeData(mscheduler->getMergeData(lindex));
|
||||
|
||||
int64_t c0_size = 1024 * 1024 * 1024 * 1;
|
||||
int64_t c0_size = 1024 * 1024 * 512 * 1;
|
||||
|
||||
if(argc == 2 && !strcmp(argv[0], "--test")) {
|
||||
if(argc == 2 && !strcmp(argv[1], "--test")) {
|
||||
|
||||
c0_size = 1024 * 1024 * 10;
|
||||
printf("warning: running w/ tiny c0 for testing"); // XXX build a separate test server and deployment server?
|
||||
|
@ -89,7 +89,7 @@ int main(int argc, char *argv[])
|
|||
|
||||
unlock(ltable.header_lock);
|
||||
|
||||
lserver = new logserver(10, 32432);
|
||||
lserver = new logserver(100, 32432);
|
||||
|
||||
lserver->startserver(<able);
|
||||
|
||||
|
|
Loading…
Reference in a new issue