bugfixes and optimizations for the new log format; log preallocation and truncation utilities

This commit is contained in:
Sears Russell 2011-05-14 03:49:38 +00:00
parent 92f9b307f1
commit 801fb6de26
7 changed files with 130 additions and 81 deletions

View file

@ -27,6 +27,7 @@ static void* worker(void* arg) {
LogEntry * e = allocUpdateLogEntry(l, -1, -1, OPERATION_NOOP, 0, 0);
l->write_entry(l, e);
l->write_entry_done(l, e);
l->force_tail(l, LOG_FORCE_COMMIT);
// if(! (i & 1023)) { l->force_tail(l, 0);}
}
return 0;

View file

@ -199,7 +199,7 @@ void stasis_log_file_pool_chunk_open(stasis_log_file_pool_state * fp, int chunk)
static int get_chunk_from_offset(stasis_log_t * log, lsn_t lsn) {
stasis_log_file_pool_state * fp = log->impl;
int chunk = -1;
if(fp->live_offsets[fp->live_count-1] <= lsn && fp->live_offsets[fp->live_count-1] + fp->target_chunk_size > lsn) {
if(fp->live_offsets[fp->live_count-1] <= lsn && (fp->live_offsets[fp->live_count-1] + fp->target_chunk_size) > lsn) {
return fp->live_count - 1;
}
for(int i = 0; i < fp->live_count; i++) {
@ -212,7 +212,6 @@ static int get_chunk_from_offset(stasis_log_t * log, lsn_t lsn) {
*/
int stasis_log_file_pool_append_chunk(stasis_log_t * log, off_t new_offset) {
stasis_log_file_pool_state * fp = log->impl;
char * old_file = 0;
char * new_file = stasis_log_file_pool_build_filename(fp, new_offset);
char * new_path = build_path(fp->dirname, new_file);
@ -251,18 +250,28 @@ LogEntry * stasis_log_file_pool_reserve_entry(stasis_log_t * log, size_t szs) {
uint64_t framed_size = sz+sizeof(uint32_t)+sizeof(uint32_t);
lsn_t off = stasis_ringbuffer_reserve_space(fp->ring, framed_size, handle);
pthread_mutex_lock(&fp->mut);
int endchunk = get_chunk_from_offset(log, off + sz + 2*sizeof(uint32_t));
//if(chunk >= fp->live_count || (off + (sz+2*sizeof(uint32_t)) > fp->live_offsets[chunk] + fp->target_chunk_size)) {
if(endchunk == -1) { //>= fp->live_count) {
stasis_log_file_pool_append_chunk(log, off);
int chunk = get_chunk_from_offset(log, off);
assert(chunk == fp->live_count-1);
}
lsn_t barrier = fp->live_offsets[fp->live_count - 1]-1;
pthread_mutex_unlock(&fp->mut);
if(barrier < off) {
stasis_ringbuffer_flush(fp->ring, barrier);
}
byte * buf = stasis_ringbuffer_get_wr_buf(fp->ring, off, framed_size);
memcpy(buf, &sz, sizeof(uint32_t));
LogEntry * e = (LogEntry*)(buf + (2 * sizeof(uint32_t)));
e->LSN = off;
pthread_mutex_lock(&fp->mut);
int chunk = get_chunk_from_offset(log, off);
if(chunk >= fp->live_count || (off + (sz+2*sizeof(uint32_t)) > fp->live_offsets[chunk] + fp->target_chunk_size)) {
stasis_log_file_pool_append_chunk(log, off);
}
pthread_mutex_unlock(&fp->mut);
return e;
}
/**
@ -484,6 +493,8 @@ int stasis_log_file_pool_close(stasis_log_t * log) {
pthread_join(fp->write_thread, 0);
// pthread_join(fp->write_thread2, 0);
stasis_ringbuffer_free(fp->ring);
for(int i = 0; i < fp->live_count; i++) {
close(fp->ro_fd[i]);
free(fp->live_filenames[i]);
@ -506,72 +517,52 @@ void * stasis_log_file_pool_writeback_worker(void * arg) {
stasis_log_file_pool_state * fp = log->impl;
lsn_t handle;
lsn_t off, next_chunk_off, chunk_len, remaining_len;
lsn_t off;
while(1) {
lsn_t len = 16*1024*1024;
off = stasis_ringbuffer_consume_bytes(fp->ring, &len, &handle);
const byte * buf = stasis_ringbuffer_get_rd_buf(fp->ring, off, len);
if(off == RING_CLOSED) break;
pthread_mutex_lock(&fp->mut);
int chunk = get_chunk_from_offset(log, off);
assert(chunk != -1); // chunks are created on insertion into the ring buffer...
if(fp->live_count > chunk+1) {
// see if this log record spans chunks.
next_chunk_off = fp->live_offsets[chunk+1];
chunk_len = next_chunk_off - off;
if(chunk_len > len) {
chunk_len = len;
remaining_len = 0;
} else {
remaining_len = len - chunk_len;
int endchunk = get_chunk_from_offset(log, off + len);
// build vector of write operations.
int* fds = malloc(sizeof(int) * (1 + endchunk-chunk));
lsn_t* file_offs = malloc(sizeof(lsn_t) * (1 + endchunk-chunk));
for(int c = chunk; c <= endchunk; c++) {
fds[c-chunk] = fp->ro_fd[c];
file_offs[c-chunk] = fp->live_offsets[c];
}
} else {
// this log record *cannot* span chunks, or the chunk would already
// exist.
chunk_len = len;
remaining_len = 0;
}
int fd = fp->ro_fd[chunk];
lsn_t file_off = fp->live_offsets[chunk];
int spill_fd = -1;
lsn_t spill_off = INVALID_LSN;
if(remaining_len) {
spill_fd = fp->ro_fd[chunk+1];
spill_off = fp->live_offsets[chunk+1];
}
// As above, the fds cannot change as we are writing. Release mut so that
// we do not hold it while we go to disk.
assert(endchunk != -1);
pthread_mutex_unlock(&fp->mut);
const byte * buf = stasis_ringbuffer_get_rd_buf(fp->ring, off, len);
int succ = stasis_log_file_pool_chunk_write_buffer(fd, buf, chunk_len, file_off, off);
if(!succ) {
fprintf(stderr, "A: chunk is %d cnk offset is %lld offset =s %lldn", chunk, file_off, off);
assert(succ);
lsn_t bytes_written = 0;
for(int c = chunk; c <= endchunk; c++){
lsn_t write_len;
if(c < endchunk) {
write_len = fp->live_offsets[c+1] - (off+bytes_written);
} else {
write_len = len - bytes_written;
}
if(remaining_len) {
int succ = stasis_log_file_pool_chunk_write_buffer(fds[c-chunk], buf+bytes_written, write_len, file_offs[c-chunk], off+bytes_written);
assert(succ);
bytes_written += write_len;
if(c < endchunk) {
uint32_t zero = 0;
// Close current log file. This might increase its length
// by a byte or so, but that's the filesystem's problem.
succ = stasis_log_file_pool_chunk_write_buffer(
fd,
fds[c-chunk],
(const byte*)&zero,
sizeof(zero),
file_off,
off+chunk_len);
if(!succ) {
fprintf(stderr, "B: chunk is %d cnk offset is %lld offset =s %lld\n", chunk, file_off, off);
assert(succ);
}
succ = stasis_log_file_pool_chunk_write_buffer(
spill_fd,
buf + chunk_len,
remaining_len,
spill_off,
off + chunk_len);
if(!succ) {
fprintf(stderr, "C: chunk is %d cnk offset is %lld offset =s %lld\n", chunk, spill_off, off);
file_offs[c-chunk],
off+bytes_written); // don't count this write toward bytes written, since it's not logically part of the log.
assert(succ);
}
}
free(file_offs);
free(fds);
stasis_ringbuffer_read_done(fp->ring, &handle);
}
return 0;
@ -661,9 +652,9 @@ stasis_log_t* stasis_log_file_pool_open(const char* dirname, int filemode, int f
fp->target_chunk_size = 64 * 1024 * 1024;
fp->filemode = filemode | O_SYNC; /// XXX should not hard-code O_SYNC.
fp->filemode = filemode | O_DSYNC; /// XXX should not hard-code O_SYNC.
fp->fileperm = fileperm;
fp->softcommit = !(filemode & O_SYNC);
fp->softcommit = !(filemode & O_DSYNC);
off_t current_target = 0;
for(int i = 0; i < n; i++) {
@ -690,6 +681,7 @@ stasis_log_t* stasis_log_file_pool_open(const char* dirname, int filemode, int f
fp->live_offsets[fp->live_count] = lsn;
/*fo->ro_fd=*/ stasis_log_file_pool_chunk_open(fp, fp->live_count);
// XXX check to see if this file contains valid data. If not, then we crashed after creating this file, but before syncing the previous one.
assert(lsn <= current_target || !current_target);
char * full_name = build_path(fp->dirname, fp->live_filenames[fp->live_count]);
if(!stat(full_name, &st)) {
@ -712,15 +704,20 @@ stasis_log_t* stasis_log_file_pool_open(const char* dirname, int filemode, int f
free(namelist[i]);
}
assert(fp->live_count);
free(namelist);
lsn_t next_lsn;
if(!fp->live_count) {
next_lsn = 1;
stasis_log_file_pool_append_chunk(ret, 1);
} else {
printf("Current log segment appears to be %s. Scanning for next available LSN\n", fp->live_filenames[fp->live_count-1]);
lsn_t next_lsn = stasis_log_file_pool_chunk_scrub_to_eof(ret, fp->ro_fd[fp->live_count-1], fp->live_offsets[fp->live_count-1]);
next_lsn = stasis_log_file_pool_chunk_scrub_to_eof(ret, fp->ro_fd[fp->live_count-1], fp->live_offsets[fp->live_count-1]);
printf("Scan returned %lld\n", (long long)next_lsn);
}
// The previous segment must have been forced to disk before we created the current one, so we're good to go.
fp->ring = stasis_ringbuffer_init(26, next_lsn); // 64mb buffer

View file

@ -91,7 +91,10 @@ lsn_t stasis_ringbuffer_reserve_space(stasis_ringbuffer_t * ring, lsn_t sz, lsn_
}
lsn_t stasis_ringbuffer_nb_consume_bytes(stasis_ringbuffer_t * ring, lsn_t off, lsn_t* sz) {
if(off == RING_NEXT) { off = ring->rf; }
if(*sz == RING_NEXT) { *sz = ring->wt - off; }
if(*sz == RING_NEXT) {
*sz = ring->wt - off;
if(*sz == 0) { return RING_VOLATILE; }
}
// has the entire byte range been consumed? (This is "normal".)
if(off + *sz < ring->rt) { return RING_TRUNCATED; }
@ -117,17 +120,10 @@ lsn_t stasis_ringbuffer_consume_bytes(stasis_ringbuffer_t * ring, lsn_t* sz, lsn
lsn_t orig_sz = *sz;
if(ring->flush > ring->rf) {
// pthread_mutex_unlock(&ring->mut);
// struct timespec tv;
// tv.tv_sec = 0;
// tv.tv_nsec = 100000;
// nanosleep(&tv, 0);
// pthread_mutex_lock(&ring->mut);
if(ring->flush > ring->rf) { *sz = RING_NEXT; }
}
if(ring->shutdown) {
if(ring->rt == ring->wf) {
fprintf(stderr, "Shutting down, and there are no more bytes. Signaling shutdown thread.\n");
pthread_cond_signal(&ring->read_done);
pthread_mutex_unlock(&ring->mut);
return RING_CLOSED;
@ -260,7 +256,7 @@ stasis_ringbuffer_t * stasis_ringbuffer_init(intptr_t base, lsn_t initial_offset
// Allocate the memory region using mmap black magic.
char* name = strdup("/dev/shm/stasis-ringbuffer-XXXXXX");
char* name = strdup("/tmp/stasis-ringbuffer-XXXXXX");
ring->fd = mkstemp(name);
if(ring->fd == -1) { perror("Couldn't mkstemp\n"); abort(); }
@ -320,14 +316,15 @@ void stasis_ringbuffer_shutdown(stasis_ringbuffer_t * ring) {
pthread_mutex_lock(&ring->mut);
ring->shutdown = 1;
do {
fprintf(stderr, "%lld < %lld signaling readers for shutdown and sleeping\n", ring->rt, ring->wf);
// fprintf(stderr, "%lld < %lld signaling readers for shutdown and sleeping\n", ring->rt, ring->wf);
pthread_cond_signal(&ring->write_done);
pthread_cond_wait(&ring->read_done,&ring->mut);
fprintf(stderr, "readers done\n");
// fprintf(stderr, "readers done\n");
} while (ring->rt < ring->wf);
pthread_mutex_unlock(&ring->mut);
}
void stasis_ringbuffer_free(stasis_ringbuffer_t * ring) {
lsn_t size = ring->mask+1;
int err = munmap(((char*)ring->mem), size * 2);
if(err == -1) { perror("could not munmap first half of ringbuffer"); }

View file

@ -31,7 +31,10 @@ lsn_t stasis_ringbuffer_get_write_frontier(stasis_ringbuffer_t * ring);
void stasis_ringbuffer_advance_read_tail(stasis_ringbuffer_t * ring, lsn_t off);
typedef enum { RING_TORN = -1, RING_VOLATILE = -2, RING_FULL = -3, RING_TRUNCATED = -4, RING_NEXT = -5, RING_CLOSED = -6, RING_MINERR = -7 } stasis_ringbuffer_error_t;
void stasis_ringbuffer_flush(stasis_ringbuffer_t * ring, lsn_t off);
void stasis_ringbuffer_shutdown(stasis_ringbuffer_t * ring);
// Causes ringbuffer requests to stop blocking, and return RING_CLOSED
void stasis_ringbuffer_shutdown(stasis_ringbuffer_t * ring);
// Deallocates the ringbuffer (call after any threads using the ringbuffer have shutdown).
void stasis_ringbuffer_free(stasis_ringbuffer_t * ring);
END_C_DECLS
#endif /* RINGBUFFER_H_ */

26
utilities/README.log Normal file
View file

@ -0,0 +1,26 @@
Stasis' default log implementation ("filePool") creates a directory
full of 64MB log "chunk" files. Synchronously extending these files
one entry at a time is extremely expensive, as the filesystem must
repeatedly update its metadata. The "preallocate_log" utility in this
directory creates a Stasis log directory, and fills it with some
number of empty log chunks. It is safe to run this utility against an
existing Stasis log directory, but Stasis will not use the new chunks
until it is restarted. Also, although it is safe to mix and match log
chunk sizes, Stasis will continue to use its compiled-in size, wasting
the end of overly large chunks, and incrementally extending undersized
ones.
preallocate_log usage:
The first parameter is an integer, and specifies the number of log
chunks to be created. The second parameter is the name of the Stasis
log directory, and defaults to "stasis_log", the current default name.
The third parameter is the log chunk size, in the format extended by
dd's "bs" option (eg: 64MB, 1GB, and so on).
delete_log usage:
This script renames all of the files in a log directory so that they
end in "~". This will cause Stasis to reuse the files, but ignore
their contents (which avoids the cost of re-creating them).

11
utilities/delete_log Executable file
View file

@ -0,0 +1,11 @@
#!/usr/bin/perl -w
use strict;
my $dir = shift || "stasis_log";
my @files = `ls $dir/log-chunk-????????????????????`;
foreach my $f (@files) {
chomp $f;
print "$f -> $f~\n";
system("mv $f $f~");
}

14
utilities/preallocate_log Executable file
View file

@ -0,0 +1,14 @@
#!/usr/bin/perl -w
use strict;
my $count = shift || die "Usage: $0 count [dir] [chunk-size]\n";
my $dir = shift || "stasis_log/";
my $sz = shift || "64M";
system("mkdir -p $dir");
for(my $i = 0; $i < $count; $i++) {
my $file = sprintf("log-chunk-%020d~", $i);
print("Writing $file\n");
system("dd if=/dev/zero of=$dir/$file bs=$sz count=1\n");
}