Simple (blocking, poor truncation performance) file handle implementation.

This commit is contained in:
Sears Russell 2006-10-18 00:57:36 +00:00
parent f2e7a6b3a8
commit 10b77729f5
7 changed files with 529 additions and 8 deletions

View file

@ -223,5 +223,5 @@ extern const short SLOT_TYPE_LENGTHS[];
#define TALLOC_REGION_SIZE 100 // Pages
#define FILE_PERM (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH)
#define LOG_MODE (O_CREAT | O_RDWR | O_SYNC)
#endif

View file

@ -9,9 +9,16 @@
otherwise. read_buffer() and write_buffer() return error codes via
the error field of the handles they produce.
An error that occurs while writing to the handle leaves the region
that was being written in an undefined state.
Errors in num_copies, num_copies_buffer, start_position, and end_position
are always unrecoverable, and return -1.
close returns 0 on success, or an error code otherwise. close
always frees the handle that was passed into it, regardless of
whether an error occurred.
Here are the meanings of the various error codes:
EDOM off is less than the beginning of the file (possibly due to truncation).
@ -19,6 +26,11 @@
that caused this one is stored in the handle's error field.
Handle implementations may return return other errors as appropriate.
Offset:
Negative offsets are reserved for implementation-specific purposes.
*/
typedef struct stasis_handle_t {
/** @return the number of in-memory copies made when the caller
@ -78,7 +90,7 @@ typedef struct stasis_read_buffer_t {
} stasis_read_buffer_t;
stasis_handle_t * stasis_handle(open_memory)(void);
stasis_handle_t * stasis_handle(open_file)(char * path, int mode);
stasis_handle_t * stasis_handle(open_file)(char * path, int flags, int mode);
stasis_handle_t * stasis_handle(open_non_blocking)(stasis_handle_t * h,
int worker_thread_count);
stasis_handle_t * stasis_handle(open_verifying)(stasis_handle_t * h);

View file

@ -18,7 +18,7 @@ liblladd_a_SOURCES=crc32.c redblack.c lhtable.c common.c stats.c io.c bufferMana
operations/linearHashNTA.c operations/linkedListNTA.c \
operations/pageOrientedListNTA.c operations/bTree.c \
operations/regions.c \
io/memory.c
io/memory.c io/file.c io/debug.c
# page/header.c logger/logMemory.c \ ringbuffer.c \ asdfas
#operations/lladdhash.c
#AM_CFLAGS= -g -Wall -pedantic -std=gnu99

View file

@ -67,8 +67,8 @@ static stasis_write_buffer_t * debug_append_buffer(stasis_handle_t * h,
*retWrap = *ret;
retWrap->h = h;
retWrap->impl = ret;
printf("tid=%9ld retn append_buffer(%lx, %ld) = %lx\n",
pthread_self(), (unsigned long)hh, len, (unsigned long)retWrap); fflush(stdout);
printf("tid=%9ld retn append_buffer(%lx, %ld) = %lx (off=%ld)\n",
pthread_self(), (unsigned long)hh, len, (unsigned long)retWrap, ret->off); fflush(stdout);
return retWrap;
}

477
src/lladd/io/file.c Normal file
View file

@ -0,0 +1,477 @@
#include <lladd/io/handle.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <errno.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdio.h>
#include <assert.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
typedef struct file_impl {
pthread_mutex_t mut;
lsn_t start_pos;
lsn_t end_pos;
int fd;
int file_flags;
int file_mode;
char * filename;
} file_impl;
static int file_num_copies(stasis_handle_t * h) { return 0; }
static int file_num_copies_buffer(stasis_handle_t * h) { return 0; }
static int file_close(stasis_handle_t * h) {
file_impl * impl = (file_impl*)h->impl;
int fd = impl->fd;
free(impl->filename);
free(impl);
free(h);
int ret = close(fd);
if(!ret) return 0;
else return errno;
}
static lsn_t file_start_position(stasis_handle_t *h) {
file_impl * impl = (file_impl*)h->impl;
pthread_mutex_lock(&(impl->mut));
lsn_t ret = impl->start_pos;
pthread_mutex_unlock(&(impl->mut));
return ret;
}
static lsn_t file_end_position(stasis_handle_t *h) {
file_impl * impl = (file_impl*)h->impl;
pthread_mutex_lock(&(impl->mut));
lsn_t ret = impl->end_pos;
pthread_mutex_unlock(&(impl->mut));
return ret;
}
inline static int file_write_unlocked(stasis_handle_t * h, lsn_t off,
const byte * dat, lsn_t len) {
file_impl * impl = (file_impl*)h->impl;
int error = 0;
// These should have been checked by the caller.
assert(impl->start_pos <= off);
assert(impl->end_pos >= off+len);
// @todo need a test harness that gets read(), write() and lseek() to misbehave.
off_t lseek_offset = lseek(impl->fd, off - impl->start_pos, SEEK_SET);
if(lseek_offset == (off_t)-1) {
error = errno;
if(error == EBADF || error == ESPIPE) {
h->error = error;
error = EBADF;
}
} else {
ssize_t bytes_written = 0;
// seek succeeded, so attempt to write.
while(bytes_written < len) {
ssize_t ret = write(impl->fd,
dat+bytes_written,
len-bytes_written);
if(ret == -1) {
if(errno == EAGAIN || errno == EINTR) {
// EAGAIN could be returned if the file handle was opened in non-blocking mode.
// we should probably warn instead of spinning.
// EINTR is returned if the write is interrupted by a signal.
// On Linux, it must have been interrupted before any bytes were written.
// On SVr4 it may have been interrupted at any point.
// Try again.
lseek_offset = lseek(impl->fd, off + bytes_written - impl->start_pos, SEEK_SET);
if(lseek_offset == (off_t)-1) {
error = errno;
if(error == EBADF || error == ESPIPE) {
h->error = error;
error = EBADF;
}
break;
}
ret = 0;
} else {
// Need to set h->error if an unrecoverable error occured.
// The only unrecoverable error is EBADF. (EINVAL could be
// the caller's fault if O_DIRECT is being used. Otherwise,
// it is unrecoverable.
if(errno == EBADF) {
h->error = EBADF;
}
error = errno;
break;
}
}
bytes_written += ret;
}
}
return error;
}
inline static void print_eof_error(char * file, int line) {
fprintf(stderr, "%s:%d Internal error: attempt to access negative offset, or beyond EOF.\n", file, line);
fflush(stderr);
}
static int file_read(stasis_handle_t * h,
lsn_t off, byte * buf, lsn_t len) {
file_impl * impl = (file_impl*)(h->impl);
pthread_mutex_lock(&(impl->mut));
int error = 0;
if(off < impl->start_pos || off + len > impl->end_pos) {
error = EDOM;
}
if(!error) {
off_t lseek_offset = lseek(impl->fd, off - impl->start_pos, SEEK_SET);
if(lseek_offset == (off_t)-1) {
error = errno;
if(error == EBADF || error == ESPIPE) {
h->error = error;
error = EBADF;
} else if(error == EINVAL) {
print_eof_error(__FILE__, __LINE__);
}
} else {
ssize_t bytes_written = 0;
// seek succeeded, so attempt to read.
while(bytes_written < len) {
ssize_t ret = read(impl->fd,
buf+bytes_written,
len-bytes_written);
if(ret == -1) {
if(errno == EAGAIN || errno == EINTR) {
// EAGAIN could be returned if the file handle was opened in non-blocking mode.
// we should probably warn instead of spinning.
// EINTR is returned if the write is interrupted by a signal.
// On Linux, it must have been interrupted before any bytes were written.
// On SVr4 it may have been interrupted at any point.
// Try again.
lseek_offset = lseek(impl->fd, off + bytes_written - impl->start_pos, SEEK_SET);
if(lseek_offset == (off_t)-1) {
error = errno;
if(error == EBADF || error == ESPIPE) {
h->error = error;
error = EBADF;
} else if(error == EINVAL) {
print_eof_error(__FILE__, __LINE__);
}
break;
}
ret = 0;
} else {
// Need to set h->error if an unrecoverable error occured.
// The only unrecoverable error is EBADF. (EINVAL could be
// the caller's fault if O_DIRECT is being used. Otherwise,
// it is unrecoverable.
if(errno == EBADF) {
h->error = EBADF;
}
error = errno;
break;
}
} else if (ret == 0) {
// EOF (!)
print_eof_error(__FILE__, __LINE__);
error = EINVAL;
break;
}
bytes_written += ret;
}
}
}
pthread_mutex_unlock(&(impl->mut));
return error;
}
static int file_write(stasis_handle_t *h, lsn_t off, const byte * dat, lsn_t len) {
file_impl * impl = (file_impl*)(h->impl);
pthread_mutex_lock(&(impl->mut));
int error = 0;
if(impl->start_pos > off) {
error = EDOM;
}
if(!error) {
if(impl->end_pos < off+len){
impl->end_pos = off+len;
}
error = file_write_unlocked(h, off, dat, len);
}
pthread_mutex_unlock(&(impl->mut));
return error;
}
static int file_append(stasis_handle_t * h, lsn_t * off, const byte * dat, lsn_t len) {
file_impl * impl = (file_impl*)(h->impl);
pthread_mutex_lock(&impl->mut);
*off = impl->end_pos;
if(impl->end_pos < *off+len){
impl->end_pos = *off+len;
}
int error = file_write_unlocked(h, *off, dat, len);
pthread_mutex_unlock(&impl->mut);
return error;
}
static stasis_write_buffer_t * file_write_buffer(stasis_handle_t * h,
lsn_t off, lsn_t len) {
// Allocate the handle
stasis_write_buffer_t * ret = malloc(sizeof(stasis_write_buffer_t));
if(!ret) { return NULL; }
file_impl * impl = (file_impl*)h->impl;
int error = 0;
pthread_mutex_lock(&(impl->mut));
if(impl->start_pos > off) {
error = EDOM;
}
if(off + len > impl->end_pos) {
impl->end_pos = off+len;
}
pthread_mutex_unlock(&(impl->mut));
byte * buf;
if(!error) {
// Allocate the buffer
buf = malloc(len);
if(!buf) {
error = ENOMEM;
}
}
if(error) {
ret->h = h;
ret->off = 0;
ret->buf = 0;
ret->len = 0;
ret->impl = 0;
ret->error = error;
} else {
ret->h = h;
ret->off = off;
ret->buf = buf;
ret->len = len;
ret->impl = 0;
ret->error = 0;
}
return ret;
}
static stasis_write_buffer_t * file_append_buffer(stasis_handle_t * h,
lsn_t len) {
// Allocate the handle
stasis_write_buffer_t * ret = malloc(sizeof(stasis_write_buffer_t));
if(!ret) { return NULL; }
file_impl * impl = (file_impl*)h->impl;
// Obtain an appropriate offset
pthread_mutex_lock(&(impl->mut));
off_t off = impl->end_pos;
impl->end_pos += len;
pthread_mutex_unlock(&(impl->mut));
// Allocate the buffer
byte * buf = malloc(len);
int error = 0;
if(!buf) {
error = ENOMEM;
}
if(error) {
ret->h = h;
ret->off = 0;
ret->buf = 0;
ret->len = 0;
ret->impl = 0;
ret->error = error;
} else {
ret->h = h;
ret->off = off;
ret->buf = buf;
ret->len = len;
ret->impl = 0;
ret->error = 0;
}
return ret;
}
static int file_release_write_buffer(stasis_write_buffer_t * w) {
file_impl * impl = (file_impl*)(w->h->impl);
pthread_mutex_lock(&(impl->mut));
int error = 0;
if(impl->end_pos < w->off + w->len ||
impl->start_pos > w->off) {
error = EDOM;
}
if(!error) {
// Call write().
error = file_write_unlocked(w->h, w->off, w->buf, w->len);
}
pthread_mutex_unlock(&(impl->mut));
free(w->buf);
free(w);
return error;
}
static stasis_read_buffer_t * file_read_buffer(stasis_handle_t * h,
lsn_t off, lsn_t len) {
stasis_read_buffer_t * ret = malloc(sizeof(stasis_read_buffer_t));
if(!ret) { return NULL; }
byte * buf = malloc(len);
int error = 0;
if(!buf) { error = ENOMEM; }
if(!error) {
error = file_read(h, off, buf, len);
}
if(error) {
ret->h = h;
ret->buf = 0;
ret->len = 0;
ret->impl = 0;
ret->error = error;
if(buf) { free(buf); }
} else {
ret->h = h;
ret->buf = buf;
ret->len = len;
ret->impl = 0;
ret->error = 0;
}
return ret;
}
static int file_release_read_buffer(stasis_read_buffer_t * r) {
free((void*)r->buf);
free(r);
return 0;
}
static int file_truncate_start(stasis_handle_t * h, lsn_t new_start) {
file_impl * impl = h->impl;
pthread_mutex_lock(&impl->mut);
int error = 0;
if(new_start > impl->end_pos) {
error = EDOM;
} else if(new_start > impl->start_pos) {
char * tmpfile = malloc(strlen(impl->filename)+2);
strcpy(tmpfile, impl->filename);
tmpfile[strlen(tmpfile)+1] = '\0';
tmpfile[strlen(tmpfile)] = '~';
int fd = open(tmpfile, impl->file_flags, impl->file_mode);
lseek(fd, 0, SEEK_SET);
lseek(impl->fd, new_start-impl->start_pos, SEEK_SET);
int count;
int buf_size = 1024 * 1024;
char * buf = malloc(buf_size);
while((count = read(impl->fd, buf, buf_size))) {
if(count == -1) {
error = errno;
perror("truncate failed to read");
}
ssize_t bytes_written = 0;
while(bytes_written < count) {
ssize_t write_ret = write(fd, buf+bytes_written, count-bytes_written);
if(write_ret == -1) {
error = errno;
perror("truncate failed to write");
break;
}
bytes_written += write_ret;
}
if(error) break;
}
free(buf);
if(!error) {
if(-1 == close(impl->fd)) {
error = errno;
}
impl->fd = fd;
impl->start_pos = new_start;
fsync(impl->fd);
int rename_ret = rename(tmpfile, impl->filename);
if(rename_ret) {
error = errno;
}
free(tmpfile);
} else {
close(fd);
}
}
pthread_mutex_unlock(&impl->mut);
return error;
}
struct stasis_handle_t file_func = {
.num_copies = file_num_copies,
.num_copies_buffer = file_num_copies_buffer,
.close = file_close,
.start_position = file_start_position,
.end_position = file_end_position,
.write = file_write,
.append = file_append,
.write_buffer = file_write_buffer,
.append_buffer = file_append_buffer,
.release_write_buffer = file_release_write_buffer,
.read = file_read,
.read_buffer = file_read_buffer,
.release_read_buffer = file_release_read_buffer,
.truncate_start = file_truncate_start,
.error = 0
};
stasis_handle_t * stasis_handle(open_file)(char * filename, int flags, int mode) {
stasis_handle_t * ret = malloc(sizeof(stasis_handle_t));
if(!ret) { return NULL; }
*ret = file_func;
file_impl * impl = malloc(sizeof(file_impl));
ret->impl = impl;
pthread_mutex_init(&(impl->mut), 0);
impl->start_pos = 0;
impl->end_pos = 0;
impl->fd = open(filename, flags, mode);
if(impl->fd == -1) {
ret->error = errno;
}
impl->filename = strdup(filename);
impl->file_flags = flags;
impl->file_mode = mode;
return ret;
}

View file

@ -244,6 +244,7 @@ struct stasis_handle_t mem_func = {
stasis_handle_t * stasis_handle(open_memory)() {
stasis_handle_t * ret = malloc(sizeof(stasis_handle_t));
if(!ret) { return NULL; }
*ret = mem_func;
mem_impl * impl = malloc(sizeof(mem_impl));

View file

@ -44,6 +44,10 @@ terms specified in this license.
#include <check.h>
#include <assert.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include "../check_includes.h"
#define LOG_NAME "check_io.log"
@ -72,7 +76,8 @@ void handle_smoketest(stasis_handle_t * h) {
assert(! h->write(h, 0, (byte*)&one, sizeof(int)));
int one_read = 0;
assert(! h->read(h, 0, (byte*)&one_read, sizeof(int)));
int ret = h->read(h, 0, (byte*)&one_read, sizeof(int));
assert(!ret);
assert(one_read == one);
stasis_write_buffer_t * w = h->write_buffer(h, sizeof(int), sizeof(int));
*((int*)(w->buf)) = two;
@ -211,7 +216,7 @@ void load_handle(thread_arg* t) {
switch(choice) {
case 0: { // read
int j;
int j = -1;
int ret = h->read(h, offsets[val], (byte*)&j, sizeof(int));
if(!ret) {
assert(j == t->values[val]);
@ -263,6 +268,7 @@ void load_handle(thread_arg* t) {
void handle_sequentialtest(stasis_handle_t * h) {
time_t seed = time(0);
//time_t seed = 0;
printf("\nSeed = %ld\n", seed);
srandom(seed);
@ -328,6 +334,29 @@ START_TEST(io_memoryTest) {
h->close(h);
} END_TEST
START_TEST(io_fileTest) {
stasis_handle_t * h;
h = stasis_handle(open_file)("logfile.txt", O_CREAT | O_RDWR, FILE_PERM);
// h = stasis_handle(open_debug)(h);
handle_smoketest(h);
h->close(h);
unlink("logfile.txt");
h = stasis_handle(open_file)("logfile.txt", O_CREAT | O_RDWR, FILE_PERM);
//h = stasis_handle(open_debug)(h);
handle_sequentialtest(h);
h->close(h);
unlink("logfile.txt");
h = stasis_handle(open_file)("logfile.txt", O_CREAT | O_RDWR, FILE_PERM);
handle_concurrencytest(h);
h->close(h);
unlink("logfile.txt");
} END_TEST
/**
Add suite declarations here
@ -339,7 +368,9 @@ Suite * check_suite(void) {
tcase_set_timeout(tc, 600); // ten minute timeout
/* Sub tests are added, one per line, here */
tcase_add_test(tc, io_memoryTest);
// tcase_add_test(tc, io_memoryTest);
tcase_add_test(tc, io_fileTest);
/* --------------------------------------------- */
tcase_add_checked_fixture(tc, setup, teardown);
suite_add_tcase(s, tc);