diff --git a/mergeManager.cpp b/mergeManager.cpp index 056c69e..7ca8e8c 100644 --- a/mergeManager.cpp +++ b/mergeManager.cpp @@ -79,19 +79,229 @@ void mergeManager::update_progress(mergeStats * s, int delta) { } } else if(s->merge_level == 1) { // C0-C1 merge (c0 is continuously growing...) if(s->active) { - s->in_progress = ((double)(s->bytes_in_large+s->bytes_in_small)) / (double)(s->base_size+ltable->mean_c0_run_length); + s->in_progress = ((double)(s->bytes_in_large+s->bytes_in_small)) / (double)(s->base_size+fmax(ltable->mean_c0_run_length,(double)s->bytes_in_small)); } else { s->in_progress = 0; } } - s->out_progress = ((double)s->get_current_size()) / ((s->merge_level == 0 ) ? (double)ltable->mean_c0_run_length : (double)s->target_size); - if(c2->active && c1->mergeable_size) { - c1_c2_delta = c1->out_progress - c2->in_progress; - } else { - c1_c2_delta = -0.02; // We try to keep this number between -0.05 and -0.01. - } + if(s->target_size) { + if(s->merge_level == 0) { + s->out_progress = ((double)s->get_current_size()) / (double)ltable->mean_c0_run_length; + } else { + // To see what's going on in the following code, consider a + // system with R = 3 and |C0| = 1. (R is the number of rounds that a + // C1-C2 merge takes during a bulk load, and also the ratios |C1|/|C0| + // and |C2|/|C1|). + // |Cn| is the size of a given tree component, normalized to the amount + // of data that can be inserted into C0 (because of red-black tree + // overheads, the effective capacity of C0 is a function of the average + // tuple size and the physical memory allotted to C0). + + // Here is a trace of the costs of each round of merges during + // a bulk load (where no data is overwritten): + + // Round | App writes | I/O performed by C0-C1 | I/O performed by C1-C2 + // 0 | 1 | 1 | + // 1 | 1 | 3 = 2 * 1 + 1 | 0 (idle) + // 2 | 1 | 7 = 2 * 3 + 1 | + // 3 | 1 | 1 | + // 4 | 1 | 3 | R (just a copy) + // 5 | 1 | 7 | + // 6 | 1 | 1 | + // 7 | 1 | 3 | 3 * R = 2 * R + R + // 8 | 1 | 7 | + // 9 | 1 | 1 | + // 10 | 1 | 3 | 2 ( 3 * R ) + R + // 11 | 1 | 7 | + + // i | 1 | t(0) = 1 | u0 = u1 =...= uR = 0 + // | | i < R: t(i) = 1+2*t(i-1)| u(i) = 2 * u(i-R) + R + // | | i >=R: t(i) = t(i%R) | + + // Note that, at runtime, we can directly compute u(i) for the current + // C1-C2 merge: + + // u_j <= (1 + \alpha) * (|c2| + |c1_mergeable|) [eq 1] + + // Where, \alpha is a constant between 0 and 1 that depends on the + // number of and deletes in c1_mergeable. For now, we assume it is 1. + + // Now, for each C1-C2 round, we want to split the total disk + // work evenly amongst the C0-C1 rounds. Thus, the amount + // consumed by C1-C2 + C0-C1 should be equal (if possible) for + // each pass over C0: + + // t(Rj) + u(Rj) = t(Rj+1) + u(Rj+1) = ... = t(Rj+R-1) + u(Rj+R-1) + + // Work for set of C0 passes during j'th C1-C2 merge: + // work(j) = \sum_{k=0..R-1}{t(Rj+k)}+u_j [eq 2] + + // Ideally, we would like the amount of work performed during each + // application visible round, i, to be the same throughout a given C1-C2 + // merger. Unfortunately, there's no way to ensure that this will be the + // case in general, as nothing prevents: + + // f(i) = (R * t[i] > u_j) + + // from being true. Therefore, we partition the i according to f(i). + // t(i) is monotonically increasing, so this creates two contiguous sets. + + // Let R' be the first i where f(i) is true, or R if no such i exists. + // [eq 3] + + // Then: + + // work'(j) = work(j) - \sum{k=R'...R-1} t(Rj+k) [eq 4] + + // We now define u_j(i); the amount of progress we would like the C1-C2 + // merger to make in each application visible round. + + // The later rounds (where f(i) is true) already perform more work than + // we'd like, so we set: + + // u_j(i) = 0 if f(i) [eq 5a] + + // We evenly divide the remaining work: + + // t(i) + u_j(i) = work'(j) / R' if not f(i) + + // The t(i) are fixed, giving us R' equations in R' unknowns; solving + // for u_j(i): + + // u_j(i) = work'(j)/R' - t(i) if not f(i) [eq 5b] + + // Note 1: + + // If the tree is big enough, we compute R in a way that guarantees f(i) + // is false. We do not do this for small trees because it leads to R<3, + // which negatively impacts throughput. Therefore, we set R=3 and deal + // with periodic transient increases in application-visible throughput. + + // Note 2: + + // If the working set is small, then C1 will not get bigger from one + // merge to the next. To cope with this, we compute delta_c1_c2 by + // figuring out what the percent complete for c2 should be once C1 is + // full, assuming we're performing a bulk load. We set delta to the + // difference between the current progress and the desired progress. If + // delta is negative, then the C1-C2 merge will still be ahead of the + // C0-C1 merge at the end of this round, so we set delta to zero, which + // effectively puts the C2 merger to sleep. + + // eq 2: Compute t[i] (from table) and initial value of work(j) + int merge_count = (int)ceil(*ltable->R()-0.1); + // next, estimate merge_number (i) based on the size of c1. + // ( i = R * j + merge_number) + int merge_number = (int)floor(((double)c1->base_size)/(double)ltable->mean_c0_run_length); + + s->out_progress = ((double)merge_number + s->in_progress) / (double) merge_count; + + // eq 1: Compute u_j + + if(c2->active && c1->mergeable_size) { +#ifdef LEGACY_BACKPRESSURE + c1_c2_delta = c1->out_progress - c2->in_progress; +#else + + pageid_t u__j = (pageid_t)(2.0 * (double)(c2->base_size + c1->mergeable_size)); + + + double* t = (double*)malloc(sizeof(double) * merge_count); + + t[0] = ltable->mean_c0_run_length; + double t__j = t[0]; + for(int i = 1; i < merge_count; i++) { + t[i] = t[i-1] * 2.0 + ltable->mean_c0_run_length; + t__j += t[i]; + } + + double work_j = t__j + u__j; + + // eq 3: Compute R' + int R_prime; + { + double frac_work = work_j / (double)merge_count; + for(R_prime = 0; R_prime < merge_count; R_prime++) { + if(t[R_prime] > frac_work) break; + } + } + // eq 4: Compute work' + double work_prime_j = work_j; + for(int i = R_prime; i < merge_count; i++) { + work_prime_j -= t[i]; // u_j[i] will be set to zero, so no need to subtract it off. + } + // eq 5a,b: Compute the u_j(i)'s for this C1-C2 round: + + double* u_j = (double*)malloc(sizeof(double) * merge_count); + + for(int i = 0; i < R_prime; i++) { + u_j[i] = work_prime_j / R_prime - t[i]; // [5b] + } + for(int i = R_prime; i < merge_count; i++) { + u_j[i] = 0; // [5a] + } + + // we now have everything we need to know how far along we should expect + // c1 and c2 to be at the beginning and end of this pass. + double expected_c1_start_progress = ((double)merge_number) / (double)merge_count; + double expected_c2_start_progress = 0.0; + double expected_c1_end_progress = ((double)(merge_number+1)) / (double)merge_count; + double expected_c2_end_progress = 0.0; + for(int i = 0; i <= merge_number; i++) { + if(i < merge_number) { + expected_c2_start_progress += u_j[i]; + } + expected_c2_end_progress += u_j[i]; + } + + expected_c2_start_progress /= u__j; + expected_c2_end_progress /= u__j; + + assert(expected_c1_start_progress > -0.01 && expected_c1_start_progress < 1.01 && + expected_c2_start_progress > -0.01 && expected_c2_start_progress < 1.01 && + expected_c1_end_progress > -0.01 && expected_c1_end_progress < 1.01 && + expected_c2_end_progress > -0.01 && expected_c2_end_progress < 1.01 && + expected_c1_start_progress <= expected_c1_end_progress && + expected_c2_start_progress <= expected_c2_end_progress); + + double c1_scale_progress = (c1->out_progress - expected_c1_start_progress) / (expected_c1_end_progress - expected_c1_start_progress); + double c2_scale_progress = (c2->in_progress - expected_c2_start_progress) / (expected_c2_end_progress - expected_c2_start_progress); + c1_c2_delta = c1_scale_progress - c2_scale_progress; +#endif + } else { + c1_c2_delta = -0.02; // Elsewhere, we try to keep this number between -0.05 and -0.01. + } + // Appendix to analysis: Computation of t(i) and u(i) for bulk-loads + + // This is not used above (since both can be computed at runtime), but + // may be of use for future analysis. + + // t(i) is easily computable, but u(i) is less straightforward: + // u(i) = 2 * u(i-R) + R + // u(i+R) = 2 * u(i) + R + // Subtracting: + + // u(i+R) - u(i) = 2u(i-R) - 2u(i) + // u(i+R) = u(i) + 2u(u-R) + // u(0) = 0; u(R) = R + // Characteristic polynomial: + // r^n = r^(n-1) + 2r^(n-2) + // divide by r^(n-2): + // r^2 = r + 2 ; r^2 - r - 2 = 0 + // characteristic roots: + // r = (1 +/- sqrt(1 + 8)) / 2 = 2 or -1 + // 2^n*C - D = a_n + + // 2^0 * C - D = 0; C = D. + // 2 * C - D = R ; C = D = R. + + // So, u(n) = 2^n*R - R + // or: + // sum{u(i..i+R-1)} = 2*floor(i/3)^R - R. + } + } #if EXTENDED_STATS struct timeval now; gettimeofday(&now, 0); @@ -195,8 +405,8 @@ void mergeManager::tick(mergeStats * s) { void mergeManager::read_tuple_from_small_component(int merge_level, datatuple * tup) { if(tup) { mergeStats * s = get_merge_stats(merge_level); + (s->num_tuples_in_small)++; #if EXTENDED_STATS - (s->stats_num_tuples_in_small)++; (s->stats_bytes_in_small_delta) += tup->byte_length(); #endif (s->bytes_in_small) += tup->byte_length(); @@ -207,9 +417,7 @@ void mergeManager::read_tuple_from_small_component(int merge_level, datatuple * void mergeManager::read_tuple_from_large_component(int merge_level, int tuple_count, pageid_t byte_len) { if(tuple_count) { mergeStats * s = get_merge_stats(merge_level); -#if EXTENDED_STATS - s->stats_num_tuples_in_large += tuple_count; -#endif + s->num_tuples_in_large += tuple_count; s->bytes_in_large += byte_len; update_progress(s, byte_len); } @@ -217,9 +425,7 @@ void mergeManager::read_tuple_from_large_component(int merge_level, int tuple_co void mergeManager::wrote_tuple(int merge_level, datatuple * tup) { mergeStats * s = get_merge_stats(merge_level); -#if EXTENDED_STATS - (s->stats_num_tuples_out)++; -#endif + (s->num_tuples_out)++; (s->bytes_out) += tup->byte_length(); } diff --git a/mergeStats.h b/mergeStats.h index 924a0d8..12e6629 100644 --- a/mergeStats.h +++ b/mergeStats.h @@ -47,6 +47,9 @@ class mergeStats { bytes_out(0), bytes_in_small(0), bytes_in_large(0), + num_tuples_out(0), + num_tuples_in_small(0), + num_tuples_in_large(0), just_handed_off(false), delta(0), need_tick(0), @@ -57,11 +60,8 @@ class mergeStats { , stats_merge_count(0), stats_bytes_out_with_overhead(0), - stats_num_tuples_out(0), stats_num_datapages_out(0), stats_bytes_in_small_delta(0), - stats_num_tuples_in_small(0), - stats_num_tuples_in_large(0), stats_lifetime_elapsed(0), stats_lifetime_active(0), stats_elapsed(0), @@ -82,6 +82,9 @@ class mergeStats { bytes_out = base_size; bytes_in_small = 0; bytes_in_large = 0; + num_tuples_out = 0; + num_tuples_in_small = 0; + num_tuples_in_large = 0; just_handed_off= false; delta = 0; need_tick = 0; @@ -91,11 +94,8 @@ class mergeStats { #if EXTENDED_STATS stats_merge_count = 0; stats_bytes_out_with_overhead = 0; - stats_num_tuples_out = 0; stats_num_datapages_out = 0; stats_bytes_in_small_delta = 0; - stats_num_tuples_in_small = 0; - stats_num_tuples_in_large = 0; stats_lifetime_elapsed = 0; stats_lifetime_active = 0; stats_elapsed = 0; @@ -127,15 +127,15 @@ class mergeStats { bytes_out = 0; bytes_in_small = 0; bytes_in_large = 0; + num_tuples_out = 0; + num_tuples_in_small = 0; + num_tuples_in_large = 0; in_progress = 0; #if EXTENDED_STATS stats_merge_count++; stats_bytes_out_with_overhead = 0; - stats_num_tuples_out = 0; stats_num_datapages_out = 0; stats_bytes_in_small_delta = 0; - stats_num_tuples_in_small = 0; - stats_num_tuples_in_large = 0; #endif } void starting_merge() { @@ -149,6 +149,7 @@ class mergeStats { } pageid_t get_current_size() { if(merge_level == 0) { + printf("base = %lld in_s = %lld in_l = %lld out = %lld\n", base_size, bytes_in_small, bytes_in_large, bytes_out); return base_size + bytes_in_small - bytes_in_large - bytes_out; } else { // s->bytes_out has strange semantics. It's how many bytes our input has written into this tree. @@ -201,6 +202,10 @@ class mergeStats { protected: pageid_t bytes_in_large; /// Bytes from the large input? (for C0, bytes deleted due to updates) + pageid_t num_tuples_out; /// How many tuples did we write? TODO Only used for C0, so not stored on disk. + pageid_t num_tuples_in_small; /// Tuples from the small input? TODO Only used for C0, so not stored on disk. + pageid_t num_tuples_in_large; /// Tuples from large input? TODO Only used for C0, so not stored on disk. + // todo: simplify confusing hand off logic, and remove this field? bool just_handed_off; @@ -220,11 +225,8 @@ class mergeStats { struct timeval stats_done; /// When did we finish merging? struct timespec stats_last_tick; pageid_t stats_bytes_out_with_overhead;/// How many bytes did we write (including internal tree nodes)? - pageid_t stats_num_tuples_out; /// How many tuples did we write? pageid_t stats_num_datapages_out; /// How many datapages? pageid_t stats_bytes_in_small_delta; /// How many bytes from the small input tree during this tick (for C0, we ignore tree overheads)? - pageid_t stats_num_tuples_in_small; /// Tuples from the small input? - pageid_t stats_num_tuples_in_large; /// Tuples from large input? double stats_lifetime_elapsed; /// How long has this tree existed, in seconds? double stats_lifetime_active; /// How long has this tree been running (i.e.; active = true), in seconds? double stats_elapsed; /// How long did this merge take, including idle time (not valid until after merge is complete)? @@ -244,9 +246,9 @@ class mergeStats { double phys_mb_out = ((double)stats_bytes_out_with_overhead) / (1024.0 * 1024.0); double mb_ins = ((double)bytes_in_small) /(1024.0*1024.0); double mb_inl = ((double)bytes_in_large) /(1024.0*1024.0); - double kt_out = ((double)stats_num_tuples_out) /(1024.0); - double kt_ins= ((double)stats_num_tuples_in_small) /(1024.0); - double kt_inl = ((double)stats_num_tuples_in_large) /(1024.0); + double kt_out = ((double)num_tuples_out) /(1024.0); + double kt_ins= ((double)num_tuples_in_small) /(1024.0); + double kt_inl = ((double)num_tuples_in_large) /(1024.0); double mb_hdd = mb_out + mb_inl + (merge_level == 1 ? 0.0 : mb_ins); double kt_hdd = kt_out + kt_inl + (merge_level == 1 ? 0.0 : kt_ins); diff --git a/merger.cpp b/merger.cpp index 9cdc568..c39c56c 100644 --- a/merger.cpp +++ b/merger.cpp @@ -160,7 +160,8 @@ void * merge_scheduler::memMergeThread() { printf("\nMerge done. R = %f MemSize = %lld Mean = %lld, This = %lld, Count = %d factor %3.3fcur%3.3favg\n", *ltable_->R(), (long long)ltable_->max_c0_size, (long long int)ltable_->mean_c0_run_length, stats->bytes_in_small, merge_count, ((double)stats->bytes_in_small) / (double)ltable_->max_c0_size, ((double)ltable_->mean_c0_run_length) / (double)ltable_->max_c0_size); assert(*ltable_->R() >= MIN_R); - bool signal_c2 = (new_c1_size / ltable_->mean_c0_run_length > *ltable_->R()); + // XXX don't hardcode 1.05, which will break for R > ~20. + bool signal_c2 = (1.05 * new_c1_size / ltable_->mean_c0_run_length > *ltable_->R()); DEBUG("\nc1 size %f R %f\n", new_c1_size, *ltable_->R()); if( signal_c2 ) { diff --git a/newserver.cpp b/newserver.cpp index 31964ff..783c6d7 100644 --- a/newserver.cpp +++ b/newserver.cpp @@ -17,25 +17,29 @@ int main(int argc, char *argv[]) { - logtable::init_stasis(); - - int xid = Tbegin(); - - - recordid table_root = ROOT_RECORD; - int64_t c0_size = 1024 * 1024 * 512 * 1; + stasis_buffer_manager_size = 1 * 1024 * 1024 * 1024 / PAGE_SIZE; // 1.5GB total if(argc == 2 && !strcmp(argv[1], "--test")) { + stasis_buffer_manager_size = 3 * 1024 * 1024 * 128 / PAGE_SIZE; // 228MB total c0_size = 1024 * 1024 * 100; printf("warning: running w/ tiny c0 for testing\n"); // XXX build a separate test server and deployment server? } if(argc == 2 && !strcmp(argv[1], "--benchmark")) { - c0_size = 1024 * 1024 * 768 * 1; + stasis_buffer_manager_size = 2L * 1024L * 1024L * 1024L / PAGE_SIZE; // 4GB total + c0_size = 1024L * 1024L * 1024L * 2L; printf("note: running w/ 2GB c0 for benchmarking\n"); // XXX build a separate test server and deployment server? } + logtable::init_stasis(); + + int xid = Tbegin(); + + + recordid table_root = ROOT_RECORD; + + logtable ltable(c0_size); if(TrecordType(xid, ROOT_RECORD) == INVALID_SLOT) {