diff --git a/CMakeLists.txt b/CMakeLists.txt index 1ec5796..c211a29 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -4,7 +4,7 @@ cmake_minimum_required(VERSION 2.4) # For all I know, 2.0 works too... # The new behavior seems preferable, though it shouldn't affect us either way. if(COMMAND cmake_policy) - cmake_policy(SET CMP0014 OLD) ## This breaks builds on older cmake... +# cmake_policy(SET CMP0014 OLD) ## This breaks builds on older cmake... cmake_policy(SET CMP0003 NEW) endif(COMMAND cmake_policy) diff --git a/src/stasis/flags.c b/src/stasis/flags.c index a2d2aad..c165d83 100644 --- a/src/stasis/flags.c +++ b/src/stasis/flags.c @@ -55,14 +55,14 @@ stasis_handle_t* (*stasis_handle_factory)() = #else stasis_handle_default_factory; #endif -stasis_handle_t* (*stasis_handle_file_factory)(lsn_t logical_offset, const char* filename, int open_mode, int creat_perms) = +stasis_handle_t* (*stasis_handle_file_factory)(const char* filename, int open_mode, int creat_perms) = #ifdef STASIS_FILE_HANDLE_FACTORY STASIS_FILE_HANDLE_FACTORY #else stasis_handle_open_pfile; #endif -stasis_handle_t* (*stasis_non_blocking_handle_file_factory)(lsn_t logical_offset, const char* filename, int open_mode, int creat_perms) = +stasis_handle_t* (*stasis_non_blocking_handle_file_factory)(const char* filename, int open_mode, int creat_perms) = #ifdef STASIS_NON_BLOCKING_HANDLE_FILE_FACTORY STASIS_NON_BLOCKING_HANDLE_FILE_FACTORY #else diff --git a/src/stasis/io/debug.c b/src/stasis/io/debug.c index 9dadfb0..69ff3f1 100644 --- a/src/stasis/io/debug.c +++ b/src/stasis/io/debug.c @@ -48,14 +48,6 @@ static void debug_enable_sequential_optimizations(struct stasis_handle_t *h) { hh->enable_sequential_optimizations(hh); printf("tid=%9ld retn enable_sequential_optimizations(%lx)\n", (long)(intptr_t)pthread_self(), (unsigned long)hh); fflush(stdout); } - -static lsn_t debug_start_position(stasis_handle_t *h) { - stasis_handle_t * hh = ((debug_impl*)h->impl)->h; - printf("tid=%9ld call start_position(%lx)\n", (long)(intptr_t)pthread_self(), (unsigned long)hh); fflush(stdout); - int ret = hh->start_position(hh); - printf("tid=%9ld retn start_position(%lx) = %d\n", (long)(intptr_t)pthread_self(), (unsigned long)hh, ret); fflush(stdout); - return ret; -} static lsn_t debug_end_position(stasis_handle_t *h) { stasis_handle_t * hh = ((debug_impl*)h->impl)->h; printf("tid=%9ld call end_position(%lx)\n", (long)(intptr_t)pthread_self(), (unsigned long)hh); fflush(stdout); @@ -77,21 +69,6 @@ static stasis_write_buffer_t * debug_write_buffer(stasis_handle_t * h, (long)(intptr_t)pthread_self(), (unsigned long)hh, off, len, (unsigned long)retWrap); fflush(stdout); return retWrap; } -static stasis_write_buffer_t * debug_append_buffer(stasis_handle_t * h, - lsn_t len) { - stasis_handle_t * hh = ((debug_impl*)h->impl)->h; - printf("tid=%9ld call append_buffer(%lx, %lld)\n", - (long)(intptr_t)pthread_self(), (unsigned long)hh, len); fflush(stdout); - stasis_write_buffer_t * ret = hh->append_buffer(hh,len); - stasis_write_buffer_t * retWrap = malloc(sizeof(stasis_write_buffer_t)); - *retWrap = *ret; - retWrap->h = h; - retWrap->impl = ret; - printf("tid=%9ld retn append_buffer(%lx, %lld) = %lx (off=%lld)\n", - (long)(intptr_t)pthread_self(), (unsigned long)hh, len, (unsigned long)retWrap, ret->off); fflush(stdout); - return retWrap; - -} static int debug_release_write_buffer(stasis_write_buffer_t * w_wrap) { stasis_write_buffer_t * w = (stasis_write_buffer_t*)w_wrap->impl; stasis_handle_t * hh = w->h; @@ -137,19 +114,6 @@ static int debug_write(stasis_handle_t * h, lsn_t off, printf("tid=%9ld retn write(%lx) = %d\n", (long)(intptr_t)pthread_self(), (unsigned long)hh, ret); fflush(stdout); return ret; } -static int debug_append(stasis_handle_t * h, lsn_t * off, - const byte * dat, lsn_t len) { - stasis_handle_t * hh = ((debug_impl*)h->impl)->h; - printf("tid=%9ld call append(%lx, ??, %lx, %lld)\n", (long)(intptr_t)pthread_self(), (unsigned long)hh, (unsigned long)dat, len); fflush(stdout); - lsn_t tmpOff; - if(!off) { - off = &tmpOff; - } - int ret = hh->append(hh, off, dat, len); - printf("tid=%9ld retn append(%lx, %lld, %lx, %lld) = %d\n", (long)(intptr_t)pthread_self(), (unsigned long)hh, *off, (unsigned long) dat, len, ret); fflush(stdout); - return ret; - -} static int debug_read(stasis_handle_t * h, lsn_t off, byte * buf, lsn_t len) { stasis_handle_t * hh = ((debug_impl*)h->impl)->h; @@ -172,33 +136,21 @@ static int debug_force_range(stasis_handle_t *h, lsn_t start, lsn_t stop) { printf("tid=%9ld retn force(%lx) = %d\n", (long)(intptr_t)pthread_self(), (unsigned long)hh, ret); fflush(stdout); return ret; } -static int debug_truncate_start(stasis_handle_t * h, lsn_t new_start) { - stasis_handle_t * hh = ((debug_impl*)h->impl)->h; - printf("tid=%9ld call truncate_start(%lx, %lld)\n", (long)(intptr_t)pthread_self(), (unsigned long)hh, new_start); fflush(stdout); - int ret = hh->truncate_start(hh, new_start); - printf("tid=%9ld retn truncate_start(%lx) = %d\n", (long)(intptr_t)pthread_self(), (unsigned long)hh, ret); fflush(stdout); - return ret; -} - struct stasis_handle_t debug_func = { .num_copies = debug_num_copies, .num_copies_buffer = debug_num_copies_buffer, .close = debug_close, .dup = debug_dup, .enable_sequential_optimizations = debug_enable_sequential_optimizations, - .start_position = debug_start_position, .end_position = debug_end_position, .write = debug_write, - .append = debug_append, .write_buffer = debug_write_buffer, - .append_buffer = debug_append_buffer, .release_write_buffer = debug_release_write_buffer, .read = debug_read, .read_buffer = debug_read_buffer, .release_read_buffer = debug_release_read_buffer, .force = debug_force, .force_range = debug_force_range, - .truncate_start = debug_truncate_start, .error = 0 }; diff --git a/src/stasis/io/file.c b/src/stasis/io/file.c index fafb322..266302d 100644 --- a/src/stasis/io/file.c +++ b/src/stasis/io/file.c @@ -10,7 +10,6 @@ typedef struct file_impl { pthread_mutex_t mut; - lsn_t start_pos; lsn_t end_pos; int fd; int file_flags; @@ -24,7 +23,7 @@ static int updateEOF(stasis_handle_t * h) { if(pos == (off_t)-1) { return errno; } else { - impl->end_pos = impl->start_pos + pos; + impl->end_pos = pos; return 0; } } @@ -48,19 +47,11 @@ static int file_close(stasis_handle_t * h) { } static stasis_handle_t* file_dup(stasis_handle_t * h) { file_impl * impl = h->impl; - return stasis_handle_open_file(impl->start_pos, impl->filename, impl->file_flags, impl->file_mode); + return stasis_handle_open_file(impl->filename, impl->file_flags, impl->file_mode); } static void file_enable_sequential_optimizations(stasis_handle_t * h) { // TODO enable_sequential_optimizations is a no-op in file.c } -static lsn_t file_start_position(stasis_handle_t *h) { - file_impl * impl = (file_impl*)h->impl; - pthread_mutex_lock(&(impl->mut)); - lsn_t ret = impl->start_pos; - pthread_mutex_unlock(&(impl->mut)); - return ret; -} - static lsn_t file_end_position(stasis_handle_t *h) { file_impl * impl = (file_impl*)h->impl; pthread_mutex_lock(&(impl->mut)); @@ -82,12 +73,12 @@ inline static int file_write_unlocked(stasis_handle_t * h, lsn_t off, int error = 0; // These should have been checked by the caller. - assert(impl->start_pos <= off); + assert(0 <= off); assert(impl->end_pos >= off+len); // @todo need a test harness that gets read(), write() and lseek() to misbehave. - off_t lseek_offset = lseek(impl->fd, off - impl->start_pos, SEEK_SET); + off_t lseek_offset = lseek(impl->fd, off, SEEK_SET); if(lseek_offset == (off_t)-1) { error = errno; @@ -114,7 +105,7 @@ inline static int file_write_unlocked(stasis_handle_t * h, lsn_t off, // Try again. - lseek_offset = lseek(impl->fd, off + bytes_written - impl->start_pos, SEEK_SET); + lseek_offset = lseek(impl->fd, off + bytes_written, SEEK_SET); if(lseek_offset == (off_t)-1) { error = errno; if(error == EBADF || error == ESPIPE) { @@ -155,7 +146,7 @@ static int file_read(stasis_handle_t * h, file_impl * impl = (file_impl*)(h->impl); pthread_mutex_lock(&(impl->mut)); int error = 0; - if(off < impl->start_pos) { + if(off < 0) { error = EDOM; } else if(off + len > impl->end_pos) { error = updateEOF(h); @@ -165,7 +156,7 @@ static int file_read(stasis_handle_t * h, } if(!error) { - off_t lseek_offset = lseek(impl->fd, off - impl->start_pos, SEEK_SET); + off_t lseek_offset = lseek(impl->fd, off, SEEK_SET); if(lseek_offset == (off_t)-1) { error = errno; @@ -193,7 +184,7 @@ static int file_read(stasis_handle_t * h, // Try again. - lseek_offset = lseek(impl->fd, off + bytes_written - impl->start_pos, SEEK_SET); + lseek_offset = lseek(impl->fd, off + bytes_written, SEEK_SET); if(lseek_offset == (off_t)-1) { error = errno; if(error == EBADF || error == ESPIPE) { @@ -237,7 +228,7 @@ static int file_write(stasis_handle_t *h, lsn_t off, const byte * dat, lsn_t len file_impl * impl = (file_impl*)(h->impl); pthread_mutex_lock(&(impl->mut)); int error = 0; - if(impl->start_pos > off) { + if(off < 0) { error = EDOM; } @@ -251,19 +242,6 @@ static int file_write(stasis_handle_t *h, lsn_t off, const byte * dat, lsn_t len return error; } -static int file_append(stasis_handle_t * h, lsn_t * off, const byte * dat, lsn_t len) { - file_impl * impl = (file_impl*)(h->impl); - pthread_mutex_lock(&impl->mut); - updateEOF(h); - *off = impl->end_pos; - impl->end_pos += len; - int error = file_write_unlocked(h, *off, dat, len); - pthread_mutex_unlock(&impl->mut); - return error; -} - - - static stasis_write_buffer_t * file_write_buffer(stasis_handle_t * h, lsn_t off, lsn_t len) { // Allocate the handle @@ -275,7 +253,7 @@ static stasis_write_buffer_t * file_write_buffer(stasis_handle_t * h, pthread_mutex_lock(&(impl->mut)); - if(impl->start_pos > off) { + if(off < 0) { error = EDOM; } if(off + len > impl->end_pos) { @@ -311,46 +289,6 @@ static stasis_write_buffer_t * file_write_buffer(stasis_handle_t * h, return ret; } -static stasis_write_buffer_t * file_append_buffer(stasis_handle_t * h, - lsn_t len) { - // Allocate the handle - stasis_write_buffer_t * ret = malloc(sizeof(stasis_write_buffer_t)); - if(!ret) { return NULL; } - - file_impl * impl = (file_impl*)h->impl; - - // Obtain an appropriate offset - pthread_mutex_lock(&(impl->mut)); - updateEOF(h); - off_t off = impl->end_pos; - impl->end_pos += len; - pthread_mutex_unlock(&(impl->mut)); - - // Allocate the buffer - byte * buf = malloc(len); - int error = 0; - if(!buf) { - error = ENOMEM; - } - - if(error) { - ret->h = h; - ret->off = 0; - ret->buf = 0; - ret->len = 0; - ret->impl = 0; - ret->error = error; - } else { - ret->h = h; - ret->off = off; - ret->buf = buf; - ret->len = len; - ret->impl = 0; - ret->error = 0; - } - return ret; - -} static int file_release_write_buffer(stasis_write_buffer_t * w) { file_impl * impl = (file_impl*)(w->h->impl); @@ -358,7 +296,7 @@ static int file_release_write_buffer(stasis_write_buffer_t * w) { pthread_mutex_lock(&(impl->mut)); int error = 0; if(impl->end_pos < w->off + w->len || - impl->start_pos > w->off) { + 0 > w->off) { error = EDOM; } @@ -413,20 +351,7 @@ static int file_force(stasis_handle_t * h) { file_impl * impl = h->impl; if(!(impl->file_flags & O_SYNC)) { - pthread_mutex_lock(&impl->mut); // must latch because of truncate... :( int fd = impl->fd; - pthread_mutex_unlock(&impl->mut); - { - static int warned = 0; - if(!warned) { - printf("Warning: There is a race condition between force() and " - " truncate() in file.c (This shouldn't matter in practice, " - "as the logger hasn't moved over to use file.c yet.\n"); - warned = 1; - } - } - // XXX there is a race here; the file handle could have been invalidated - // by truncate. #ifdef HAVE_FDATASYNC DEBUG("file_force() is calling fdatasync()\n"); fdatasync(fd); @@ -446,8 +371,6 @@ static int file_force_range(stasis_handle_t *h, lsn_t start, lsn_t stop) { // not opened synchronously; we need to explicitly sync. pthread_mutex_lock(&impl->mut); int fd = impl->fd; - lsn_t off = impl->start_pos; - (void)off; pthread_mutex_unlock(&impl->mut); { static int warned = 0; @@ -487,69 +410,6 @@ static int file_force_range(stasis_handle_t *h, lsn_t start, lsn_t stop) { } return ret; } -static int file_truncate_start(stasis_handle_t * h, lsn_t new_start) { - file_impl * impl = h->impl; - pthread_mutex_lock(&impl->mut); - int error = 0; - - if(new_start > impl->end_pos) { - updateEOF(h); - if(!error && new_start > impl->end_pos) { - error = EDOM; - } - } - - if(!error && new_start > impl->start_pos) { - char * tmpfile = malloc(strlen(impl->filename)+2); - strcpy(tmpfile, impl->filename); - tmpfile[strlen(tmpfile)+1] = '\0'; - tmpfile[strlen(tmpfile)] = '~'; - - int fd = open(tmpfile, impl->file_flags, impl->file_mode); - - lseek(fd, 0, SEEK_SET); - lseek(impl->fd, new_start-impl->start_pos, SEEK_SET); - int count; - int buf_size = 1024 * 1024; - char * buf = malloc(buf_size); - while((count = read(impl->fd, buf, buf_size))) { - if(count == -1) { - error = errno; - perror("truncate failed to read"); - } - ssize_t bytes_written = 0; - while(bytes_written < count) { - ssize_t write_ret = write(fd, buf+bytes_written, count-bytes_written); - if(write_ret == -1) { - error = errno; - perror("truncate failed to write"); - break; - } - bytes_written += write_ret; - } - if(error) break; - } - free(buf); - if(!error) { - if(-1 == close(impl->fd)) { - error = errno; - } - impl->fd = fd; - impl->start_pos = new_start; - fsync(impl->fd); - int rename_ret = rename(tmpfile, impl->filename); - if(rename_ret) { - error = errno; - } - free(tmpfile); - } else { - close(fd); - } - } - - pthread_mutex_unlock(&impl->mut); - return error; -} struct stasis_handle_t file_func = { .num_copies = file_num_copies, @@ -557,23 +417,19 @@ struct stasis_handle_t file_func = { .close = file_close, .dup = file_dup, .enable_sequential_optimizations = file_enable_sequential_optimizations, - .start_position = file_start_position, .end_position = file_end_position, .write = file_write, - .append = file_append, .write_buffer = file_write_buffer, - .append_buffer = file_append_buffer, .release_write_buffer = file_release_write_buffer, .read = file_read, .read_buffer = file_read_buffer, .release_read_buffer = file_release_read_buffer, .force = file_force, .force_range = file_force_range, - .truncate_start = file_truncate_start, .error = 0 }; -stasis_handle_t * stasis_handle(open_file)(lsn_t start_offset, const char * filename, int flags, int mode) { +stasis_handle_t * stasis_handle(open_file)(const char * filename, int flags, int mode) { stasis_handle_t * ret = malloc(sizeof(stasis_handle_t)); if(!ret) { return NULL; } *ret = file_func; @@ -581,8 +437,6 @@ stasis_handle_t * stasis_handle(open_file)(lsn_t start_offset, const char * file file_impl * impl = malloc(sizeof(file_impl)); ret->impl = impl; pthread_mutex_init(&(impl->mut), 0); - impl->start_pos = start_offset; - impl->end_pos = start_offset; assert(sizeof(off_t) >= (64/8)); impl->fd = open(filename, flags, mode); if(impl->fd == -1) { @@ -591,5 +445,6 @@ stasis_handle_t * stasis_handle(open_file)(lsn_t start_offset, const char * file impl->filename = strdup(filename); impl->file_flags = flags; impl->file_mode = mode; + updateEOF(ret); return ret; } diff --git a/src/stasis/io/handle.c b/src/stasis/io/handle.c index e233c71..211eb6f 100644 --- a/src/stasis/io/handle.c +++ b/src/stasis/io/handle.c @@ -19,5 +19,5 @@ #include stasis_handle_t* stasis_handle_default_factory() { - return stasis_handle_file_factory(0, stasis_store_file_name, O_CREAT | O_RDWR | stasis_buffer_manager_io_handle_flags, FILE_PERM); + return stasis_handle_file_factory(stasis_store_file_name, O_CREAT | O_RDWR | stasis_buffer_manager_io_handle_flags, FILE_PERM); } diff --git a/src/stasis/io/memory.c b/src/stasis/io/memory.c index dd52ff1..0d76a80 100644 --- a/src/stasis/io/memory.c +++ b/src/stasis/io/memory.c @@ -3,7 +3,6 @@ typedef struct mem_impl { pthread_mutex_t mut; - lsn_t start_pos; lsn_t end_pos; byte * buf; int refcount; @@ -32,16 +31,6 @@ static void mem_enable_sequential_optimizations(stasis_handle_t * h) { // No-op } -static lsn_t mem_start_position(stasis_handle_t *h) { - lsn_t ret; - mem_impl* impl = (mem_impl*)(h->impl); - - pthread_mutex_lock(&impl->mut); - ret = impl->start_pos; - pthread_mutex_unlock(&impl->mut); - - return ret; -} static lsn_t mem_end_position(stasis_handle_t *h) { lsn_t ret; mem_impl* impl = (mem_impl*)(h->impl); @@ -63,14 +52,14 @@ static stasis_write_buffer_t * mem_write_buffer(stasis_handle_t * h, int error = 0; - if(impl->start_pos > off) { + if(off < 0) { error = EDOM; } else if(impl->end_pos > off+len) { // Just need to return buffer; h's state is unchanged. } else { byte * newbuf; - if(off+len-impl->start_pos) { - newbuf = realloc(impl->buf, off+len - impl->start_pos); + if(off+len) { + newbuf = realloc(impl->buf, off+len); } else { free(impl->buf); newbuf = malloc(0); @@ -93,7 +82,7 @@ static stasis_write_buffer_t * mem_write_buffer(stasis_handle_t * h, } else { ret->h = h; ret->off = off; - ret->buf = &(impl->buf[off-impl->start_pos]); + ret->buf = &(impl->buf[off]); ret->len = len; ret->impl = 0; ret->error = 0; @@ -101,46 +90,6 @@ static stasis_write_buffer_t * mem_write_buffer(stasis_handle_t * h, return ret; } - -static stasis_write_buffer_t * mem_append_buffer(stasis_handle_t * h, - lsn_t len) { - mem_impl * impl = (mem_impl*)(h->impl); - - stasis_write_buffer_t * ret = malloc(sizeof(stasis_write_buffer_t)); - if(!ret) { return 0; } - - pthread_mutex_lock(&(impl->mut)); - - lsn_t off = impl->end_pos; - impl->end_pos += len; - size_t newlen = impl->end_pos - impl->start_pos; - byte * newbuf; - if(newlen == 0) { - free(impl->buf); - newbuf = malloc(0); - } else { - newbuf = realloc(impl->buf, impl->end_pos - impl->start_pos); - } - if(newbuf) { - impl->buf = newbuf; - - ret->h = h; - ret->off = off; - ret->buf = &(impl->buf[off-impl->start_pos]); - ret->len = len; - ret->impl = 0; - ret->error = 0; - } else { - // if we requested a zero length buffer, this is OK. - ret->h = h; - ret->off = 0; - ret->buf = 0; - ret->len = 0; - ret->impl = 0; - ret->error = ENOMEM; - } - return ret; -} static int mem_release_write_buffer(stasis_write_buffer_t * w) { mem_impl * impl = (mem_impl*)(w->h->impl); pthread_mutex_unlock(&(impl->mut)); @@ -156,7 +105,7 @@ static stasis_read_buffer_t * mem_read_buffer(stasis_handle_t * h, stasis_read_buffer_t * ret = malloc(sizeof(stasis_read_buffer_t)); if(!ret) { return NULL; } - if(off < impl->start_pos || off + len > impl->end_pos) { + if(off < 0 || off + len > impl->end_pos) { ret->h = h; ret->buf = 0; ret->len = 0; @@ -165,7 +114,7 @@ static stasis_read_buffer_t * mem_read_buffer(stasis_handle_t * h, ret->error = EDOM; } else { ret->h = h; - ret->buf = &(impl->buf[off-impl->start_pos]); + ret->buf = &(impl->buf[off]); ret->off = off; ret->len = len; ret->impl = 0; @@ -195,20 +144,6 @@ static int mem_write(stasis_handle_t * h, lsn_t off, return ret; } -static int mem_append(stasis_handle_t * h, lsn_t * off, const byte * dat, lsn_t len) { - stasis_write_buffer_t * w = mem_append_buffer(h, len); - int ret; - if(w->error) { - ret = w->error; - } else { - memcpy(w->buf, dat, len); - ret = 0; - } - *off = w->off; - mem_release_write_buffer(w); - return ret; -} - static int mem_read(stasis_handle_t * h, lsn_t off, byte * buf, lsn_t len) { stasis_read_buffer_t * r = mem_read_buffer(h, off, len); @@ -228,30 +163,6 @@ static int mem_force(stasis_handle_t *h) { static int mem_force_range(stasis_handle_t *h,lsn_t start, lsn_t stop) { return 0; } -static int mem_truncate_start(stasis_handle_t * h, lsn_t new_start) { - mem_impl* impl = (mem_impl*) h->impl; - pthread_mutex_lock(&(impl->mut)); - if(new_start < impl->start_pos) { - pthread_mutex_unlock(&impl->mut); - return 0; - } - if(new_start > impl->end_pos) { - pthread_mutex_unlock(&impl->mut); - return EDOM; - } - - byte * new_buf = malloc(impl->end_pos -new_start); - - memcpy(new_buf, &(impl->buf[new_start - impl->start_pos]), impl->end_pos - new_start); - - free(impl->buf); - - impl->buf = new_buf; - impl->start_pos = new_start; - - pthread_mutex_unlock(&(impl->mut)); - return 0; -} struct stasis_handle_t mem_func = { .num_copies = mem_num_copies, @@ -259,23 +170,19 @@ struct stasis_handle_t mem_func = { .close = mem_close, .dup = mem_dup, .enable_sequential_optimizations = mem_enable_sequential_optimizations, - .start_position = mem_start_position, .end_position = mem_end_position, .write = mem_write, - .append = mem_append, .write_buffer = mem_write_buffer, - .append_buffer = mem_append_buffer, .release_write_buffer = mem_release_write_buffer, .read = mem_read, .read_buffer = mem_read_buffer, .release_read_buffer = mem_release_read_buffer, .force = mem_force, .force_range = mem_force_range, - .truncate_start = mem_truncate_start, .error = 0 }; -stasis_handle_t * stasis_handle(open_memory)(lsn_t start_offset) { +stasis_handle_t * stasis_handle(open_memory)() { stasis_handle_t * ret = malloc(sizeof(stasis_handle_t)); if(!ret) { return NULL; } *ret = mem_func; @@ -283,8 +190,7 @@ stasis_handle_t * stasis_handle(open_memory)(lsn_t start_offset) { mem_impl * impl = malloc(sizeof(mem_impl)); ret->impl = impl; pthread_mutex_init(&(impl->mut), 0); - impl->start_pos = start_offset; - impl->end_pos = start_offset; + impl->end_pos = 0; impl->buf = malloc(0); impl->refcount = 1; diff --git a/src/stasis/io/non_blocking.c b/src/stasis/io/non_blocking.c index 1151d33..4f7d38c 100644 --- a/src/stasis/io/non_blocking.c +++ b/src/stasis/io/non_blocking.c @@ -142,7 +142,6 @@ typedef struct nbw_impl { pthread_mutex_t mut; // Handle state - lsn_t start_pos; lsn_t end_pos; // Fields to manage slow handles @@ -396,13 +395,6 @@ static stasis_handle_t * nbw_dup(stasis_handle_t *h) { static void nbw_enable_sequential_optimizations(stasis_handle_t *h) { // TODO non blocking should pass sequential optimizations down to underlying handles. } -static lsn_t nbw_start_position(stasis_handle_t *h) { - nbw_impl * impl = h->impl; - pthread_mutex_lock(&impl->mut); - lsn_t ret = impl->start_pos; - pthread_mutex_unlock(&impl->mut); - return ret; -} static lsn_t nbw_end_position(stasis_handle_t *h) { nbw_impl * impl = h->impl; pthread_mutex_lock(&impl->mut); @@ -430,8 +422,8 @@ static stasis_write_buffer_t * nbw_write_buffer(stasis_handle_t * h, if(!ret->error) { pthread_mutex_lock(&impl->mut); - assert(impl->start_pos <= impl->end_pos); - if(off < impl->start_pos) { + assert(0 <= impl->end_pos); + if(off < 0) { // Note: We're returning a valid write buffer to space before // the handle's truncation point. Spooky. ret->error = EDOM; @@ -444,18 +436,6 @@ static stasis_write_buffer_t * nbw_write_buffer(stasis_handle_t * h, return ret; } -static stasis_write_buffer_t * nbw_append_buffer(stasis_handle_t * h, - lsn_t len) { - nbw_impl * impl = h->impl; - - pthread_mutex_lock(&impl->mut); - lsn_t off = impl->end_pos; - impl->end_pos += len; - impl->requested_bytes_written += len; - pthread_mutex_unlock(&impl->mut); - - return nbw_write_buffer(h, off, len); -} static int nbw_release_write_buffer(stasis_write_buffer_t * w) { nbw_impl * impl = w->h->impl; write_buffer_impl * w_impl = w->impl; @@ -514,8 +494,8 @@ static int nbw_write(stasis_handle_t * h, lsn_t off, releaseFastHandle(impl, n, DIRTY); if(!ret) { pthread_mutex_lock(&impl->mut); - assert(impl->start_pos <= impl->end_pos); - if(off < impl->start_pos) { + assert(0 <= impl->end_pos); + if(off < 0) { ret = EDOM; } else if(off + len > impl->end_pos) { impl->end_pos = off+len; @@ -525,19 +505,6 @@ static int nbw_write(stasis_handle_t * h, lsn_t off, } return ret; } -static int nbw_append(stasis_handle_t * h, lsn_t * off, - const byte * dat, lsn_t len) { - nbw_impl * impl = h->impl; - - pthread_mutex_lock(&impl->mut); - *off = impl->end_pos; - impl->end_pos+= len; - impl->requested_bytes_written += len; - pthread_mutex_unlock(&impl->mut); - - int ret = nbw_write(h, *off, dat, len); - return ret; -} static int nbw_read(stasis_handle_t * h, lsn_t off, byte * buf, lsn_t len) { nbw_impl * impl = h->impl; @@ -603,7 +570,7 @@ static int nbw_force_range_impl(stasis_handle_t * h, lsn_t start, lsn_t stop) { static int nbw_force(stasis_handle_t * h) { nbw_impl * impl = h->impl; pthread_mutex_lock(&impl->mut); - int ret = nbw_force_range_impl(h, impl->start_pos, impl->end_pos); + int ret = nbw_force_range_impl(h, 0, impl->end_pos); pthread_mutex_unlock(&impl->mut); return ret; } @@ -616,21 +583,6 @@ static int nbw_force_range(stasis_handle_t * h, pthread_mutex_unlock(&impl->mut); return ret; } -static int nbw_truncate_start(stasis_handle_t * h, lsn_t new_start) { - nbw_impl * impl = h->impl; - int error = 0; - pthread_mutex_lock(&impl->mut); - if(new_start <= impl->end_pos && new_start > impl->start_pos) { - impl->start_pos = new_start; - } else { - error = EDOM; - } - pthread_mutex_unlock(&impl->mut); - if(!error) { - // @todo close all slow handles; truncate them. (ie: implement truncate) - } - return error; -} struct stasis_handle_t nbw_func = { .num_copies = nbw_num_copies, @@ -638,19 +590,15 @@ struct stasis_handle_t nbw_func = { .close = nbw_close, .dup = nbw_dup, .enable_sequential_optimizations = nbw_enable_sequential_optimizations, - .start_position = nbw_start_position, .end_position = nbw_end_position, .write = nbw_write, - .append = nbw_append, .write_buffer = nbw_write_buffer, - .append_buffer = nbw_append_buffer, .release_write_buffer = nbw_release_write_buffer, .read = nbw_read, .read_buffer = nbw_read_buffer, .release_read_buffer = nbw_release_read_buffer, .force = nbw_force, .force_range = nbw_force_range, - .truncate_start = nbw_truncate_start, .error = 0 }; @@ -691,7 +639,7 @@ static void * nbw_worker(void * handle) { writes++; stasis_handle_t * fast = node->h; - lsn_t off = fast->start_position(fast); + lsn_t off = 0; lsn_t len = fast->end_position(fast) - off; stasis_read_buffer_t * r = fast->read_buffer(fast, off, len); @@ -829,7 +777,6 @@ stasis_handle_t * stasis_handle(open_non_blocking) nbw_impl * impl = malloc(sizeof(nbw_impl)); pthread_mutex_init(&impl->mut, 0); - impl->start_pos = 0; impl->end_pos = 0; impl->slow_factory = slow_factory; diff --git a/src/stasis/io/pfile.c b/src/stasis/io/pfile.c index 2cfde23..5d392d8 100644 --- a/src/stasis/io/pfile.c +++ b/src/stasis/io/pfile.c @@ -36,11 +36,6 @@ DECLARE_HISTOGRAM_64(force_range_hist) Per-handle information for pfile */ typedef struct pfile_impl { - /** - The logical offset of the file. Once the file is open, this will - never change, as pfile doesn't support truncation. - */ - lsn_t start_pos; /** File descriptor */ @@ -86,7 +81,7 @@ static int pfile_close(stasis_handle_t *h) { static stasis_handle_t * pfile_dup(stasis_handle_t *h) { pfile_impl *impl = h->impl; - return stasis_handle_open_pfile(impl->start_pos, impl->filename, impl->file_flags, impl->file_mode); + return stasis_handle_open_pfile(impl->filename, impl->file_flags, impl->file_mode); } static void pfile_enable_sequential_optimizations(stasis_handle_t *h) { pfile_impl *impl = h->impl; @@ -94,11 +89,6 @@ static void pfile_enable_sequential_optimizations(stasis_handle_t *h) { int err = posix_fadvise(impl->fd, 0, 0, POSIX_FADV_SEQUENTIAL); if(err) perror("Attempt to pass POSIX_FADV_SEQUENTIAL to kernel failed"); } -static lsn_t pfile_start_position(stasis_handle_t *h) { - pfile_impl *impl = (pfile_impl*)h->impl; - return impl->start_pos; -} - static lsn_t pfile_end_position(stasis_handle_t *h) { pfile_impl *impl = (pfile_impl*)h->impl; lsn_t ret = lseek(impl->fd, 0, SEEK_END); @@ -144,7 +134,7 @@ static int pfile_read(stasis_handle_t *h, lsn_t off, byte *buf, lsn_t len) { pfile_impl *impl = (pfile_impl*)(h->impl); int error = 0; - if (off < impl->start_pos) { + if (off < 0) { error = EDOM; } else { ssize_t bytes_read = 0; @@ -153,7 +143,7 @@ static int pfile_read(stasis_handle_t *h, lsn_t off, byte *buf, lsn_t len) { ssize_t count = pread(impl->fd, buf + bytes_read, len - bytes_read, - off + bytes_read - impl->start_pos); + off + bytes_read); if (count == -1) { if (errno == EAGAIN || errno == EINTR) { count = 0; @@ -162,10 +152,12 @@ static int pfile_read(stasis_handle_t *h, lsn_t off, byte *buf, lsn_t len) { h->error = EBADF; } else { int err = errno; - // XXX Why is sys_errlist[] is unavailable here? + // The other errors either involve memory bugs (EFAULT), logic bugs + // (EISDIR, EFIFO, EOVERFLOW), or bad hardware (EIO), so print + // something to console, and uncleanly crash. perror("pfile_read encountered an unknown error code."); fprintf(stderr, "pread() returned -1; errno is %d\n",err); - abort(); // XXX other errors? + abort(); } error = errno; break; @@ -194,24 +186,14 @@ static int pfile_write(stasis_handle_t *h, lsn_t off, const byte *dat, lsn_t len) { pfile_impl *impl = (pfile_impl*)(h->impl); int error = 0; - lsn_t phys_off; - if (impl->start_pos > off) { + if (off < 0) { error = EDOM; } else { - phys_off = off - impl->start_pos; - error = pfile_write_unlocked(impl->fd, phys_off, dat, len); + error = pfile_write_unlocked(impl->fd, off, dat, len); } return error; } -static int pfile_append(stasis_handle_t *h, lsn_t *off, const byte *dat, - lsn_t len) { - pfile_impl *impl = (pfile_impl*)(h->impl); - lsn_t phys_off = *off - impl->start_pos; - abort(); // xxx o append is harder than I thought. Should support via distinct API. - return pfile_write_unlocked(impl->fd, phys_off, dat,len); -} - static stasis_write_buffer_t * pfile_write_buffer(stasis_handle_t *h, lsn_t off, lsn_t len) { stasis_write_buffer_t *ret = malloc(sizeof(stasis_write_buffer_t)); @@ -221,11 +203,9 @@ static stasis_write_buffer_t * pfile_write_buffer(stasis_handle_t *h, return NULL; } - pfile_impl *impl = (pfile_impl*)h->impl; - int error = 0; - if (impl->start_pos > off) { + if (off < 0) { error = EDOM; } @@ -252,44 +232,10 @@ static stasis_write_buffer_t * pfile_write_buffer(stasis_handle_t *h, return ret; } -static stasis_write_buffer_t *pfile_append_buffer(stasis_handle_t *h, - lsn_t len) { - // Allocate the handle - stasis_write_buffer_t *ret = malloc(sizeof(stasis_write_buffer_t)); - if (!ret) { return NULL; } - - // Obtain an appropriate offset - off_t off; abort(); // XXX need O_APPEND handle. - - // Allocate the buffer - byte *buf = malloc(len); - int error = 0; - if (!buf) { - error = ENOMEM; - } - - if (error) { - ret->h = h; - ret->off = 0; - ret->buf = 0; - ret->len = 0; - ret->impl = 0; - ret->error = error; - } else { - ret->h = h; - ret->off = off; - ret->buf = buf; - ret->len = len; - ret->impl = 0; - ret->error = 0; - } - return ret; -} - static int pfile_release_write_buffer(stasis_write_buffer_t *w) { pfile_impl *impl = (pfile_impl*)(w->h->impl); - int error = pfile_write_unlocked(impl->fd, w->off-impl->start_pos, w->buf, + int error = pfile_write_unlocked(impl->fd, w->off, w->buf, w->len); if (w->buf) { @@ -390,21 +336,12 @@ static int pfile_force_range(stasis_handle_t *h, lsn_t start, lsn_t stop) { int ret = 0; #endif if(impl->sequential) { - int err = posix_fadvise(impl->fd, start-impl->start_pos, stop-start, POSIX_FADV_DONTNEED); + int err = posix_fadvise(impl->fd, start, stop-start, POSIX_FADV_DONTNEED); if(err) perror("Attempt to pass POSIX_FADV_SEQUENTIAL (for a range of a file) to kernel failed"); } TOCK(force_range_hist); return ret; } -static int pfile_truncate_start(stasis_handle_t *h, lsn_t new_start) { - static int truncate_warned = 0; - if (!truncate_warned) { - printf("\nWarning: pfile doesn't support truncation; " - "ignoring truncation request\n"); - truncate_warned = 1; - } - return 0; -} static struct stasis_handle_t pfile_func = { .num_copies = pfile_num_copies, @@ -412,25 +349,20 @@ static struct stasis_handle_t pfile_func = { .close = pfile_close, .dup = pfile_dup, .enable_sequential_optimizations = pfile_enable_sequential_optimizations, - .start_position = pfile_start_position, .end_position = pfile_end_position, .write = pfile_write, - .append = pfile_append, .write_buffer = pfile_write_buffer, - .append_buffer = pfile_append_buffer, .release_write_buffer = pfile_release_write_buffer, .read = pfile_read, .read_buffer = pfile_read_buffer, .release_read_buffer = pfile_release_read_buffer, .force = pfile_force, .force_range = pfile_force_range, - .truncate_start = pfile_truncate_start, .error = 0 }; -stasis_handle_t *stasis_handle(open_pfile)(lsn_t start_offset, - const char *filename, - int flags, int mode) { +stasis_handle_t *stasis_handle(open_pfile)(const char *filename, + int flags, int mode) { stasis_handle_t *ret = malloc(sizeof(stasis_handle_t)); if (!ret) { return NULL; } *ret = pfile_func; @@ -445,8 +377,6 @@ stasis_handle_t *stasis_handle(open_pfile)(lsn_t start_offset, ret->error = errno; } - impl->start_pos = start_offset; - impl->filename = strdup(filename); impl->file_flags = flags; impl->file_mode = mode; diff --git a/src/stasis/io/raid1.c b/src/stasis/io/raid1.c index c4a6003..1cb9f43 100644 --- a/src/stasis/io/raid1.c +++ b/src/stasis/io/raid1.c @@ -52,10 +52,6 @@ static void raid1_enable_sequential_optimizations(stasis_handle_t *h) { i->a->enable_sequential_optimizations(i->a); i->b->enable_sequential_optimizations(i->b); } -static lsn_t raid1_start_position(stasis_handle_t *h) { - raid1_impl *i = h->impl; - return i->a->start_position(i->a); -} static lsn_t raid1_end_position(stasis_handle_t *h) { raid1_impl *i = h->impl; return i->a->end_position(i->a); @@ -77,25 +73,12 @@ static int raid1_write(stasis_handle_t *h, lsn_t off, const byte *dat, lsn_t len int retB = i->b->write(i->b, off, dat, len); return retA ? retA : retB; } -static int raid1_append(stasis_handle_t *h, lsn_t *off, const byte *dat, lsn_t len) { - raid1_impl *i = h->impl; - int retA = i->a->append(i->a, off, dat, len); - int retB = i->b->write (i->b, *off, dat, len); - assert((retA && retB) || !(retA || retB)); - return retA ? retA : retB; -} static stasis_write_buffer_t * raid1_write_buffer(stasis_handle_t *h, lsn_t off, lsn_t len) { raid1_impl *i = h->impl; stasis_write_buffer_t * ret = i->a->write_buffer(i->a, off, len); ret->h = h; return ret; } -static stasis_write_buffer_t * raid1_append_buffer(stasis_handle_t *h, lsn_t len) { - raid1_impl *i = h->impl; - stasis_write_buffer_t * ret = i->a->append_buffer(i->a, len); - ret->h = h; - return ret; -} static int raid1_release_write_buffer(stasis_write_buffer_t *w) { raid1_impl *i = w->h->impl; w->h = i->a; @@ -132,31 +115,21 @@ static int raid1_force_range(stasis_handle_t *h, lsn_t start, lsn_t stop) { int retB = i->b->force_range(i->b, start, stop); return retA ? retA : retB; } -static int raid1_truncate_start(stasis_handle_t *h, lsn_t new_start) { - raid1_impl *i = h->impl; - int retA = i->a->truncate_start(i->a, new_start); - int retB = i->b->truncate_start(i->b, new_start); - return retA ? retA : retB; -} struct stasis_handle_t raid1_func = { .num_copies = raid1_num_copies, .num_copies_buffer = raid1_num_copies_buffer, .close = raid1_close, .dup = raid1_dup, .enable_sequential_optimizations = raid1_enable_sequential_optimizations, - .start_position = raid1_start_position, .end_position = raid1_end_position, .write = raid1_write, - .append = raid1_append, .write_buffer = raid1_write_buffer, - .append_buffer = raid1_append_buffer, .release_write_buffer = raid1_release_write_buffer, .read = raid1_read, .read_buffer = raid1_read_buffer, .release_read_buffer = raid1_release_read_buffer, .force = raid1_force, .force_range = raid1_force_range, - .truncate_start = raid1_truncate_start, .error = 0 }; @@ -170,8 +143,8 @@ stasis_handle_t * stasis_handle_open_raid1(stasis_handle_t* a, stasis_handle_t* } stasis_handle_t * stasis_handle_raid1_factory() { - stasis_handle_t * a = stasis_handle_file_factory(0, stasis_store_file_1_name, O_CREAT | O_RDWR | stasis_buffer_manager_io_handle_flags, FILE_PERM); - stasis_handle_t * b = stasis_handle_file_factory(0, stasis_store_file_2_name, O_CREAT | O_RDWR | stasis_buffer_manager_io_handle_flags, FILE_PERM); + stasis_handle_t * a = stasis_handle_file_factory(stasis_store_file_1_name, O_CREAT | O_RDWR | stasis_buffer_manager_io_handle_flags, FILE_PERM); + stasis_handle_t * b = stasis_handle_file_factory(stasis_store_file_2_name, O_CREAT | O_RDWR | stasis_buffer_manager_io_handle_flags, FILE_PERM); return stasis_handle_open_raid1(a, b); } #endif diff --git a/stasis/flags.h b/stasis/flags.h index 579a608..f08b416 100644 --- a/stasis/flags.h +++ b/stasis/flags.h @@ -69,14 +69,14 @@ extern stasis_handle_t* (*stasis_handle_factory)(); Valid options: stasis_handle_open_file(), stasis_handle_open_pfile(), and stasis_handle_non_blocking_factory. */ -extern stasis_handle_t* (*stasis_handle_file_factory)(lsn_t logical_offset, const char* filename, int open_mode, int creat_perms); +extern stasis_handle_t* (*stasis_handle_file_factory)(const char* filename, int open_mode, int creat_perms); /** The factory that non_blocking handles will use for slow handles. (Only used if stasis_buffer_manager_io_handle_default_factory is set to stasis_non_blocking_factory.) */ -extern stasis_handle_t* (*stasis_non_blocking_handle_file_factory)(lsn_t logical_offset, const char* filename, int open_mode, int creat_perms); +extern stasis_handle_t* (*stasis_non_blocking_handle_file_factory)(const char* filename, int open_mode, int creat_perms); /** If true, the buffer manager will use O_DIRECT, O_SYNC and so on (Mandatory diff --git a/stasis/io/handle.h b/stasis/io/handle.h index b091897..7f72afa 100644 --- a/stasis/io/handle.h +++ b/stasis/io/handle.h @@ -117,8 +117,6 @@ typedef struct stasis_handle_t { * Optimize the handle for sequential reads and writes. */ void (*enable_sequential_optimizations)(struct stasis_handle_t *h); - /** The offset of the handle's first byte */ - lsn_t (*start_position)(struct stasis_handle_t * h); /** The offset of the byte after the end of the handle's data. */ lsn_t (*end_position)(struct stasis_handle_t * h); @@ -134,20 +132,20 @@ typedef struct stasis_handle_t { */ struct stasis_write_buffer_t * (*write_buffer)(struct stasis_handle_t * h, lsn_t off, lsn_t len); - /** - Increase the size of the file, and obtain a write buffer at the - beginning of the new space. - - The behavior of calls that attempt to access this region before - release_write_buffer() returns is undefined. Calls to append - that are made before the buffer is released are legal, and will - append data starting at the new end of file. - - @param h The handle - @param len The length, in bytes, of the write buffer. - */ - struct stasis_write_buffer_t * (*append_buffer)(struct stasis_handle_t * h, - lsn_t len); +// /** +// Increase the size of the file, and obtain a write buffer at the +// beginning of the new space. +// +// The behavior of calls that attempt to access this region before +// release_write_buffer() returns is undefined. Calls to append +// that are made before the buffer is released are legal, and will +// append data starting at the new end of file. +// +// @param h The handle +// @param len The length, in bytes, of the write buffer. +// */ +// struct stasis_write_buffer_t * (*append_buffer)(struct stasis_handle_t * h, +// lsn_t len); /** Release a write buffer and associated resources. */ @@ -182,17 +180,17 @@ typedef struct stasis_handle_t { */ int (*write)(struct stasis_handle_t * h, lsn_t off, const byte * dat, lsn_t len); - /** - Append data to the end of the file. Once append returns, future - calls to the handle will reflect the update. - - @param h The handle - @param off The position of the first byte to be written - @param dat A buffer containin the data to be written - @param len The number of bytes to be written - */ - int (*append)(struct stasis_handle_t * h, lsn_t * off, const byte * dat, - lsn_t len); +// /** +// Append data to the end of the file. Once append returns, future +// calls to the handle will reflect the update. +// +// @param h The handle +// @param off The position of the first byte to be written +// @param dat A buffer containin the data to be written +// @param len The number of bytes to be written +// */ +// int (*append)(struct stasis_handle_t * h, lsn_t * off, const byte * dat, +// lsn_t len); /** Read data from the file. The region may be safely written to once read returns. @@ -211,17 +209,6 @@ typedef struct stasis_handle_t { */ int (*force)(struct stasis_handle_t * h); int (*force_range)(struct stasis_handle_t * h, lsn_t start, lsn_t stop); - /** - Truncate bytes from the beginning of the file. This is needed by - the log manager. - - @param h The handle - - @param new_start The offest of the first byte in the handle that - must be preserved. Bytes before this point may or may not - be retained after this function returns. - */ - int (*truncate_start)(struct stasis_handle_t * h, lsn_t new_start); /** The handle's error flag; this passes errors to the caller when they can't be returned directly. @@ -257,7 +244,7 @@ typedef struct stasis_read_buffer_t { @param start_offset The logical offset of the first byte in the handle */ -stasis_handle_t * stasis_handle(open_memory)(lsn_t start_offset); +stasis_handle_t * stasis_handle(open_memory)(void); /** Open a handle that is backed by a file. This handle uses the unix read(),write() I/O interfaces. Due to limitations in read() and @@ -274,7 +261,7 @@ stasis_handle_t * stasis_handle(open_memory)(lsn_t start_offset); @param perm The file permissions to be passed to open() */ stasis_handle_t * stasis_handle(open_file) - (lsn_t start_offset, const char * path, int flags, int perm); + (const char * path, int flags, int perm); /** Open a handle that is backed by a file. This handle uses pread() and pwrite(). It never holds a mutex while perfoming I/O. @@ -289,7 +276,7 @@ stasis_handle_t * stasis_handle(open_file) @param perm The file permissions to be passed to open() */ stasis_handle_t * stasis_handle(open_pfile) - (lsn_t start_offset, const char * path, int flags, int perm); + (const char * path, int flags, int perm); /** Given a factory for creating "fast" and "slow" handles, provide a handle that never makes callers wait for write requests to diff --git a/test/stasis/check_io.c b/test/stasis/check_io.c index 75ecb46..27c1619 100644 --- a/test/stasis/check_io.c +++ b/test/stasis/check_io.c @@ -51,24 +51,13 @@ terms specified in this license. #define LOG_NAME "check_io.log" -int handle_truncate_is_supported = 1; - void handle_smoketest(stasis_handle_t * h) { const int one = 0x11111111; const int two = 0x22222222; - const int three = 0x33333333; - const int four = 0x44444444; - - lsn_t off; - h->append(h, &off, 0, 0); - assert(off == 0); assert((!h->num_copies(h)) || (!h->num_copies_buffer(h))); - assert(0 == h->start_position(h) || - 0 == h->end_position(h)); - assert(! h->write(h, 0, (byte*)&one, sizeof(int))); int one_read = 0; @@ -88,26 +77,6 @@ void handle_smoketest(stasis_handle_t * h) { assert(! h->read(h, sizeof(int), (byte*)&two_read, sizeof(int))); assert(two == two_read); - assert(! h->append(h, &off, (byte*)&three, sizeof(int))); - - w = h->append_buffer(h, sizeof(int)); - memcpy(w->buf, &four, sizeof(int)); - w->h->release_write_buffer(w); - if(handle_truncate_is_supported) { - h->truncate_start(h, 2 * sizeof(int)); - } - int three_read = 0; - int four_read = 0; - ret = h->read(h, 2*sizeof(int), (byte*)&three_read, sizeof(int)); - assert(!ret); - - r = h->read_buffer(h, 3*sizeof(int), sizeof(int)); - memcpy(&four_read, r->buf, sizeof(int)); - r->h->release_read_buffer(r); - - assert(three == three_read); - assert(four == four_read); - } @@ -123,20 +92,13 @@ typedef struct { lsn_t trunc_val; pthread_mutex_t trunc_mut = PTHREAD_MUTEX_INITIALIZER; - void load_handle(thread_arg* t) { lsn_t * offsets = malloc(t->count * sizeof(lsn_t)); stasis_handle_t * h = t->h; - if(handle_truncate_is_supported) { - for(int i = 0; i < t->count; i++) { - offsets[i] = -1; - } - } else { - for(int i = 0; i < t->count; i++) { - offsets[i] = i * sizeof(int); - } + for(int i = 0; i < t->count; i++) { + offsets[i] = i * sizeof(int); } for(int i = 0; i < OPS_PER_THREAD; i++) { int val = myrandom(t->count); @@ -144,67 +106,46 @@ void load_handle(thread_arg* t) { if(offsets[val] == -1) { // Need to write it somewhere. - long choice = myrandom(4); + long choice = myrandom(2); switch(choice) { case 0: { // overwrite old entry with write() - long val2 = myrandom(t->count); - offsets[val] = offsets[val2]; - offsets[val2] = -1; - if(offsets[val] != -1) { - int ret = h->write(h, offsets[val], (const byte*)&(t->values[val]), sizeof(int)); - if(ret) { - assert(ret == EDOM); - offsets[val] = -1; - i--; - } - } else { - i--; - } - } break; - case 1: { // overwrite old entry with write_buffer() - long val2 = myrandom(t->count); - offsets[val] = offsets[val2]; - offsets[val2] = -1; - if(offsets[val] != -1) { - stasis_write_buffer_t * w = h->write_buffer(h, offsets[val], sizeof(int)); - if(!w->error) { - *((int*)w->buf) = t->values[val]; - assert(w->len == sizeof(int)); - assert(w->off == offsets[val]); - } else { - assert(w->error == EDOM); - offsets[val] = -1; - i--; - } - w->h->release_write_buffer(w); - } else { - i--; - } - } break; - case 2: { // append - if(handle_truncate_is_supported) { - lsn_t oldend = h->end_position(h); - int ret = h->append(h, &(offsets[val]), (const byte*)&(t->values[val]), sizeof(int)); - assert(!ret || oldend < h->start_position(h)); + long val2 = myrandom(t->count); + offsets[val] = offsets[val2]; + offsets[val2] = -1; + if(offsets[val] != -1) { + int ret = h->write(h, offsets[val], (const byte*)&(t->values[val]), sizeof(int)); + if(ret) { + assert(ret == EDOM); + offsets[val] = -1; + i--; + } + } else { + i--; } } break; - case 3: { // append_buffer - if(handle_truncate_is_supported) { - lsn_t oldend = h->end_position(h); - stasis_write_buffer_t * w = h->append_buffer(h, sizeof(int)); + case 1: { // overwrite old entry with write_buffer() + long val2 = myrandom(t->count); + offsets[val] = offsets[val2]; + offsets[val2] = -1; + if(offsets[val] != -1) { + stasis_write_buffer_t * w = h->write_buffer(h, offsets[val], sizeof(int)); if(!w->error) { *((int*)w->buf) = t->values[val]; assert(w->len == sizeof(int)); - offsets[val] = w->off; + assert(w->off == offsets[val]); } else { - assert(oldend < h->start_position(h)); + assert(w->error == EDOM); + offsets[val] = -1; + i--; } w->h->release_write_buffer(w); + } else { + i--; } } break; default: { - abort(); + abort(); } } @@ -228,8 +169,6 @@ void load_handle(thread_arg* t) { assert(j == t->values[val]); } else { assert(ret == EDOM); - if(handle_truncate_is_supported) - assert(h->start_position(h) > offsets[val]); } } break; case 1: { // read_buffer @@ -241,8 +180,6 @@ void load_handle(thread_arg* t) { } else { assert(r->error == EDOM); r->h->release_read_buffer(r); - if(handle_truncate_is_supported) - assert(h->start_position(h) > offsets[val]); } } break; default: @@ -254,26 +191,6 @@ void load_handle(thread_arg* t) { if(!myrandom(100)) { h->force(h); } - - // Truncate 1% of the time. - if(!myrandom(100) && handle_truncate_is_supported) { - lsn_t pre_start = h->start_position(h); - - pthread_mutex_lock(&trunc_mut); - lsn_t start = trunc_val; - lsn_t stop = start - 100 + myrandom(200); - if(stop > trunc_val) { - trunc_val = stop; - } - pthread_mutex_unlock(&trunc_mut); - - assert(pre_start <= start); - int ret = h->truncate_start(h, stop); - if(!ret) { - lsn_t post_stop = h->start_position(h); - assert(stop <= post_stop); - } - } } free(offsets); } @@ -341,15 +258,15 @@ void handle_concurrencytest(stasis_handle_t * h) { START_TEST(io_memoryTest) { printf("io_memoryTest\n"); fflush(stdout); stasis_handle_t * h; - h = stasis_handle(open_memory)(0); + h = stasis_handle(open_memory)(); // h = stasis_handle(open_debug)(h); handle_smoketest(h); h->close(h); - h = stasis_handle(open_memory)(0); + h = stasis_handle(open_memory)(); // h = stasis_handle(open_debug)(h); handle_sequentialtest(h); h->close(h); - h = stasis_handle(open_memory)(0); + h = stasis_handle(open_memory)(); // h = stasis_handle(open_debug)(h); handle_concurrencytest(h); h->close(h); @@ -358,21 +275,21 @@ START_TEST(io_memoryTest) { START_TEST(io_fileTest) { printf("io_fileTest\n"); fflush(stdout); stasis_handle_t * h; - h = stasis_handle(open_file)(0, "logfile.txt", O_CREAT | O_RDWR, FILE_PERM); + h = stasis_handle(open_file)("logfile.txt", O_CREAT | O_RDWR, FILE_PERM); // h = stasis_handle(open_debug)(h); handle_smoketest(h); h->close(h); remove("logfile.txt"); - h = stasis_handle(open_file)(0, "logfile.txt", O_CREAT | O_RDWR, FILE_PERM); + h = stasis_handle(open_file)("logfile.txt", O_CREAT | O_RDWR, FILE_PERM); //h = stasis_handle(open_debug)(h); handle_sequentialtest(h); h->close(h); remove("logfile.txt"); - h = stasis_handle(open_file)(0, "logfile.txt", O_CREAT | O_RDWR, FILE_PERM); + h = stasis_handle(open_file)("logfile.txt", O_CREAT | O_RDWR, FILE_PERM); // handle_concurrencytest(h); // fails by design h->close(h); @@ -382,35 +299,32 @@ START_TEST(io_fileTest) { START_TEST(io_pfileTest) { printf("io_pfileTest\n"); fflush(stdout); - handle_truncate_is_supported = 0; stasis_handle_t * h; - h = stasis_handle(open_pfile)(0, "logfile.txt", O_CREAT | O_RDWR, FILE_PERM); + h = stasis_handle(open_pfile)("logfile.txt", O_CREAT | O_RDWR, FILE_PERM); // h = stasis_handle(open_debug)(h); handle_smoketest(h); h->close(h); remove("logfile.txt"); - h = stasis_handle(open_pfile)(0, "logfile.txt", O_CREAT | O_RDWR, FILE_PERM); + h = stasis_handle(open_pfile)("logfile.txt", O_CREAT | O_RDWR, FILE_PERM); //h = stasis_handle(open_debug)(h); handle_sequentialtest(h); h->close(h); remove("logfile.txt"); - h = stasis_handle(open_pfile)(0, "logfile.txt", O_CREAT | O_RDWR, FILE_PERM); + h = stasis_handle(open_pfile)("logfile.txt", O_CREAT | O_RDWR, FILE_PERM); handle_concurrencytest(h); h->close(h); remove("logfile.txt"); - handle_truncate_is_supported = 1; } END_TEST START_TEST(io_raid1pfileTest) { printf("io_raid1pfileTest\n"); fflush(stdout); - handle_truncate_is_supported = 0; const char * A = "vol1.txt"; const char * B = "vol2.txt"; @@ -419,8 +333,8 @@ START_TEST(io_raid1pfileTest) { remove(B); stasis_handle_t *h, *a, *b; - a = stasis_handle(open_pfile)(0, A, O_CREAT | O_RDWR, FILE_PERM); - b = stasis_handle(open_pfile)(0, B, O_CREAT | O_RDWR, FILE_PERM); + a = stasis_handle(open_pfile)(A, O_CREAT | O_RDWR, FILE_PERM); + b = stasis_handle(open_pfile)(B, O_CREAT | O_RDWR, FILE_PERM); h = stasis_handle_open_raid1(a, b); // h = stasis_handle(open_debug)(h); @@ -430,8 +344,8 @@ START_TEST(io_raid1pfileTest) { remove(A); remove(B); - a = stasis_handle(open_pfile)(0, A, O_CREAT | O_RDWR, FILE_PERM); - b = stasis_handle(open_pfile)(0, B, O_CREAT | O_RDWR, FILE_PERM); + a = stasis_handle(open_pfile)(A, O_CREAT | O_RDWR, FILE_PERM); + b = stasis_handle(open_pfile)(B, O_CREAT | O_RDWR, FILE_PERM); h = stasis_handle_open_raid1(a, b); // h = stasis_handle(open_debug)(h); @@ -441,8 +355,8 @@ START_TEST(io_raid1pfileTest) { remove(A); remove(B); - a = stasis_handle(open_pfile)(0, A, O_CREAT | O_RDWR, FILE_PERM); - b = stasis_handle(open_pfile)(0, B, O_CREAT | O_RDWR, FILE_PERM); + a = stasis_handle(open_pfile)(A, O_CREAT | O_RDWR, FILE_PERM); + b = stasis_handle(open_pfile)(B, O_CREAT | O_RDWR, FILE_PERM); h = stasis_handle_open_raid1(a, b); // h = stasis_handle(open_debug)(h); @@ -452,8 +366,6 @@ START_TEST(io_raid1pfileTest) { remove(A); remove(B); - - handle_truncate_is_supported = 1; } END_TEST /* static stasis_handle_t * fast_factory(lsn_t off, lsn_t len, void * ignored) { diff --git a/utilities/logfile_dump.c b/utilities/logfile_dump.c index f1aaf51..a766550 100644 --- a/utilities/logfile_dump.c +++ b/utilities/logfile_dump.c @@ -11,7 +11,7 @@ static char * logEntryToString(const LogEntry * le) { char * ret = NULL; - int err; + int err = -1; switch(le->type) { case UPDATELOG: {