re-enable old merge policy for c1-c2 merger; implement shutdown (sort of)

git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@1006 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
sears 2010-08-17 21:23:39 +00:00
parent b4f80aaa7d
commit 1e487bbc54
7 changed files with 53 additions and 36 deletions

View file

@ -31,6 +31,8 @@ logtable<TUPLE>::logtable(pageid_t internal_region_size, pageid_t datapage_regio
tree_c1 = NULL;
tree_c1_mergeable = NULL;
tree_c2 = NULL;
// This bool is purely for external code.
this->accepting_new_requests = true;
this->still_running_ = true;
this->merge_mgr = new mergeManager(this);
this->mergedata = 0;

View file

@ -113,6 +113,7 @@ public:
int64_t max_c0_size;
mergeManager * merge_mgr;
bool accepting_new_requests;
inline bool is_still_running() { return still_running_; }
inline void stop() {
rwlc_writelock(header_mut);

View file

@ -24,8 +24,6 @@ mergeStats* mergeManager:: get_merge_stats(int mergeLevel) {
mergeManager::~mergeManager() {
pthread_mutex_destroy(&throttle_mut);
pthread_mutex_destroy(&dummy_throttle_mut);
pthread_cond_destroy(&dummy_throttle_cond);
pthread_cond_destroy(&throttle_wokeup_cond);
delete c0;
delete c1;
@ -80,12 +78,24 @@ void mergeManager::update_progress(mergeStats * s, int delta) {
s->delta = 0;
if(!s->need_tick) { s->need_tick = 1; }
}
if(s->merge_level != 0) {
if(s->merge_level == 2
#ifdef NO_SNOWSHOVEL
|| s->merge_level == 1
#endif
) {
if(s->active) {
s->in_progress = ((double)(s->bytes_in_large + s->bytes_in_small)) / (double)(get_merge_stats(s->merge_level-1)->mergeable_size + s->base_size);
} else {
s->in_progress = 0;
}
} else if(s->merge_level == 1) { // C0-C1 merge (c0 is continuously growing...)
if(s->active) {
s->in_progress = ((double)(s->bytes_in_large+s->bytes_in_small)) / (double)(s->base_size+ltable->max_c0_size);
if(s->in_progress > 0.95) { s->in_progress = 0.95; }
assert(s->in_progress > -0.01 && s->in_progress < 1.02);
} else {
s->in_progress = 0;
}
}
if(s->merge_level != 2) {
if(s->mergeable_size) {
@ -153,20 +163,28 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) {
}
if(force || s->need_tick) {
if(block
#ifndef NO_SNOWSHOVEL
&& s->merge_level == 0
#endif
) {
pthread_mutex_lock(&ltable->tick_mut);
rwlc_readlock(ltable->header_mut);
if(block) {
if(s->merge_level == 0) {
pthread_mutex_lock(&ltable->tick_mut);
rwlc_writelock(ltable->header_mut);
while(sleeping[s->merge_level]) {
rwlc_unlock(ltable->header_mut);
pthread_cond_wait(&throttle_wokeup_cond, &ltable->tick_mut);
rwlc_readlock(ltable->header_mut);
while(sleeping[s->merge_level]) {
rwlc_unlock(ltable->header_mut);
pthread_cond_wait(&throttle_wokeup_cond, &ltable->tick_mut);
rwlc_writelock(ltable->header_mut);
}
} else {
rwlc_writelock(ltable->header_mut);
while(sleeping[s->merge_level]) {
rwlc_cond_wait(&throttle_wokeup_cond, ltable->header_mut);
}
}
#ifdef NO_SNOWSHOVEL
bool snowshovel = false;
#else
bool snowshovel = true;
#endif
if((!snowshovel) || s->merge_level == 1) { // apply backpressure based on merge progress.
int64_t overshoot = 0;
int64_t overshoot2 = 0;
int64_t raw_overshoot = 0;
@ -196,6 +214,7 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) {
double bps;
// This needs to be here (and not in update_progress), since the other guy's in_progress changes while we sleep.
if(s->merge_level == 0) {
abort();
if(!(c1->active && c0->mergeable_size)) { overshoot_fudge = 0; overshoot_fudge2 = 0; }
raw_overshoot = (int64_t)(((double)c0->target_size) * (c0->out_progress - c1->in_progress));
overshoot = raw_overshoot + overshoot_fudge;
@ -251,9 +270,10 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) {
double_to_ts(&sleep_until, sleeptime + tv_to_double(&now));
sleeping[s->merge_level] = true;
rwlc_unlock(ltable->header_mut);
pthread_cond_timedwait(&dummy_throttle_cond, &ltable->tick_mut, &sleep_until);
rwlc_readlock(ltable->header_mut);
if(s->merge_level == 0) abort();
struct timespec ts;
double_to_ts(&ts, sleeptime);
nanosleep(&ts, 0);
sleeping[s->merge_level] = false;
pthread_cond_broadcast(&throttle_wokeup_cond);
gettimeofday(&now, 0);
@ -269,7 +289,7 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) {
break;
}
} while(1);
#else
} else if(s->merge_level == 0) {
while(/*s->current_size*/ltable->tree_bytes > ltable->max_c0_size) {
rwlc_unlock(ltable->header_mut);
printf("\nMEMORY OVERRUN!!!! SLEEP!!!!\n");
@ -285,17 +305,17 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) {
nanosleep(&sleeptime, 0);
rwlc_readlock(ltable->header_mut);
}
#endif
}
rwlc_unlock(ltable->header_mut);
pthread_mutex_unlock(&ltable->tick_mut);
if(s->merge_level == 0) pthread_mutex_unlock(&ltable->tick_mut); // XXX can this even be the case?
} else {
if(!force) {
if(s->print_skipped == PRINT_SKIP) {
pthread_mutex_lock(&ltable->tick_mut);
rwlc_readlock(ltable->header_mut);
if(s->merge_level == 0) pthread_mutex_lock(&ltable->tick_mut);
rwlc_writelock(ltable->header_mut);
pretty_print(stdout);
rwlc_unlock(ltable->header_mut);
pthread_mutex_unlock(&ltable->tick_mut);
if(s->merge_level == 0) pthread_mutex_unlock(&ltable->tick_mut);
s->print_skipped = 0;
} else {
s->print_skipped++;
@ -355,8 +375,6 @@ mergeManager::mergeManager(logtable<datatuple> *ltable):
c1(new mergeStats(1, (int64_t)(ltable ? ((double)(ltable->max_c0_size) * *ltable->R()) : 100000000.0) )),
c2(new mergeStats(2, 0)) {
pthread_mutex_init(&throttle_mut, 0);
pthread_mutex_init(&dummy_throttle_mut, 0);
pthread_cond_init(&dummy_throttle_cond, 0);
pthread_cond_init(&throttle_wokeup_cond, 0);
struct timeval tv;
gettimeofday(&tv, 0);
@ -391,8 +409,8 @@ void mergeManager::pretty_print(FILE * out) {
#ifdef NO_SNOWSHOVEL
assert((!c1->active) || (c0_c1_in_progress >= -1 && c0_c1_in_progress < 102));
assert((!c2->active) || (c1_c2_progress >= -1 && c1_c2_progress < 102));
#endif
assert((!c2->active) || (c1_c2_progress >= -1 && c1_c2_progress < 102));
fprintf(out,"[merge progress MB/s window (lifetime)]: app [%s %6lldMB ~ %3.0f%% %6.1fsec %4.1f (%4.1f)] %s %s [%s %3.0f%% ~ %3.0f%% %4.1f (%4.1f)] %s %s [%s %3.0f%% %4.1f (%4.1f)] %s ",
c0->active ? "RUN" : "---", (long long)(c0->lifetime_consumed / mb), c0_out_progress, c0->lifetime_elapsed, c0->bps/((double)mb), c0->lifetime_consumed/(((double)mb)*c0->lifetime_elapsed),

View file

@ -62,8 +62,6 @@ private:
mergeStats * c1;
mergeStats * c2;
pthread_mutex_t throttle_mut;
pthread_mutex_t dummy_throttle_mut;
pthread_cond_t dummy_throttle_cond;
pthread_cond_t throttle_wokeup_cond;
bool sleeping[3];

View file

@ -91,8 +91,8 @@ inline int requestDispatch<HANDLE>::op_flush(logtable<datatuple> * ltable, HANDL
}
template<class HANDLE>
inline int requestDispatch<HANDLE>::op_shutdown(logtable<datatuple> * ltable, HANDLE fd) {
// XXX
return writeoptosocket(fd, LOGSTORE_UNIMPLEMENTED_ERROR);
ltable->accepting_new_requests = false;
return writeoptosocket(fd, LOGSTORE_RESPONSE_SUCCESS);
}
template<class HANDLE>
inline int requestDispatch<HANDLE>::op_stat_space_usage(logtable<datatuple> * ltable, HANDLE fd) {

View file

@ -34,7 +34,7 @@ void * simpleServer::worker(int self) {
pthread_mutex_lock(&thread_mut[self]);
while(true) {
while(thread_fd[self] == -1) {
if(!running) {
if(!ltable->accepting_new_requests) {
pthread_mutex_unlock(&thread_mut[self]);
return 0;
}
@ -56,8 +56,7 @@ simpleServer::simpleServer(logtable<datatuple> * ltable, int max_threads, int po
thread_fd((int*)malloc(sizeof(*thread_fd)*max_threads)),
thread_cond((pthread_cond_t*)malloc(sizeof(*thread_cond)*max_threads)),
thread_mut((pthread_mutex_t*)malloc(sizeof(*thread_mut)*max_threads)),
thread((pthread_t*)malloc(sizeof(*thread)*max_threads)),
running(true) {
thread((pthread_t*)malloc(sizeof(*thread)*max_threads)) {
for(int i = 0; i < max_threads; i++) {
thread_fd[i] = -2;
pthread_cond_init(&thread_cond[i], 0);
@ -95,7 +94,7 @@ bool simpleServer::acceptLoop() {
// *(sdata->server_socket) = sockfd;
int flag, result;
while(true) {
while(ltable->accepting_new_requests) {
socklen_t clilen = sizeof(cli_addr);
newsockfd = accept(sockfd, (struct sockaddr *) &cli_addr, &clilen);
@ -138,9 +137,9 @@ bool simpleServer::acceptLoop() {
}
}
}
return true;
}
simpleServer::~simpleServer() {
running = false;
for(int i = 0; i < max_threads; i++) {
pthread_cond_signal(&thread_cond[i]);
pthread_mutex_lock(&thread_mut[i]);

View file

@ -26,7 +26,6 @@ private:
pthread_cond_t * thread_cond;
pthread_mutex_t * thread_mut;
pthread_t * thread;
bool running;
};
#endif /* SIMPLESERVER_H_ */