diff --git a/benchmarks/multicore/smallLogEntry.c b/benchmarks/multicore/smallLogEntry.c index bb6a85a..95795a8 100644 --- a/benchmarks/multicore/smallLogEntry.c +++ b/benchmarks/multicore/smallLogEntry.c @@ -27,6 +27,7 @@ static void* worker(void* arg) { LogEntry * e = allocUpdateLogEntry(l, -1, -1, OPERATION_NOOP, 0, 0); l->write_entry(l, e); l->write_entry_done(l, e); + l->force_tail(l, LOG_FORCE_COMMIT); // if(! (i & 1023)) { l->force_tail(l, 0);} } return 0; diff --git a/src/stasis/logger/filePool.c b/src/stasis/logger/filePool.c index 88fdc8b..16b37b8 100644 --- a/src/stasis/logger/filePool.c +++ b/src/stasis/logger/filePool.c @@ -199,7 +199,7 @@ void stasis_log_file_pool_chunk_open(stasis_log_file_pool_state * fp, int chunk) static int get_chunk_from_offset(stasis_log_t * log, lsn_t lsn) { stasis_log_file_pool_state * fp = log->impl; int chunk = -1; - if(fp->live_offsets[fp->live_count-1] <= lsn && fp->live_offsets[fp->live_count-1] + fp->target_chunk_size > lsn) { + if(fp->live_offsets[fp->live_count-1] <= lsn && (fp->live_offsets[fp->live_count-1] + fp->target_chunk_size) > lsn) { return fp->live_count - 1; } for(int i = 0; i < fp->live_count; i++) { @@ -212,7 +212,6 @@ static int get_chunk_from_offset(stasis_log_t * log, lsn_t lsn) { */ int stasis_log_file_pool_append_chunk(stasis_log_t * log, off_t new_offset) { stasis_log_file_pool_state * fp = log->impl; - char * old_file = 0; char * new_file = stasis_log_file_pool_build_filename(fp, new_offset); char * new_path = build_path(fp->dirname, new_file); @@ -251,18 +250,28 @@ LogEntry * stasis_log_file_pool_reserve_entry(stasis_log_t * log, size_t szs) { uint64_t framed_size = sz+sizeof(uint32_t)+sizeof(uint32_t); lsn_t off = stasis_ringbuffer_reserve_space(fp->ring, framed_size, handle); + + pthread_mutex_lock(&fp->mut); + int endchunk = get_chunk_from_offset(log, off + sz + 2*sizeof(uint32_t)); + //if(chunk >= fp->live_count || (off + (sz+2*sizeof(uint32_t)) > fp->live_offsets[chunk] + fp->target_chunk_size)) { + if(endchunk == -1) { //>= fp->live_count) { + stasis_log_file_pool_append_chunk(log, off); + int chunk = get_chunk_from_offset(log, off); + assert(chunk == fp->live_count-1); + } + lsn_t barrier = fp->live_offsets[fp->live_count - 1]-1; + pthread_mutex_unlock(&fp->mut); + + if(barrier < off) { + stasis_ringbuffer_flush(fp->ring, barrier); + } + byte * buf = stasis_ringbuffer_get_wr_buf(fp->ring, off, framed_size); memcpy(buf, &sz, sizeof(uint32_t)); LogEntry * e = (LogEntry*)(buf + (2 * sizeof(uint32_t))); e->LSN = off; - pthread_mutex_lock(&fp->mut); - int chunk = get_chunk_from_offset(log, off); - if(chunk >= fp->live_count || (off + (sz+2*sizeof(uint32_t)) > fp->live_offsets[chunk] + fp->target_chunk_size)) { - stasis_log_file_pool_append_chunk(log, off); - } - pthread_mutex_unlock(&fp->mut); return e; } /** @@ -484,6 +493,8 @@ int stasis_log_file_pool_close(stasis_log_t * log) { pthread_join(fp->write_thread, 0); // pthread_join(fp->write_thread2, 0); + stasis_ringbuffer_free(fp->ring); + for(int i = 0; i < fp->live_count; i++) { close(fp->ro_fd[i]); free(fp->live_filenames[i]); @@ -506,72 +517,52 @@ void * stasis_log_file_pool_writeback_worker(void * arg) { stasis_log_file_pool_state * fp = log->impl; lsn_t handle; - lsn_t off, next_chunk_off, chunk_len, remaining_len; + lsn_t off; while(1) { lsn_t len = 16*1024*1024; off = stasis_ringbuffer_consume_bytes(fp->ring, &len, &handle); + const byte * buf = stasis_ringbuffer_get_rd_buf(fp->ring, off, len); + if(off == RING_CLOSED) break; pthread_mutex_lock(&fp->mut); int chunk = get_chunk_from_offset(log, off); - assert(chunk != -1); // chunks are created on insertion into the ring buffer... - if(fp->live_count > chunk+1) { - // see if this log record spans chunks. - next_chunk_off = fp->live_offsets[chunk+1]; - chunk_len = next_chunk_off - off; - if(chunk_len > len) { - chunk_len = len; - remaining_len = 0; - } else { - remaining_len = len - chunk_len; - } - } else { - // this log record *cannot* span chunks, or the chunk would already - // exist. - chunk_len = len; - remaining_len = 0; + int endchunk = get_chunk_from_offset(log, off + len); + // build vector of write operations. + int* fds = malloc(sizeof(int) * (1 + endchunk-chunk)); + lsn_t* file_offs = malloc(sizeof(lsn_t) * (1 + endchunk-chunk)); + for(int c = chunk; c <= endchunk; c++) { + fds[c-chunk] = fp->ro_fd[c]; + file_offs[c-chunk] = fp->live_offsets[c]; } - int fd = fp->ro_fd[chunk]; - lsn_t file_off = fp->live_offsets[chunk]; - int spill_fd = -1; - lsn_t spill_off = INVALID_LSN; - if(remaining_len) { - spill_fd = fp->ro_fd[chunk+1]; - spill_off = fp->live_offsets[chunk+1]; - } - // As above, the fds cannot change as we are writing. Release mut so that - // we do not hold it while we go to disk. + assert(endchunk != -1); pthread_mutex_unlock(&fp->mut); - const byte * buf = stasis_ringbuffer_get_rd_buf(fp->ring, off, len); - int succ = stasis_log_file_pool_chunk_write_buffer(fd, buf, chunk_len, file_off, off); - if(!succ) { - fprintf(stderr, "A: chunk is %d cnk offset is %lld offset =s %lldn", chunk, file_off, off); + lsn_t bytes_written = 0; + for(int c = chunk; c <= endchunk; c++){ + lsn_t write_len; + if(c < endchunk) { + write_len = fp->live_offsets[c+1] - (off+bytes_written); + } else { + write_len = len - bytes_written; + } + int succ = stasis_log_file_pool_chunk_write_buffer(fds[c-chunk], buf+bytes_written, write_len, file_offs[c-chunk], off+bytes_written); assert(succ); - } - if(remaining_len) { - uint32_t zero = 0; - // Close current log file. This might increase its length - // by a byte or so, but that's the filesystem's problem. - succ = stasis_log_file_pool_chunk_write_buffer( - fd, - (const byte*)&zero, - sizeof(zero), - file_off, - off+chunk_len); - if(!succ) { - fprintf(stderr, "B: chunk is %d cnk offset is %lld offset =s %lld\n", chunk, file_off, off); - assert(succ); - } - succ = stasis_log_file_pool_chunk_write_buffer( - spill_fd, - buf + chunk_len, - remaining_len, - spill_off, - off + chunk_len); - if(!succ) { - fprintf(stderr, "C: chunk is %d cnk offset is %lld offset =s %lld\n", chunk, spill_off, off); + bytes_written += write_len; + + if(c < endchunk) { + uint32_t zero = 0; + // Close current log file. This might increase its length + // by a byte or so, but that's the filesystem's problem. + succ = stasis_log_file_pool_chunk_write_buffer( + fds[c-chunk], + (const byte*)&zero, + sizeof(zero), + file_offs[c-chunk], + off+bytes_written); // don't count this write toward bytes written, since it's not logically part of the log. assert(succ); } } + free(file_offs); + free(fds); stasis_ringbuffer_read_done(fp->ring, &handle); } return 0; @@ -661,9 +652,9 @@ stasis_log_t* stasis_log_file_pool_open(const char* dirname, int filemode, int f fp->target_chunk_size = 64 * 1024 * 1024; - fp->filemode = filemode | O_SYNC; /// XXX should not hard-code O_SYNC. + fp->filemode = filemode | O_DSYNC; /// XXX should not hard-code O_SYNC. fp->fileperm = fileperm; - fp->softcommit = !(filemode & O_SYNC); + fp->softcommit = !(filemode & O_DSYNC); off_t current_target = 0; for(int i = 0; i < n; i++) { @@ -690,6 +681,7 @@ stasis_log_t* stasis_log_file_pool_open(const char* dirname, int filemode, int f fp->live_offsets[fp->live_count] = lsn; /*fo->ro_fd=*/ stasis_log_file_pool_chunk_open(fp, fp->live_count); + // XXX check to see if this file contains valid data. If not, then we crashed after creating this file, but before syncing the previous one. assert(lsn <= current_target || !current_target); char * full_name = build_path(fp->dirname, fp->live_filenames[fp->live_count]); if(!stat(full_name, &st)) { @@ -712,15 +704,20 @@ stasis_log_t* stasis_log_file_pool_open(const char* dirname, int filemode, int f free(namelist[i]); } - assert(fp->live_count); free(namelist); - printf("Current log segment appears to be %s. Scanning for next available LSN\n", fp->live_filenames[fp->live_count-1]); + lsn_t next_lsn; + if(!fp->live_count) { + next_lsn = 1; + stasis_log_file_pool_append_chunk(ret, 1); + } else { - lsn_t next_lsn = stasis_log_file_pool_chunk_scrub_to_eof(ret, fp->ro_fd[fp->live_count-1], fp->live_offsets[fp->live_count-1]); + printf("Current log segment appears to be %s. Scanning for next available LSN\n", fp->live_filenames[fp->live_count-1]); - printf("Scan returned %lld\n", (long long)next_lsn); + next_lsn = stasis_log_file_pool_chunk_scrub_to_eof(ret, fp->ro_fd[fp->live_count-1], fp->live_offsets[fp->live_count-1]); + printf("Scan returned %lld\n", (long long)next_lsn); + } // The previous segment must have been forced to disk before we created the current one, so we're good to go. fp->ring = stasis_ringbuffer_init(26, next_lsn); // 64mb buffer diff --git a/src/stasis/util/ringbuffer.c b/src/stasis/util/ringbuffer.c index 375fb72..d5f63c9 100644 --- a/src/stasis/util/ringbuffer.c +++ b/src/stasis/util/ringbuffer.c @@ -91,7 +91,10 @@ lsn_t stasis_ringbuffer_reserve_space(stasis_ringbuffer_t * ring, lsn_t sz, lsn_ } lsn_t stasis_ringbuffer_nb_consume_bytes(stasis_ringbuffer_t * ring, lsn_t off, lsn_t* sz) { if(off == RING_NEXT) { off = ring->rf; } - if(*sz == RING_NEXT) { *sz = ring->wt - off; } + if(*sz == RING_NEXT) { + *sz = ring->wt - off; + if(*sz == 0) { return RING_VOLATILE; } + } // has the entire byte range been consumed? (This is "normal".) if(off + *sz < ring->rt) { return RING_TRUNCATED; } @@ -117,17 +120,10 @@ lsn_t stasis_ringbuffer_consume_bytes(stasis_ringbuffer_t * ring, lsn_t* sz, lsn lsn_t orig_sz = *sz; if(ring->flush > ring->rf) { -// pthread_mutex_unlock(&ring->mut); -// struct timespec tv; -// tv.tv_sec = 0; -// tv.tv_nsec = 100000; -// nanosleep(&tv, 0); -// pthread_mutex_lock(&ring->mut); if(ring->flush > ring->rf) { *sz = RING_NEXT; } } if(ring->shutdown) { if(ring->rt == ring->wf) { - fprintf(stderr, "Shutting down, and there are no more bytes. Signaling shutdown thread.\n"); pthread_cond_signal(&ring->read_done); pthread_mutex_unlock(&ring->mut); return RING_CLOSED; @@ -260,7 +256,7 @@ stasis_ringbuffer_t * stasis_ringbuffer_init(intptr_t base, lsn_t initial_offset // Allocate the memory region using mmap black magic. - char* name = strdup("/dev/shm/stasis-ringbuffer-XXXXXX"); + char* name = strdup("/tmp/stasis-ringbuffer-XXXXXX"); ring->fd = mkstemp(name); if(ring->fd == -1) { perror("Couldn't mkstemp\n"); abort(); } @@ -320,14 +316,15 @@ void stasis_ringbuffer_shutdown(stasis_ringbuffer_t * ring) { pthread_mutex_lock(&ring->mut); ring->shutdown = 1; do { - fprintf(stderr, "%lld < %lld signaling readers for shutdown and sleeping\n", ring->rt, ring->wf); +// fprintf(stderr, "%lld < %lld signaling readers for shutdown and sleeping\n", ring->rt, ring->wf); pthread_cond_signal(&ring->write_done); pthread_cond_wait(&ring->read_done,&ring->mut); - fprintf(stderr, "readers done\n"); +// fprintf(stderr, "readers done\n"); } while (ring->rt < ring->wf); pthread_mutex_unlock(&ring->mut); - +} +void stasis_ringbuffer_free(stasis_ringbuffer_t * ring) { lsn_t size = ring->mask+1; int err = munmap(((char*)ring->mem), size * 2); if(err == -1) { perror("could not munmap first half of ringbuffer"); } diff --git a/stasis/util/ringbuffer.h b/stasis/util/ringbuffer.h index 3b0664f..116878b 100644 --- a/stasis/util/ringbuffer.h +++ b/stasis/util/ringbuffer.h @@ -31,7 +31,10 @@ lsn_t stasis_ringbuffer_get_write_frontier(stasis_ringbuffer_t * ring); void stasis_ringbuffer_advance_read_tail(stasis_ringbuffer_t * ring, lsn_t off); typedef enum { RING_TORN = -1, RING_VOLATILE = -2, RING_FULL = -3, RING_TRUNCATED = -4, RING_NEXT = -5, RING_CLOSED = -6, RING_MINERR = -7 } stasis_ringbuffer_error_t; void stasis_ringbuffer_flush(stasis_ringbuffer_t * ring, lsn_t off); -void stasis_ringbuffer_shutdown(stasis_ringbuffer_t * ring); +// Causes ringbuffer requests to stop blocking, and return RING_CLOSED +void stasis_ringbuffer_shutdown(stasis_ringbuffer_t * ring); +// Deallocates the ringbuffer (call after any threads using the ringbuffer have shutdown). +void stasis_ringbuffer_free(stasis_ringbuffer_t * ring); END_C_DECLS #endif /* RINGBUFFER_H_ */ diff --git a/utilities/README.log b/utilities/README.log new file mode 100644 index 0000000..1645863 --- /dev/null +++ b/utilities/README.log @@ -0,0 +1,26 @@ +Stasis' default log implementation ("filePool") creates a directory +full of 64MB log "chunk" files. Synchronously extending these files +one entry at a time is extremely expensive, as the filesystem must +repeatedly update its metadata. The "preallocate_log" utility in this +directory creates a Stasis log directory, and fills it with some +number of empty log chunks. It is safe to run this utility against an +existing Stasis log directory, but Stasis will not use the new chunks +until it is restarted. Also, although it is safe to mix and match log +chunk sizes, Stasis will continue to use its compiled-in size, wasting +the end of overly large chunks, and incrementally extending undersized +ones. + +preallocate_log usage: + +The first parameter is an integer, and specifies the number of log +chunks to be created. The second parameter is the name of the Stasis +log directory, and defaults to "stasis_log", the current default name. +The third parameter is the log chunk size, in the format extended by +dd's "bs" option (eg: 64MB, 1GB, and so on). + +delete_log usage: + +This script renames all of the files in a log directory so that they +end in "~". This will cause Stasis to reuse the files, but ignore +their contents (which avoids the cost of re-creating them). + diff --git a/utilities/delete_log b/utilities/delete_log new file mode 100755 index 0000000..9ccbdf8 --- /dev/null +++ b/utilities/delete_log @@ -0,0 +1,11 @@ +#!/usr/bin/perl -w +use strict; + +my $dir = shift || "stasis_log"; + +my @files = `ls $dir/log-chunk-????????????????????`; +foreach my $f (@files) { + chomp $f; + print "$f -> $f~\n"; + system("mv $f $f~"); +} diff --git a/utilities/preallocate_log b/utilities/preallocate_log new file mode 100755 index 0000000..8f5f68b --- /dev/null +++ b/utilities/preallocate_log @@ -0,0 +1,14 @@ +#!/usr/bin/perl -w +use strict; + +my $count = shift || die "Usage: $0 count [dir] [chunk-size]\n"; +my $dir = shift || "stasis_log/"; +my $sz = shift || "64M"; + +system("mkdir -p $dir"); + +for(my $i = 0; $i < $count; $i++) { + my $file = sprintf("log-chunk-%020d~", $i); + print("Writing $file\n"); + system("dd if=/dev/zero of=$dir/$file bs=$sz count=1\n"); +}