remove questionable truncate and append api from handle.c

This commit is contained in:
Sears Russell 2011-07-26 19:15:14 +00:00
parent 7f332f85cc
commit e692682cf8
13 changed files with 121 additions and 659 deletions

View file

@ -4,7 +4,7 @@ cmake_minimum_required(VERSION 2.4) # For all I know, 2.0 works too...
# The new behavior seems preferable, though it shouldn't affect us either way. # The new behavior seems preferable, though it shouldn't affect us either way.
if(COMMAND cmake_policy) if(COMMAND cmake_policy)
cmake_policy(SET CMP0014 OLD) ## This breaks builds on older cmake... # cmake_policy(SET CMP0014 OLD) ## This breaks builds on older cmake...
cmake_policy(SET CMP0003 NEW) cmake_policy(SET CMP0003 NEW)
endif(COMMAND cmake_policy) endif(COMMAND cmake_policy)

View file

@ -55,14 +55,14 @@ stasis_handle_t* (*stasis_handle_factory)() =
#else #else
stasis_handle_default_factory; stasis_handle_default_factory;
#endif #endif
stasis_handle_t* (*stasis_handle_file_factory)(lsn_t logical_offset, const char* filename, int open_mode, int creat_perms) = stasis_handle_t* (*stasis_handle_file_factory)(const char* filename, int open_mode, int creat_perms) =
#ifdef STASIS_FILE_HANDLE_FACTORY #ifdef STASIS_FILE_HANDLE_FACTORY
STASIS_FILE_HANDLE_FACTORY STASIS_FILE_HANDLE_FACTORY
#else #else
stasis_handle_open_pfile; stasis_handle_open_pfile;
#endif #endif
stasis_handle_t* (*stasis_non_blocking_handle_file_factory)(lsn_t logical_offset, const char* filename, int open_mode, int creat_perms) = stasis_handle_t* (*stasis_non_blocking_handle_file_factory)(const char* filename, int open_mode, int creat_perms) =
#ifdef STASIS_NON_BLOCKING_HANDLE_FILE_FACTORY #ifdef STASIS_NON_BLOCKING_HANDLE_FILE_FACTORY
STASIS_NON_BLOCKING_HANDLE_FILE_FACTORY STASIS_NON_BLOCKING_HANDLE_FILE_FACTORY
#else #else

View file

@ -48,14 +48,6 @@ static void debug_enable_sequential_optimizations(struct stasis_handle_t *h) {
hh->enable_sequential_optimizations(hh); hh->enable_sequential_optimizations(hh);
printf("tid=%9ld retn enable_sequential_optimizations(%lx)\n", (long)(intptr_t)pthread_self(), (unsigned long)hh); fflush(stdout); printf("tid=%9ld retn enable_sequential_optimizations(%lx)\n", (long)(intptr_t)pthread_self(), (unsigned long)hh); fflush(stdout);
} }
static lsn_t debug_start_position(stasis_handle_t *h) {
stasis_handle_t * hh = ((debug_impl*)h->impl)->h;
printf("tid=%9ld call start_position(%lx)\n", (long)(intptr_t)pthread_self(), (unsigned long)hh); fflush(stdout);
int ret = hh->start_position(hh);
printf("tid=%9ld retn start_position(%lx) = %d\n", (long)(intptr_t)pthread_self(), (unsigned long)hh, ret); fflush(stdout);
return ret;
}
static lsn_t debug_end_position(stasis_handle_t *h) { static lsn_t debug_end_position(stasis_handle_t *h) {
stasis_handle_t * hh = ((debug_impl*)h->impl)->h; stasis_handle_t * hh = ((debug_impl*)h->impl)->h;
printf("tid=%9ld call end_position(%lx)\n", (long)(intptr_t)pthread_self(), (unsigned long)hh); fflush(stdout); printf("tid=%9ld call end_position(%lx)\n", (long)(intptr_t)pthread_self(), (unsigned long)hh); fflush(stdout);
@ -77,21 +69,6 @@ static stasis_write_buffer_t * debug_write_buffer(stasis_handle_t * h,
(long)(intptr_t)pthread_self(), (unsigned long)hh, off, len, (unsigned long)retWrap); fflush(stdout); (long)(intptr_t)pthread_self(), (unsigned long)hh, off, len, (unsigned long)retWrap); fflush(stdout);
return retWrap; return retWrap;
} }
static stasis_write_buffer_t * debug_append_buffer(stasis_handle_t * h,
lsn_t len) {
stasis_handle_t * hh = ((debug_impl*)h->impl)->h;
printf("tid=%9ld call append_buffer(%lx, %lld)\n",
(long)(intptr_t)pthread_self(), (unsigned long)hh, len); fflush(stdout);
stasis_write_buffer_t * ret = hh->append_buffer(hh,len);
stasis_write_buffer_t * retWrap = malloc(sizeof(stasis_write_buffer_t));
*retWrap = *ret;
retWrap->h = h;
retWrap->impl = ret;
printf("tid=%9ld retn append_buffer(%lx, %lld) = %lx (off=%lld)\n",
(long)(intptr_t)pthread_self(), (unsigned long)hh, len, (unsigned long)retWrap, ret->off); fflush(stdout);
return retWrap;
}
static int debug_release_write_buffer(stasis_write_buffer_t * w_wrap) { static int debug_release_write_buffer(stasis_write_buffer_t * w_wrap) {
stasis_write_buffer_t * w = (stasis_write_buffer_t*)w_wrap->impl; stasis_write_buffer_t * w = (stasis_write_buffer_t*)w_wrap->impl;
stasis_handle_t * hh = w->h; stasis_handle_t * hh = w->h;
@ -137,19 +114,6 @@ static int debug_write(stasis_handle_t * h, lsn_t off,
printf("tid=%9ld retn write(%lx) = %d\n", (long)(intptr_t)pthread_self(), (unsigned long)hh, ret); fflush(stdout); printf("tid=%9ld retn write(%lx) = %d\n", (long)(intptr_t)pthread_self(), (unsigned long)hh, ret); fflush(stdout);
return ret; return ret;
} }
static int debug_append(stasis_handle_t * h, lsn_t * off,
const byte * dat, lsn_t len) {
stasis_handle_t * hh = ((debug_impl*)h->impl)->h;
printf("tid=%9ld call append(%lx, ??, %lx, %lld)\n", (long)(intptr_t)pthread_self(), (unsigned long)hh, (unsigned long)dat, len); fflush(stdout);
lsn_t tmpOff;
if(!off) {
off = &tmpOff;
}
int ret = hh->append(hh, off, dat, len);
printf("tid=%9ld retn append(%lx, %lld, %lx, %lld) = %d\n", (long)(intptr_t)pthread_self(), (unsigned long)hh, *off, (unsigned long) dat, len, ret); fflush(stdout);
return ret;
}
static int debug_read(stasis_handle_t * h, static int debug_read(stasis_handle_t * h,
lsn_t off, byte * buf, lsn_t len) { lsn_t off, byte * buf, lsn_t len) {
stasis_handle_t * hh = ((debug_impl*)h->impl)->h; stasis_handle_t * hh = ((debug_impl*)h->impl)->h;
@ -172,33 +136,21 @@ static int debug_force_range(stasis_handle_t *h, lsn_t start, lsn_t stop) {
printf("tid=%9ld retn force(%lx) = %d\n", (long)(intptr_t)pthread_self(), (unsigned long)hh, ret); fflush(stdout); printf("tid=%9ld retn force(%lx) = %d\n", (long)(intptr_t)pthread_self(), (unsigned long)hh, ret); fflush(stdout);
return ret; return ret;
} }
static int debug_truncate_start(stasis_handle_t * h, lsn_t new_start) {
stasis_handle_t * hh = ((debug_impl*)h->impl)->h;
printf("tid=%9ld call truncate_start(%lx, %lld)\n", (long)(intptr_t)pthread_self(), (unsigned long)hh, new_start); fflush(stdout);
int ret = hh->truncate_start(hh, new_start);
printf("tid=%9ld retn truncate_start(%lx) = %d\n", (long)(intptr_t)pthread_self(), (unsigned long)hh, ret); fflush(stdout);
return ret;
}
struct stasis_handle_t debug_func = { struct stasis_handle_t debug_func = {
.num_copies = debug_num_copies, .num_copies = debug_num_copies,
.num_copies_buffer = debug_num_copies_buffer, .num_copies_buffer = debug_num_copies_buffer,
.close = debug_close, .close = debug_close,
.dup = debug_dup, .dup = debug_dup,
.enable_sequential_optimizations = debug_enable_sequential_optimizations, .enable_sequential_optimizations = debug_enable_sequential_optimizations,
.start_position = debug_start_position,
.end_position = debug_end_position, .end_position = debug_end_position,
.write = debug_write, .write = debug_write,
.append = debug_append,
.write_buffer = debug_write_buffer, .write_buffer = debug_write_buffer,
.append_buffer = debug_append_buffer,
.release_write_buffer = debug_release_write_buffer, .release_write_buffer = debug_release_write_buffer,
.read = debug_read, .read = debug_read,
.read_buffer = debug_read_buffer, .read_buffer = debug_read_buffer,
.release_read_buffer = debug_release_read_buffer, .release_read_buffer = debug_release_read_buffer,
.force = debug_force, .force = debug_force,
.force_range = debug_force_range, .force_range = debug_force_range,
.truncate_start = debug_truncate_start,
.error = 0 .error = 0
}; };

View file

@ -10,7 +10,6 @@
typedef struct file_impl { typedef struct file_impl {
pthread_mutex_t mut; pthread_mutex_t mut;
lsn_t start_pos;
lsn_t end_pos; lsn_t end_pos;
int fd; int fd;
int file_flags; int file_flags;
@ -24,7 +23,7 @@ static int updateEOF(stasis_handle_t * h) {
if(pos == (off_t)-1) { if(pos == (off_t)-1) {
return errno; return errno;
} else { } else {
impl->end_pos = impl->start_pos + pos; impl->end_pos = pos;
return 0; return 0;
} }
} }
@ -48,19 +47,11 @@ static int file_close(stasis_handle_t * h) {
} }
static stasis_handle_t* file_dup(stasis_handle_t * h) { static stasis_handle_t* file_dup(stasis_handle_t * h) {
file_impl * impl = h->impl; file_impl * impl = h->impl;
return stasis_handle_open_file(impl->start_pos, impl->filename, impl->file_flags, impl->file_mode); return stasis_handle_open_file(impl->filename, impl->file_flags, impl->file_mode);
} }
static void file_enable_sequential_optimizations(stasis_handle_t * h) { static void file_enable_sequential_optimizations(stasis_handle_t * h) {
// TODO enable_sequential_optimizations is a no-op in file.c // TODO enable_sequential_optimizations is a no-op in file.c
} }
static lsn_t file_start_position(stasis_handle_t *h) {
file_impl * impl = (file_impl*)h->impl;
pthread_mutex_lock(&(impl->mut));
lsn_t ret = impl->start_pos;
pthread_mutex_unlock(&(impl->mut));
return ret;
}
static lsn_t file_end_position(stasis_handle_t *h) { static lsn_t file_end_position(stasis_handle_t *h) {
file_impl * impl = (file_impl*)h->impl; file_impl * impl = (file_impl*)h->impl;
pthread_mutex_lock(&(impl->mut)); pthread_mutex_lock(&(impl->mut));
@ -82,12 +73,12 @@ inline static int file_write_unlocked(stasis_handle_t * h, lsn_t off,
int error = 0; int error = 0;
// These should have been checked by the caller. // These should have been checked by the caller.
assert(impl->start_pos <= off); assert(0 <= off);
assert(impl->end_pos >= off+len); assert(impl->end_pos >= off+len);
// @todo need a test harness that gets read(), write() and lseek() to misbehave. // @todo need a test harness that gets read(), write() and lseek() to misbehave.
off_t lseek_offset = lseek(impl->fd, off - impl->start_pos, SEEK_SET); off_t lseek_offset = lseek(impl->fd, off, SEEK_SET);
if(lseek_offset == (off_t)-1) { if(lseek_offset == (off_t)-1) {
error = errno; error = errno;
@ -114,7 +105,7 @@ inline static int file_write_unlocked(stasis_handle_t * h, lsn_t off,
// Try again. // Try again.
lseek_offset = lseek(impl->fd, off + bytes_written - impl->start_pos, SEEK_SET); lseek_offset = lseek(impl->fd, off + bytes_written, SEEK_SET);
if(lseek_offset == (off_t)-1) { if(lseek_offset == (off_t)-1) {
error = errno; error = errno;
if(error == EBADF || error == ESPIPE) { if(error == EBADF || error == ESPIPE) {
@ -155,7 +146,7 @@ static int file_read(stasis_handle_t * h,
file_impl * impl = (file_impl*)(h->impl); file_impl * impl = (file_impl*)(h->impl);
pthread_mutex_lock(&(impl->mut)); pthread_mutex_lock(&(impl->mut));
int error = 0; int error = 0;
if(off < impl->start_pos) { if(off < 0) {
error = EDOM; error = EDOM;
} else if(off + len > impl->end_pos) { } else if(off + len > impl->end_pos) {
error = updateEOF(h); error = updateEOF(h);
@ -165,7 +156,7 @@ static int file_read(stasis_handle_t * h,
} }
if(!error) { if(!error) {
off_t lseek_offset = lseek(impl->fd, off - impl->start_pos, SEEK_SET); off_t lseek_offset = lseek(impl->fd, off, SEEK_SET);
if(lseek_offset == (off_t)-1) { if(lseek_offset == (off_t)-1) {
error = errno; error = errno;
@ -193,7 +184,7 @@ static int file_read(stasis_handle_t * h,
// Try again. // Try again.
lseek_offset = lseek(impl->fd, off + bytes_written - impl->start_pos, SEEK_SET); lseek_offset = lseek(impl->fd, off + bytes_written, SEEK_SET);
if(lseek_offset == (off_t)-1) { if(lseek_offset == (off_t)-1) {
error = errno; error = errno;
if(error == EBADF || error == ESPIPE) { if(error == EBADF || error == ESPIPE) {
@ -237,7 +228,7 @@ static int file_write(stasis_handle_t *h, lsn_t off, const byte * dat, lsn_t len
file_impl * impl = (file_impl*)(h->impl); file_impl * impl = (file_impl*)(h->impl);
pthread_mutex_lock(&(impl->mut)); pthread_mutex_lock(&(impl->mut));
int error = 0; int error = 0;
if(impl->start_pos > off) { if(off < 0) {
error = EDOM; error = EDOM;
} }
@ -251,19 +242,6 @@ static int file_write(stasis_handle_t *h, lsn_t off, const byte * dat, lsn_t len
return error; return error;
} }
static int file_append(stasis_handle_t * h, lsn_t * off, const byte * dat, lsn_t len) {
file_impl * impl = (file_impl*)(h->impl);
pthread_mutex_lock(&impl->mut);
updateEOF(h);
*off = impl->end_pos;
impl->end_pos += len;
int error = file_write_unlocked(h, *off, dat, len);
pthread_mutex_unlock(&impl->mut);
return error;
}
static stasis_write_buffer_t * file_write_buffer(stasis_handle_t * h, static stasis_write_buffer_t * file_write_buffer(stasis_handle_t * h,
lsn_t off, lsn_t len) { lsn_t off, lsn_t len) {
// Allocate the handle // Allocate the handle
@ -275,7 +253,7 @@ static stasis_write_buffer_t * file_write_buffer(stasis_handle_t * h,
pthread_mutex_lock(&(impl->mut)); pthread_mutex_lock(&(impl->mut));
if(impl->start_pos > off) { if(off < 0) {
error = EDOM; error = EDOM;
} }
if(off + len > impl->end_pos) { if(off + len > impl->end_pos) {
@ -311,46 +289,6 @@ static stasis_write_buffer_t * file_write_buffer(stasis_handle_t * h,
return ret; return ret;
} }
static stasis_write_buffer_t * file_append_buffer(stasis_handle_t * h,
lsn_t len) {
// Allocate the handle
stasis_write_buffer_t * ret = malloc(sizeof(stasis_write_buffer_t));
if(!ret) { return NULL; }
file_impl * impl = (file_impl*)h->impl;
// Obtain an appropriate offset
pthread_mutex_lock(&(impl->mut));
updateEOF(h);
off_t off = impl->end_pos;
impl->end_pos += len;
pthread_mutex_unlock(&(impl->mut));
// Allocate the buffer
byte * buf = malloc(len);
int error = 0;
if(!buf) {
error = ENOMEM;
}
if(error) {
ret->h = h;
ret->off = 0;
ret->buf = 0;
ret->len = 0;
ret->impl = 0;
ret->error = error;
} else {
ret->h = h;
ret->off = off;
ret->buf = buf;
ret->len = len;
ret->impl = 0;
ret->error = 0;
}
return ret;
}
static int file_release_write_buffer(stasis_write_buffer_t * w) { static int file_release_write_buffer(stasis_write_buffer_t * w) {
file_impl * impl = (file_impl*)(w->h->impl); file_impl * impl = (file_impl*)(w->h->impl);
@ -358,7 +296,7 @@ static int file_release_write_buffer(stasis_write_buffer_t * w) {
pthread_mutex_lock(&(impl->mut)); pthread_mutex_lock(&(impl->mut));
int error = 0; int error = 0;
if(impl->end_pos < w->off + w->len || if(impl->end_pos < w->off + w->len ||
impl->start_pos > w->off) { 0 > w->off) {
error = EDOM; error = EDOM;
} }
@ -413,20 +351,7 @@ static int file_force(stasis_handle_t * h) {
file_impl * impl = h->impl; file_impl * impl = h->impl;
if(!(impl->file_flags & O_SYNC)) { if(!(impl->file_flags & O_SYNC)) {
pthread_mutex_lock(&impl->mut); // must latch because of truncate... :(
int fd = impl->fd; int fd = impl->fd;
pthread_mutex_unlock(&impl->mut);
{
static int warned = 0;
if(!warned) {
printf("Warning: There is a race condition between force() and "
" truncate() in file.c (This shouldn't matter in practice, "
"as the logger hasn't moved over to use file.c yet.\n");
warned = 1;
}
}
// XXX there is a race here; the file handle could have been invalidated
// by truncate.
#ifdef HAVE_FDATASYNC #ifdef HAVE_FDATASYNC
DEBUG("file_force() is calling fdatasync()\n"); DEBUG("file_force() is calling fdatasync()\n");
fdatasync(fd); fdatasync(fd);
@ -446,8 +371,6 @@ static int file_force_range(stasis_handle_t *h, lsn_t start, lsn_t stop) {
// not opened synchronously; we need to explicitly sync. // not opened synchronously; we need to explicitly sync.
pthread_mutex_lock(&impl->mut); pthread_mutex_lock(&impl->mut);
int fd = impl->fd; int fd = impl->fd;
lsn_t off = impl->start_pos;
(void)off;
pthread_mutex_unlock(&impl->mut); pthread_mutex_unlock(&impl->mut);
{ {
static int warned = 0; static int warned = 0;
@ -487,69 +410,6 @@ static int file_force_range(stasis_handle_t *h, lsn_t start, lsn_t stop) {
} }
return ret; return ret;
} }
static int file_truncate_start(stasis_handle_t * h, lsn_t new_start) {
file_impl * impl = h->impl;
pthread_mutex_lock(&impl->mut);
int error = 0;
if(new_start > impl->end_pos) {
updateEOF(h);
if(!error && new_start > impl->end_pos) {
error = EDOM;
}
}
if(!error && new_start > impl->start_pos) {
char * tmpfile = malloc(strlen(impl->filename)+2);
strcpy(tmpfile, impl->filename);
tmpfile[strlen(tmpfile)+1] = '\0';
tmpfile[strlen(tmpfile)] = '~';
int fd = open(tmpfile, impl->file_flags, impl->file_mode);
lseek(fd, 0, SEEK_SET);
lseek(impl->fd, new_start-impl->start_pos, SEEK_SET);
int count;
int buf_size = 1024 * 1024;
char * buf = malloc(buf_size);
while((count = read(impl->fd, buf, buf_size))) {
if(count == -1) {
error = errno;
perror("truncate failed to read");
}
ssize_t bytes_written = 0;
while(bytes_written < count) {
ssize_t write_ret = write(fd, buf+bytes_written, count-bytes_written);
if(write_ret == -1) {
error = errno;
perror("truncate failed to write");
break;
}
bytes_written += write_ret;
}
if(error) break;
}
free(buf);
if(!error) {
if(-1 == close(impl->fd)) {
error = errno;
}
impl->fd = fd;
impl->start_pos = new_start;
fsync(impl->fd);
int rename_ret = rename(tmpfile, impl->filename);
if(rename_ret) {
error = errno;
}
free(tmpfile);
} else {
close(fd);
}
}
pthread_mutex_unlock(&impl->mut);
return error;
}
struct stasis_handle_t file_func = { struct stasis_handle_t file_func = {
.num_copies = file_num_copies, .num_copies = file_num_copies,
@ -557,23 +417,19 @@ struct stasis_handle_t file_func = {
.close = file_close, .close = file_close,
.dup = file_dup, .dup = file_dup,
.enable_sequential_optimizations = file_enable_sequential_optimizations, .enable_sequential_optimizations = file_enable_sequential_optimizations,
.start_position = file_start_position,
.end_position = file_end_position, .end_position = file_end_position,
.write = file_write, .write = file_write,
.append = file_append,
.write_buffer = file_write_buffer, .write_buffer = file_write_buffer,
.append_buffer = file_append_buffer,
.release_write_buffer = file_release_write_buffer, .release_write_buffer = file_release_write_buffer,
.read = file_read, .read = file_read,
.read_buffer = file_read_buffer, .read_buffer = file_read_buffer,
.release_read_buffer = file_release_read_buffer, .release_read_buffer = file_release_read_buffer,
.force = file_force, .force = file_force,
.force_range = file_force_range, .force_range = file_force_range,
.truncate_start = file_truncate_start,
.error = 0 .error = 0
}; };
stasis_handle_t * stasis_handle(open_file)(lsn_t start_offset, const char * filename, int flags, int mode) { stasis_handle_t * stasis_handle(open_file)(const char * filename, int flags, int mode) {
stasis_handle_t * ret = malloc(sizeof(stasis_handle_t)); stasis_handle_t * ret = malloc(sizeof(stasis_handle_t));
if(!ret) { return NULL; } if(!ret) { return NULL; }
*ret = file_func; *ret = file_func;
@ -581,8 +437,6 @@ stasis_handle_t * stasis_handle(open_file)(lsn_t start_offset, const char * file
file_impl * impl = malloc(sizeof(file_impl)); file_impl * impl = malloc(sizeof(file_impl));
ret->impl = impl; ret->impl = impl;
pthread_mutex_init(&(impl->mut), 0); pthread_mutex_init(&(impl->mut), 0);
impl->start_pos = start_offset;
impl->end_pos = start_offset;
assert(sizeof(off_t) >= (64/8)); assert(sizeof(off_t) >= (64/8));
impl->fd = open(filename, flags, mode); impl->fd = open(filename, flags, mode);
if(impl->fd == -1) { if(impl->fd == -1) {
@ -591,5 +445,6 @@ stasis_handle_t * stasis_handle(open_file)(lsn_t start_offset, const char * file
impl->filename = strdup(filename); impl->filename = strdup(filename);
impl->file_flags = flags; impl->file_flags = flags;
impl->file_mode = mode; impl->file_mode = mode;
updateEOF(ret);
return ret; return ret;
} }

View file

@ -19,5 +19,5 @@
#include <stdio.h> #include <stdio.h>
stasis_handle_t* stasis_handle_default_factory() { stasis_handle_t* stasis_handle_default_factory() {
return stasis_handle_file_factory(0, stasis_store_file_name, O_CREAT | O_RDWR | stasis_buffer_manager_io_handle_flags, FILE_PERM); return stasis_handle_file_factory(stasis_store_file_name, O_CREAT | O_RDWR | stasis_buffer_manager_io_handle_flags, FILE_PERM);
} }

View file

@ -3,7 +3,6 @@
typedef struct mem_impl { typedef struct mem_impl {
pthread_mutex_t mut; pthread_mutex_t mut;
lsn_t start_pos;
lsn_t end_pos; lsn_t end_pos;
byte * buf; byte * buf;
int refcount; int refcount;
@ -32,16 +31,6 @@ static void mem_enable_sequential_optimizations(stasis_handle_t * h) {
// No-op // No-op
} }
static lsn_t mem_start_position(stasis_handle_t *h) {
lsn_t ret;
mem_impl* impl = (mem_impl*)(h->impl);
pthread_mutex_lock(&impl->mut);
ret = impl->start_pos;
pthread_mutex_unlock(&impl->mut);
return ret;
}
static lsn_t mem_end_position(stasis_handle_t *h) { static lsn_t mem_end_position(stasis_handle_t *h) {
lsn_t ret; lsn_t ret;
mem_impl* impl = (mem_impl*)(h->impl); mem_impl* impl = (mem_impl*)(h->impl);
@ -63,14 +52,14 @@ static stasis_write_buffer_t * mem_write_buffer(stasis_handle_t * h,
int error = 0; int error = 0;
if(impl->start_pos > off) { if(off < 0) {
error = EDOM; error = EDOM;
} else if(impl->end_pos > off+len) { } else if(impl->end_pos > off+len) {
// Just need to return buffer; h's state is unchanged. // Just need to return buffer; h's state is unchanged.
} else { } else {
byte * newbuf; byte * newbuf;
if(off+len-impl->start_pos) { if(off+len) {
newbuf = realloc(impl->buf, off+len - impl->start_pos); newbuf = realloc(impl->buf, off+len);
} else { } else {
free(impl->buf); free(impl->buf);
newbuf = malloc(0); newbuf = malloc(0);
@ -93,7 +82,7 @@ static stasis_write_buffer_t * mem_write_buffer(stasis_handle_t * h,
} else { } else {
ret->h = h; ret->h = h;
ret->off = off; ret->off = off;
ret->buf = &(impl->buf[off-impl->start_pos]); ret->buf = &(impl->buf[off]);
ret->len = len; ret->len = len;
ret->impl = 0; ret->impl = 0;
ret->error = 0; ret->error = 0;
@ -101,46 +90,6 @@ static stasis_write_buffer_t * mem_write_buffer(stasis_handle_t * h,
return ret; return ret;
} }
static stasis_write_buffer_t * mem_append_buffer(stasis_handle_t * h,
lsn_t len) {
mem_impl * impl = (mem_impl*)(h->impl);
stasis_write_buffer_t * ret = malloc(sizeof(stasis_write_buffer_t));
if(!ret) { return 0; }
pthread_mutex_lock(&(impl->mut));
lsn_t off = impl->end_pos;
impl->end_pos += len;
size_t newlen = impl->end_pos - impl->start_pos;
byte * newbuf;
if(newlen == 0) {
free(impl->buf);
newbuf = malloc(0);
} else {
newbuf = realloc(impl->buf, impl->end_pos - impl->start_pos);
}
if(newbuf) {
impl->buf = newbuf;
ret->h = h;
ret->off = off;
ret->buf = &(impl->buf[off-impl->start_pos]);
ret->len = len;
ret->impl = 0;
ret->error = 0;
} else {
// if we requested a zero length buffer, this is OK.
ret->h = h;
ret->off = 0;
ret->buf = 0;
ret->len = 0;
ret->impl = 0;
ret->error = ENOMEM;
}
return ret;
}
static int mem_release_write_buffer(stasis_write_buffer_t * w) { static int mem_release_write_buffer(stasis_write_buffer_t * w) {
mem_impl * impl = (mem_impl*)(w->h->impl); mem_impl * impl = (mem_impl*)(w->h->impl);
pthread_mutex_unlock(&(impl->mut)); pthread_mutex_unlock(&(impl->mut));
@ -156,7 +105,7 @@ static stasis_read_buffer_t * mem_read_buffer(stasis_handle_t * h,
stasis_read_buffer_t * ret = malloc(sizeof(stasis_read_buffer_t)); stasis_read_buffer_t * ret = malloc(sizeof(stasis_read_buffer_t));
if(!ret) { return NULL; } if(!ret) { return NULL; }
if(off < impl->start_pos || off + len > impl->end_pos) { if(off < 0 || off + len > impl->end_pos) {
ret->h = h; ret->h = h;
ret->buf = 0; ret->buf = 0;
ret->len = 0; ret->len = 0;
@ -165,7 +114,7 @@ static stasis_read_buffer_t * mem_read_buffer(stasis_handle_t * h,
ret->error = EDOM; ret->error = EDOM;
} else { } else {
ret->h = h; ret->h = h;
ret->buf = &(impl->buf[off-impl->start_pos]); ret->buf = &(impl->buf[off]);
ret->off = off; ret->off = off;
ret->len = len; ret->len = len;
ret->impl = 0; ret->impl = 0;
@ -195,20 +144,6 @@ static int mem_write(stasis_handle_t * h, lsn_t off,
return ret; return ret;
} }
static int mem_append(stasis_handle_t * h, lsn_t * off, const byte * dat, lsn_t len) {
stasis_write_buffer_t * w = mem_append_buffer(h, len);
int ret;
if(w->error) {
ret = w->error;
} else {
memcpy(w->buf, dat, len);
ret = 0;
}
*off = w->off;
mem_release_write_buffer(w);
return ret;
}
static int mem_read(stasis_handle_t * h, static int mem_read(stasis_handle_t * h,
lsn_t off, byte * buf, lsn_t len) { lsn_t off, byte * buf, lsn_t len) {
stasis_read_buffer_t * r = mem_read_buffer(h, off, len); stasis_read_buffer_t * r = mem_read_buffer(h, off, len);
@ -228,30 +163,6 @@ static int mem_force(stasis_handle_t *h) {
static int mem_force_range(stasis_handle_t *h,lsn_t start, lsn_t stop) { static int mem_force_range(stasis_handle_t *h,lsn_t start, lsn_t stop) {
return 0; return 0;
} }
static int mem_truncate_start(stasis_handle_t * h, lsn_t new_start) {
mem_impl* impl = (mem_impl*) h->impl;
pthread_mutex_lock(&(impl->mut));
if(new_start < impl->start_pos) {
pthread_mutex_unlock(&impl->mut);
return 0;
}
if(new_start > impl->end_pos) {
pthread_mutex_unlock(&impl->mut);
return EDOM;
}
byte * new_buf = malloc(impl->end_pos -new_start);
memcpy(new_buf, &(impl->buf[new_start - impl->start_pos]), impl->end_pos - new_start);
free(impl->buf);
impl->buf = new_buf;
impl->start_pos = new_start;
pthread_mutex_unlock(&(impl->mut));
return 0;
}
struct stasis_handle_t mem_func = { struct stasis_handle_t mem_func = {
.num_copies = mem_num_copies, .num_copies = mem_num_copies,
@ -259,23 +170,19 @@ struct stasis_handle_t mem_func = {
.close = mem_close, .close = mem_close,
.dup = mem_dup, .dup = mem_dup,
.enable_sequential_optimizations = mem_enable_sequential_optimizations, .enable_sequential_optimizations = mem_enable_sequential_optimizations,
.start_position = mem_start_position,
.end_position = mem_end_position, .end_position = mem_end_position,
.write = mem_write, .write = mem_write,
.append = mem_append,
.write_buffer = mem_write_buffer, .write_buffer = mem_write_buffer,
.append_buffer = mem_append_buffer,
.release_write_buffer = mem_release_write_buffer, .release_write_buffer = mem_release_write_buffer,
.read = mem_read, .read = mem_read,
.read_buffer = mem_read_buffer, .read_buffer = mem_read_buffer,
.release_read_buffer = mem_release_read_buffer, .release_read_buffer = mem_release_read_buffer,
.force = mem_force, .force = mem_force,
.force_range = mem_force_range, .force_range = mem_force_range,
.truncate_start = mem_truncate_start,
.error = 0 .error = 0
}; };
stasis_handle_t * stasis_handle(open_memory)(lsn_t start_offset) { stasis_handle_t * stasis_handle(open_memory)() {
stasis_handle_t * ret = malloc(sizeof(stasis_handle_t)); stasis_handle_t * ret = malloc(sizeof(stasis_handle_t));
if(!ret) { return NULL; } if(!ret) { return NULL; }
*ret = mem_func; *ret = mem_func;
@ -283,8 +190,7 @@ stasis_handle_t * stasis_handle(open_memory)(lsn_t start_offset) {
mem_impl * impl = malloc(sizeof(mem_impl)); mem_impl * impl = malloc(sizeof(mem_impl));
ret->impl = impl; ret->impl = impl;
pthread_mutex_init(&(impl->mut), 0); pthread_mutex_init(&(impl->mut), 0);
impl->start_pos = start_offset; impl->end_pos = 0;
impl->end_pos = start_offset;
impl->buf = malloc(0); impl->buf = malloc(0);
impl->refcount = 1; impl->refcount = 1;

View file

@ -142,7 +142,6 @@ typedef struct nbw_impl {
pthread_mutex_t mut; pthread_mutex_t mut;
// Handle state // Handle state
lsn_t start_pos;
lsn_t end_pos; lsn_t end_pos;
// Fields to manage slow handles // Fields to manage slow handles
@ -396,13 +395,6 @@ static stasis_handle_t * nbw_dup(stasis_handle_t *h) {
static void nbw_enable_sequential_optimizations(stasis_handle_t *h) { static void nbw_enable_sequential_optimizations(stasis_handle_t *h) {
// TODO non blocking should pass sequential optimizations down to underlying handles. // TODO non blocking should pass sequential optimizations down to underlying handles.
} }
static lsn_t nbw_start_position(stasis_handle_t *h) {
nbw_impl * impl = h->impl;
pthread_mutex_lock(&impl->mut);
lsn_t ret = impl->start_pos;
pthread_mutex_unlock(&impl->mut);
return ret;
}
static lsn_t nbw_end_position(stasis_handle_t *h) { static lsn_t nbw_end_position(stasis_handle_t *h) {
nbw_impl * impl = h->impl; nbw_impl * impl = h->impl;
pthread_mutex_lock(&impl->mut); pthread_mutex_lock(&impl->mut);
@ -430,8 +422,8 @@ static stasis_write_buffer_t * nbw_write_buffer(stasis_handle_t * h,
if(!ret->error) { if(!ret->error) {
pthread_mutex_lock(&impl->mut); pthread_mutex_lock(&impl->mut);
assert(impl->start_pos <= impl->end_pos); assert(0 <= impl->end_pos);
if(off < impl->start_pos) { if(off < 0) {
// Note: We're returning a valid write buffer to space before // Note: We're returning a valid write buffer to space before
// the handle's truncation point. Spooky. // the handle's truncation point. Spooky.
ret->error = EDOM; ret->error = EDOM;
@ -444,18 +436,6 @@ static stasis_write_buffer_t * nbw_write_buffer(stasis_handle_t * h,
return ret; return ret;
} }
static stasis_write_buffer_t * nbw_append_buffer(stasis_handle_t * h,
lsn_t len) {
nbw_impl * impl = h->impl;
pthread_mutex_lock(&impl->mut);
lsn_t off = impl->end_pos;
impl->end_pos += len;
impl->requested_bytes_written += len;
pthread_mutex_unlock(&impl->mut);
return nbw_write_buffer(h, off, len);
}
static int nbw_release_write_buffer(stasis_write_buffer_t * w) { static int nbw_release_write_buffer(stasis_write_buffer_t * w) {
nbw_impl * impl = w->h->impl; nbw_impl * impl = w->h->impl;
write_buffer_impl * w_impl = w->impl; write_buffer_impl * w_impl = w->impl;
@ -514,8 +494,8 @@ static int nbw_write(stasis_handle_t * h, lsn_t off,
releaseFastHandle(impl, n, DIRTY); releaseFastHandle(impl, n, DIRTY);
if(!ret) { if(!ret) {
pthread_mutex_lock(&impl->mut); pthread_mutex_lock(&impl->mut);
assert(impl->start_pos <= impl->end_pos); assert(0 <= impl->end_pos);
if(off < impl->start_pos) { if(off < 0) {
ret = EDOM; ret = EDOM;
} else if(off + len > impl->end_pos) { } else if(off + len > impl->end_pos) {
impl->end_pos = off+len; impl->end_pos = off+len;
@ -525,19 +505,6 @@ static int nbw_write(stasis_handle_t * h, lsn_t off,
} }
return ret; return ret;
} }
static int nbw_append(stasis_handle_t * h, lsn_t * off,
const byte * dat, lsn_t len) {
nbw_impl * impl = h->impl;
pthread_mutex_lock(&impl->mut);
*off = impl->end_pos;
impl->end_pos+= len;
impl->requested_bytes_written += len;
pthread_mutex_unlock(&impl->mut);
int ret = nbw_write(h, *off, dat, len);
return ret;
}
static int nbw_read(stasis_handle_t * h, static int nbw_read(stasis_handle_t * h,
lsn_t off, byte * buf, lsn_t len) { lsn_t off, byte * buf, lsn_t len) {
nbw_impl * impl = h->impl; nbw_impl * impl = h->impl;
@ -603,7 +570,7 @@ static int nbw_force_range_impl(stasis_handle_t * h, lsn_t start, lsn_t stop) {
static int nbw_force(stasis_handle_t * h) { static int nbw_force(stasis_handle_t * h) {
nbw_impl * impl = h->impl; nbw_impl * impl = h->impl;
pthread_mutex_lock(&impl->mut); pthread_mutex_lock(&impl->mut);
int ret = nbw_force_range_impl(h, impl->start_pos, impl->end_pos); int ret = nbw_force_range_impl(h, 0, impl->end_pos);
pthread_mutex_unlock(&impl->mut); pthread_mutex_unlock(&impl->mut);
return ret; return ret;
} }
@ -616,21 +583,6 @@ static int nbw_force_range(stasis_handle_t * h,
pthread_mutex_unlock(&impl->mut); pthread_mutex_unlock(&impl->mut);
return ret; return ret;
} }
static int nbw_truncate_start(stasis_handle_t * h, lsn_t new_start) {
nbw_impl * impl = h->impl;
int error = 0;
pthread_mutex_lock(&impl->mut);
if(new_start <= impl->end_pos && new_start > impl->start_pos) {
impl->start_pos = new_start;
} else {
error = EDOM;
}
pthread_mutex_unlock(&impl->mut);
if(!error) {
// @todo close all slow handles; truncate them. (ie: implement truncate)
}
return error;
}
struct stasis_handle_t nbw_func = { struct stasis_handle_t nbw_func = {
.num_copies = nbw_num_copies, .num_copies = nbw_num_copies,
@ -638,19 +590,15 @@ struct stasis_handle_t nbw_func = {
.close = nbw_close, .close = nbw_close,
.dup = nbw_dup, .dup = nbw_dup,
.enable_sequential_optimizations = nbw_enable_sequential_optimizations, .enable_sequential_optimizations = nbw_enable_sequential_optimizations,
.start_position = nbw_start_position,
.end_position = nbw_end_position, .end_position = nbw_end_position,
.write = nbw_write, .write = nbw_write,
.append = nbw_append,
.write_buffer = nbw_write_buffer, .write_buffer = nbw_write_buffer,
.append_buffer = nbw_append_buffer,
.release_write_buffer = nbw_release_write_buffer, .release_write_buffer = nbw_release_write_buffer,
.read = nbw_read, .read = nbw_read,
.read_buffer = nbw_read_buffer, .read_buffer = nbw_read_buffer,
.release_read_buffer = nbw_release_read_buffer, .release_read_buffer = nbw_release_read_buffer,
.force = nbw_force, .force = nbw_force,
.force_range = nbw_force_range, .force_range = nbw_force_range,
.truncate_start = nbw_truncate_start,
.error = 0 .error = 0
}; };
@ -691,7 +639,7 @@ static void * nbw_worker(void * handle) {
writes++; writes++;
stasis_handle_t * fast = node->h; stasis_handle_t * fast = node->h;
lsn_t off = fast->start_position(fast); lsn_t off = 0;
lsn_t len = fast->end_position(fast) - off; lsn_t len = fast->end_position(fast) - off;
stasis_read_buffer_t * r = fast->read_buffer(fast, off, len); stasis_read_buffer_t * r = fast->read_buffer(fast, off, len);
@ -829,7 +777,6 @@ stasis_handle_t * stasis_handle(open_non_blocking)
nbw_impl * impl = malloc(sizeof(nbw_impl)); nbw_impl * impl = malloc(sizeof(nbw_impl));
pthread_mutex_init(&impl->mut, 0); pthread_mutex_init(&impl->mut, 0);
impl->start_pos = 0;
impl->end_pos = 0; impl->end_pos = 0;
impl->slow_factory = slow_factory; impl->slow_factory = slow_factory;

View file

@ -36,11 +36,6 @@ DECLARE_HISTOGRAM_64(force_range_hist)
Per-handle information for pfile Per-handle information for pfile
*/ */
typedef struct pfile_impl { typedef struct pfile_impl {
/**
The logical offset of the file. Once the file is open, this will
never change, as pfile doesn't support truncation.
*/
lsn_t start_pos;
/** /**
File descriptor File descriptor
*/ */
@ -86,7 +81,7 @@ static int pfile_close(stasis_handle_t *h) {
static stasis_handle_t * pfile_dup(stasis_handle_t *h) { static stasis_handle_t * pfile_dup(stasis_handle_t *h) {
pfile_impl *impl = h->impl; pfile_impl *impl = h->impl;
return stasis_handle_open_pfile(impl->start_pos, impl->filename, impl->file_flags, impl->file_mode); return stasis_handle_open_pfile(impl->filename, impl->file_flags, impl->file_mode);
} }
static void pfile_enable_sequential_optimizations(stasis_handle_t *h) { static void pfile_enable_sequential_optimizations(stasis_handle_t *h) {
pfile_impl *impl = h->impl; pfile_impl *impl = h->impl;
@ -94,11 +89,6 @@ static void pfile_enable_sequential_optimizations(stasis_handle_t *h) {
int err = posix_fadvise(impl->fd, 0, 0, POSIX_FADV_SEQUENTIAL); int err = posix_fadvise(impl->fd, 0, 0, POSIX_FADV_SEQUENTIAL);
if(err) perror("Attempt to pass POSIX_FADV_SEQUENTIAL to kernel failed"); if(err) perror("Attempt to pass POSIX_FADV_SEQUENTIAL to kernel failed");
} }
static lsn_t pfile_start_position(stasis_handle_t *h) {
pfile_impl *impl = (pfile_impl*)h->impl;
return impl->start_pos;
}
static lsn_t pfile_end_position(stasis_handle_t *h) { static lsn_t pfile_end_position(stasis_handle_t *h) {
pfile_impl *impl = (pfile_impl*)h->impl; pfile_impl *impl = (pfile_impl*)h->impl;
lsn_t ret = lseek(impl->fd, 0, SEEK_END); lsn_t ret = lseek(impl->fd, 0, SEEK_END);
@ -144,7 +134,7 @@ static int pfile_read(stasis_handle_t *h, lsn_t off, byte *buf, lsn_t len) {
pfile_impl *impl = (pfile_impl*)(h->impl); pfile_impl *impl = (pfile_impl*)(h->impl);
int error = 0; int error = 0;
if (off < impl->start_pos) { if (off < 0) {
error = EDOM; error = EDOM;
} else { } else {
ssize_t bytes_read = 0; ssize_t bytes_read = 0;
@ -153,7 +143,7 @@ static int pfile_read(stasis_handle_t *h, lsn_t off, byte *buf, lsn_t len) {
ssize_t count = pread(impl->fd, ssize_t count = pread(impl->fd,
buf + bytes_read, buf + bytes_read,
len - bytes_read, len - bytes_read,
off + bytes_read - impl->start_pos); off + bytes_read);
if (count == -1) { if (count == -1) {
if (errno == EAGAIN || errno == EINTR) { if (errno == EAGAIN || errno == EINTR) {
count = 0; count = 0;
@ -162,10 +152,12 @@ static int pfile_read(stasis_handle_t *h, lsn_t off, byte *buf, lsn_t len) {
h->error = EBADF; h->error = EBADF;
} else { } else {
int err = errno; int err = errno;
// XXX Why is sys_errlist[] is unavailable here? // The other errors either involve memory bugs (EFAULT), logic bugs
// (EISDIR, EFIFO, EOVERFLOW), or bad hardware (EIO), so print
// something to console, and uncleanly crash.
perror("pfile_read encountered an unknown error code."); perror("pfile_read encountered an unknown error code.");
fprintf(stderr, "pread() returned -1; errno is %d\n",err); fprintf(stderr, "pread() returned -1; errno is %d\n",err);
abort(); // XXX other errors? abort();
} }
error = errno; error = errno;
break; break;
@ -194,24 +186,14 @@ static int pfile_write(stasis_handle_t *h, lsn_t off, const byte *dat,
lsn_t len) { lsn_t len) {
pfile_impl *impl = (pfile_impl*)(h->impl); pfile_impl *impl = (pfile_impl*)(h->impl);
int error = 0; int error = 0;
lsn_t phys_off; if (off < 0) {
if (impl->start_pos > off) {
error = EDOM; error = EDOM;
} else { } else {
phys_off = off - impl->start_pos; error = pfile_write_unlocked(impl->fd, off, dat, len);
error = pfile_write_unlocked(impl->fd, phys_off, dat, len);
} }
return error; return error;
} }
static int pfile_append(stasis_handle_t *h, lsn_t *off, const byte *dat,
lsn_t len) {
pfile_impl *impl = (pfile_impl*)(h->impl);
lsn_t phys_off = *off - impl->start_pos;
abort(); // xxx o append is harder than I thought. Should support via distinct API.
return pfile_write_unlocked(impl->fd, phys_off, dat,len);
}
static stasis_write_buffer_t * pfile_write_buffer(stasis_handle_t *h, static stasis_write_buffer_t * pfile_write_buffer(stasis_handle_t *h,
lsn_t off, lsn_t len) { lsn_t off, lsn_t len) {
stasis_write_buffer_t *ret = malloc(sizeof(stasis_write_buffer_t)); stasis_write_buffer_t *ret = malloc(sizeof(stasis_write_buffer_t));
@ -221,11 +203,9 @@ static stasis_write_buffer_t * pfile_write_buffer(stasis_handle_t *h,
return NULL; return NULL;
} }
pfile_impl *impl = (pfile_impl*)h->impl;
int error = 0; int error = 0;
if (impl->start_pos > off) { if (off < 0) {
error = EDOM; error = EDOM;
} }
@ -252,44 +232,10 @@ static stasis_write_buffer_t * pfile_write_buffer(stasis_handle_t *h,
return ret; return ret;
} }
static stasis_write_buffer_t *pfile_append_buffer(stasis_handle_t *h,
lsn_t len) {
// Allocate the handle
stasis_write_buffer_t *ret = malloc(sizeof(stasis_write_buffer_t));
if (!ret) { return NULL; }
// Obtain an appropriate offset
off_t off; abort(); // XXX need O_APPEND handle.
// Allocate the buffer
byte *buf = malloc(len);
int error = 0;
if (!buf) {
error = ENOMEM;
}
if (error) {
ret->h = h;
ret->off = 0;
ret->buf = 0;
ret->len = 0;
ret->impl = 0;
ret->error = error;
} else {
ret->h = h;
ret->off = off;
ret->buf = buf;
ret->len = len;
ret->impl = 0;
ret->error = 0;
}
return ret;
}
static int pfile_release_write_buffer(stasis_write_buffer_t *w) { static int pfile_release_write_buffer(stasis_write_buffer_t *w) {
pfile_impl *impl = (pfile_impl*)(w->h->impl); pfile_impl *impl = (pfile_impl*)(w->h->impl);
int error = pfile_write_unlocked(impl->fd, w->off-impl->start_pos, w->buf, int error = pfile_write_unlocked(impl->fd, w->off, w->buf,
w->len); w->len);
if (w->buf) { if (w->buf) {
@ -390,21 +336,12 @@ static int pfile_force_range(stasis_handle_t *h, lsn_t start, lsn_t stop) {
int ret = 0; int ret = 0;
#endif #endif
if(impl->sequential) { if(impl->sequential) {
int err = posix_fadvise(impl->fd, start-impl->start_pos, stop-start, POSIX_FADV_DONTNEED); int err = posix_fadvise(impl->fd, start, stop-start, POSIX_FADV_DONTNEED);
if(err) perror("Attempt to pass POSIX_FADV_SEQUENTIAL (for a range of a file) to kernel failed"); if(err) perror("Attempt to pass POSIX_FADV_SEQUENTIAL (for a range of a file) to kernel failed");
} }
TOCK(force_range_hist); TOCK(force_range_hist);
return ret; return ret;
} }
static int pfile_truncate_start(stasis_handle_t *h, lsn_t new_start) {
static int truncate_warned = 0;
if (!truncate_warned) {
printf("\nWarning: pfile doesn't support truncation; "
"ignoring truncation request\n");
truncate_warned = 1;
}
return 0;
}
static struct stasis_handle_t pfile_func = { static struct stasis_handle_t pfile_func = {
.num_copies = pfile_num_copies, .num_copies = pfile_num_copies,
@ -412,24 +349,19 @@ static struct stasis_handle_t pfile_func = {
.close = pfile_close, .close = pfile_close,
.dup = pfile_dup, .dup = pfile_dup,
.enable_sequential_optimizations = pfile_enable_sequential_optimizations, .enable_sequential_optimizations = pfile_enable_sequential_optimizations,
.start_position = pfile_start_position,
.end_position = pfile_end_position, .end_position = pfile_end_position,
.write = pfile_write, .write = pfile_write,
.append = pfile_append,
.write_buffer = pfile_write_buffer, .write_buffer = pfile_write_buffer,
.append_buffer = pfile_append_buffer,
.release_write_buffer = pfile_release_write_buffer, .release_write_buffer = pfile_release_write_buffer,
.read = pfile_read, .read = pfile_read,
.read_buffer = pfile_read_buffer, .read_buffer = pfile_read_buffer,
.release_read_buffer = pfile_release_read_buffer, .release_read_buffer = pfile_release_read_buffer,
.force = pfile_force, .force = pfile_force,
.force_range = pfile_force_range, .force_range = pfile_force_range,
.truncate_start = pfile_truncate_start,
.error = 0 .error = 0
}; };
stasis_handle_t *stasis_handle(open_pfile)(lsn_t start_offset, stasis_handle_t *stasis_handle(open_pfile)(const char *filename,
const char *filename,
int flags, int mode) { int flags, int mode) {
stasis_handle_t *ret = malloc(sizeof(stasis_handle_t)); stasis_handle_t *ret = malloc(sizeof(stasis_handle_t));
if (!ret) { return NULL; } if (!ret) { return NULL; }
@ -445,8 +377,6 @@ stasis_handle_t *stasis_handle(open_pfile)(lsn_t start_offset,
ret->error = errno; ret->error = errno;
} }
impl->start_pos = start_offset;
impl->filename = strdup(filename); impl->filename = strdup(filename);
impl->file_flags = flags; impl->file_flags = flags;
impl->file_mode = mode; impl->file_mode = mode;

View file

@ -52,10 +52,6 @@ static void raid1_enable_sequential_optimizations(stasis_handle_t *h) {
i->a->enable_sequential_optimizations(i->a); i->a->enable_sequential_optimizations(i->a);
i->b->enable_sequential_optimizations(i->b); i->b->enable_sequential_optimizations(i->b);
} }
static lsn_t raid1_start_position(stasis_handle_t *h) {
raid1_impl *i = h->impl;
return i->a->start_position(i->a);
}
static lsn_t raid1_end_position(stasis_handle_t *h) { static lsn_t raid1_end_position(stasis_handle_t *h) {
raid1_impl *i = h->impl; raid1_impl *i = h->impl;
return i->a->end_position(i->a); return i->a->end_position(i->a);
@ -77,25 +73,12 @@ static int raid1_write(stasis_handle_t *h, lsn_t off, const byte *dat, lsn_t len
int retB = i->b->write(i->b, off, dat, len); int retB = i->b->write(i->b, off, dat, len);
return retA ? retA : retB; return retA ? retA : retB;
} }
static int raid1_append(stasis_handle_t *h, lsn_t *off, const byte *dat, lsn_t len) {
raid1_impl *i = h->impl;
int retA = i->a->append(i->a, off, dat, len);
int retB = i->b->write (i->b, *off, dat, len);
assert((retA && retB) || !(retA || retB));
return retA ? retA : retB;
}
static stasis_write_buffer_t * raid1_write_buffer(stasis_handle_t *h, lsn_t off, lsn_t len) { static stasis_write_buffer_t * raid1_write_buffer(stasis_handle_t *h, lsn_t off, lsn_t len) {
raid1_impl *i = h->impl; raid1_impl *i = h->impl;
stasis_write_buffer_t * ret = i->a->write_buffer(i->a, off, len); stasis_write_buffer_t * ret = i->a->write_buffer(i->a, off, len);
ret->h = h; ret->h = h;
return ret; return ret;
} }
static stasis_write_buffer_t * raid1_append_buffer(stasis_handle_t *h, lsn_t len) {
raid1_impl *i = h->impl;
stasis_write_buffer_t * ret = i->a->append_buffer(i->a, len);
ret->h = h;
return ret;
}
static int raid1_release_write_buffer(stasis_write_buffer_t *w) { static int raid1_release_write_buffer(stasis_write_buffer_t *w) {
raid1_impl *i = w->h->impl; raid1_impl *i = w->h->impl;
w->h = i->a; w->h = i->a;
@ -132,31 +115,21 @@ static int raid1_force_range(stasis_handle_t *h, lsn_t start, lsn_t stop) {
int retB = i->b->force_range(i->b, start, stop); int retB = i->b->force_range(i->b, start, stop);
return retA ? retA : retB; return retA ? retA : retB;
} }
static int raid1_truncate_start(stasis_handle_t *h, lsn_t new_start) {
raid1_impl *i = h->impl;
int retA = i->a->truncate_start(i->a, new_start);
int retB = i->b->truncate_start(i->b, new_start);
return retA ? retA : retB;
}
struct stasis_handle_t raid1_func = { struct stasis_handle_t raid1_func = {
.num_copies = raid1_num_copies, .num_copies = raid1_num_copies,
.num_copies_buffer = raid1_num_copies_buffer, .num_copies_buffer = raid1_num_copies_buffer,
.close = raid1_close, .close = raid1_close,
.dup = raid1_dup, .dup = raid1_dup,
.enable_sequential_optimizations = raid1_enable_sequential_optimizations, .enable_sequential_optimizations = raid1_enable_sequential_optimizations,
.start_position = raid1_start_position,
.end_position = raid1_end_position, .end_position = raid1_end_position,
.write = raid1_write, .write = raid1_write,
.append = raid1_append,
.write_buffer = raid1_write_buffer, .write_buffer = raid1_write_buffer,
.append_buffer = raid1_append_buffer,
.release_write_buffer = raid1_release_write_buffer, .release_write_buffer = raid1_release_write_buffer,
.read = raid1_read, .read = raid1_read,
.read_buffer = raid1_read_buffer, .read_buffer = raid1_read_buffer,
.release_read_buffer = raid1_release_read_buffer, .release_read_buffer = raid1_release_read_buffer,
.force = raid1_force, .force = raid1_force,
.force_range = raid1_force_range, .force_range = raid1_force_range,
.truncate_start = raid1_truncate_start,
.error = 0 .error = 0
}; };
@ -170,8 +143,8 @@ stasis_handle_t * stasis_handle_open_raid1(stasis_handle_t* a, stasis_handle_t*
} }
stasis_handle_t * stasis_handle_raid1_factory() { stasis_handle_t * stasis_handle_raid1_factory() {
stasis_handle_t * a = stasis_handle_file_factory(0, stasis_store_file_1_name, O_CREAT | O_RDWR | stasis_buffer_manager_io_handle_flags, FILE_PERM); stasis_handle_t * a = stasis_handle_file_factory(stasis_store_file_1_name, O_CREAT | O_RDWR | stasis_buffer_manager_io_handle_flags, FILE_PERM);
stasis_handle_t * b = stasis_handle_file_factory(0, stasis_store_file_2_name, O_CREAT | O_RDWR | stasis_buffer_manager_io_handle_flags, FILE_PERM); stasis_handle_t * b = stasis_handle_file_factory(stasis_store_file_2_name, O_CREAT | O_RDWR | stasis_buffer_manager_io_handle_flags, FILE_PERM);
return stasis_handle_open_raid1(a, b); return stasis_handle_open_raid1(a, b);
} }
#endif #endif

View file

@ -69,14 +69,14 @@ extern stasis_handle_t* (*stasis_handle_factory)();
Valid options: stasis_handle_open_file(), stasis_handle_open_pfile(), and stasis_handle_non_blocking_factory. Valid options: stasis_handle_open_file(), stasis_handle_open_pfile(), and stasis_handle_non_blocking_factory.
*/ */
extern stasis_handle_t* (*stasis_handle_file_factory)(lsn_t logical_offset, const char* filename, int open_mode, int creat_perms); extern stasis_handle_t* (*stasis_handle_file_factory)(const char* filename, int open_mode, int creat_perms);
/** /**
The factory that non_blocking handles will use for slow handles. (Only The factory that non_blocking handles will use for slow handles. (Only
used if stasis_buffer_manager_io_handle_default_factory is set to used if stasis_buffer_manager_io_handle_default_factory is set to
stasis_non_blocking_factory.) stasis_non_blocking_factory.)
*/ */
extern stasis_handle_t* (*stasis_non_blocking_handle_file_factory)(lsn_t logical_offset, const char* filename, int open_mode, int creat_perms); extern stasis_handle_t* (*stasis_non_blocking_handle_file_factory)(const char* filename, int open_mode, int creat_perms);
/** /**
If true, the buffer manager will use O_DIRECT, O_SYNC and so on (Mandatory If true, the buffer manager will use O_DIRECT, O_SYNC and so on (Mandatory

View file

@ -117,8 +117,6 @@ typedef struct stasis_handle_t {
* Optimize the handle for sequential reads and writes. * Optimize the handle for sequential reads and writes.
*/ */
void (*enable_sequential_optimizations)(struct stasis_handle_t *h); void (*enable_sequential_optimizations)(struct stasis_handle_t *h);
/** The offset of the handle's first byte */
lsn_t (*start_position)(struct stasis_handle_t * h);
/** The offset of the byte after the end of the handle's data. */ /** The offset of the byte after the end of the handle's data. */
lsn_t (*end_position)(struct stasis_handle_t * h); lsn_t (*end_position)(struct stasis_handle_t * h);
@ -134,20 +132,20 @@ typedef struct stasis_handle_t {
*/ */
struct stasis_write_buffer_t * (*write_buffer)(struct stasis_handle_t * h, struct stasis_write_buffer_t * (*write_buffer)(struct stasis_handle_t * h,
lsn_t off, lsn_t len); lsn_t off, lsn_t len);
/** // /**
Increase the size of the file, and obtain a write buffer at the // Increase the size of the file, and obtain a write buffer at the
beginning of the new space. // beginning of the new space.
//
The behavior of calls that attempt to access this region before // The behavior of calls that attempt to access this region before
release_write_buffer() returns is undefined. Calls to append // release_write_buffer() returns is undefined. Calls to append
that are made before the buffer is released are legal, and will // that are made before the buffer is released are legal, and will
append data starting at the new end of file. // append data starting at the new end of file.
//
@param h The handle // @param h The handle
@param len The length, in bytes, of the write buffer. // @param len The length, in bytes, of the write buffer.
*/ // */
struct stasis_write_buffer_t * (*append_buffer)(struct stasis_handle_t * h, // struct stasis_write_buffer_t * (*append_buffer)(struct stasis_handle_t * h,
lsn_t len); // lsn_t len);
/** /**
Release a write buffer and associated resources. Release a write buffer and associated resources.
*/ */
@ -182,17 +180,17 @@ typedef struct stasis_handle_t {
*/ */
int (*write)(struct stasis_handle_t * h, lsn_t off, int (*write)(struct stasis_handle_t * h, lsn_t off,
const byte * dat, lsn_t len); const byte * dat, lsn_t len);
/** // /**
Append data to the end of the file. Once append returns, future // Append data to the end of the file. Once append returns, future
calls to the handle will reflect the update. // calls to the handle will reflect the update.
//
@param h The handle // @param h The handle
@param off The position of the first byte to be written // @param off The position of the first byte to be written
@param dat A buffer containin the data to be written // @param dat A buffer containin the data to be written
@param len The number of bytes to be written // @param len The number of bytes to be written
*/ // */
int (*append)(struct stasis_handle_t * h, lsn_t * off, const byte * dat, // int (*append)(struct stasis_handle_t * h, lsn_t * off, const byte * dat,
lsn_t len); // lsn_t len);
/** /**
Read data from the file. The region may be safely written to Read data from the file. The region may be safely written to
once read returns. once read returns.
@ -211,17 +209,6 @@ typedef struct stasis_handle_t {
*/ */
int (*force)(struct stasis_handle_t * h); int (*force)(struct stasis_handle_t * h);
int (*force_range)(struct stasis_handle_t * h, lsn_t start, lsn_t stop); int (*force_range)(struct stasis_handle_t * h, lsn_t start, lsn_t stop);
/**
Truncate bytes from the beginning of the file. This is needed by
the log manager.
@param h The handle
@param new_start The offest of the first byte in the handle that
must be preserved. Bytes before this point may or may not
be retained after this function returns.
*/
int (*truncate_start)(struct stasis_handle_t * h, lsn_t new_start);
/** /**
The handle's error flag; this passes errors to the caller when The handle's error flag; this passes errors to the caller when
they can't be returned directly. they can't be returned directly.
@ -257,7 +244,7 @@ typedef struct stasis_read_buffer_t {
@param start_offset The logical offset of the first byte in the handle @param start_offset The logical offset of the first byte in the handle
*/ */
stasis_handle_t * stasis_handle(open_memory)(lsn_t start_offset); stasis_handle_t * stasis_handle(open_memory)(void);
/** /**
Open a handle that is backed by a file. This handle uses the unix Open a handle that is backed by a file. This handle uses the unix
read(),write() I/O interfaces. Due to limitations in read() and read(),write() I/O interfaces. Due to limitations in read() and
@ -274,7 +261,7 @@ stasis_handle_t * stasis_handle(open_memory)(lsn_t start_offset);
@param perm The file permissions to be passed to open() @param perm The file permissions to be passed to open()
*/ */
stasis_handle_t * stasis_handle(open_file) stasis_handle_t * stasis_handle(open_file)
(lsn_t start_offset, const char * path, int flags, int perm); (const char * path, int flags, int perm);
/** /**
Open a handle that is backed by a file. This handle uses pread() Open a handle that is backed by a file. This handle uses pread()
and pwrite(). It never holds a mutex while perfoming I/O. and pwrite(). It never holds a mutex while perfoming I/O.
@ -289,7 +276,7 @@ stasis_handle_t * stasis_handle(open_file)
@param perm The file permissions to be passed to open() @param perm The file permissions to be passed to open()
*/ */
stasis_handle_t * stasis_handle(open_pfile) stasis_handle_t * stasis_handle(open_pfile)
(lsn_t start_offset, const char * path, int flags, int perm); (const char * path, int flags, int perm);
/** /**
Given a factory for creating "fast" and "slow" handles, provide a Given a factory for creating "fast" and "slow" handles, provide a
handle that never makes callers wait for write requests to handle that never makes callers wait for write requests to

View file

@ -51,24 +51,13 @@ terms specified in this license.
#define LOG_NAME "check_io.log" #define LOG_NAME "check_io.log"
int handle_truncate_is_supported = 1;
void handle_smoketest(stasis_handle_t * h) { void handle_smoketest(stasis_handle_t * h) {
const int one = 0x11111111; const int one = 0x11111111;
const int two = 0x22222222; const int two = 0x22222222;
const int three = 0x33333333;
const int four = 0x44444444;
lsn_t off;
h->append(h, &off, 0, 0);
assert(off == 0);
assert((!h->num_copies(h)) || (!h->num_copies_buffer(h))); assert((!h->num_copies(h)) || (!h->num_copies_buffer(h)));
assert(0 == h->start_position(h) ||
0 == h->end_position(h));
assert(! h->write(h, 0, (byte*)&one, sizeof(int))); assert(! h->write(h, 0, (byte*)&one, sizeof(int)));
int one_read = 0; int one_read = 0;
@ -88,26 +77,6 @@ void handle_smoketest(stasis_handle_t * h) {
assert(! h->read(h, sizeof(int), (byte*)&two_read, sizeof(int))); assert(! h->read(h, sizeof(int), (byte*)&two_read, sizeof(int)));
assert(two == two_read); assert(two == two_read);
assert(! h->append(h, &off, (byte*)&three, sizeof(int)));
w = h->append_buffer(h, sizeof(int));
memcpy(w->buf, &four, sizeof(int));
w->h->release_write_buffer(w);
if(handle_truncate_is_supported) {
h->truncate_start(h, 2 * sizeof(int));
}
int three_read = 0;
int four_read = 0;
ret = h->read(h, 2*sizeof(int), (byte*)&three_read, sizeof(int));
assert(!ret);
r = h->read_buffer(h, 3*sizeof(int), sizeof(int));
memcpy(&four_read, r->buf, sizeof(int));
r->h->release_read_buffer(r);
assert(three == three_read);
assert(four == four_read);
} }
@ -123,28 +92,21 @@ typedef struct {
lsn_t trunc_val; lsn_t trunc_val;
pthread_mutex_t trunc_mut = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t trunc_mut = PTHREAD_MUTEX_INITIALIZER;
void load_handle(thread_arg* t) { void load_handle(thread_arg* t) {
lsn_t * offsets = malloc(t->count * sizeof(lsn_t)); lsn_t * offsets = malloc(t->count * sizeof(lsn_t));
stasis_handle_t * h = t->h; stasis_handle_t * h = t->h;
if(handle_truncate_is_supported) {
for(int i = 0; i < t->count; i++) {
offsets[i] = -1;
}
} else {
for(int i = 0; i < t->count; i++) { for(int i = 0; i < t->count; i++) {
offsets[i] = i * sizeof(int); offsets[i] = i * sizeof(int);
} }
}
for(int i = 0; i < OPS_PER_THREAD; i++) { for(int i = 0; i < OPS_PER_THREAD; i++) {
int val = myrandom(t->count); int val = myrandom(t->count);
if(offsets[val] == -1) { if(offsets[val] == -1) {
// Need to write it somewhere. // Need to write it somewhere.
long choice = myrandom(4); long choice = myrandom(2);
switch(choice) { switch(choice) {
case 0: { // overwrite old entry with write() case 0: { // overwrite old entry with write()
@ -182,27 +144,6 @@ void load_handle(thread_arg* t) {
i--; i--;
} }
} break; } break;
case 2: { // append
if(handle_truncate_is_supported) {
lsn_t oldend = h->end_position(h);
int ret = h->append(h, &(offsets[val]), (const byte*)&(t->values[val]), sizeof(int));
assert(!ret || oldend < h->start_position(h));
}
} break;
case 3: { // append_buffer
if(handle_truncate_is_supported) {
lsn_t oldend = h->end_position(h);
stasis_write_buffer_t * w = h->append_buffer(h, sizeof(int));
if(!w->error) {
*((int*)w->buf) = t->values[val];
assert(w->len == sizeof(int));
offsets[val] = w->off;
} else {
assert(oldend < h->start_position(h));
}
w->h->release_write_buffer(w);
}
} break;
default: { default: {
abort(); abort();
} }
@ -228,8 +169,6 @@ void load_handle(thread_arg* t) {
assert(j == t->values[val]); assert(j == t->values[val]);
} else { } else {
assert(ret == EDOM); assert(ret == EDOM);
if(handle_truncate_is_supported)
assert(h->start_position(h) > offsets[val]);
} }
} break; } break;
case 1: { // read_buffer case 1: { // read_buffer
@ -241,8 +180,6 @@ void load_handle(thread_arg* t) {
} else { } else {
assert(r->error == EDOM); assert(r->error == EDOM);
r->h->release_read_buffer(r); r->h->release_read_buffer(r);
if(handle_truncate_is_supported)
assert(h->start_position(h) > offsets[val]);
} }
} break; } break;
default: default:
@ -254,26 +191,6 @@ void load_handle(thread_arg* t) {
if(!myrandom(100)) { if(!myrandom(100)) {
h->force(h); h->force(h);
} }
// Truncate 1% of the time.
if(!myrandom(100) && handle_truncate_is_supported) {
lsn_t pre_start = h->start_position(h);
pthread_mutex_lock(&trunc_mut);
lsn_t start = trunc_val;
lsn_t stop = start - 100 + myrandom(200);
if(stop > trunc_val) {
trunc_val = stop;
}
pthread_mutex_unlock(&trunc_mut);
assert(pre_start <= start);
int ret = h->truncate_start(h, stop);
if(!ret) {
lsn_t post_stop = h->start_position(h);
assert(stop <= post_stop);
}
}
} }
free(offsets); free(offsets);
} }
@ -341,15 +258,15 @@ void handle_concurrencytest(stasis_handle_t * h) {
START_TEST(io_memoryTest) { START_TEST(io_memoryTest) {
printf("io_memoryTest\n"); fflush(stdout); printf("io_memoryTest\n"); fflush(stdout);
stasis_handle_t * h; stasis_handle_t * h;
h = stasis_handle(open_memory)(0); h = stasis_handle(open_memory)();
// h = stasis_handle(open_debug)(h); // h = stasis_handle(open_debug)(h);
handle_smoketest(h); handle_smoketest(h);
h->close(h); h->close(h);
h = stasis_handle(open_memory)(0); h = stasis_handle(open_memory)();
// h = stasis_handle(open_debug)(h); // h = stasis_handle(open_debug)(h);
handle_sequentialtest(h); handle_sequentialtest(h);
h->close(h); h->close(h);
h = stasis_handle(open_memory)(0); h = stasis_handle(open_memory)();
// h = stasis_handle(open_debug)(h); // h = stasis_handle(open_debug)(h);
handle_concurrencytest(h); handle_concurrencytest(h);
h->close(h); h->close(h);
@ -358,21 +275,21 @@ START_TEST(io_memoryTest) {
START_TEST(io_fileTest) { START_TEST(io_fileTest) {
printf("io_fileTest\n"); fflush(stdout); printf("io_fileTest\n"); fflush(stdout);
stasis_handle_t * h; stasis_handle_t * h;
h = stasis_handle(open_file)(0, "logfile.txt", O_CREAT | O_RDWR, FILE_PERM); h = stasis_handle(open_file)("logfile.txt", O_CREAT | O_RDWR, FILE_PERM);
// h = stasis_handle(open_debug)(h); // h = stasis_handle(open_debug)(h);
handle_smoketest(h); handle_smoketest(h);
h->close(h); h->close(h);
remove("logfile.txt"); remove("logfile.txt");
h = stasis_handle(open_file)(0, "logfile.txt", O_CREAT | O_RDWR, FILE_PERM); h = stasis_handle(open_file)("logfile.txt", O_CREAT | O_RDWR, FILE_PERM);
//h = stasis_handle(open_debug)(h); //h = stasis_handle(open_debug)(h);
handle_sequentialtest(h); handle_sequentialtest(h);
h->close(h); h->close(h);
remove("logfile.txt"); remove("logfile.txt");
h = stasis_handle(open_file)(0, "logfile.txt", O_CREAT | O_RDWR, FILE_PERM); h = stasis_handle(open_file)("logfile.txt", O_CREAT | O_RDWR, FILE_PERM);
// handle_concurrencytest(h); // fails by design // handle_concurrencytest(h); // fails by design
h->close(h); h->close(h);
@ -382,35 +299,32 @@ START_TEST(io_fileTest) {
START_TEST(io_pfileTest) { START_TEST(io_pfileTest) {
printf("io_pfileTest\n"); fflush(stdout); printf("io_pfileTest\n"); fflush(stdout);
handle_truncate_is_supported = 0;
stasis_handle_t * h; stasis_handle_t * h;
h = stasis_handle(open_pfile)(0, "logfile.txt", O_CREAT | O_RDWR, FILE_PERM); h = stasis_handle(open_pfile)("logfile.txt", O_CREAT | O_RDWR, FILE_PERM);
// h = stasis_handle(open_debug)(h); // h = stasis_handle(open_debug)(h);
handle_smoketest(h); handle_smoketest(h);
h->close(h); h->close(h);
remove("logfile.txt"); remove("logfile.txt");
h = stasis_handle(open_pfile)(0, "logfile.txt", O_CREAT | O_RDWR, FILE_PERM); h = stasis_handle(open_pfile)("logfile.txt", O_CREAT | O_RDWR, FILE_PERM);
//h = stasis_handle(open_debug)(h); //h = stasis_handle(open_debug)(h);
handle_sequentialtest(h); handle_sequentialtest(h);
h->close(h); h->close(h);
remove("logfile.txt"); remove("logfile.txt");
h = stasis_handle(open_pfile)(0, "logfile.txt", O_CREAT | O_RDWR, FILE_PERM); h = stasis_handle(open_pfile)("logfile.txt", O_CREAT | O_RDWR, FILE_PERM);
handle_concurrencytest(h); handle_concurrencytest(h);
h->close(h); h->close(h);
remove("logfile.txt"); remove("logfile.txt");
handle_truncate_is_supported = 1;
} END_TEST } END_TEST
START_TEST(io_raid1pfileTest) { START_TEST(io_raid1pfileTest) {
printf("io_raid1pfileTest\n"); fflush(stdout); printf("io_raid1pfileTest\n"); fflush(stdout);
handle_truncate_is_supported = 0;
const char * A = "vol1.txt"; const char * A = "vol1.txt";
const char * B = "vol2.txt"; const char * B = "vol2.txt";
@ -419,8 +333,8 @@ START_TEST(io_raid1pfileTest) {
remove(B); remove(B);
stasis_handle_t *h, *a, *b; stasis_handle_t *h, *a, *b;
a = stasis_handle(open_pfile)(0, A, O_CREAT | O_RDWR, FILE_PERM); a = stasis_handle(open_pfile)(A, O_CREAT | O_RDWR, FILE_PERM);
b = stasis_handle(open_pfile)(0, B, O_CREAT | O_RDWR, FILE_PERM); b = stasis_handle(open_pfile)(B, O_CREAT | O_RDWR, FILE_PERM);
h = stasis_handle_open_raid1(a, b); h = stasis_handle_open_raid1(a, b);
// h = stasis_handle(open_debug)(h); // h = stasis_handle(open_debug)(h);
@ -430,8 +344,8 @@ START_TEST(io_raid1pfileTest) {
remove(A); remove(A);
remove(B); remove(B);
a = stasis_handle(open_pfile)(0, A, O_CREAT | O_RDWR, FILE_PERM); a = stasis_handle(open_pfile)(A, O_CREAT | O_RDWR, FILE_PERM);
b = stasis_handle(open_pfile)(0, B, O_CREAT | O_RDWR, FILE_PERM); b = stasis_handle(open_pfile)(B, O_CREAT | O_RDWR, FILE_PERM);
h = stasis_handle_open_raid1(a, b); h = stasis_handle_open_raid1(a, b);
// h = stasis_handle(open_debug)(h); // h = stasis_handle(open_debug)(h);
@ -441,8 +355,8 @@ START_TEST(io_raid1pfileTest) {
remove(A); remove(A);
remove(B); remove(B);
a = stasis_handle(open_pfile)(0, A, O_CREAT | O_RDWR, FILE_PERM); a = stasis_handle(open_pfile)(A, O_CREAT | O_RDWR, FILE_PERM);
b = stasis_handle(open_pfile)(0, B, O_CREAT | O_RDWR, FILE_PERM); b = stasis_handle(open_pfile)(B, O_CREAT | O_RDWR, FILE_PERM);
h = stasis_handle_open_raid1(a, b); h = stasis_handle_open_raid1(a, b);
// h = stasis_handle(open_debug)(h); // h = stasis_handle(open_debug)(h);
@ -452,8 +366,6 @@ START_TEST(io_raid1pfileTest) {
remove(A); remove(A);
remove(B); remove(B);
handle_truncate_is_supported = 1;
} END_TEST } END_TEST
/* /*
static stasis_handle_t * fast_factory(lsn_t off, lsn_t len, void * ignored) { static stasis_handle_t * fast_factory(lsn_t off, lsn_t len, void * ignored) {

View file

@ -11,7 +11,7 @@
static char * logEntryToString(const LogEntry * le) { static char * logEntryToString(const LogEntry * le) {
char * ret = NULL; char * ret = NULL;
int err; int err = -1;
switch(le->type) { switch(le->type) {
case UPDATELOG: case UPDATELOG:
{ {