diff --git a/benchmarks/multicore/smallLogEntry.c b/benchmarks/multicore/smallLogEntry.c index 279961e..bb6a85a 100644 --- a/benchmarks/multicore/smallLogEntry.c +++ b/benchmarks/multicore/smallLogEntry.c @@ -7,8 +7,10 @@ #include #include +#include #include #include +#include #include #include @@ -20,17 +22,12 @@ stasis_log_t * l; static void* worker(void* arg) { unsigned long numops = *(unsigned long*) arg; - LogEntry e; - e.LSN = 0; - e.prevLSN = 0; - e.type = UPDATELOG; - e.xid = 0; - e.update.arg_size = 0; - e.update.funcID = 0; - e.update.page = INVALID_PAGE; for(unsigned long i = 0; i < numops; i++) { - l->write_entry(l, &e); + LogEntry * e = allocUpdateLogEntry(l, -1, -1, OPERATION_NOOP, 0, 0); + l->write_entry(l, e); + l->write_entry_done(l, e); +// if(! (i & 1023)) { l->force_tail(l, 0);} } return 0; } @@ -46,13 +43,17 @@ int main(int argc, char * argv[]) { pthread_t workers[numthreads]; if(stasis_log_type == LOG_TO_FILE) { l = stasis_log_safe_writes_open(stasis_log_file_name, stasis_log_file_mode, stasis_log_file_permissions, 0); + } else if(stasis_log_type == LOG_TO_DIR) { + l = stasis_log_file_pool_open(stasis_log_dir_name, stasis_log_file_mode, stasis_log_file_permissions); } else { l = stasis_log_impl_in_memory_open(); } for(int i = 0; i < numthreads; i++) { - pthread_create(&workers[i], 0, worker, &numops); + int err = pthread_create(&workers[i], 0, worker, &numops); + assert(!err); } for(int i = 0; i < numthreads; i++) { pthread_join(workers[i], 0); } + l->close(l); } diff --git a/benchmarks/sequentialThroughput.c b/benchmarks/sequentialThroughput.c index cbdc13a..89c1d8c 100644 --- a/benchmarks/sequentialThroughput.c +++ b/benchmarks/sequentialThroughput.c @@ -40,7 +40,7 @@ static inline long mb_to_page(long mb) { return (mb * 1024 * 1024) / PAGE_SIZE; } -const char * usage = "./sequentialThroughput [--direct] [--mb mb] [--stake mb]\n [--deprecatedBM|--deprecatedFH|--log_safe_writes|--log_memory|--nb|--file|--pfile|--nb_pfile|--nb_file]\n"; +const char * usage = "./sequentialThroughput [--direct] [--mb mb] [--stake mb]\n [--deprecatedBM|--deprecatedFH|--log_safe_writes|--log_memory|--log_file_pool|--nb|--file|--pfile|--nb_pfile|--nb_file]\n"; int main(int argc, char ** argv) { int direct = 0; @@ -59,6 +59,9 @@ int main(int argc, char ** argv) { } else if(!strcmp(argv[i], "--log_memory")) { stasis_log_type = LOG_TO_MEMORY; log_mode = 1; + } else if(!strcmp(argv[i], "--log_file_pool")) { + stasis_log_type = LOG_TO_DIR; + log_mode = 1; } else if(!strcmp(argv[i], "--deprecatedBM")) { stasis_buffer_manager_factory = stasis_buffer_manager_deprecated_factory; legacyBM = 1; @@ -108,6 +111,7 @@ int main(int argc, char ** argv) { for(long i = 0; i < page_count; i++) { LogEntry * e = allocUpdateLogEntry(l, prevLSN, -1, OPERATION_NOOP, 0, PAGE_SIZE); + l->write_entry(l, e); l->write_entry_done(l, e); } free(arg); diff --git a/src/stasis/logger/filePool.c b/src/stasis/logger/filePool.c index 645c065..f23793e 100644 --- a/src/stasis/logger/filePool.c +++ b/src/stasis/logger/filePool.c @@ -59,7 +59,7 @@ typedef struct { char softcommit; pthread_t write_thread; - + pthread_t write_thread2; stasis_ringbuffer_t * ring; /** Need this because the min aggregate in the ringbuffer doesn't * want to malloc keys, but needs to maintain some sort of state @@ -192,7 +192,7 @@ void stasis_log_file_pool_chunk_open(stasis_log_file_pool_state * fp, int chunk) strcat(full_name, "/"); strcat(full_name, fp->live_filenames[chunk]); - fp->ro_fd[chunk] = open(full_name, fp->filemode | O_SYNC, fp->fileperm); /// XXX should not hard-code O_SYNC. + fp->ro_fd[chunk] = open(full_name, fp->filemode, fp->fileperm); } /** * Does no latching. Relies on stability of fp->live_offsets and fp->live_count. @@ -478,9 +478,12 @@ lsn_t stasis_log_file_pool_chunk_scrub_to_eof(stasis_log_t * log, int fd, lsn_t int stasis_log_file_pool_close(stasis_log_t * log) { stasis_log_file_pool_state * fp = log->impl; + log->force_tail(log, 0); /// xxx use real constant for wal mode.. + stasis_ringbuffer_shutdown(fp->ring); pthread_join(fp->write_thread, 0); +// pthread_join(fp->write_thread2, 0); // XXX need to force log to disk here. for(int i = 0; i < fp->live_count; i++) { @@ -507,7 +510,7 @@ void * stasis_log_file_pool_writeback_worker(void * arg) { int64_t handle; lsn_t off, next_chunk_off, chunk_len, remaining_len; while(1) { - lsn_t len = 4*1024*1024; + lsn_t len = 16*1024*1024; off = stasis_ringbuffer_consume_bytes(fp->ring, &len, &handle); if(off == RING_CLOSED) break; pthread_mutex_lock(&fp->mut); @@ -658,9 +661,9 @@ stasis_log_t* stasis_log_file_pool_open(const char* dirname, int filemode, int f fp->live_count = 0; fp->dead_count = 0; - fp->target_chunk_size = 16 * 1024 * 1024; + fp->target_chunk_size = 64 * 1024 * 1024; - fp->filemode = filemode; + fp->filemode = filemode | O_SYNC; /// XXX should not hard-code O_SYNC. fp->fileperm = fileperm; fp->softcommit = !(filemode & O_SYNC); @@ -722,10 +725,11 @@ stasis_log_t* stasis_log_file_pool_open(const char* dirname, int filemode, int f // The previous segment must have been forced to disk before we created the current one, so we're good to go. - fp->ring = stasis_ringbuffer_init(24, next_lsn); // 16mb buffer + fp->ring = stasis_ringbuffer_init(26, next_lsn); // 64mb buffer pthread_key_create(&fp->handle_key, key_destr); pthread_create(&fp->write_thread, 0, stasis_log_file_pool_writeback_worker, ret); +// pthread_create(&fp->write_thread2, 0, stasis_log_file_pool_writeback_worker, ret); return ret; } diff --git a/src/stasis/util/ringbuffer.c b/src/stasis/util/ringbuffer.c index cf9e18a..d828c9b 100644 --- a/src/stasis/util/ringbuffer.c +++ b/src/stasis/util/ringbuffer.c @@ -109,11 +109,20 @@ int64_t stasis_ringbuffer_nb_consume_bytes(stasis_ringbuffer_t * ring, int64_t o return off; } int64_t stasis_ringbuffer_consume_bytes(stasis_ringbuffer_t * ring, int64_t* sz, int64_t * handle) { + pthread_mutex_lock(&ring->mut); int64_t ret; int64_t orig_sz = *sz; - *sz = (ring->flush > ring->rf) ? RING_NEXT : orig_sz; + if(ring->flush > ring->rf) { + pthread_mutex_unlock(&ring->mut); + struct timespec tv; + tv.tv_sec = 0; + tv.tv_nsec = 100000; + nanosleep(&tv, 0); + pthread_mutex_lock(&ring->mut); + if(ring->flush > ring->rf) { *sz = RING_NEXT; } + } if(ring->shutdown) { if(ring->rt == ring->wf) { fprintf(stderr, "Shutting down, and there are no more bytes. Signaling shutdown thread.\n");