diff --git a/logstore.cpp b/logstore.cpp index 6ffe6f5..c0963db 100644 --- a/logstore.cpp +++ b/logstore.cpp @@ -31,6 +31,8 @@ logtable::logtable(pageid_t internal_region_size, pageid_t datapage_regio tree_c1 = NULL; tree_c1_mergeable = NULL; tree_c2 = NULL; + // This bool is purely for external code. + this->accepting_new_requests = true; this->still_running_ = true; this->merge_mgr = new mergeManager(this); this->mergedata = 0; diff --git a/logstore.h b/logstore.h index 61158a5..0c022ed 100644 --- a/logstore.h +++ b/logstore.h @@ -113,6 +113,7 @@ public: int64_t max_c0_size; mergeManager * merge_mgr; + bool accepting_new_requests; inline bool is_still_running() { return still_running_; } inline void stop() { rwlc_writelock(header_mut); diff --git a/mergeManager.cpp b/mergeManager.cpp index d68c317..aee6fed 100644 --- a/mergeManager.cpp +++ b/mergeManager.cpp @@ -24,8 +24,6 @@ mergeStats* mergeManager:: get_merge_stats(int mergeLevel) { mergeManager::~mergeManager() { pthread_mutex_destroy(&throttle_mut); - pthread_mutex_destroy(&dummy_throttle_mut); - pthread_cond_destroy(&dummy_throttle_cond); pthread_cond_destroy(&throttle_wokeup_cond); delete c0; delete c1; @@ -80,12 +78,24 @@ void mergeManager::update_progress(mergeStats * s, int delta) { s->delta = 0; if(!s->need_tick) { s->need_tick = 1; } } - if(s->merge_level != 0) { + if(s->merge_level == 2 +#ifdef NO_SNOWSHOVEL + || s->merge_level == 1 +#endif + ) { if(s->active) { s->in_progress = ((double)(s->bytes_in_large + s->bytes_in_small)) / (double)(get_merge_stats(s->merge_level-1)->mergeable_size + s->base_size); } else { s->in_progress = 0; } + } else if(s->merge_level == 1) { // C0-C1 merge (c0 is continuously growing...) + if(s->active) { + s->in_progress = ((double)(s->bytes_in_large+s->bytes_in_small)) / (double)(s->base_size+ltable->max_c0_size); + if(s->in_progress > 0.95) { s->in_progress = 0.95; } + assert(s->in_progress > -0.01 && s->in_progress < 1.02); + } else { + s->in_progress = 0; + } } if(s->merge_level != 2) { if(s->mergeable_size) { @@ -153,20 +163,28 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) { } if(force || s->need_tick) { - if(block -#ifndef NO_SNOWSHOVEL - && s->merge_level == 0 -#endif - ) { - pthread_mutex_lock(<able->tick_mut); - rwlc_readlock(ltable->header_mut); + if(block) { + if(s->merge_level == 0) { + pthread_mutex_lock(<able->tick_mut); + rwlc_writelock(ltable->header_mut); - while(sleeping[s->merge_level]) { - rwlc_unlock(ltable->header_mut); - pthread_cond_wait(&throttle_wokeup_cond, <able->tick_mut); - rwlc_readlock(ltable->header_mut); + while(sleeping[s->merge_level]) { + rwlc_unlock(ltable->header_mut); + pthread_cond_wait(&throttle_wokeup_cond, <able->tick_mut); + rwlc_writelock(ltable->header_mut); + } + } else { + rwlc_writelock(ltable->header_mut); + while(sleeping[s->merge_level]) { + rwlc_cond_wait(&throttle_wokeup_cond, ltable->header_mut); + } } #ifdef NO_SNOWSHOVEL + bool snowshovel = false; +#else + bool snowshovel = true; +#endif + if((!snowshovel) || s->merge_level == 1) { // apply backpressure based on merge progress. int64_t overshoot = 0; int64_t overshoot2 = 0; int64_t raw_overshoot = 0; @@ -196,6 +214,7 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) { double bps; // This needs to be here (and not in update_progress), since the other guy's in_progress changes while we sleep. if(s->merge_level == 0) { + abort(); if(!(c1->active && c0->mergeable_size)) { overshoot_fudge = 0; overshoot_fudge2 = 0; } raw_overshoot = (int64_t)(((double)c0->target_size) * (c0->out_progress - c1->in_progress)); overshoot = raw_overshoot + overshoot_fudge; @@ -251,9 +270,10 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) { double_to_ts(&sleep_until, sleeptime + tv_to_double(&now)); sleeping[s->merge_level] = true; - rwlc_unlock(ltable->header_mut); - pthread_cond_timedwait(&dummy_throttle_cond, <able->tick_mut, &sleep_until); - rwlc_readlock(ltable->header_mut); + if(s->merge_level == 0) abort(); + struct timespec ts; + double_to_ts(&ts, sleeptime); + nanosleep(&ts, 0); sleeping[s->merge_level] = false; pthread_cond_broadcast(&throttle_wokeup_cond); gettimeofday(&now, 0); @@ -269,7 +289,7 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) { break; } } while(1); -#else + } else if(s->merge_level == 0) { while(/*s->current_size*/ltable->tree_bytes > ltable->max_c0_size) { rwlc_unlock(ltable->header_mut); printf("\nMEMORY OVERRUN!!!! SLEEP!!!!\n"); @@ -285,17 +305,17 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) { nanosleep(&sleeptime, 0); rwlc_readlock(ltable->header_mut); } -#endif + } rwlc_unlock(ltable->header_mut); - pthread_mutex_unlock(<able->tick_mut); + if(s->merge_level == 0) pthread_mutex_unlock(<able->tick_mut); // XXX can this even be the case? } else { if(!force) { if(s->print_skipped == PRINT_SKIP) { - pthread_mutex_lock(<able->tick_mut); - rwlc_readlock(ltable->header_mut); + if(s->merge_level == 0) pthread_mutex_lock(<able->tick_mut); + rwlc_writelock(ltable->header_mut); pretty_print(stdout); rwlc_unlock(ltable->header_mut); - pthread_mutex_unlock(<able->tick_mut); + if(s->merge_level == 0) pthread_mutex_unlock(<able->tick_mut); s->print_skipped = 0; } else { s->print_skipped++; @@ -355,8 +375,6 @@ mergeManager::mergeManager(logtable *ltable): c1(new mergeStats(1, (int64_t)(ltable ? ((double)(ltable->max_c0_size) * *ltable->R()) : 100000000.0) )), c2(new mergeStats(2, 0)) { pthread_mutex_init(&throttle_mut, 0); - pthread_mutex_init(&dummy_throttle_mut, 0); - pthread_cond_init(&dummy_throttle_cond, 0); pthread_cond_init(&throttle_wokeup_cond, 0); struct timeval tv; gettimeofday(&tv, 0); @@ -391,8 +409,8 @@ void mergeManager::pretty_print(FILE * out) { #ifdef NO_SNOWSHOVEL assert((!c1->active) || (c0_c1_in_progress >= -1 && c0_c1_in_progress < 102)); - assert((!c2->active) || (c1_c2_progress >= -1 && c1_c2_progress < 102)); #endif + assert((!c2->active) || (c1_c2_progress >= -1 && c1_c2_progress < 102)); fprintf(out,"[merge progress MB/s window (lifetime)]: app [%s %6lldMB ~ %3.0f%% %6.1fsec %4.1f (%4.1f)] %s %s [%s %3.0f%% ~ %3.0f%% %4.1f (%4.1f)] %s %s [%s %3.0f%% %4.1f (%4.1f)] %s ", c0->active ? "RUN" : "---", (long long)(c0->lifetime_consumed / mb), c0_out_progress, c0->lifetime_elapsed, c0->bps/((double)mb), c0->lifetime_consumed/(((double)mb)*c0->lifetime_elapsed), diff --git a/mergeManager.h b/mergeManager.h index 112cb13..f5ff26f 100644 --- a/mergeManager.h +++ b/mergeManager.h @@ -62,8 +62,6 @@ private: mergeStats * c1; mergeStats * c2; pthread_mutex_t throttle_mut; - pthread_mutex_t dummy_throttle_mut; - pthread_cond_t dummy_throttle_cond; pthread_cond_t throttle_wokeup_cond; bool sleeping[3]; diff --git a/requestDispatch.cpp b/requestDispatch.cpp index 2f6a89b..78c9901 100644 --- a/requestDispatch.cpp +++ b/requestDispatch.cpp @@ -91,8 +91,8 @@ inline int requestDispatch::op_flush(logtable * ltable, HANDL } template inline int requestDispatch::op_shutdown(logtable * ltable, HANDLE fd) { - // XXX - return writeoptosocket(fd, LOGSTORE_UNIMPLEMENTED_ERROR); + ltable->accepting_new_requests = false; + return writeoptosocket(fd, LOGSTORE_RESPONSE_SUCCESS); } template inline int requestDispatch::op_stat_space_usage(logtable * ltable, HANDLE fd) { diff --git a/simpleServer.cpp b/simpleServer.cpp index 98e4e11..efe713f 100644 --- a/simpleServer.cpp +++ b/simpleServer.cpp @@ -34,7 +34,7 @@ void * simpleServer::worker(int self) { pthread_mutex_lock(&thread_mut[self]); while(true) { while(thread_fd[self] == -1) { - if(!running) { + if(!ltable->accepting_new_requests) { pthread_mutex_unlock(&thread_mut[self]); return 0; } @@ -56,8 +56,7 @@ simpleServer::simpleServer(logtable * ltable, int max_threads, int po thread_fd((int*)malloc(sizeof(*thread_fd)*max_threads)), thread_cond((pthread_cond_t*)malloc(sizeof(*thread_cond)*max_threads)), thread_mut((pthread_mutex_t*)malloc(sizeof(*thread_mut)*max_threads)), - thread((pthread_t*)malloc(sizeof(*thread)*max_threads)), - running(true) { + thread((pthread_t*)malloc(sizeof(*thread)*max_threads)) { for(int i = 0; i < max_threads; i++) { thread_fd[i] = -2; pthread_cond_init(&thread_cond[i], 0); @@ -95,7 +94,7 @@ bool simpleServer::acceptLoop() { // *(sdata->server_socket) = sockfd; int flag, result; - while(true) { + while(ltable->accepting_new_requests) { socklen_t clilen = sizeof(cli_addr); newsockfd = accept(sockfd, (struct sockaddr *) &cli_addr, &clilen); @@ -138,9 +137,9 @@ bool simpleServer::acceptLoop() { } } } + return true; } simpleServer::~simpleServer() { - running = false; for(int i = 0; i < max_threads; i++) { pthread_cond_signal(&thread_cond[i]); pthread_mutex_lock(&thread_mut[i]); diff --git a/simpleServer.h b/simpleServer.h index d48504d..095a18b 100644 --- a/simpleServer.h +++ b/simpleServer.h @@ -26,7 +26,6 @@ private: pthread_cond_t * thread_cond; pthread_mutex_t * thread_mut; pthread_t * thread; - bool running; }; #endif /* SIMPLESERVER_H_ */