fix long-standing bug in pfile wrapper. racy eof handling was leading to dropped writes
This commit is contained in:
parent
e7280de51f
commit
55a9436438
1 changed files with 18 additions and 72 deletions
|
@ -36,19 +36,11 @@ DECLARE_HISTOGRAM_64(force_range_hist)
|
||||||
Per-handle information for pfile
|
Per-handle information for pfile
|
||||||
*/
|
*/
|
||||||
typedef struct pfile_impl {
|
typedef struct pfile_impl {
|
||||||
/**
|
|
||||||
This should be held whenever end_pos is accessed.
|
|
||||||
*/
|
|
||||||
pthread_mutex_t mut;
|
|
||||||
/**
|
/**
|
||||||
The logical offset of the file. Once the file is open, this will
|
The logical offset of the file. Once the file is open, this will
|
||||||
never change, as pfile doesn't support truncation.
|
never change, as pfile doesn't support truncation.
|
||||||
*/
|
*/
|
||||||
lsn_t start_pos;
|
lsn_t start_pos;
|
||||||
/**
|
|
||||||
The logical end of the file.
|
|
||||||
*/
|
|
||||||
lsn_t end_pos;
|
|
||||||
/**
|
/**
|
||||||
File descriptor
|
File descriptor
|
||||||
*/
|
*/
|
||||||
|
@ -110,9 +102,7 @@ static lsn_t pfile_start_position(stasis_handle_t *h) {
|
||||||
|
|
||||||
static lsn_t pfile_end_position(stasis_handle_t *h) {
|
static lsn_t pfile_end_position(stasis_handle_t *h) {
|
||||||
pfile_impl *impl = (pfile_impl*)h->impl;
|
pfile_impl *impl = (pfile_impl*)h->impl;
|
||||||
pthread_mutex_lock(&impl->mut);
|
lsn_t ret = lseek(impl->fd, 0, SEEK_END);
|
||||||
lsn_t ret = impl->end_pos;
|
|
||||||
pthread_mutex_unlock(&impl->mut);
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -155,19 +145,8 @@ static int pfile_read(stasis_handle_t *h, lsn_t off, byte *buf, lsn_t len) {
|
||||||
pfile_impl *impl = (pfile_impl*)(h->impl);
|
pfile_impl *impl = (pfile_impl*)(h->impl);
|
||||||
int error = 0;
|
int error = 0;
|
||||||
|
|
||||||
// reading impl->end_pos is probably atomic on most hardware, but
|
|
||||||
// this is safer.
|
|
||||||
pthread_mutex_lock(&impl->mut);
|
|
||||||
lsn_t end_pos = impl->end_pos;
|
|
||||||
pthread_mutex_unlock(&impl->mut);
|
|
||||||
|
|
||||||
// checking end_pos is adequate, (we assume this is the only handle
|
|
||||||
// touching the file)
|
|
||||||
|
|
||||||
if (off < impl->start_pos) {
|
if (off < impl->start_pos) {
|
||||||
error = EDOM;
|
error = EDOM;
|
||||||
} else if (off + len > end_pos) {
|
|
||||||
error = EDOM;
|
|
||||||
} else {
|
} else {
|
||||||
ssize_t bytes_read = 0;
|
ssize_t bytes_read = 0;
|
||||||
TICK(read_hist);
|
TICK(read_hist);
|
||||||
|
@ -192,14 +171,22 @@ static int pfile_read(stasis_handle_t *h, lsn_t off, byte *buf, lsn_t len) {
|
||||||
error = errno;
|
error = errno;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
} else if(count == 0) {
|
||||||
bytes_read += count;
|
// EOF
|
||||||
if (bytes_read != len) {
|
if(bytes_read != 0) {
|
||||||
DEBUG("pread spinning\n");
|
fprintf(stderr, "short read at end of storefile. Assuming that this is due to strange recovery scenario, and continuing.\n");
|
||||||
|
}
|
||||||
|
error = EDOM;
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
bytes_read += count;
|
||||||
|
if (bytes_read != len) {
|
||||||
|
DEBUG("pread spinning\n");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
TOCK(read_hist);
|
TOCK(read_hist);
|
||||||
assert(bytes_read == len);
|
assert(error || bytes_read == len);
|
||||||
}
|
}
|
||||||
return error;
|
return error;
|
||||||
}
|
}
|
||||||
|
@ -212,14 +199,6 @@ static int pfile_write(stasis_handle_t *h, lsn_t off, const byte *dat,
|
||||||
if (impl->start_pos > off) {
|
if (impl->start_pos > off) {
|
||||||
error = EDOM;
|
error = EDOM;
|
||||||
} else {
|
} else {
|
||||||
pthread_mutex_lock(&impl->mut);
|
|
||||||
if (impl->end_pos < off+len) {
|
|
||||||
// update end_pos now; the caller is not allowed to look at this
|
|
||||||
// part of the file until we return, so if they notice that the
|
|
||||||
// file hasn't been extended yet, it's a bug on their end.
|
|
||||||
impl->end_pos = off+len;
|
|
||||||
}
|
|
||||||
pthread_mutex_unlock(&impl->mut);
|
|
||||||
phys_off = off - impl->start_pos;
|
phys_off = off - impl->start_pos;
|
||||||
error = pfile_write_unlocked(impl->fd, phys_off, dat, len);
|
error = pfile_write_unlocked(impl->fd, phys_off, dat, len);
|
||||||
}
|
}
|
||||||
|
@ -229,11 +208,8 @@ static int pfile_write(stasis_handle_t *h, lsn_t off, const byte *dat,
|
||||||
static int pfile_append(stasis_handle_t *h, lsn_t *off, const byte *dat,
|
static int pfile_append(stasis_handle_t *h, lsn_t *off, const byte *dat,
|
||||||
lsn_t len) {
|
lsn_t len) {
|
||||||
pfile_impl *impl = (pfile_impl*)(h->impl);
|
pfile_impl *impl = (pfile_impl*)(h->impl);
|
||||||
pthread_mutex_lock(&impl->mut);
|
|
||||||
*off = impl->end_pos;
|
|
||||||
impl->end_pos += len;
|
|
||||||
pthread_mutex_unlock(&impl->mut);
|
|
||||||
lsn_t phys_off = *off - impl->start_pos;
|
lsn_t phys_off = *off - impl->start_pos;
|
||||||
|
abort(); // xxx o append is harder than I thought. Should support via distinct API.
|
||||||
return pfile_write_unlocked(impl->fd, phys_off, dat,len);
|
return pfile_write_unlocked(impl->fd, phys_off, dat,len);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -254,14 +230,6 @@ static stasis_write_buffer_t * pfile_write_buffer(stasis_handle_t *h,
|
||||||
error = EDOM;
|
error = EDOM;
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_mutex_lock(&impl->mut);
|
|
||||||
|
|
||||||
// @todo Come up with a reasonable way to avoid sparse files.
|
|
||||||
if (off + len > impl->end_pos) {
|
|
||||||
impl->end_pos = off+len;
|
|
||||||
}
|
|
||||||
pthread_mutex_unlock(&impl->mut);
|
|
||||||
|
|
||||||
byte *buf;
|
byte *buf;
|
||||||
if (!error) {
|
if (!error) {
|
||||||
buf = malloc(len);
|
buf = malloc(len);
|
||||||
|
@ -294,11 +262,7 @@ static stasis_write_buffer_t *pfile_append_buffer(stasis_handle_t *h,
|
||||||
pfile_impl *impl = (pfile_impl*)h->impl;
|
pfile_impl *impl = (pfile_impl*)h->impl;
|
||||||
|
|
||||||
// Obtain an appropriate offset
|
// Obtain an appropriate offset
|
||||||
pthread_mutex_lock(&(impl->mut));
|
off_t off; abort(); // XXX need O_APPEND handle.
|
||||||
|
|
||||||
off_t off = impl->end_pos;
|
|
||||||
impl->end_pos += len;
|
|
||||||
pthread_mutex_unlock(&(impl->mut));
|
|
||||||
|
|
||||||
// Allocate the buffer
|
// Allocate the buffer
|
||||||
byte *buf = malloc(len);
|
byte *buf = malloc(len);
|
||||||
|
@ -328,18 +292,9 @@ static stasis_write_buffer_t *pfile_append_buffer(stasis_handle_t *h,
|
||||||
static int pfile_release_write_buffer(stasis_write_buffer_t *w) {
|
static int pfile_release_write_buffer(stasis_write_buffer_t *w) {
|
||||||
pfile_impl *impl = (pfile_impl*)(w->h->impl);
|
pfile_impl *impl = (pfile_impl*)(w->h->impl);
|
||||||
|
|
||||||
pthread_mutex_lock(&(impl->mut));
|
int error = pfile_write_unlocked(impl->fd, w->off-impl->start_pos, w->buf,
|
||||||
int error = 0;
|
|
||||||
if (impl->end_pos < w->off + w->len ||
|
|
||||||
impl->start_pos > w->off) {
|
|
||||||
error = EDOM;
|
|
||||||
}
|
|
||||||
pthread_mutex_unlock(&(impl->mut));
|
|
||||||
|
|
||||||
if (!error) {
|
|
||||||
error = pfile_write_unlocked(impl->fd, w->off-impl->start_pos, w->buf,
|
|
||||||
w->len);
|
w->len);
|
||||||
}
|
|
||||||
if (w->buf) {
|
if (w->buf) {
|
||||||
free(w->buf);
|
free(w->buf);
|
||||||
}
|
}
|
||||||
|
@ -487,7 +442,6 @@ stasis_handle_t *stasis_handle(open_pfile)(lsn_t start_offset,
|
||||||
if (!impl) { free(ret); return NULL; }
|
if (!impl) { free(ret); return NULL; }
|
||||||
|
|
||||||
ret->impl = impl;
|
ret->impl = impl;
|
||||||
pthread_mutex_init(&(impl->mut), 0);
|
|
||||||
impl->fd = open(filename, flags, mode);
|
impl->fd = open(filename, flags, mode);
|
||||||
assert(sizeof(off_t) >= (64/8));
|
assert(sizeof(off_t) >= (64/8));
|
||||||
if (impl->fd == -1) {
|
if (impl->fd == -1) {
|
||||||
|
@ -496,14 +450,6 @@ stasis_handle_t *stasis_handle(open_pfile)(lsn_t start_offset,
|
||||||
|
|
||||||
impl->start_pos = start_offset;
|
impl->start_pos = start_offset;
|
||||||
|
|
||||||
off_t file_len = lseek(impl->fd,0,SEEK_END);
|
|
||||||
if (file_len == (off_t)-1) {
|
|
||||||
ret->error = errno;
|
|
||||||
}
|
|
||||||
impl->end_pos = file_len + start_offset;
|
|
||||||
DEBUG("file len = %lld, start_off = %lld, end = %lld\n",
|
|
||||||
(long long) file_len, start_offset, impl->end_pos);
|
|
||||||
|
|
||||||
impl->filename = strdup(filename);
|
impl->filename = strdup(filename);
|
||||||
impl->file_flags = flags;
|
impl->file_flags = flags;
|
||||||
impl->file_mode = mode;
|
impl->file_mode = mode;
|
||||||
|
|
Loading…
Reference in a new issue