From 2ea8b9ff4421f29284bbfb42d633426aa8c8a824 Mon Sep 17 00:00:00 2001 From: sears Date: Wed, 24 Mar 2010 20:30:35 +0000 Subject: [PATCH] latching fixes, server no longer ignores --test, performance tuning git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@759 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe --- datapage.cpp | 9 +++++++-- diskTreeComponent.cpp | 9 ++++++--- logstore.h | 23 +++++++++++++++++++---- merger.cpp | 22 ++++++++++++---------- server.cpp | 6 +++--- 5 files changed, 47 insertions(+), 22 deletions(-) diff --git a/datapage.cpp b/datapage.cpp index c8e6e2e..f57a85a 100644 --- a/datapage.cpp +++ b/datapage.cpp @@ -85,7 +85,7 @@ DataPage::DataPage(int xid, pageid_t page_count, RegionAllocator *alloc) first_page_(alloc_->alloc_extent(xid_, page_count_)), write_offset_(0) { - DEBUG("Datapage page count: %lld pid = %lld\n", (long long int)page_count_, (long long int)first_page_); + printf("Datapage page count: %lld pid = %lld\n", (long long int)initial_page_count_, (long long int)first_page_); assert(page_count_ >= 1); initialize(); } @@ -241,7 +241,12 @@ template bool DataPage::append(TUPLE const * dat) { // Don't append record to already-full datapage. The record could push us over the page limit, but that's OK. - if(write_offset_ > (initial_page_count_ * PAGE_SIZE)) { return false; } + if(write_offset_ > (initial_page_count_ * PAGE_SIZE)) { + DEBUG("offset %lld closing datapage\n", write_offset_); + return false; + } + + DEBUG("offset %lld continuing datapage\n", write_offset_); byte * buf = dat->to_bytes(); // TODO could be more efficient; this does a malloc and memcpy. The alternative couples us more strongly to datapage, but simplifies datapage. len_t dat_len = dat->byte_length(); diff --git a/diskTreeComponent.cpp b/diskTreeComponent.cpp index f36b715..77e6036 100644 --- a/diskTreeComponent.cpp +++ b/diskTreeComponent.cpp @@ -805,12 +805,12 @@ diskTreeComponent::internalNodes::iterator::iterator(int xid, recordid root, con current.size = lsm_entry_rid.size; xid_ = xid; - t = 0; // must be zero so free() doesn't croak. justOnePage = (depth==0); DEBUG("diskTreeComponentIterator: index root %lld index page %lld data page %lld key %s\n", root.page, current.page, rec->ptr, key); DEBUG("entry = %s key = %s\n", (char*)(rec+1), (char*)key); } + t = 0; // must be zero so free() doesn't croak. } /** @@ -859,7 +859,6 @@ int diskTreeComponent::internalNodes::iterator::next() } else { assert(!p); if(t != NULL) { free(t); t = NULL; } - t = 0; return 0; } } @@ -869,8 +868,12 @@ void diskTreeComponent::internalNodes::iterator::close() { if(p) { unlock(p->rwlatch); releasePage(p); + p = NULL; + } + if(t) { + free(t); + t = NULL; } - if(t) free(t); } diff --git a/logstore.h b/logstore.h index b9fa07a..52ac373 100644 --- a/logstore.h +++ b/logstore.h @@ -36,7 +36,19 @@ class logtable { public: class iterator; - logtable(pageid_t internal_region_size = 1000, pageid_t datapage_region_size = 10000, pageid_t datapage_size = 40); // scans 160KB / 2 per lookup on average. at 100MB/s, this is 0.7 ms. XXX pick datapage_size in principled way. + + // We want datapages to be as small as possible, assuming they don't force an extra seek to traverse the bottom level of internal nodes. + // Internal b-tree mem requirements: + // - Assume keys are small (compared to stasis pages) so we can ignore all but the bottom level of the tree. + // + // |internal nodes| ~= (|key| * |tree|) / (datapage_size * |stasis PAGE_SIZE|) + // + // Plugging in the numbers today: + // + // 6GB ~= 100B * 500 GB / (datapage_size * 4KB) + // (100B * 500GB) / (6GB * 4KB) = 2.035 + logtable(pageid_t internal_region_size = 1000, pageid_t datapage_region_size = 10000, pageid_t datapage_size = 2); + ~logtable(); //user access functions @@ -259,9 +271,11 @@ public: } ~iterator() { - ltable->forgetIterator(this); - invalidate(); - if(last_returned) TUPLE::freetuple(last_returned); + writelock(ltable->header_lock,0); + ltable->forgetIterator(this); + invalidate(); + if(last_returned) TUPLE::freetuple(last_returned); + unlock(ltable->header_lock); } private: TUPLE * getnextHelper() { @@ -292,6 +306,7 @@ public: } void invalidate() { + assert(!trywritelock(ltable->header_lock,0)); if(valid) { delete merge_it_; merge_it_ = NULL; diff --git a/merger.cpp b/merger.cpp index fa0f770..738386e 100644 --- a/merger.cpp +++ b/merger.cpp @@ -274,10 +274,11 @@ void* memMergeThread(void*arg) while(true) // 1 { - merge_stats_t stats; - stats.merge_level = 1; - stats.merge_count = merge_count; - gettimeofday(&stats.sleep,0); + merge_stats_t stats; + memset((void*)&stats, 0, sizeof(stats)); + stats.merge_level = 1; + stats.merge_count = merge_count; + gettimeofday(&stats.sleep,0); writelock(ltable->header_lock,0); int done = 0; // 2: wait for c0_mergable @@ -433,12 +434,13 @@ void *diskMergeThread(void*arg) while(true) { - merge_stats_t stats; - stats.merge_level = 2; - stats.merge_count = merge_count; - gettimeofday(&stats.sleep,0); - // 2: wait for input - writelock(ltable->header_lock,0); + merge_stats_t stats; + memset((void*)&stats, 0, sizeof(stats)); + stats.merge_level = 2; + stats.merge_count = merge_count; + gettimeofday(&stats.sleep,0); + // 2: wait for input + writelock(ltable->header_lock,0); int done = 0; // get a new input for merge while(!ltable->get_tree_c1_mergeable()) diff --git a/server.cpp b/server.cpp index b8e06d2..e0303a5 100644 --- a/server.cpp +++ b/server.cpp @@ -77,9 +77,9 @@ int main(int argc, char *argv[]) int lindex = mscheduler->addlogtable(<able); ltable.setMergeData(mscheduler->getMergeData(lindex)); - int64_t c0_size = 1024 * 1024 * 1024 * 1; + int64_t c0_size = 1024 * 1024 * 512 * 1; - if(argc == 2 && !strcmp(argv[0], "--test")) { + if(argc == 2 && !strcmp(argv[1], "--test")) { c0_size = 1024 * 1024 * 10; printf("warning: running w/ tiny c0 for testing"); // XXX build a separate test server and deployment server? @@ -89,7 +89,7 @@ int main(int argc, char *argv[]) unlock(ltable.header_lock); - lserver = new logserver(10, 32432); + lserver = new logserver(100, 32432); lserver->startserver(<able);