From 8135bbcc2e6e8d3102407a92b13caee5b89c9901 Mon Sep 17 00:00:00 2001 From: sears Date: Wed, 20 Apr 2011 21:51:04 +0000 Subject: [PATCH] misc log cleanups; add command line parameter to newserver to control log mode git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@2435 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe --- datapage.cpp | 6 ++--- logstore.cpp | 51 +++++++++++++++++++++++---------------- logstore.h | 6 +++-- mergeManager.cpp | 2 ++ newserver.cpp | 29 +++++++++++++--------- test/check_datapage.cpp | 1 + test/check_gen.cpp | 1 + test/check_logtable.cpp | 1 + test/check_logtree.cpp | 2 +- test/check_merge.cpp | 1 + test/check_mergelarge.cpp | 1 + test/check_mergetuple.cpp | 1 + test/check_rbtree.cpp | 1 + test/check_testAndSet.cpp | 1 + 14 files changed, 66 insertions(+), 38 deletions(-) diff --git a/datapage.cpp b/datapage.cpp index 9a38d5c..07a0bd5 100644 --- a/datapage.cpp +++ b/datapage.cpp @@ -297,12 +297,12 @@ bool DataPage::append(TUPLE const * dat) } else { if(tup_len > initial_page_count_ * PAGE_SIZE) { // this is a "big tuple" - len_t reject_padding = PAGE_SIZE - (write_offset_ & PAGE_SIZE-1); - len_t accept_padding = PAGE_SIZE - ((write_offset_ + tup_len) & PAGE_SIZE-1); + len_t reject_padding = PAGE_SIZE - (write_offset_ & (PAGE_SIZE-1)); + len_t accept_padding = PAGE_SIZE - ((write_offset_ + tup_len) & (PAGE_SIZE-1)); accept_tuple = accept_padding < reject_padding; } else { // this is a "small tuple"; only exceed budget if doing so leads to < 33% overhead for this data. - len_t accept_padding = PAGE_SIZE - (write_offset_ & PAGE_SIZE-1); + len_t accept_padding = PAGE_SIZE - (write_offset_ & (PAGE_SIZE-1)); accept_tuple = (3*accept_padding) < tup_len; } } diff --git a/logstore.cpp b/logstore.cpp index 93eb901..250c062 100644 --- a/logstore.cpp +++ b/logstore.cpp @@ -6,7 +6,7 @@ #include #include #include -#include +#include #include "mergeStats.h" #undef try @@ -23,7 +23,7 @@ static inline double tv_to_double(struct timeval tv) ///////////////////////////////////////////////////////////////// template -logtable::logtable(pageid_t max_c0_size, pageid_t internal_region_size, pageid_t datapage_region_size, pageid_t datapage_size) +logtable::logtable(int log_mode, pageid_t max_c0_size, pageid_t internal_region_size, pageid_t datapage_region_size, pageid_t datapage_size) { recovering = true; this->max_c0_size = max_c0_size; @@ -59,16 +59,15 @@ logtable::logtable(pageid_t max_c0_size, pageid_t internal_region_size, p this->datapage_region_size = datapage_region_size; this->datapage_size = datapage_size; - - const char * lsm_log_file_name = "lsm.logfile"; - - log_file = stasis_log_safe_writes_open(lsm_log_file_name, - stasis_log_file_mode, - stasis_log_file_permissions, - stasis_log_softcommit); - log_file->group_force = - stasis_log_group_force_init(log_file, 10 * 1000 * 1000); // timeout in nsec; want 10msec. - printf("Warning enable group force in logstore.cpp\n"); + this->log_mode = log_mode; + this->batch_size++; + if(log_mode > 0) { + log_file = stasis_log_file_pool_open("lsm_log", + stasis_log_file_mode, + stasis_log_file_permissions); + } else { + log_file = NULL; + } } template @@ -155,6 +154,7 @@ void logtable::logUpdate(datatuple * tup) { template void logtable::replayLog() { + if(!log_file) { assert(!log_mode); recovering = false; return; } lsn_t start = tbl_header.log_trunc; LogHandle * lh = start ? getLSNHandle(log_file, start) : getLogHandle(log_file); const LogEntry * e; @@ -162,8 +162,7 @@ void logtable::replayLog() { switch(e->type) { case UPDATELOG: { datatuple * tup = datatuple::from_bytes((byte*)stasis_log_entry_update_args_cptr(e)); - // assert(e->update.funcID == 0/*LSMINSERT*/); - insertTuple(tup, false); + insertTuple(tup); datatuple::freetuple(tup); } break; case INTERNALLOG: { } break; @@ -177,7 +176,7 @@ void logtable::replayLog() { template lsn_t logtable::get_log_offset() { - if(recovering) { return INVALID_LSN; } + if(recovering || !log_mode) { return INVALID_LSN; } return log_file->next_available_lsn(log_file); } template @@ -570,10 +569,16 @@ void logtable::insertManyTuples(datatuple ** tuples, int tuple_count) { for(int i = 0; i < tuple_count; i++) { merge_mgr->read_tuple_from_small_component(0, tuples[i]); } - for(int i = 0; i < tuple_count; i++) { - logUpdate(tuples[i]); + if(log_file && !recovering) { + for(int i = 0; i < tuple_count; i++) { + logUpdate(tuples[i]); + } + batch_size ++; + if(batch_size >= log_mode) { + log_file->force_tail(log_file, LOG_FORCE_COMMIT); + batch_size = 0; + } } - log_file->force_tail(log_file, LOG_FORCE_COMMIT); pthread_mutex_lock(&rb_mut); int num_old_tups = 0; @@ -591,11 +596,15 @@ void logtable::insertManyTuples(datatuple ** tuples, int tuple_count) { } template -void logtable::insertTuple(datatuple *tuple, bool should_log) +void logtable::insertTuple(datatuple *tuple) { - if(should_log) { + if(log_file && !recovering) { logUpdate(tuple); - log_file->force_tail(log_file, LOG_FORCE_COMMIT); + batch_size++; + if(batch_size >= log_mode) { + log_file->force_tail(log_file, LOG_FORCE_COMMIT); + batch_size = 0; + } } //lock the red-black tree merge_mgr->read_tuple_from_small_component(0, tuple); // has to be before rb_mut, since it calls tick with block = true, and that releases header_mut. diff --git a/logstore.h b/logstore.h index c47cded..9378152 100644 --- a/logstore.h +++ b/logstore.h @@ -34,7 +34,7 @@ public: // 6GB ~= 100B * 500 GB / (datapage_size * 4KB) // (100B * 500GB) / (6GB * 4KB) = 2.035 // RCS: Set this to 1 so that we do (on average) one seek per b-tree read. - logtable(pageid_t max_c0_size = 100 * 1024 * 1024, pageid_t internal_region_size = 1000, pageid_t datapage_region_size = 10000, pageid_t datapage_size = 1); + logtable(int log_mode = 0, pageid_t max_c0_size = 100 * 1024 * 1024, pageid_t internal_region_size = 1000, pageid_t datapage_region_size = 10000, pageid_t datapage_size = 1); ~logtable(); @@ -49,7 +49,7 @@ private: datatuple * insertTupleHelper(datatuple *tuple); public: void insertManyTuples(struct datatuple **tuples, int tuple_count); - void insertTuple(struct datatuple *tuple, bool should_log = true); + void insertTuple(struct datatuple *tuple); /** This test and set has strange semantics on two fronts: * * 1) It is not atomic with respect to non-testAndSet operations (which is fine in theory, since they have no barrier semantics, and we don't have a use case to support the extra overhead) @@ -125,6 +125,8 @@ public: mergeManager * merge_mgr; stasis_log_t * log_file; + int log_mode; + int batch_size; bool recovering; bool accepting_new_requests; diff --git a/mergeManager.cpp b/mergeManager.cpp index 7ca8e8c..ae2e24f 100644 --- a/mergeManager.cpp +++ b/mergeManager.cpp @@ -14,6 +14,8 @@ #undef try #undef end +#define LEGACY_BACKPRESSURE + mergeStats* mergeManager:: get_merge_stats(int mergeLevel) { if (mergeLevel == 0) { return c0; diff --git a/newserver.cpp b/newserver.cpp index 11f9614..778d9c6 100644 --- a/newserver.cpp +++ b/newserver.cpp @@ -22,18 +22,25 @@ int main(int argc, char *argv[]) { signal(SIGPIPE, SIG_IGN); int64_t c0_size = 1024 * 1024 * 512 * 1; + int log_mode = 0; // do not log by default. stasis_buffer_manager_size = 1 * 1024 * 1024 * 1024 / PAGE_SIZE; // 1.5GB total - if(argc == 2 && !strcmp(argv[1], "--test")) { - stasis_buffer_manager_size = 3 * 1024 * 1024 * 128 / PAGE_SIZE; // 228MB total - c0_size = 1024 * 1024 * 100; - printf("warning: running w/ tiny c0 for testing\n"); // XXX build a separate test server and deployment server? - } - - if(argc == 2 && !strcmp(argv[1], "--benchmark")) { - stasis_buffer_manager_size = (1024L * 1024L * 1024L * 2L) / PAGE_SIZE; // 4GB total - c0_size = 1024L * 1024L * 1024L * 2L; - printf("note: running w/ 2GB c0 for benchmarking\n"); // XXX build a separate test server and deployment server? + for(int i = 1; i < argc; i++) { + if(!strcmp(argv[i], "--test")) { + stasis_buffer_manager_size = 3 * 1024 * 1024 * 128 / PAGE_SIZE; // 228MB total + c0_size = 1024 * 1024 * 100; + printf("warning: running w/ tiny c0 for testing\n"); // XXX build a separate test server and deployment server? + } else if(!strcmp(argv[i], "--benchmark")) { + stasis_buffer_manager_size = (1024L * 1024L * 1024L * 2L) / PAGE_SIZE; // 4GB total + c0_size = 1024L * 1024L * 1024L * 2L; + printf("note: running w/ 2GB c0 for benchmarking\n"); // XXX build a separate test server and deployment server? + } else if(!strcmp(argv[i], "--log-mode")) { + i++; + log_mode = atoi(argv[i]); + } else { + fprintf(stderr, "Usage: %s [--test|--benchmark] [--log-mode ]", argv[0]); + abort(); + } } logtable::init_stasis(); @@ -44,7 +51,7 @@ int main(int argc, char *argv[]) recordid table_root = ROOT_RECORD; - logtable ltable(c0_size); + logtable ltable(log_mode, c0_size); if(TrecordType(xid, ROOT_RECORD) == INVALID_SLOT) { printf("Creating empty logstore\n"); diff --git a/test/check_datapage.cpp b/test/check_datapage.cpp index 23c9ff9..8ea1ad3 100644 --- a/test/check_datapage.cpp +++ b/test/check_datapage.cpp @@ -27,6 +27,7 @@ void insertWithConcurrentReads(size_t NUM_ENTRIES) { srand(1001); unlink("storefile.txt"); unlink("logfile.txt"); + system("rm -rf stasis_log/"); sync(); diff --git a/test/check_gen.cpp b/test/check_gen.cpp index 0ee8191..92a5172 100644 --- a/test/check_gen.cpp +++ b/test/check_gen.cpp @@ -11,6 +11,7 @@ int main(int argc, char **argv) { unlink("storefile.txt"); unlink("logfile.txt"); + system("rm -rf stasis_log/"); sync(); diff --git a/test/check_logtable.cpp b/test/check_logtable.cpp index 018a030..dfe5c22 100644 --- a/test/check_logtable.cpp +++ b/test/check_logtable.cpp @@ -24,6 +24,7 @@ void insertProbeIter(size_t NUM_ENTRIES) srand(1000); unlink("storefile.txt"); unlink("logfile.txt"); + system("rm -rf stasis_log/"); sync(); diff --git a/test/check_logtree.cpp b/test/check_logtree.cpp index e501524..b4b264a 100644 --- a/test/check_logtree.cpp +++ b/test/check_logtree.cpp @@ -33,7 +33,7 @@ void insertProbeIter_str(int NUM_ENTRIES) srand(1000); unlink("storefile.txt"); unlink("logfile.txt"); - + system("rm -rf stasis_log/"); sync(); logtable::init_stasis(); diff --git a/test/check_merge.cpp b/test/check_merge.cpp index 3f51962..3d312f2 100644 --- a/test/check_merge.cpp +++ b/test/check_merge.cpp @@ -24,6 +24,7 @@ void insertProbeIter(size_t NUM_ENTRIES) srand(1000); unlink("storefile.txt"); unlink("logfile.txt"); + system("rm -rf stasis_log/"); logtable::init_stasis(); diff --git a/test/check_mergelarge.cpp b/test/check_mergelarge.cpp index 062c441..9c00b4e 100644 --- a/test/check_mergelarge.cpp +++ b/test/check_mergelarge.cpp @@ -24,6 +24,7 @@ void insertProbeIter(size_t NUM_ENTRIES) srand(1000); unlink("storefile.txt"); unlink("logfile.txt"); + system("rm -rf stasis_log/"); logtable::init_stasis(); diff --git a/test/check_mergetuple.cpp b/test/check_mergetuple.cpp index 3475582..fd24e68 100644 --- a/test/check_mergetuple.cpp +++ b/test/check_mergetuple.cpp @@ -24,6 +24,7 @@ void insertProbeIter(size_t NUM_ENTRIES) srand(1000); unlink("storefile.txt"); unlink("logfile.txt"); + system("rm -rf stasis_log/"); sync(); diff --git a/test/check_rbtree.cpp b/test/check_rbtree.cpp index 90c09d1..55d63c8 100644 --- a/test/check_rbtree.cpp +++ b/test/check_rbtree.cpp @@ -22,6 +22,7 @@ void insertProbeIter(size_t NUM_ENTRIES) { unlink("logfile.txt"); unlink("storefile.txt"); + system("rm -rf stasis_log/"); //data generation std::vector data_arr; std::vector key_arr; diff --git a/test/check_testAndSet.cpp b/test/check_testAndSet.cpp index b2a8f59..cd1f01b 100644 --- a/test/check_testAndSet.cpp +++ b/test/check_testAndSet.cpp @@ -57,6 +57,7 @@ void insertProbeIter(size_t NUM_ENTRIES) srand(1000); unlink("storefile.txt"); unlink("logfile.txt"); + system("rm -rf stasis_log/"); logtable::init_stasis(); int xid = Tbegin();