From 55a94364382a3fe1d6470a0b525c5c1afc8c4513 Mon Sep 17 00:00:00 2001 From: Sears Russell Date: Fri, 25 Mar 2011 20:00:29 +0000 Subject: [PATCH] fix long-standing bug in pfile wrapper. racy eof handling was leading to dropped writes --- src/stasis/io/pfile.c | 90 +++++++++---------------------------------- 1 file changed, 18 insertions(+), 72 deletions(-) diff --git a/src/stasis/io/pfile.c b/src/stasis/io/pfile.c index d0af97b..70c68e4 100644 --- a/src/stasis/io/pfile.c +++ b/src/stasis/io/pfile.c @@ -36,19 +36,11 @@ DECLARE_HISTOGRAM_64(force_range_hist) Per-handle information for pfile */ typedef struct pfile_impl { - /** - This should be held whenever end_pos is accessed. - */ - pthread_mutex_t mut; /** The logical offset of the file. Once the file is open, this will never change, as pfile doesn't support truncation. */ lsn_t start_pos; - /** - The logical end of the file. - */ - lsn_t end_pos; /** File descriptor */ @@ -110,9 +102,7 @@ static lsn_t pfile_start_position(stasis_handle_t *h) { static lsn_t pfile_end_position(stasis_handle_t *h) { pfile_impl *impl = (pfile_impl*)h->impl; - pthread_mutex_lock(&impl->mut); - lsn_t ret = impl->end_pos; - pthread_mutex_unlock(&impl->mut); + lsn_t ret = lseek(impl->fd, 0, SEEK_END); return ret; } @@ -155,19 +145,8 @@ static int pfile_read(stasis_handle_t *h, lsn_t off, byte *buf, lsn_t len) { pfile_impl *impl = (pfile_impl*)(h->impl); int error = 0; - // reading impl->end_pos is probably atomic on most hardware, but - // this is safer. - pthread_mutex_lock(&impl->mut); - lsn_t end_pos = impl->end_pos; - pthread_mutex_unlock(&impl->mut); - - // checking end_pos is adequate, (we assume this is the only handle - // touching the file) - if (off < impl->start_pos) { error = EDOM; - } else if (off + len > end_pos) { - error = EDOM; } else { ssize_t bytes_read = 0; TICK(read_hist); @@ -192,14 +171,22 @@ static int pfile_read(stasis_handle_t *h, lsn_t off, byte *buf, lsn_t len) { error = errno; break; } - } - bytes_read += count; - if (bytes_read != len) { - DEBUG("pread spinning\n"); + } else if(count == 0) { + // EOF + if(bytes_read != 0) { + fprintf(stderr, "short read at end of storefile. Assuming that this is due to strange recovery scenario, and continuing.\n"); + } + error = EDOM; + break; + } else { + bytes_read += count; + if (bytes_read != len) { + DEBUG("pread spinning\n"); + } } } TOCK(read_hist); - assert(bytes_read == len); + assert(error || bytes_read == len); } return error; } @@ -212,14 +199,6 @@ static int pfile_write(stasis_handle_t *h, lsn_t off, const byte *dat, if (impl->start_pos > off) { error = EDOM; } else { - pthread_mutex_lock(&impl->mut); - if (impl->end_pos < off+len) { - // update end_pos now; the caller is not allowed to look at this - // part of the file until we return, so if they notice that the - // file hasn't been extended yet, it's a bug on their end. - impl->end_pos = off+len; - } - pthread_mutex_unlock(&impl->mut); phys_off = off - impl->start_pos; error = pfile_write_unlocked(impl->fd, phys_off, dat, len); } @@ -229,11 +208,8 @@ static int pfile_write(stasis_handle_t *h, lsn_t off, const byte *dat, static int pfile_append(stasis_handle_t *h, lsn_t *off, const byte *dat, lsn_t len) { pfile_impl *impl = (pfile_impl*)(h->impl); - pthread_mutex_lock(&impl->mut); - *off = impl->end_pos; - impl->end_pos += len; - pthread_mutex_unlock(&impl->mut); lsn_t phys_off = *off - impl->start_pos; + abort(); // xxx o append is harder than I thought. Should support via distinct API. return pfile_write_unlocked(impl->fd, phys_off, dat,len); } @@ -254,14 +230,6 @@ static stasis_write_buffer_t * pfile_write_buffer(stasis_handle_t *h, error = EDOM; } - pthread_mutex_lock(&impl->mut); - - // @todo Come up with a reasonable way to avoid sparse files. - if (off + len > impl->end_pos) { - impl->end_pos = off+len; - } - pthread_mutex_unlock(&impl->mut); - byte *buf; if (!error) { buf = malloc(len); @@ -294,11 +262,7 @@ static stasis_write_buffer_t *pfile_append_buffer(stasis_handle_t *h, pfile_impl *impl = (pfile_impl*)h->impl; // Obtain an appropriate offset - pthread_mutex_lock(&(impl->mut)); - - off_t off = impl->end_pos; - impl->end_pos += len; - pthread_mutex_unlock(&(impl->mut)); + off_t off; abort(); // XXX need O_APPEND handle. // Allocate the buffer byte *buf = malloc(len); @@ -328,18 +292,9 @@ static stasis_write_buffer_t *pfile_append_buffer(stasis_handle_t *h, static int pfile_release_write_buffer(stasis_write_buffer_t *w) { pfile_impl *impl = (pfile_impl*)(w->h->impl); - pthread_mutex_lock(&(impl->mut)); - int error = 0; - if (impl->end_pos < w->off + w->len || - impl->start_pos > w->off) { - error = EDOM; - } - pthread_mutex_unlock(&(impl->mut)); - - if (!error) { - error = pfile_write_unlocked(impl->fd, w->off-impl->start_pos, w->buf, + int error = pfile_write_unlocked(impl->fd, w->off-impl->start_pos, w->buf, w->len); - } + if (w->buf) { free(w->buf); } @@ -487,7 +442,6 @@ stasis_handle_t *stasis_handle(open_pfile)(lsn_t start_offset, if (!impl) { free(ret); return NULL; } ret->impl = impl; - pthread_mutex_init(&(impl->mut), 0); impl->fd = open(filename, flags, mode); assert(sizeof(off_t) >= (64/8)); if (impl->fd == -1) { @@ -496,14 +450,6 @@ stasis_handle_t *stasis_handle(open_pfile)(lsn_t start_offset, impl->start_pos = start_offset; - off_t file_len = lseek(impl->fd,0,SEEK_END); - if (file_len == (off_t)-1) { - ret->error = errno; - } - impl->end_pos = file_len + start_offset; - DEBUG("file len = %lld, start_off = %lld, end = %lld\n", - (long long) file_len, start_offset, impl->end_pos); - impl->filename = strdup(filename); impl->file_flags = flags; impl->file_mode = mode;