From 7b4cf402212ac252f96d3b485a6157eeb60536fe Mon Sep 17 00:00:00 2001 From: Sears Russell Date: Thu, 7 May 2009 08:52:06 +0000 Subject: [PATCH] refactored/created stasis_buffer_manager_open and stasis_handle_open --- src/stasis/CMakeLists.txt | 2 +- src/stasis/Makefile.am | 2 +- src/stasis/bufferManager.c | 12 +- src/stasis/bufferManager/bufferHash.c | 6 +- .../legacy/legacyBufferManager.c | 6 +- src/stasis/bufferManager/pageArray.c | 6 +- src/stasis/io/file.c | 178 +++++++++--------- src/stasis/io/handle.c | 115 +++++++++++ src/stasis/io/non_blocking.c | 12 +- src/stasis/io/pfile.c | 4 +- src/stasis/page/header.c | 14 +- src/stasis/transactional2.c | 127 ++----------- stasis/bufferManager.h | 10 +- stasis/bufferManager/bufferHash.h | 2 +- .../legacy/legacyBufferManager.h | 2 +- stasis/bufferManager/pageArray.h | 2 +- stasis/io/handle.h | 14 +- test/stasis/check_io.c | 6 +- 18 files changed, 272 insertions(+), 248 deletions(-) create mode 100644 src/stasis/io/handle.c diff --git a/src/stasis/CMakeLists.txt b/src/stasis/CMakeLists.txt index 7d363bc..4ac17b3 100644 --- a/src/stasis/CMakeLists.txt +++ b/src/stasis/CMakeLists.txt @@ -29,7 +29,7 @@ ADD_LIBRARY(stasis crc32.c redblack.c lhtable.c rw.c doubleLinkedList.c operations/pageOrientedListNTA.c operations/regions.c operations/lsmTree.c io/rangeTracker.c io/memory.c io/file.c io/pfile.c - io/non_blocking.c io/debug.c + io/non_blocking.c io/debug.c io/handle.c bufferManager/pageArray.c bufferManager/bufferHash.c replacementPolicy/lru.c replacementPolicy/lruFast.c) diff --git a/src/stasis/Makefile.am b/src/stasis/Makefile.am index 3076a4b..a0e8eee 100644 --- a/src/stasis/Makefile.am +++ b/src/stasis/Makefile.am @@ -22,7 +22,7 @@ libstasis_la_SOURCES=crc32.c redblack.c lhtable.c rw.c doubleLinkedList.c common operations/regions.c operations/lsmTree.c \ operations/lsnFreeSet.c \ io/rangeTracker.c io/memory.c io/file.c io/pfile.c io/non_blocking.c \ - io/debug.c \ + io/debug.c io/handle.c \ bufferManager.c \ bufferManager/pageArray.c \ bufferManager/bufferHash.c \ diff --git a/src/stasis/bufferManager.c b/src/stasis/bufferManager.c index bf6de6f..9c0eabd 100644 --- a/src/stasis/bufferManager.c +++ b/src/stasis/bufferManager.c @@ -169,8 +169,8 @@ void (*releasePageImpl)(Page * p) = 0; void (*writeBackPage)(Page * p) = 0; void (*forcePages)() = 0; void (*forcePageRange)(pageid_t start, pageid_t stop) = 0; -void (*bufDeinit)() = 0; -void (*simulateBufferManagerCrash)() = 0; +void (*stasis_buffer_manager_close)() = 0; +void (*stasis_buffer_manager_simulate_crash)() = 0; Page * loadPage(int xid, pageid_t pageid) { try_ret(NULL) { @@ -195,7 +195,7 @@ void releasePage(Page * p) { releasePageImpl(p); } -int bufInit(int type) { +int stasis_buffer_manager_open(int type) { bufferManagerType = type; static int lastType = 0; if(type == BUFFER_MANAGER_REOPEN) { @@ -203,13 +203,13 @@ int bufInit(int type) { } lastType = type; if(type == BUFFER_MANAGER_DEPRECATED_HASH) { - bufManBufInit(); + stasis_buffer_manager_deprecated_open(); return 0; } else if (type == BUFFER_MANAGER_MEM_ARRAY) { - paBufInit(); + stasis_buffer_manager_mem_array_open(); return 0; } else if (type == BUFFER_MANAGER_HASH) { - bhBufInit(); + stasis_buffer_manager_hash_open(); return 0; } else { // XXX error handling diff --git a/src/stasis/bufferManager/bufferHash.c b/src/stasis/bufferManager/bufferHash.c index 4545e6d..f06c9fb 100644 --- a/src/stasis/bufferManager/bufferHash.c +++ b/src/stasis/bufferManager/bufferHash.c @@ -376,7 +376,7 @@ static void bhSimulateBufferManagerCrash() { stasis_buffer_pool_deinit(stasis_buffer_pool); } -void bhBufInit() { +void stasis_buffer_manager_hash_open() { assert(!running); @@ -390,8 +390,8 @@ void bhBufInit() { writeBackPage = bhWriteBackPage; forcePages = bhForcePages; forcePageRange = bhForcePageRange; - bufDeinit = bhBufDeinit; - simulateBufferManagerCrash = bhSimulateBufferManagerCrash; + stasis_buffer_manager_close = bhBufDeinit; + stasis_buffer_manager_simulate_crash = bhSimulateBufferManagerCrash; stasis_buffer_pool = stasis_buffer_pool_init(); diff --git a/src/stasis/bufferManager/legacy/legacyBufferManager.c b/src/stasis/bufferManager/legacy/legacyBufferManager.c index 82a74e9..6a320d5 100644 --- a/src/stasis/bufferManager/legacy/legacyBufferManager.c +++ b/src/stasis/bufferManager/legacy/legacyBufferManager.c @@ -27,7 +27,7 @@ static void bufManSimulateBufferManagerCrash(); static stasis_buffer_pool_t * stasis_buffer_pool; -int bufManBufInit() { +int stasis_buffer_manager_deprecated_open() { releasePageImpl = bufManReleasePage; loadPageImpl = bufManLoadPage; @@ -35,8 +35,8 @@ int bufManBufInit() { writeBackPage = pageWrite; forcePages = forcePageFile; forcePageRange = forceRangePageFile; - bufDeinit = bufManBufDeinit; - simulateBufferManagerCrash = bufManSimulateBufferManagerCrash; + stasis_buffer_manager_close = bufManBufDeinit; + stasis_buffer_manager_simulate_crash = bufManSimulateBufferManagerCrash; stasis_buffer_pool = stasis_buffer_pool_init(); diff --git a/src/stasis/bufferManager/pageArray.c b/src/stasis/bufferManager/pageArray.c index c7e156c..0ce38c3 100644 --- a/src/stasis/bufferManager/pageArray.c +++ b/src/stasis/bufferManager/pageArray.c @@ -57,14 +57,14 @@ static void paBufDeinit() { } } -void paBufInit () { +void stasis_buffer_manager_mem_array_open () { releasePageImpl = paReleasePage; loadPageImpl = paLoadPage; writeBackPage = paWriteBackPage; forcePages = paForcePages; - bufDeinit = paBufDeinit; - simulateBufferManagerCrash = paBufDeinit; + stasis_buffer_manager_close = paBufDeinit; + stasis_buffer_manager_simulate_crash = paBufDeinit; pageCount = 0; pageMap = 0; diff --git a/src/stasis/io/file.c b/src/stasis/io/file.c index c396fb8..2e78f0e 100644 --- a/src/stasis/io/file.c +++ b/src/stasis/io/file.c @@ -18,7 +18,7 @@ /** @file */ -typedef struct file_impl { +typedef struct file_impl { pthread_mutex_t mut; lsn_t start_pos; lsn_t end_pos; @@ -31,7 +31,7 @@ typedef struct file_impl { static int updateEOF(stasis_handle_t * h) { file_impl * impl = h->impl; off_t pos = lseek(impl->fd, 0, SEEK_END); - if(pos == (off_t)-1) { + if(pos == (off_t)-1) { return errno; } else { impl->end_pos = impl->start_pos + pos; @@ -64,77 +64,77 @@ static lsn_t file_start_position(stasis_handle_t *h) { return ret; } -static lsn_t file_end_position(stasis_handle_t *h) { +static lsn_t file_end_position(stasis_handle_t *h) { file_impl * impl = (file_impl*)h->impl; pthread_mutex_lock(&(impl->mut)); int error = updateEOF(h); int ret; - if(error) { + if(error) { h->error = error; ret = -1; - } else { + } else { ret = impl->end_pos; } pthread_mutex_unlock(&(impl->mut)); return ret; } -inline static int file_write_unlocked(stasis_handle_t * h, lsn_t off, - const byte * dat, lsn_t len) { +inline static int file_write_unlocked(stasis_handle_t * h, lsn_t off, + const byte * dat, lsn_t len) { file_impl * impl = (file_impl*)h->impl; int error = 0; // These should have been checked by the caller. - assert(impl->start_pos <= off); + assert(impl->start_pos <= off); assert(impl->end_pos >= off+len); // @todo need a test harness that gets read(), write() and lseek() to misbehave. off_t lseek_offset = lseek(impl->fd, off - impl->start_pos, SEEK_SET); - if(lseek_offset == (off_t)-1) { + if(lseek_offset == (off_t)-1) { error = errno; if(error == EBADF || error == ESPIPE) { h->error = error; error = EBADF; } - } else { + } else { ssize_t bytes_written = 0; - + // seek succeeded, so attempt to write. - while(bytes_written < len) { + while(bytes_written < len) { ssize_t ret = write(impl->fd, - dat+bytes_written, + dat+bytes_written, len-bytes_written); if(ret == -1) { if(errno == EAGAIN || errno == EINTR) { // EAGAIN could be returned if the file handle was opened in non-blocking mode. // we should probably warn instead of spinning. - + // EINTR is returned if the write is interrupted by a signal. // On Linux, it must have been interrupted before any bytes were written. // On SVr4 it may have been interrupted at any point. - + // Try again. lseek_offset = lseek(impl->fd, off + bytes_written - impl->start_pos, SEEK_SET); - if(lseek_offset == (off_t)-1) { + if(lseek_offset == (off_t)-1) { error = errno; - if(error == EBADF || error == ESPIPE) { + if(error == EBADF || error == ESPIPE) { h->error = error; error = EBADF; } break; - } + } ret = 0; - - } else { + + } else { // Need to set h->error if an unrecoverable error occured. // The only unrecoverable error is EBADF. (EINVAL could be // the caller's fault if O_DIRECT is being used. Otherwise, // it is unrecoverable. - if(errno == EBADF) { + if(errno == EBADF) { h->error = EBADF; } error = errno; @@ -148,13 +148,13 @@ inline static int file_write_unlocked(stasis_handle_t * h, lsn_t off, return error; } -inline static void print_eof_error(char * file, int line) { +inline static void print_eof_error(char * file, int line) { fprintf(stderr, "%s:%d Internal error: attempt to access negative offset, or beyond EOF.\n", file, line); fflush(stderr); } -static int file_read(stasis_handle_t * h, - lsn_t off, byte * buf, lsn_t len) { +static int file_read(stasis_handle_t * h, + lsn_t off, byte * buf, lsn_t len) { file_impl * impl = (file_impl*)(h->impl); pthread_mutex_lock(&(impl->mut)); int error = 0; @@ -167,61 +167,61 @@ static int file_read(stasis_handle_t * h, } } - if(!error) { + if(!error) { off_t lseek_offset = lseek(impl->fd, off - impl->start_pos, SEEK_SET); - - if(lseek_offset == (off_t)-1) { + + if(lseek_offset == (off_t)-1) { error = errno; if(error == EBADF || error == ESPIPE) { h->error = error; error = EBADF; - } else if(error == EINVAL) { + } else if(error == EINVAL) { print_eof_error(__FILE__, __LINE__); } - } else { + } else { ssize_t bytes_written = 0; // seek succeeded, so attempt to read. - while(bytes_written < len) { + while(bytes_written < len) { ssize_t ret = read(impl->fd, - buf+bytes_written, + buf+bytes_written, len-bytes_written); if(ret == -1) { if(errno == EAGAIN || errno == EINTR) { // EAGAIN could be returned if the file handle was opened in non-blocking mode. // we should probably warn instead of spinning. - + // EINTR is returned if the write is interrupted by a signal. // On Linux, it must have been interrupted before any bytes were written. // On SVr4 it may have been interrupted at any point. - + // Try again. lseek_offset = lseek(impl->fd, off + bytes_written - impl->start_pos, SEEK_SET); - if(lseek_offset == (off_t)-1) { + if(lseek_offset == (off_t)-1) { error = errno; - if(error == EBADF || error == ESPIPE) { + if(error == EBADF || error == ESPIPE) { h->error = error; error = EBADF; - } else if(error == EINVAL) { + } else if(error == EINVAL) { print_eof_error(__FILE__, __LINE__); } break; - } - + } + ret = 0; - - } else { + + } else { // Need to set h->error if an unrecoverable error occured. // The only unrecoverable error is EBADF. (EINVAL could be // the caller's fault if O_DIRECT is being used. Otherwise, // it is unrecoverable. - if(errno == EBADF) { + if(errno == EBADF) { h->error = EBADF; } error = errno; break; } - } else if (ret == 0) { + } else if (ret == 0) { // EOF (!) print_eof_error(__FILE__, __LINE__); error = EINVAL; @@ -240,12 +240,12 @@ static int file_write(stasis_handle_t *h, lsn_t off, const byte * dat, lsn_t len file_impl * impl = (file_impl*)(h->impl); pthread_mutex_lock(&(impl->mut)); int error = 0; - if(impl->start_pos > off) { - error = EDOM; + if(impl->start_pos > off) { + error = EDOM; } - if(!error) { - if(impl->end_pos < off+len){ + if(!error) { + if(impl->end_pos < off+len){ impl->end_pos = off+len; } error = file_write_unlocked(h, off, dat, len); @@ -254,7 +254,7 @@ static int file_write(stasis_handle_t *h, lsn_t off, const byte * dat, lsn_t len return error; } -static int file_append(stasis_handle_t * h, lsn_t * off, const byte * dat, lsn_t len) { +static int file_append(stasis_handle_t * h, lsn_t * off, const byte * dat, lsn_t len) { file_impl * impl = (file_impl*)(h->impl); pthread_mutex_lock(&impl->mut); updateEOF(h); @@ -267,35 +267,35 @@ static int file_append(stasis_handle_t * h, lsn_t * off, const byte * dat, lsn_t -static stasis_write_buffer_t * file_write_buffer(stasis_handle_t * h, - lsn_t off, lsn_t len) { +static stasis_write_buffer_t * file_write_buffer(stasis_handle_t * h, + lsn_t off, lsn_t len) { // Allocate the handle stasis_write_buffer_t * ret = malloc(sizeof(stasis_write_buffer_t)); if(!ret) { return NULL; } - + file_impl * impl = (file_impl*)h->impl; int error = 0; pthread_mutex_lock(&(impl->mut)); - if(impl->start_pos > off) { + if(impl->start_pos > off) { error = EDOM; } - if(off + len > impl->end_pos) { + if(off + len > impl->end_pos) { impl->end_pos = off+len; } pthread_mutex_unlock(&(impl->mut)); - + byte * buf; - if(!error) { + if(!error) { // Allocate the buffer buf = malloc(len); - if(!buf) { + if(!buf) { error = ENOMEM; } } - if(error) { + if(error) { ret->h = h; ret->off = 0; ret->buf = 0; @@ -314,7 +314,7 @@ static stasis_write_buffer_t * file_write_buffer(stasis_handle_t * h, return ret; } -static stasis_write_buffer_t * file_append_buffer(stasis_handle_t * h, +static stasis_write_buffer_t * file_append_buffer(stasis_handle_t * h, lsn_t len) { // Allocate the handle stasis_write_buffer_t * ret = malloc(sizeof(stasis_write_buffer_t)); @@ -328,15 +328,15 @@ static stasis_write_buffer_t * file_append_buffer(stasis_handle_t * h, off_t off = impl->end_pos; impl->end_pos += len; pthread_mutex_unlock(&(impl->mut)); - + // Allocate the buffer byte * buf = malloc(len); int error = 0; - if(!buf) { + if(!buf) { error = ENOMEM; } - if(error) { + if(error) { ret->h = h; ret->off = 0; ret->buf = 0; @@ -354,18 +354,18 @@ static stasis_write_buffer_t * file_append_buffer(stasis_handle_t * h, return ret; } -static int file_release_write_buffer(stasis_write_buffer_t * w) { +static int file_release_write_buffer(stasis_write_buffer_t * w) { file_impl * impl = (file_impl*)(w->h->impl); pthread_mutex_lock(&(impl->mut)); int error = 0; - if(impl->end_pos < w->off + w->len || - impl->start_pos > w->off) { + if(impl->end_pos < w->off + w->len || + impl->start_pos > w->off) { error = EDOM; } - - if(!error) { + + if(!error) { // Call write(). error = file_write_unlocked(w->h, w->off, w->buf, w->len); } @@ -376,20 +376,20 @@ static int file_release_write_buffer(stasis_write_buffer_t * w) { } static stasis_read_buffer_t * file_read_buffer(stasis_handle_t * h, - lsn_t off, lsn_t len) { + lsn_t off, lsn_t len) { stasis_read_buffer_t * ret = malloc(sizeof(stasis_read_buffer_t)); if(!ret) { return NULL; } - + byte * buf = malloc(len); int error = 0; if(!buf) { error = ENOMEM; } - if(!error) { + if(!error) { error = file_read(h, off, buf, len); } - if(error) { + if(error) { ret->h = h; ret->buf = 0; ret->off = 0; @@ -397,7 +397,7 @@ static stasis_read_buffer_t * file_read_buffer(stasis_handle_t * h, ret->impl = 0; ret->error = error; if(buf) { free(buf); } - } else { + } else { ret->h = h; ret->buf = buf; ret->off = off; @@ -405,9 +405,9 @@ static stasis_read_buffer_t * file_read_buffer(stasis_handle_t * h, ret->impl = 0; ret->error = 0; } - return ret; + return ret; } -static int file_release_read_buffer(stasis_read_buffer_t * r) { +static int file_release_read_buffer(stasis_read_buffer_t * r) { free((void*)r->buf); free(r); return 0; @@ -450,7 +450,7 @@ static int file_force_range(stasis_handle_t *h, lsn_t start, lsn_t stop) { pthread_mutex_lock(&impl->mut); int fd = impl->fd; lsn_t off = impl->start_pos; - (void)off; + (void)off; pthread_mutex_unlock(&impl->mut); { static int warned = 0; @@ -489,40 +489,40 @@ static int file_force_range(stasis_handle_t *h, lsn_t start, lsn_t stop) { } return ret; } -static int file_truncate_start(stasis_handle_t * h, lsn_t new_start) { +static int file_truncate_start(stasis_handle_t * h, lsn_t new_start) { file_impl * impl = h->impl; pthread_mutex_lock(&impl->mut); int error = 0; - if(new_start > impl->end_pos) { + if(new_start > impl->end_pos) { updateEOF(h); - if(!error && new_start > impl->end_pos) { + if(!error && new_start > impl->end_pos) { error = EDOM; } } - if(!error && new_start > impl->start_pos) { + if(!error && new_start > impl->start_pos) { char * tmpfile = malloc(strlen(impl->filename)+2); strcpy(tmpfile, impl->filename); tmpfile[strlen(tmpfile)+1] = '\0'; tmpfile[strlen(tmpfile)] = '~'; int fd = open(tmpfile, impl->file_flags, impl->file_mode); - + lseek(fd, 0, SEEK_SET); lseek(impl->fd, new_start-impl->start_pos, SEEK_SET); int count; int buf_size = 1024 * 1024; char * buf = malloc(buf_size); - while((count = read(impl->fd, buf, buf_size))) { - if(count == -1) { + while((count = read(impl->fd, buf, buf_size))) { + if(count == -1) { error = errno; perror("truncate failed to read"); } ssize_t bytes_written = 0; - while(bytes_written < count) { + while(bytes_written < count) { ssize_t write_ret = write(fd, buf+bytes_written, count-bytes_written); - if(write_ret == -1) { + if(write_ret == -1) { error = errno; perror("truncate failed to write"); break; @@ -532,23 +532,23 @@ static int file_truncate_start(stasis_handle_t * h, lsn_t new_start) { if(error) break; } free(buf); - if(!error) { - if(-1 == close(impl->fd)) { + if(!error) { + if(-1 == close(impl->fd)) { error = errno; } impl->fd = fd; impl->start_pos = new_start; fsync(impl->fd); int rename_ret = rename(tmpfile, impl->filename); - if(rename_ret) { + if(rename_ret) { error = errno; } free(tmpfile); - } else { + } else { close(fd); } } - + pthread_mutex_unlock(&impl->mut); return error; } @@ -573,7 +573,7 @@ struct stasis_handle_t file_func = { .error = 0 }; -stasis_handle_t * stasis_handle(open_file)(lsn_t start_offset, char * filename, int flags, int mode) { +stasis_handle_t * stasis_handle(open_file)(lsn_t start_offset, const char * filename, int flags, int mode) { stasis_handle_t * ret = malloc(sizeof(stasis_handle_t)); if(!ret) { return NULL; } *ret = file_func; @@ -585,7 +585,7 @@ stasis_handle_t * stasis_handle(open_file)(lsn_t start_offset, char * filename, impl->end_pos = start_offset; assert(sizeof(off_t) >= (64/8)); impl->fd = open(filename, flags, mode); - if(impl->fd == -1) { + if(impl->fd == -1) { ret->error = errno; } impl->filename = strdup(filename); diff --git a/src/stasis/io/handle.c b/src/stasis/io/handle.c new file mode 100644 index 0000000..20117dc --- /dev/null +++ b/src/stasis/io/handle.c @@ -0,0 +1,115 @@ +/* + * handle.c + * + * Created on: May 7, 2009 + * Author: sears + */ +#include +#include +#include +#include + +#include + + +// @todo this factory stuff doesn't really belong here... +static stasis_handle_t * fast_factory(lsn_t off, lsn_t len, void * ignored) { + stasis_handle_t * h = stasis_handle(open_memory)(off); + //h = stasis_handle(open_debug)(h); + stasis_write_buffer_t * w = h->append_buffer(h, len); + w->h->release_write_buffer(w); + return h; +} +typedef struct sf_args { + const char * filename; + int openMode; + int filePerm; +} sf_args; +static stasis_handle_t * slow_file_factory(void * argsP) { + sf_args * args = (sf_args*) argsP; + stasis_handle_t * h = stasis_handle(open_file)(0, args->filename, args->openMode, args->filePerm); + //h = stasis_handle(open_debug)(h); + return h; +} +static void slow_file_factory_close(void * argsP) { + // nop +} +static stasis_handle_t * slow_pfile_factory(void * argsP) { + stasis_handle_t * h = argsP; + return h; +} +static void slow_pfile_factory_close(void * argsP) { + stasis_handle_t * h = argsP; + h->close(h); +} +static int (*slow_close)(stasis_handle_t * h) = 0; +static stasis_handle_t * slow_pfile = 0; +static int nop_close(stasis_handle_t*h) { return 0; } + +stasis_handle_t * stasis_handle(open)(const char * path) { +#ifndef HAVE_O_DIRECT + if(bufferManagerO_DIRECT) { + printf("O_DIRECT not supported by this build; switching to conventional buffered I/O.\n"); + bufferManagerO_DIRECT = 0; + } +#endif + int openMode; + if(bufferManagerO_DIRECT) { +#ifdef HAVE_O_DIRECT + openMode = O_CREAT | O_RDWR | O_DIRECT; +#else + printf("Can't happen\n"); + abort(); +#endif + } else { + openMode = O_CREAT | O_RDWR; + } + stasis_handle_t * ret; + /// @todo remove hardcoding of buffer manager implementations in transactional2.c + + switch(bufferManagerFileHandleType) { + case BUFFER_MANAGER_FILE_HANDLE_NON_BLOCKING: { + struct sf_args * slow_arg = malloc(sizeof(sf_args)); + slow_arg->filename = path; + + slow_arg->openMode = openMode; + + slow_arg->filePerm = FILE_PERM; + // Allow 4MB of outstanding writes. + // @todo Where / how should we open storefile? + int worker_thread_count = 4; + if(bufferManagerNonBlockingSlowHandleType == IO_HANDLE_PFILE) { + // printf("\nusing pread()/pwrite()\n"); + slow_pfile = stasis_handle_open_pfile(0, slow_arg->filename, slow_arg->openMode, slow_arg->filePerm); + slow_close = slow_pfile->close; + slow_pfile->close = nop_close; + ret = + stasis_handle(open_non_blocking)(slow_pfile_factory, slow_pfile_factory_close, slow_pfile, 1, fast_factory, + NULL, worker_thread_count, PAGE_SIZE * 1024 , 1024); + + } else if(bufferManagerNonBlockingSlowHandleType == IO_HANDLE_FILE) { + ret = + stasis_handle(open_non_blocking)(slow_file_factory, slow_file_factory_close, slow_arg, 0, fast_factory, + NULL, worker_thread_count, PAGE_SIZE * 1024, 1024); + } else { + printf("Unknown value for config option bufferManagerNonBlockingSlowHandleType\n"); + abort(); + } + } break; + case BUFFER_MANAGER_FILE_HANDLE_FILE: { + ret = stasis_handle_open_file(0, path, openMode, FILE_PERM); + } break; + case BUFFER_MANAGER_FILE_HANDLE_PFILE: { + ret = stasis_handle_open_pfile(0, path, openMode, FILE_PERM); + } break; + case BUFFER_MANAGER_FILE_HANDLE_DEPRECATED: { + assert(bufferManagerFileHandleType != BUFFER_MANAGER_FILE_HANDLE_DEPRECATED); + } break; + default: { + printf("\nUnknown buffer manager filehandle type: %d\n", + bufferManagerFileHandleType); + abort(); + } + } + return ret; +} diff --git a/src/stasis/io/non_blocking.c b/src/stasis/io/non_blocking.c index 1d8690c..b8fef9b 100644 --- a/src/stasis/io/non_blocking.c +++ b/src/stasis/io/non_blocking.c @@ -145,6 +145,7 @@ typedef struct nbw_impl { // Fields to manage slow handles stasis_handle_t * (*slow_factory)(void * arg); + void (*slow_factory_close)(void * arg); void * slow_factory_arg; int slow_force_once; @@ -245,7 +246,7 @@ static inline const tree_node * allocFastHandle(nbw_impl * impl, lsn_t off, DEBUG("Blocking on write. %lld bytes (%lld max)\n", impl->used_buffer_size, impl->max_buffer_size); } - + pthread_mutex_unlock(&impl->mut); np->dirty = INVALID_NODE; @@ -362,6 +363,10 @@ static int nbw_close(stasis_handle_t * h) { free(impl->all_slow_handles); assert(impl->available_slow_handle_count == 0); + if(impl->slow_factory_close) { + impl->slow_factory_close(impl->slow_factory_arg); + } + free(h->impl); free(h); return 0; @@ -777,7 +782,9 @@ static void * nbw_worker(void * handle) { } stasis_handle_t * stasis_handle(open_non_blocking) - (stasis_handle_t * (*slow_factory)(void * arg), void * slow_factory_arg, + (stasis_handle_t * (*slow_factory)(void * arg), + void (*slow_factory_close)(void * arg), + void * slow_factory_arg, int slow_force_once, stasis_handle_t * (*fast_factory)(lsn_t, lsn_t, void *), void * fast_factory_arg, int worker_thread_count, lsn_t buffer_size, @@ -789,6 +796,7 @@ stasis_handle_t * stasis_handle(open_non_blocking) impl->end_pos = 0; impl->slow_factory = slow_factory; + impl->slow_factory_close = slow_factory_close; impl->slow_factory_arg = slow_factory_arg; impl->slow_force_once = slow_force_once; diff --git a/src/stasis/io/pfile.c b/src/stasis/io/pfile.c index 47c8e1e..ba0e643 100644 --- a/src/stasis/io/pfile.c +++ b/src/stasis/io/pfile.c @@ -167,7 +167,7 @@ static int pfile_read(stasis_handle_t *h, lsn_t off, byte *buf, lsn_t len) { int err = errno; // XXX Why is sys_errlist[] is unavailable here? perror("pfile_read encountered an unknown error code."); - fprintf(stderr, "pread() returned -1; errno is %d\n",err); + fprintf(stderr, "pread() returned -1; errno is %d\n",err); abort(); // XXX other errors? } error = errno; @@ -442,7 +442,7 @@ struct stasis_handle_t pfile_func = { }; stasis_handle_t *stasis_handle(open_pfile)(lsn_t start_offset, - char *filename, + const char *filename, int flags, int mode) { stasis_handle_t *ret = malloc(sizeof(stasis_handle_t)); if (!ret) { return NULL; } diff --git a/src/stasis/page/header.c b/src/stasis/page/header.c index f08c0e2..79023e6 100644 --- a/src/stasis/page/header.c +++ b/src/stasis/page/header.c @@ -1,26 +1,26 @@ -#include "../page.h" -#include "header.h" +#include +#include #include /** - @file header.c is dead code(?) + @file header.c is dead code(?) @todo Delete header.c */ int headerPageInitialize() { Page * p; - try_ret(0) { + try_ret(0) { p = loadPage(-1, 0); assert(!compensation_error()); } end_ret(0); int freePage; - if(*page_type_ptr(p) != LLADD_HEADER_PAGE) { + if(*page_type_ptr(p) != LLADD_HEADER_PAGE) { assert(*page_type_ptr(p) == 0) ; memset(p->memAddr, 0, PAGE_SIZE); stasis_page_cleanup(p); *page_type_ptr(p) = LLADD_HEADER_PAGE; *headerFreepage_ptr(p) = 1; - *headerFreepagelist_ptr(p) = 0; + *headerFreepagelist_ptr(p) = 0; } - + freePage = *headerFreepage_ptr(p); releasePage(p); assert(freePage); diff --git a/src/stasis/transactional2.c b/src/stasis/transactional2.c index 04c654e..b9d7b67 100644 --- a/src/stasis/transactional2.c +++ b/src/stasis/transactional2.c @@ -1,7 +1,4 @@ #include -#include -#include -#include #include #include @@ -24,7 +21,7 @@ #include #include #include // XXX remove this, move Tread() to set.c -#include +//#include #include #include @@ -53,33 +50,6 @@ void stasis_transaction_table_init() { } } -// @todo this factory stuff doesn't really belong here... -static stasis_handle_t * fast_factory(lsn_t off, lsn_t len, void * ignored) { - stasis_handle_t * h = stasis_handle(open_memory)(off); - //h = stasis_handle(open_debug)(h); - stasis_write_buffer_t * w = h->append_buffer(h, len); - w->h->release_write_buffer(w); - return h; -} -typedef struct sf_args { - char * filename; - int openMode; - int filePerm; -} sf_args; -static stasis_handle_t * slow_file_factory(void * argsP) { - sf_args * args = (sf_args*) argsP; - stasis_handle_t * h = stasis_handle(open_file)(0, args->filename, args->openMode, args->filePerm); - //h = stasis_handle(open_debug)(h); - return h; -} -static stasis_handle_t * slow_pfile_factory(void * argsP) { - stasis_handle_t * h = argsP; - return h; -} -static int (*slow_close)(stasis_handle_t * h) = 0; -static stasis_handle_t * slow_pfile = 0; -static int nop_close(stasis_handle_t*h) { return 0; } - int Tinit() { pthread_mutex_init(&stasis_transaction_table_mutex, NULL); stasis_initted = 1; @@ -101,81 +71,15 @@ int Tinit() { } stasis_page_init(); -#ifndef HAVE_O_DIRECT - if(bufferManagerO_DIRECT) { - printf("O_DIRECT not supported by this build; switching to conventional buffered I/O.\n"); - bufferManagerO_DIRECT = 0; - } -#endif - int openMode; - if(bufferManagerO_DIRECT) { -#ifdef HAVE_O_DIRECT - openMode = O_CREAT | O_RDWR | O_DIRECT; -#else - printf("Can't happen\n"); - abort(); -#endif + if(bufferManagerFileHandleType == BUFFER_MANAGER_FILE_HANDLE_DEPRECATED) { + printf("\nWarning: Using old I/O routines (with known bugs).\n"); + openPageFile(); } else { - openMode = O_CREAT | O_RDWR; + stasis_handle_t * h = stasis_handle_open(stasis_store_file_name); + // XXX should not be global. + pageHandleOpen(h); } - - /// @todo remove hardcoding of buffer manager implementations in transactional2.c - - switch(bufferManagerFileHandleType) { - case BUFFER_MANAGER_FILE_HANDLE_NON_BLOCKING: { - struct sf_args * slow_arg = malloc(sizeof(sf_args)); - slow_arg->filename = stasis_store_file_name; - - slow_arg->openMode = openMode; - - slow_arg->filePerm = FILE_PERM; - // Allow 4MB of outstanding writes. - // @todo Where / how should we open storefile? - stasis_handle_t * pageFile; - int worker_thread_count = 4; - if(bufferManagerNonBlockingSlowHandleType == IO_HANDLE_PFILE) { - // printf("\nusing pread()/pwrite()\n"); - slow_pfile = stasis_handle_open_pfile(0, slow_arg->filename, slow_arg->openMode, slow_arg->filePerm); - slow_close = slow_pfile->close; - slow_pfile->close = nop_close; - pageFile = - stasis_handle(open_non_blocking)(slow_pfile_factory, slow_pfile, 1, fast_factory, - NULL, worker_thread_count, PAGE_SIZE * 1024 , 1024); - - } else if(bufferManagerNonBlockingSlowHandleType == IO_HANDLE_FILE) { - pageFile = - stasis_handle(open_non_blocking)(slow_file_factory, slow_arg, 0, fast_factory, - NULL, worker_thread_count, PAGE_SIZE * 1024, 1024); - } else { - printf("Unknown value for config option bufferManagerNonBlockingSlowHandleType\n"); - abort(); - } - //pageFile = stasis_handle(open_debug)(pageFile); - pageHandleOpen(pageFile); - } break; - case BUFFER_MANAGER_FILE_HANDLE_FILE: { - stasis_handle_t * pageFile = - stasis_handle_open_file(0, stasis_store_file_name, - openMode, FILE_PERM); - pageHandleOpen(pageFile); - } break; - case BUFFER_MANAGER_FILE_HANDLE_PFILE: { - stasis_handle_t * pageFile = - stasis_handle_open_pfile(0, stasis_store_file_name, - openMode, FILE_PERM); - pageHandleOpen(pageFile); - } break; - case BUFFER_MANAGER_FILE_HANDLE_DEPRECATED: { - printf("\nWarning: Using old I/O routines (with known bugs).\n"); - openPageFile(); - } break; - default: { - printf("\nUnknown buffer manager filehandle type: %d\n", - bufferManagerFileHandleType); - abort(); - } - } - bufInit(bufferManagerType); + stasis_buffer_manager_open(bufferManagerType); DEBUG("Buffer manager type = %d\n", bufferManagerType); pageOperationsInit(); TallocInit(); @@ -441,14 +345,9 @@ int Tdeinit() { stasis_truncation_deinit(); TnaiveHashDeinit(); TallocDeinit(); - bufDeinit(); + stasis_buffer_manager_close(); DEBUG("Closing page file tdeinit\n"); closePageFile(); - if(slow_pfile) { - slow_close(slow_pfile); - slow_pfile = 0; - slow_close = 0; - } stasis_page_deinit(); stasis_log_file->close(stasis_log_file); dirtyPagesDeinit(); @@ -464,12 +363,8 @@ int TuncleanShutdown() { stasis_suppress_unclean_shutdown_warnings = 1; stasis_truncation_deinit(); TnaiveHashDeinit(); - simulateBufferManagerCrash(); - if(slow_pfile) { - slow_close(slow_pfile); - slow_pfile = 0; - slow_close = 0; - } + stasis_buffer_manager_simulate_crash(); + // XXX: closePageFile? stasis_page_deinit(); stasis_log_file->close(stasis_log_file); stasis_transaction_table_num_active = 0; diff --git a/stasis/bufferManager.h b/stasis/bufferManager.h index 396f073..8aa15fc 100644 --- a/stasis/bufferManager.h +++ b/stasis/bufferManager.h @@ -99,7 +99,7 @@ Page * loadPage(int xid, pageid_t pageid); Page * loadUninitializedPage(int xid, pageid_t pageid); /** - This is the function pointer that bufInit sets in order to + This is the function pointer that stasis_buffer_manager_open sets in order to override loadPage. */ extern Page * (*loadPageImpl)(int xid, pageid_t pageid); @@ -111,7 +111,7 @@ extern Page * (*loadUninitPageImpl)(int xid, pageid_t pageid); void releasePage(Page *p); /** - This is the function pointer that bufInit sets in order to + This is the function pointer that stasis_buffer_manager_open sets in order to override releasePage. */ extern void (*releasePageImpl)(Page * p); @@ -146,14 +146,14 @@ extern void (*forcePages)(); This does not force page that have not been written to with pageWrite(). */ extern void (*forcePageRange)(pageid_t start, pageid_t stop); -extern void (*simulateBufferManagerCrash)(); +extern void (*stasis_buffer_manager_simulate_crash)(); -int bufInit(int type); +int stasis_buffer_manager_open(int type); /** * will write out any dirty pages, assumes that there are no running * transactions */ -extern void (*bufDeinit)(); +extern void (*stasis_buffer_manager_close)(); #ifdef PROFILE_LATCHES_WRITE_ONLY #define loadPage(x,y) __profile_loadPage((x), (y), __FILE__, __LINE__) diff --git a/stasis/bufferManager/bufferHash.h b/stasis/bufferManager/bufferHash.h index efb3bb8..f161221 100644 --- a/stasis/bufferManager/bufferHash.h +++ b/stasis/bufferManager/bufferHash.h @@ -1 +1 @@ -void bhBufInit(); +void stasis_buffer_manager_hash_open(); diff --git a/stasis/bufferManager/legacy/legacyBufferManager.h b/stasis/bufferManager/legacy/legacyBufferManager.h index 73426c2..0e8afd0 100644 --- a/stasis/bufferManager/legacy/legacyBufferManager.h +++ b/stasis/bufferManager/legacy/legacyBufferManager.h @@ -1,4 +1,4 @@ #ifndef __STASIS_BUFFERMANAGER_LEGACY_LEGACYBUFFERMANAGER_H #define __STASIS_BUFFERMANAGER_LEGACY_LEGACYBUFFERMANAGER_H -int bufManBufInit(); +int stasis_buffer_manager_deprecated_open(); #endif//__STASIS_BUFFERMANAGER_LEGACY_LEGACYBUFFERMANAGER_H diff --git a/stasis/bufferManager/pageArray.h b/stasis/bufferManager/pageArray.h index 95f7a2d..601c05c 100644 --- a/stasis/bufferManager/pageArray.h +++ b/stasis/bufferManager/pageArray.h @@ -1 +1 @@ -void paBufInit(); +void stasis_buffer_manager_mem_array_open(); diff --git a/stasis/io/handle.h b/stasis/io/handle.h index 7ae03de..aa487f9 100644 --- a/stasis/io/handle.h +++ b/stasis/io/handle.h @@ -15,7 +15,7 @@ /** - @file + @file Interface for I/O handle implementations. @@ -263,7 +263,7 @@ stasis_handle_t * stasis_handle(open_memory)(lsn_t start_offset); @param perm The file permissions to be passed to open() */ stasis_handle_t * stasis_handle(open_file) - (lsn_t start_offset, char * path, int flags, int perm); + (lsn_t start_offset, const char * path, int flags, int perm); /** Open a handle that is backed by a file. This handle uses pread() and pwrite(). It never holds a mutex while perfoming I/O. @@ -278,7 +278,7 @@ stasis_handle_t * stasis_handle(open_file) @param perm The file permissions to be passed to open() */ stasis_handle_t * stasis_handle(open_pfile) - (lsn_t start_offset, char * path, int flags, int perm); + (lsn_t start_offset, const char * path, int flags, int perm); /** Given a factory for creating "fast" and "slow" handles, provide a handle that never makes callers wait for write requests to @@ -326,7 +326,9 @@ stasis_handle_t * stasis_handle(open_pfile) before blocking. */ stasis_handle_t * stasis_handle(open_non_blocking) - (stasis_handle_t * (*slow_factory)(void * arg), void * slow_factory_arg, + (stasis_handle_t * (*slow_factory)(void * arg), + void (*slow_factory_close)(void * arg), + void * slow_factory_arg, int slow_force_once, stasis_handle_t * (*fast_factory)(lsn_t off, lsn_t len, void * arg), void * fast_factory_arg, int worker_thread_count, lsn_t buffer_size, @@ -341,5 +343,9 @@ stasis_handle_t * stasis_handle(open_verifying)(stasis_handle_t * h); @param h All handle operations will be forwarded to h. */ stasis_handle_t * stasis_handle(open_debug)(stasis_handle_t * h); +/** + * Open a Stasis file handle using default arguments. + */ +stasis_handle_t * stasis_handle(open)(const char * path); #endif diff --git a/test/stasis/check_io.c b/test/stasis/check_io.c index f1103f9..a6bdc7c 100644 --- a/test/stasis/check_io.c +++ b/test/stasis/check_io.c @@ -426,7 +426,7 @@ START_TEST(io_nonBlockingTest) { FILE_PERM }; - h = stasis_handle(open_non_blocking)(slow_factory, &slow_args, 0, + h = stasis_handle(open_non_blocking)(slow_factory, 0, &slow_args, 0, fast_factory, 0, 5, 1024*1024, 100); // h = stasis_handle(open_debug)(h); @@ -435,7 +435,7 @@ START_TEST(io_nonBlockingTest) { unlink("logfile.txt"); - h = stasis_handle(open_non_blocking)(slow_factory, &slow_args, 0, + h = stasis_handle(open_non_blocking)(slow_factory, 0, &slow_args, 0, fast_factory, 0, 5, 1024*1024, 100); //h = stasis_handle(open_debug)(h); @@ -444,7 +444,7 @@ START_TEST(io_nonBlockingTest) { unlink("logfile.txt"); - h = stasis_handle(open_non_blocking)(slow_factory, &slow_args, 0, + h = stasis_handle(open_non_blocking)(slow_factory, 0, &slow_args, 0, fast_factory, 0, 5, 1024 * 1024, 100); handle_concurrencytest(h);