fix bug where filePool was not syncing to disk, update log related benchmarks; add tweak to ringbuffer to prevent extremely high contention from breaking group commit.
This commit is contained in:
parent
84bbeca165
commit
ad2b6a62ae
4 changed files with 36 additions and 18 deletions
|
@ -7,8 +7,10 @@
|
||||||
|
|
||||||
#include <stasis/logger/logger2.h>
|
#include <stasis/logger/logger2.h>
|
||||||
#include <stasis/logger/safeWrites.h>
|
#include <stasis/logger/safeWrites.h>
|
||||||
|
#include <stasis/logger/filePool.h>
|
||||||
#include <stasis/logger/inMemoryLog.h>
|
#include <stasis/logger/inMemoryLog.h>
|
||||||
#include <stasis/flags.h>
|
#include <stasis/flags.h>
|
||||||
|
#include <stasis/constants.h>
|
||||||
|
|
||||||
#include <pthread.h>
|
#include <pthread.h>
|
||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
|
@ -20,17 +22,12 @@ stasis_log_t * l;
|
||||||
|
|
||||||
static void* worker(void* arg) {
|
static void* worker(void* arg) {
|
||||||
unsigned long numops = *(unsigned long*) arg;
|
unsigned long numops = *(unsigned long*) arg;
|
||||||
LogEntry e;
|
|
||||||
e.LSN = 0;
|
|
||||||
e.prevLSN = 0;
|
|
||||||
e.type = UPDATELOG;
|
|
||||||
e.xid = 0;
|
|
||||||
e.update.arg_size = 0;
|
|
||||||
e.update.funcID = 0;
|
|
||||||
e.update.page = INVALID_PAGE;
|
|
||||||
|
|
||||||
for(unsigned long i = 0; i < numops; i++) {
|
for(unsigned long i = 0; i < numops; i++) {
|
||||||
l->write_entry(l, &e);
|
LogEntry * e = allocUpdateLogEntry(l, -1, -1, OPERATION_NOOP, 0, 0);
|
||||||
|
l->write_entry(l, e);
|
||||||
|
l->write_entry_done(l, e);
|
||||||
|
// if(! (i & 1023)) { l->force_tail(l, 0);}
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -46,13 +43,17 @@ int main(int argc, char * argv[]) {
|
||||||
pthread_t workers[numthreads];
|
pthread_t workers[numthreads];
|
||||||
if(stasis_log_type == LOG_TO_FILE) {
|
if(stasis_log_type == LOG_TO_FILE) {
|
||||||
l = stasis_log_safe_writes_open(stasis_log_file_name, stasis_log_file_mode, stasis_log_file_permissions, 0);
|
l = stasis_log_safe_writes_open(stasis_log_file_name, stasis_log_file_mode, stasis_log_file_permissions, 0);
|
||||||
|
} else if(stasis_log_type == LOG_TO_DIR) {
|
||||||
|
l = stasis_log_file_pool_open(stasis_log_dir_name, stasis_log_file_mode, stasis_log_file_permissions);
|
||||||
} else {
|
} else {
|
||||||
l = stasis_log_impl_in_memory_open();
|
l = stasis_log_impl_in_memory_open();
|
||||||
}
|
}
|
||||||
for(int i = 0; i < numthreads; i++) {
|
for(int i = 0; i < numthreads; i++) {
|
||||||
pthread_create(&workers[i], 0, worker, &numops);
|
int err = pthread_create(&workers[i], 0, worker, &numops);
|
||||||
|
assert(!err);
|
||||||
}
|
}
|
||||||
for(int i = 0; i < numthreads; i++) {
|
for(int i = 0; i < numthreads; i++) {
|
||||||
pthread_join(workers[i], 0);
|
pthread_join(workers[i], 0);
|
||||||
}
|
}
|
||||||
|
l->close(l);
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,7 +40,7 @@ static inline long mb_to_page(long mb) {
|
||||||
return (mb * 1024 * 1024) / PAGE_SIZE;
|
return (mb * 1024 * 1024) / PAGE_SIZE;
|
||||||
}
|
}
|
||||||
|
|
||||||
const char * usage = "./sequentialThroughput [--direct] [--mb mb] [--stake mb]\n [--deprecatedBM|--deprecatedFH|--log_safe_writes|--log_memory|--nb|--file|--pfile|--nb_pfile|--nb_file]\n";
|
const char * usage = "./sequentialThroughput [--direct] [--mb mb] [--stake mb]\n [--deprecatedBM|--deprecatedFH|--log_safe_writes|--log_memory|--log_file_pool|--nb|--file|--pfile|--nb_pfile|--nb_file]\n";
|
||||||
|
|
||||||
int main(int argc, char ** argv) {
|
int main(int argc, char ** argv) {
|
||||||
int direct = 0;
|
int direct = 0;
|
||||||
|
@ -59,6 +59,9 @@ int main(int argc, char ** argv) {
|
||||||
} else if(!strcmp(argv[i], "--log_memory")) {
|
} else if(!strcmp(argv[i], "--log_memory")) {
|
||||||
stasis_log_type = LOG_TO_MEMORY;
|
stasis_log_type = LOG_TO_MEMORY;
|
||||||
log_mode = 1;
|
log_mode = 1;
|
||||||
|
} else if(!strcmp(argv[i], "--log_file_pool")) {
|
||||||
|
stasis_log_type = LOG_TO_DIR;
|
||||||
|
log_mode = 1;
|
||||||
} else if(!strcmp(argv[i], "--deprecatedBM")) {
|
} else if(!strcmp(argv[i], "--deprecatedBM")) {
|
||||||
stasis_buffer_manager_factory = stasis_buffer_manager_deprecated_factory;
|
stasis_buffer_manager_factory = stasis_buffer_manager_deprecated_factory;
|
||||||
legacyBM = 1;
|
legacyBM = 1;
|
||||||
|
@ -108,6 +111,7 @@ int main(int argc, char ** argv) {
|
||||||
for(long i = 0; i < page_count; i++) {
|
for(long i = 0; i < page_count; i++) {
|
||||||
LogEntry * e = allocUpdateLogEntry(l, prevLSN, -1, OPERATION_NOOP,
|
LogEntry * e = allocUpdateLogEntry(l, prevLSN, -1, OPERATION_NOOP,
|
||||||
0, PAGE_SIZE);
|
0, PAGE_SIZE);
|
||||||
|
l->write_entry(l, e);
|
||||||
l->write_entry_done(l, e);
|
l->write_entry_done(l, e);
|
||||||
}
|
}
|
||||||
free(arg);
|
free(arg);
|
||||||
|
|
|
@ -59,7 +59,7 @@ typedef struct {
|
||||||
char softcommit;
|
char softcommit;
|
||||||
|
|
||||||
pthread_t write_thread;
|
pthread_t write_thread;
|
||||||
|
pthread_t write_thread2;
|
||||||
stasis_ringbuffer_t * ring;
|
stasis_ringbuffer_t * ring;
|
||||||
/** Need this because the min aggregate in the ringbuffer doesn't
|
/** Need this because the min aggregate in the ringbuffer doesn't
|
||||||
* want to malloc keys, but needs to maintain some sort of state
|
* want to malloc keys, but needs to maintain some sort of state
|
||||||
|
@ -192,7 +192,7 @@ void stasis_log_file_pool_chunk_open(stasis_log_file_pool_state * fp, int chunk)
|
||||||
strcat(full_name, "/");
|
strcat(full_name, "/");
|
||||||
strcat(full_name, fp->live_filenames[chunk]);
|
strcat(full_name, fp->live_filenames[chunk]);
|
||||||
|
|
||||||
fp->ro_fd[chunk] = open(full_name, fp->filemode | O_SYNC, fp->fileperm); /// XXX should not hard-code O_SYNC.
|
fp->ro_fd[chunk] = open(full_name, fp->filemode, fp->fileperm);
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
* Does no latching. Relies on stability of fp->live_offsets and fp->live_count.
|
* Does no latching. Relies on stability of fp->live_offsets and fp->live_count.
|
||||||
|
@ -478,9 +478,12 @@ lsn_t stasis_log_file_pool_chunk_scrub_to_eof(stasis_log_t * log, int fd, lsn_t
|
||||||
int stasis_log_file_pool_close(stasis_log_t * log) {
|
int stasis_log_file_pool_close(stasis_log_t * log) {
|
||||||
stasis_log_file_pool_state * fp = log->impl;
|
stasis_log_file_pool_state * fp = log->impl;
|
||||||
|
|
||||||
|
log->force_tail(log, 0); /// xxx use real constant for wal mode..
|
||||||
|
|
||||||
stasis_ringbuffer_shutdown(fp->ring);
|
stasis_ringbuffer_shutdown(fp->ring);
|
||||||
|
|
||||||
pthread_join(fp->write_thread, 0);
|
pthread_join(fp->write_thread, 0);
|
||||||
|
// pthread_join(fp->write_thread2, 0);
|
||||||
|
|
||||||
// XXX need to force log to disk here.
|
// XXX need to force log to disk here.
|
||||||
for(int i = 0; i < fp->live_count; i++) {
|
for(int i = 0; i < fp->live_count; i++) {
|
||||||
|
@ -507,7 +510,7 @@ void * stasis_log_file_pool_writeback_worker(void * arg) {
|
||||||
int64_t handle;
|
int64_t handle;
|
||||||
lsn_t off, next_chunk_off, chunk_len, remaining_len;
|
lsn_t off, next_chunk_off, chunk_len, remaining_len;
|
||||||
while(1) {
|
while(1) {
|
||||||
lsn_t len = 4*1024*1024;
|
lsn_t len = 16*1024*1024;
|
||||||
off = stasis_ringbuffer_consume_bytes(fp->ring, &len, &handle);
|
off = stasis_ringbuffer_consume_bytes(fp->ring, &len, &handle);
|
||||||
if(off == RING_CLOSED) break;
|
if(off == RING_CLOSED) break;
|
||||||
pthread_mutex_lock(&fp->mut);
|
pthread_mutex_lock(&fp->mut);
|
||||||
|
@ -658,9 +661,9 @@ stasis_log_t* stasis_log_file_pool_open(const char* dirname, int filemode, int f
|
||||||
fp->live_count = 0;
|
fp->live_count = 0;
|
||||||
fp->dead_count = 0;
|
fp->dead_count = 0;
|
||||||
|
|
||||||
fp->target_chunk_size = 16 * 1024 * 1024;
|
fp->target_chunk_size = 64 * 1024 * 1024;
|
||||||
|
|
||||||
fp->filemode = filemode;
|
fp->filemode = filemode | O_SYNC; /// XXX should not hard-code O_SYNC.
|
||||||
fp->fileperm = fileperm;
|
fp->fileperm = fileperm;
|
||||||
fp->softcommit = !(filemode & O_SYNC);
|
fp->softcommit = !(filemode & O_SYNC);
|
||||||
|
|
||||||
|
@ -722,10 +725,11 @@ stasis_log_t* stasis_log_file_pool_open(const char* dirname, int filemode, int f
|
||||||
|
|
||||||
// The previous segment must have been forced to disk before we created the current one, so we're good to go.
|
// The previous segment must have been forced to disk before we created the current one, so we're good to go.
|
||||||
|
|
||||||
fp->ring = stasis_ringbuffer_init(24, next_lsn); // 16mb buffer
|
fp->ring = stasis_ringbuffer_init(26, next_lsn); // 64mb buffer
|
||||||
pthread_key_create(&fp->handle_key, key_destr);
|
pthread_key_create(&fp->handle_key, key_destr);
|
||||||
|
|
||||||
pthread_create(&fp->write_thread, 0, stasis_log_file_pool_writeback_worker, ret);
|
pthread_create(&fp->write_thread, 0, stasis_log_file_pool_writeback_worker, ret);
|
||||||
|
// pthread_create(&fp->write_thread2, 0, stasis_log_file_pool_writeback_worker, ret);
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
|
@ -109,11 +109,20 @@ int64_t stasis_ringbuffer_nb_consume_bytes(stasis_ringbuffer_t * ring, int64_t o
|
||||||
return off;
|
return off;
|
||||||
}
|
}
|
||||||
int64_t stasis_ringbuffer_consume_bytes(stasis_ringbuffer_t * ring, int64_t* sz, int64_t * handle) {
|
int64_t stasis_ringbuffer_consume_bytes(stasis_ringbuffer_t * ring, int64_t* sz, int64_t * handle) {
|
||||||
|
|
||||||
pthread_mutex_lock(&ring->mut);
|
pthread_mutex_lock(&ring->mut);
|
||||||
int64_t ret;
|
int64_t ret;
|
||||||
int64_t orig_sz = *sz;
|
int64_t orig_sz = *sz;
|
||||||
|
|
||||||
*sz = (ring->flush > ring->rf) ? RING_NEXT : orig_sz;
|
if(ring->flush > ring->rf) {
|
||||||
|
pthread_mutex_unlock(&ring->mut);
|
||||||
|
struct timespec tv;
|
||||||
|
tv.tv_sec = 0;
|
||||||
|
tv.tv_nsec = 100000;
|
||||||
|
nanosleep(&tv, 0);
|
||||||
|
pthread_mutex_lock(&ring->mut);
|
||||||
|
if(ring->flush > ring->rf) { *sz = RING_NEXT; }
|
||||||
|
}
|
||||||
if(ring->shutdown) {
|
if(ring->shutdown) {
|
||||||
if(ring->rt == ring->wf) {
|
if(ring->rt == ring->wf) {
|
||||||
fprintf(stderr, "Shutting down, and there are no more bytes. Signaling shutdown thread.\n");
|
fprintf(stderr, "Shutting down, and there are no more bytes. Signaling shutdown thread.\n");
|
||||||
|
|
Loading…
Reference in a new issue