misc log cleanups; add command line parameter to newserver to control log mode

git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@2435 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
sears 2011-04-20 21:51:04 +00:00
parent 0508f6cb40
commit 8135bbcc2e
14 changed files with 66 additions and 38 deletions

View file

@ -297,12 +297,12 @@ bool DataPage<TUPLE>::append(TUPLE const * dat)
} else {
if(tup_len > initial_page_count_ * PAGE_SIZE) {
// this is a "big tuple"
len_t reject_padding = PAGE_SIZE - (write_offset_ & PAGE_SIZE-1);
len_t accept_padding = PAGE_SIZE - ((write_offset_ + tup_len) & PAGE_SIZE-1);
len_t reject_padding = PAGE_SIZE - (write_offset_ & (PAGE_SIZE-1));
len_t accept_padding = PAGE_SIZE - ((write_offset_ + tup_len) & (PAGE_SIZE-1));
accept_tuple = accept_padding < reject_padding;
} else {
// this is a "small tuple"; only exceed budget if doing so leads to < 33% overhead for this data.
len_t accept_padding = PAGE_SIZE - (write_offset_ & PAGE_SIZE-1);
len_t accept_padding = PAGE_SIZE - (write_offset_ & (PAGE_SIZE-1));
accept_tuple = (3*accept_padding) < tup_len;
}
}

View file

@ -6,7 +6,7 @@
#include <stasis/bufferManager/bufferHash.h>
#include <stasis/logger/logger2.h>
#include <stasis/logger/logHandle.h>
#include <stasis/logger/safeWrites.h>
#include <stasis/logger/filePool.h>
#include "mergeStats.h"
#undef try
@ -23,7 +23,7 @@ static inline double tv_to_double(struct timeval tv)
/////////////////////////////////////////////////////////////////
template<class TUPLE>
logtable<TUPLE>::logtable(pageid_t max_c0_size, pageid_t internal_region_size, pageid_t datapage_region_size, pageid_t datapage_size)
logtable<TUPLE>::logtable(int log_mode, pageid_t max_c0_size, pageid_t internal_region_size, pageid_t datapage_region_size, pageid_t datapage_size)
{
recovering = true;
this->max_c0_size = max_c0_size;
@ -59,16 +59,15 @@ logtable<TUPLE>::logtable(pageid_t max_c0_size, pageid_t internal_region_size, p
this->datapage_region_size = datapage_region_size;
this->datapage_size = datapage_size;
const char * lsm_log_file_name = "lsm.logfile";
log_file = stasis_log_safe_writes_open(lsm_log_file_name,
stasis_log_file_mode,
stasis_log_file_permissions,
stasis_log_softcommit);
log_file->group_force =
stasis_log_group_force_init(log_file, 10 * 1000 * 1000); // timeout in nsec; want 10msec.
printf("Warning enable group force in logstore.cpp\n");
this->log_mode = log_mode;
this->batch_size++;
if(log_mode > 0) {
log_file = stasis_log_file_pool_open("lsm_log",
stasis_log_file_mode,
stasis_log_file_permissions);
} else {
log_file = NULL;
}
}
template<class TUPLE>
@ -155,6 +154,7 @@ void logtable<TUPLE>::logUpdate(datatuple * tup) {
template<class TUPLE>
void logtable<TUPLE>::replayLog() {
if(!log_file) { assert(!log_mode); recovering = false; return; }
lsn_t start = tbl_header.log_trunc;
LogHandle * lh = start ? getLSNHandle(log_file, start) : getLogHandle(log_file);
const LogEntry * e;
@ -162,8 +162,7 @@ void logtable<TUPLE>::replayLog() {
switch(e->type) {
case UPDATELOG: {
datatuple * tup = datatuple::from_bytes((byte*)stasis_log_entry_update_args_cptr(e));
// assert(e->update.funcID == 0/*LSMINSERT*/);
insertTuple(tup, false);
insertTuple(tup);
datatuple::freetuple(tup);
} break;
case INTERNALLOG: { } break;
@ -177,7 +176,7 @@ void logtable<TUPLE>::replayLog() {
template<class TUPLE>
lsn_t logtable<TUPLE>::get_log_offset() {
if(recovering) { return INVALID_LSN; }
if(recovering || !log_mode) { return INVALID_LSN; }
return log_file->next_available_lsn(log_file);
}
template<class TUPLE>
@ -570,10 +569,16 @@ void logtable<TUPLE>::insertManyTuples(datatuple ** tuples, int tuple_count) {
for(int i = 0; i < tuple_count; i++) {
merge_mgr->read_tuple_from_small_component(0, tuples[i]);
}
for(int i = 0; i < tuple_count; i++) {
logUpdate(tuples[i]);
if(log_file && !recovering) {
for(int i = 0; i < tuple_count; i++) {
logUpdate(tuples[i]);
}
batch_size ++;
if(batch_size >= log_mode) {
log_file->force_tail(log_file, LOG_FORCE_COMMIT);
batch_size = 0;
}
}
log_file->force_tail(log_file, LOG_FORCE_COMMIT);
pthread_mutex_lock(&rb_mut);
int num_old_tups = 0;
@ -591,11 +596,15 @@ void logtable<TUPLE>::insertManyTuples(datatuple ** tuples, int tuple_count) {
}
template<class TUPLE>
void logtable<TUPLE>::insertTuple(datatuple *tuple, bool should_log)
void logtable<TUPLE>::insertTuple(datatuple *tuple)
{
if(should_log) {
if(log_file && !recovering) {
logUpdate(tuple);
log_file->force_tail(log_file, LOG_FORCE_COMMIT);
batch_size++;
if(batch_size >= log_mode) {
log_file->force_tail(log_file, LOG_FORCE_COMMIT);
batch_size = 0;
}
}
//lock the red-black tree
merge_mgr->read_tuple_from_small_component(0, tuple); // has to be before rb_mut, since it calls tick with block = true, and that releases header_mut.

View file

@ -34,7 +34,7 @@ public:
// 6GB ~= 100B * 500 GB / (datapage_size * 4KB)
// (100B * 500GB) / (6GB * 4KB) = 2.035
// RCS: Set this to 1 so that we do (on average) one seek per b-tree read.
logtable(pageid_t max_c0_size = 100 * 1024 * 1024, pageid_t internal_region_size = 1000, pageid_t datapage_region_size = 10000, pageid_t datapage_size = 1);
logtable(int log_mode = 0, pageid_t max_c0_size = 100 * 1024 * 1024, pageid_t internal_region_size = 1000, pageid_t datapage_region_size = 10000, pageid_t datapage_size = 1);
~logtable();
@ -49,7 +49,7 @@ private:
datatuple * insertTupleHelper(datatuple *tuple);
public:
void insertManyTuples(struct datatuple **tuples, int tuple_count);
void insertTuple(struct datatuple *tuple, bool should_log = true);
void insertTuple(struct datatuple *tuple);
/** This test and set has strange semantics on two fronts:
*
* 1) It is not atomic with respect to non-testAndSet operations (which is fine in theory, since they have no barrier semantics, and we don't have a use case to support the extra overhead)
@ -125,6 +125,8 @@ public:
mergeManager * merge_mgr;
stasis_log_t * log_file;
int log_mode;
int batch_size;
bool recovering;
bool accepting_new_requests;

View file

@ -14,6 +14,8 @@
#undef try
#undef end
#define LEGACY_BACKPRESSURE
mergeStats* mergeManager:: get_merge_stats(int mergeLevel) {
if (mergeLevel == 0) {
return c0;

View file

@ -22,18 +22,25 @@ int main(int argc, char *argv[])
{
signal(SIGPIPE, SIG_IGN);
int64_t c0_size = 1024 * 1024 * 512 * 1;
int log_mode = 0; // do not log by default.
stasis_buffer_manager_size = 1 * 1024 * 1024 * 1024 / PAGE_SIZE; // 1.5GB total
if(argc == 2 && !strcmp(argv[1], "--test")) {
stasis_buffer_manager_size = 3 * 1024 * 1024 * 128 / PAGE_SIZE; // 228MB total
c0_size = 1024 * 1024 * 100;
printf("warning: running w/ tiny c0 for testing\n"); // XXX build a separate test server and deployment server?
}
if(argc == 2 && !strcmp(argv[1], "--benchmark")) {
stasis_buffer_manager_size = (1024L * 1024L * 1024L * 2L) / PAGE_SIZE; // 4GB total
c0_size = 1024L * 1024L * 1024L * 2L;
printf("note: running w/ 2GB c0 for benchmarking\n"); // XXX build a separate test server and deployment server?
for(int i = 1; i < argc; i++) {
if(!strcmp(argv[i], "--test")) {
stasis_buffer_manager_size = 3 * 1024 * 1024 * 128 / PAGE_SIZE; // 228MB total
c0_size = 1024 * 1024 * 100;
printf("warning: running w/ tiny c0 for testing\n"); // XXX build a separate test server and deployment server?
} else if(!strcmp(argv[i], "--benchmark")) {
stasis_buffer_manager_size = (1024L * 1024L * 1024L * 2L) / PAGE_SIZE; // 4GB total
c0_size = 1024L * 1024L * 1024L * 2L;
printf("note: running w/ 2GB c0 for benchmarking\n"); // XXX build a separate test server and deployment server?
} else if(!strcmp(argv[i], "--log-mode")) {
i++;
log_mode = atoi(argv[i]);
} else {
fprintf(stderr, "Usage: %s [--test|--benchmark] [--log-mode <int>]", argv[0]);
abort();
}
}
logtable<datatuple>::init_stasis();
@ -44,7 +51,7 @@ int main(int argc, char *argv[])
recordid table_root = ROOT_RECORD;
logtable<datatuple> ltable(c0_size);
logtable<datatuple> ltable(log_mode, c0_size);
if(TrecordType(xid, ROOT_RECORD) == INVALID_SLOT) {
printf("Creating empty logstore\n");

View file

@ -27,6 +27,7 @@ void insertWithConcurrentReads(size_t NUM_ENTRIES) {
srand(1001);
unlink("storefile.txt");
unlink("logfile.txt");
system("rm -rf stasis_log/");
sync();

View file

@ -11,6 +11,7 @@ int main(int argc, char **argv)
{
unlink("storefile.txt");
unlink("logfile.txt");
system("rm -rf stasis_log/");
sync();

View file

@ -24,6 +24,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
srand(1000);
unlink("storefile.txt");
unlink("logfile.txt");
system("rm -rf stasis_log/");
sync();

View file

@ -33,7 +33,7 @@ void insertProbeIter_str(int NUM_ENTRIES)
srand(1000);
unlink("storefile.txt");
unlink("logfile.txt");
system("rm -rf stasis_log/");
sync();
logtable<datatuple>::init_stasis();

View file

@ -24,6 +24,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
srand(1000);
unlink("storefile.txt");
unlink("logfile.txt");
system("rm -rf stasis_log/");
logtable<datatuple>::init_stasis();

View file

@ -24,6 +24,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
srand(1000);
unlink("storefile.txt");
unlink("logfile.txt");
system("rm -rf stasis_log/");
logtable<datatuple>::init_stasis();

View file

@ -24,6 +24,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
srand(1000);
unlink("storefile.txt");
unlink("logfile.txt");
system("rm -rf stasis_log/");
sync();

View file

@ -22,6 +22,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
{
unlink("logfile.txt");
unlink("storefile.txt");
system("rm -rf stasis_log/");
//data generation
std::vector<std::string> data_arr;
std::vector<std::string> key_arr;

View file

@ -57,6 +57,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
srand(1000);
unlink("storefile.txt");
unlink("logfile.txt");
system("rm -rf stasis_log/");
logtable<datatuple>::init_stasis();
int xid = Tbegin();