Misc statistics cleanup; improved merge cost estimations during backpressure.
Misc statistics cleanups include: - fixes for corner case where c0 is empty, and it would divide by zero - always compute number of tuples (in / out, but not base) - rework amount of memory allocated when --test and --newserver are passed in. git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@1582 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
parent
42b52a4d17
commit
dad55b564c
4 changed files with 251 additions and 38 deletions
234
mergeManager.cpp
234
mergeManager.cpp
|
@ -79,19 +79,229 @@ void mergeManager::update_progress(mergeStats * s, int delta) {
|
|||
}
|
||||
} else if(s->merge_level == 1) { // C0-C1 merge (c0 is continuously growing...)
|
||||
if(s->active) {
|
||||
s->in_progress = ((double)(s->bytes_in_large+s->bytes_in_small)) / (double)(s->base_size+ltable->mean_c0_run_length);
|
||||
s->in_progress = ((double)(s->bytes_in_large+s->bytes_in_small)) / (double)(s->base_size+fmax(ltable->mean_c0_run_length,(double)s->bytes_in_small));
|
||||
} else {
|
||||
s->in_progress = 0;
|
||||
}
|
||||
}
|
||||
|
||||
s->out_progress = ((double)s->get_current_size()) / ((s->merge_level == 0 ) ? (double)ltable->mean_c0_run_length : (double)s->target_size);
|
||||
if(c2->active && c1->mergeable_size) {
|
||||
c1_c2_delta = c1->out_progress - c2->in_progress;
|
||||
} else {
|
||||
c1_c2_delta = -0.02; // We try to keep this number between -0.05 and -0.01.
|
||||
}
|
||||
if(s->target_size) {
|
||||
if(s->merge_level == 0) {
|
||||
s->out_progress = ((double)s->get_current_size()) / (double)ltable->mean_c0_run_length;
|
||||
} else {
|
||||
// To see what's going on in the following code, consider a
|
||||
// system with R = 3 and |C0| = 1. (R is the number of rounds that a
|
||||
// C1-C2 merge takes during a bulk load, and also the ratios |C1|/|C0|
|
||||
// and |C2|/|C1|).
|
||||
|
||||
// |Cn| is the size of a given tree component, normalized to the amount
|
||||
// of data that can be inserted into C0 (because of red-black tree
|
||||
// overheads, the effective capacity of C0 is a function of the average
|
||||
// tuple size and the physical memory allotted to C0).
|
||||
|
||||
// Here is a trace of the costs of each round of merges during
|
||||
// a bulk load (where no data is overwritten):
|
||||
|
||||
// Round | App writes | I/O performed by C0-C1 | I/O performed by C1-C2
|
||||
// 0 | 1 | 1 |
|
||||
// 1 | 1 | 3 = 2 * 1 + 1 | 0 (idle)
|
||||
// 2 | 1 | 7 = 2 * 3 + 1 |
|
||||
// 3 | 1 | 1 |
|
||||
// 4 | 1 | 3 | R (just a copy)
|
||||
// 5 | 1 | 7 |
|
||||
// 6 | 1 | 1 |
|
||||
// 7 | 1 | 3 | 3 * R = 2 * R + R
|
||||
// 8 | 1 | 7 |
|
||||
// 9 | 1 | 1 |
|
||||
// 10 | 1 | 3 | 2 ( 3 * R ) + R
|
||||
// 11 | 1 | 7 |
|
||||
|
||||
// i | 1 | t(0) = 1 | u0 = u1 =...= uR = 0
|
||||
// | | i < R: t(i) = 1+2*t(i-1)| u(i) = 2 * u(i-R) + R
|
||||
// | | i >=R: t(i) = t(i%R) |
|
||||
|
||||
// Note that, at runtime, we can directly compute u(i) for the current
|
||||
// C1-C2 merge:
|
||||
|
||||
// u_j <= (1 + \alpha) * (|c2| + |c1_mergeable|) [eq 1]
|
||||
|
||||
// Where, \alpha is a constant between 0 and 1 that depends on the
|
||||
// number of and deletes in c1_mergeable. For now, we assume it is 1.
|
||||
|
||||
// Now, for each C1-C2 round, we want to split the total disk
|
||||
// work evenly amongst the C0-C1 rounds. Thus, the amount
|
||||
// consumed by C1-C2 + C0-C1 should be equal (if possible) for
|
||||
// each pass over C0:
|
||||
|
||||
// t(Rj) + u(Rj) = t(Rj+1) + u(Rj+1) = ... = t(Rj+R-1) + u(Rj+R-1)
|
||||
|
||||
// Work for set of C0 passes during j'th C1-C2 merge:
|
||||
// work(j) = \sum_{k=0..R-1}{t(Rj+k)}+u_j [eq 2]
|
||||
|
||||
// Ideally, we would like the amount of work performed during each
|
||||
// application visible round, i, to be the same throughout a given C1-C2
|
||||
// merger. Unfortunately, there's no way to ensure that this will be the
|
||||
// case in general, as nothing prevents:
|
||||
|
||||
// f(i) = (R * t[i] > u_j)
|
||||
|
||||
// from being true. Therefore, we partition the i according to f(i).
|
||||
// t(i) is monotonically increasing, so this creates two contiguous sets.
|
||||
|
||||
// Let R' be the first i where f(i) is true, or R if no such i exists.
|
||||
// [eq 3]
|
||||
|
||||
// Then:
|
||||
|
||||
// work'(j) = work(j) - \sum{k=R'...R-1} t(Rj+k) [eq 4]
|
||||
|
||||
// We now define u_j(i); the amount of progress we would like the C1-C2
|
||||
// merger to make in each application visible round.
|
||||
|
||||
// The later rounds (where f(i) is true) already perform more work than
|
||||
// we'd like, so we set:
|
||||
|
||||
// u_j(i) = 0 if f(i) [eq 5a]
|
||||
|
||||
// We evenly divide the remaining work:
|
||||
|
||||
// t(i) + u_j(i) = work'(j) / R' if not f(i)
|
||||
|
||||
// The t(i) are fixed, giving us R' equations in R' unknowns; solving
|
||||
// for u_j(i):
|
||||
|
||||
// u_j(i) = work'(j)/R' - t(i) if not f(i) [eq 5b]
|
||||
|
||||
// Note 1:
|
||||
|
||||
// If the tree is big enough, we compute R in a way that guarantees f(i)
|
||||
// is false. We do not do this for small trees because it leads to R<3,
|
||||
// which negatively impacts throughput. Therefore, we set R=3 and deal
|
||||
// with periodic transient increases in application-visible throughput.
|
||||
|
||||
// Note 2:
|
||||
|
||||
// If the working set is small, then C1 will not get bigger from one
|
||||
// merge to the next. To cope with this, we compute delta_c1_c2 by
|
||||
// figuring out what the percent complete for c2 should be once C1 is
|
||||
// full, assuming we're performing a bulk load. We set delta to the
|
||||
// difference between the current progress and the desired progress. If
|
||||
// delta is negative, then the C1-C2 merge will still be ahead of the
|
||||
// C0-C1 merge at the end of this round, so we set delta to zero, which
|
||||
// effectively puts the C2 merger to sleep.
|
||||
|
||||
// eq 2: Compute t[i] (from table) and initial value of work(j)
|
||||
int merge_count = (int)ceil(*ltable->R()-0.1);
|
||||
// next, estimate merge_number (i) based on the size of c1.
|
||||
// ( i = R * j + merge_number)
|
||||
int merge_number = (int)floor(((double)c1->base_size)/(double)ltable->mean_c0_run_length);
|
||||
|
||||
s->out_progress = ((double)merge_number + s->in_progress) / (double) merge_count;
|
||||
|
||||
// eq 1: Compute u_j
|
||||
|
||||
if(c2->active && c1->mergeable_size) {
|
||||
#ifdef LEGACY_BACKPRESSURE
|
||||
c1_c2_delta = c1->out_progress - c2->in_progress;
|
||||
#else
|
||||
|
||||
pageid_t u__j = (pageid_t)(2.0 * (double)(c2->base_size + c1->mergeable_size));
|
||||
|
||||
|
||||
double* t = (double*)malloc(sizeof(double) * merge_count);
|
||||
|
||||
t[0] = ltable->mean_c0_run_length;
|
||||
double t__j = t[0];
|
||||
for(int i = 1; i < merge_count; i++) {
|
||||
t[i] = t[i-1] * 2.0 + ltable->mean_c0_run_length;
|
||||
t__j += t[i];
|
||||
}
|
||||
|
||||
double work_j = t__j + u__j;
|
||||
|
||||
// eq 3: Compute R'
|
||||
int R_prime;
|
||||
{
|
||||
double frac_work = work_j / (double)merge_count;
|
||||
for(R_prime = 0; R_prime < merge_count; R_prime++) {
|
||||
if(t[R_prime] > frac_work) break;
|
||||
}
|
||||
}
|
||||
// eq 4: Compute work'
|
||||
double work_prime_j = work_j;
|
||||
for(int i = R_prime; i < merge_count; i++) {
|
||||
work_prime_j -= t[i]; // u_j[i] will be set to zero, so no need to subtract it off.
|
||||
}
|
||||
// eq 5a,b: Compute the u_j(i)'s for this C1-C2 round:
|
||||
|
||||
double* u_j = (double*)malloc(sizeof(double) * merge_count);
|
||||
|
||||
for(int i = 0; i < R_prime; i++) {
|
||||
u_j[i] = work_prime_j / R_prime - t[i]; // [5b]
|
||||
}
|
||||
for(int i = R_prime; i < merge_count; i++) {
|
||||
u_j[i] = 0; // [5a]
|
||||
}
|
||||
|
||||
// we now have everything we need to know how far along we should expect
|
||||
// c1 and c2 to be at the beginning and end of this pass.
|
||||
double expected_c1_start_progress = ((double)merge_number) / (double)merge_count;
|
||||
double expected_c2_start_progress = 0.0;
|
||||
double expected_c1_end_progress = ((double)(merge_number+1)) / (double)merge_count;
|
||||
double expected_c2_end_progress = 0.0;
|
||||
for(int i = 0; i <= merge_number; i++) {
|
||||
if(i < merge_number) {
|
||||
expected_c2_start_progress += u_j[i];
|
||||
}
|
||||
expected_c2_end_progress += u_j[i];
|
||||
}
|
||||
|
||||
expected_c2_start_progress /= u__j;
|
||||
expected_c2_end_progress /= u__j;
|
||||
|
||||
assert(expected_c1_start_progress > -0.01 && expected_c1_start_progress < 1.01 &&
|
||||
expected_c2_start_progress > -0.01 && expected_c2_start_progress < 1.01 &&
|
||||
expected_c1_end_progress > -0.01 && expected_c1_end_progress < 1.01 &&
|
||||
expected_c2_end_progress > -0.01 && expected_c2_end_progress < 1.01 &&
|
||||
expected_c1_start_progress <= expected_c1_end_progress &&
|
||||
expected_c2_start_progress <= expected_c2_end_progress);
|
||||
|
||||
double c1_scale_progress = (c1->out_progress - expected_c1_start_progress) / (expected_c1_end_progress - expected_c1_start_progress);
|
||||
double c2_scale_progress = (c2->in_progress - expected_c2_start_progress) / (expected_c2_end_progress - expected_c2_start_progress);
|
||||
c1_c2_delta = c1_scale_progress - c2_scale_progress;
|
||||
#endif
|
||||
} else {
|
||||
c1_c2_delta = -0.02; // Elsewhere, we try to keep this number between -0.05 and -0.01.
|
||||
}
|
||||
// Appendix to analysis: Computation of t(i) and u(i) for bulk-loads
|
||||
|
||||
// This is not used above (since both can be computed at runtime), but
|
||||
// may be of use for future analysis.
|
||||
|
||||
// t(i) is easily computable, but u(i) is less straightforward:
|
||||
// u(i) = 2 * u(i-R) + R
|
||||
// u(i+R) = 2 * u(i) + R
|
||||
// Subtracting:
|
||||
|
||||
// u(i+R) - u(i) = 2u(i-R) - 2u(i)
|
||||
// u(i+R) = u(i) + 2u(u-R)
|
||||
// u(0) = 0; u(R) = R
|
||||
// Characteristic polynomial:
|
||||
// r^n = r^(n-1) + 2r^(n-2)
|
||||
// divide by r^(n-2):
|
||||
// r^2 = r + 2 ; r^2 - r - 2 = 0
|
||||
// characteristic roots:
|
||||
// r = (1 +/- sqrt(1 + 8)) / 2 = 2 or -1
|
||||
// 2^n*C - D = a_n
|
||||
|
||||
// 2^0 * C - D = 0; C = D.
|
||||
// 2 * C - D = R ; C = D = R.
|
||||
|
||||
// So, u(n) = 2^n*R - R
|
||||
// or:
|
||||
// sum{u(i..i+R-1)} = 2*floor(i/3)^R - R.
|
||||
}
|
||||
}
|
||||
#if EXTENDED_STATS
|
||||
struct timeval now;
|
||||
gettimeofday(&now, 0);
|
||||
|
@ -195,8 +405,8 @@ void mergeManager::tick(mergeStats * s) {
|
|||
void mergeManager::read_tuple_from_small_component(int merge_level, datatuple * tup) {
|
||||
if(tup) {
|
||||
mergeStats * s = get_merge_stats(merge_level);
|
||||
(s->num_tuples_in_small)++;
|
||||
#if EXTENDED_STATS
|
||||
(s->stats_num_tuples_in_small)++;
|
||||
(s->stats_bytes_in_small_delta) += tup->byte_length();
|
||||
#endif
|
||||
(s->bytes_in_small) += tup->byte_length();
|
||||
|
@ -207,9 +417,7 @@ void mergeManager::read_tuple_from_small_component(int merge_level, datatuple *
|
|||
void mergeManager::read_tuple_from_large_component(int merge_level, int tuple_count, pageid_t byte_len) {
|
||||
if(tuple_count) {
|
||||
mergeStats * s = get_merge_stats(merge_level);
|
||||
#if EXTENDED_STATS
|
||||
s->stats_num_tuples_in_large += tuple_count;
|
||||
#endif
|
||||
s->num_tuples_in_large += tuple_count;
|
||||
s->bytes_in_large += byte_len;
|
||||
update_progress(s, byte_len);
|
||||
}
|
||||
|
@ -217,9 +425,7 @@ void mergeManager::read_tuple_from_large_component(int merge_level, int tuple_co
|
|||
|
||||
void mergeManager::wrote_tuple(int merge_level, datatuple * tup) {
|
||||
mergeStats * s = get_merge_stats(merge_level);
|
||||
#if EXTENDED_STATS
|
||||
(s->stats_num_tuples_out)++;
|
||||
#endif
|
||||
(s->num_tuples_out)++;
|
||||
(s->bytes_out) += tup->byte_length();
|
||||
}
|
||||
|
||||
|
|
32
mergeStats.h
32
mergeStats.h
|
@ -47,6 +47,9 @@ class mergeStats {
|
|||
bytes_out(0),
|
||||
bytes_in_small(0),
|
||||
bytes_in_large(0),
|
||||
num_tuples_out(0),
|
||||
num_tuples_in_small(0),
|
||||
num_tuples_in_large(0),
|
||||
just_handed_off(false),
|
||||
delta(0),
|
||||
need_tick(0),
|
||||
|
@ -57,11 +60,8 @@ class mergeStats {
|
|||
,
|
||||
stats_merge_count(0),
|
||||
stats_bytes_out_with_overhead(0),
|
||||
stats_num_tuples_out(0),
|
||||
stats_num_datapages_out(0),
|
||||
stats_bytes_in_small_delta(0),
|
||||
stats_num_tuples_in_small(0),
|
||||
stats_num_tuples_in_large(0),
|
||||
stats_lifetime_elapsed(0),
|
||||
stats_lifetime_active(0),
|
||||
stats_elapsed(0),
|
||||
|
@ -82,6 +82,9 @@ class mergeStats {
|
|||
bytes_out = base_size;
|
||||
bytes_in_small = 0;
|
||||
bytes_in_large = 0;
|
||||
num_tuples_out = 0;
|
||||
num_tuples_in_small = 0;
|
||||
num_tuples_in_large = 0;
|
||||
just_handed_off= false;
|
||||
delta = 0;
|
||||
need_tick = 0;
|
||||
|
@ -91,11 +94,8 @@ class mergeStats {
|
|||
#if EXTENDED_STATS
|
||||
stats_merge_count = 0;
|
||||
stats_bytes_out_with_overhead = 0;
|
||||
stats_num_tuples_out = 0;
|
||||
stats_num_datapages_out = 0;
|
||||
stats_bytes_in_small_delta = 0;
|
||||
stats_num_tuples_in_small = 0;
|
||||
stats_num_tuples_in_large = 0;
|
||||
stats_lifetime_elapsed = 0;
|
||||
stats_lifetime_active = 0;
|
||||
stats_elapsed = 0;
|
||||
|
@ -127,15 +127,15 @@ class mergeStats {
|
|||
bytes_out = 0;
|
||||
bytes_in_small = 0;
|
||||
bytes_in_large = 0;
|
||||
num_tuples_out = 0;
|
||||
num_tuples_in_small = 0;
|
||||
num_tuples_in_large = 0;
|
||||
in_progress = 0;
|
||||
#if EXTENDED_STATS
|
||||
stats_merge_count++;
|
||||
stats_bytes_out_with_overhead = 0;
|
||||
stats_num_tuples_out = 0;
|
||||
stats_num_datapages_out = 0;
|
||||
stats_bytes_in_small_delta = 0;
|
||||
stats_num_tuples_in_small = 0;
|
||||
stats_num_tuples_in_large = 0;
|
||||
#endif
|
||||
}
|
||||
void starting_merge() {
|
||||
|
@ -149,6 +149,7 @@ class mergeStats {
|
|||
}
|
||||
pageid_t get_current_size() {
|
||||
if(merge_level == 0) {
|
||||
printf("base = %lld in_s = %lld in_l = %lld out = %lld\n", base_size, bytes_in_small, bytes_in_large, bytes_out);
|
||||
return base_size + bytes_in_small - bytes_in_large - bytes_out;
|
||||
} else {
|
||||
// s->bytes_out has strange semantics. It's how many bytes our input has written into this tree.
|
||||
|
@ -201,6 +202,10 @@ class mergeStats {
|
|||
protected:
|
||||
pageid_t bytes_in_large; /// Bytes from the large input? (for C0, bytes deleted due to updates)
|
||||
|
||||
pageid_t num_tuples_out; /// How many tuples did we write? TODO Only used for C0, so not stored on disk.
|
||||
pageid_t num_tuples_in_small; /// Tuples from the small input? TODO Only used for C0, so not stored on disk.
|
||||
pageid_t num_tuples_in_large; /// Tuples from large input? TODO Only used for C0, so not stored on disk.
|
||||
|
||||
// todo: simplify confusing hand off logic, and remove this field?
|
||||
bool just_handed_off;
|
||||
|
||||
|
@ -220,11 +225,8 @@ class mergeStats {
|
|||
struct timeval stats_done; /// When did we finish merging?
|
||||
struct timespec stats_last_tick;
|
||||
pageid_t stats_bytes_out_with_overhead;/// How many bytes did we write (including internal tree nodes)?
|
||||
pageid_t stats_num_tuples_out; /// How many tuples did we write?
|
||||
pageid_t stats_num_datapages_out; /// How many datapages?
|
||||
pageid_t stats_bytes_in_small_delta; /// How many bytes from the small input tree during this tick (for C0, we ignore tree overheads)?
|
||||
pageid_t stats_num_tuples_in_small; /// Tuples from the small input?
|
||||
pageid_t stats_num_tuples_in_large; /// Tuples from large input?
|
||||
double stats_lifetime_elapsed; /// How long has this tree existed, in seconds?
|
||||
double stats_lifetime_active; /// How long has this tree been running (i.e.; active = true), in seconds?
|
||||
double stats_elapsed; /// How long did this merge take, including idle time (not valid until after merge is complete)?
|
||||
|
@ -244,9 +246,9 @@ class mergeStats {
|
|||
double phys_mb_out = ((double)stats_bytes_out_with_overhead) / (1024.0 * 1024.0);
|
||||
double mb_ins = ((double)bytes_in_small) /(1024.0*1024.0);
|
||||
double mb_inl = ((double)bytes_in_large) /(1024.0*1024.0);
|
||||
double kt_out = ((double)stats_num_tuples_out) /(1024.0);
|
||||
double kt_ins= ((double)stats_num_tuples_in_small) /(1024.0);
|
||||
double kt_inl = ((double)stats_num_tuples_in_large) /(1024.0);
|
||||
double kt_out = ((double)num_tuples_out) /(1024.0);
|
||||
double kt_ins= ((double)num_tuples_in_small) /(1024.0);
|
||||
double kt_inl = ((double)num_tuples_in_large) /(1024.0);
|
||||
double mb_hdd = mb_out + mb_inl + (merge_level == 1 ? 0.0 : mb_ins);
|
||||
double kt_hdd = kt_out + kt_inl + (merge_level == 1 ? 0.0 : kt_ins);
|
||||
|
||||
|
|
|
@ -160,7 +160,8 @@ void * merge_scheduler::memMergeThread() {
|
|||
printf("\nMerge done. R = %f MemSize = %lld Mean = %lld, This = %lld, Count = %d factor %3.3fcur%3.3favg\n", *ltable_->R(), (long long)ltable_->max_c0_size, (long long int)ltable_->mean_c0_run_length, stats->bytes_in_small, merge_count, ((double)stats->bytes_in_small) / (double)ltable_->max_c0_size, ((double)ltable_->mean_c0_run_length) / (double)ltable_->max_c0_size);
|
||||
|
||||
assert(*ltable_->R() >= MIN_R);
|
||||
bool signal_c2 = (new_c1_size / ltable_->mean_c0_run_length > *ltable_->R());
|
||||
// XXX don't hardcode 1.05, which will break for R > ~20.
|
||||
bool signal_c2 = (1.05 * new_c1_size / ltable_->mean_c0_run_length > *ltable_->R());
|
||||
DEBUG("\nc1 size %f R %f\n", new_c1_size, *ltable_->R());
|
||||
if( signal_c2 )
|
||||
{
|
||||
|
|
|
@ -17,25 +17,29 @@
|
|||
int main(int argc, char *argv[])
|
||||
{
|
||||
|
||||
logtable<datatuple>::init_stasis();
|
||||
|
||||
int xid = Tbegin();
|
||||
|
||||
|
||||
recordid table_root = ROOT_RECORD;
|
||||
|
||||
int64_t c0_size = 1024 * 1024 * 512 * 1;
|
||||
stasis_buffer_manager_size = 1 * 1024 * 1024 * 1024 / PAGE_SIZE; // 1.5GB total
|
||||
|
||||
if(argc == 2 && !strcmp(argv[1], "--test")) {
|
||||
stasis_buffer_manager_size = 3 * 1024 * 1024 * 128 / PAGE_SIZE; // 228MB total
|
||||
c0_size = 1024 * 1024 * 100;
|
||||
printf("warning: running w/ tiny c0 for testing\n"); // XXX build a separate test server and deployment server?
|
||||
}
|
||||
|
||||
if(argc == 2 && !strcmp(argv[1], "--benchmark")) {
|
||||
c0_size = 1024 * 1024 * 768 * 1;
|
||||
stasis_buffer_manager_size = 2L * 1024L * 1024L * 1024L / PAGE_SIZE; // 4GB total
|
||||
c0_size = 1024L * 1024L * 1024L * 2L;
|
||||
printf("note: running w/ 2GB c0 for benchmarking\n"); // XXX build a separate test server and deployment server?
|
||||
}
|
||||
|
||||
logtable<datatuple>::init_stasis();
|
||||
|
||||
int xid = Tbegin();
|
||||
|
||||
|
||||
recordid table_root = ROOT_RECORD;
|
||||
|
||||
|
||||
logtable<datatuple> ltable(c0_size);
|
||||
|
||||
if(TrecordType(xid, ROOT_RECORD) == INVALID_SLOT) {
|
||||
|
|
Loading…
Reference in a new issue