diff --git a/src/stasis/Makefile.am b/src/stasis/Makefile.am index ba6276d..eb0e9f3 100644 --- a/src/stasis/Makefile.am +++ b/src/stasis/Makefile.am @@ -1,6 +1,6 @@ nobase_pkginclude_HEADERS=$(wildcard *.h) $(wildcard */*.h) lib_LTLIBRARIES=libstasis.la -libstasis_la_SOURCES=crc32.c redblack.c lhtable.c doubleLinkedList.c common.c stats.c io.c bufferManager.c linkedlist.c operations.c \ +libstasis_la_SOURCES=crc32.c redblack.c lhtable.c doubleLinkedList.c common.c flags.c stats.c io.c bufferManager.c linkedlist.c operations.c \ pageHandle.c pageFile.c pageCache.c page.c bufferPool.c blobManager.c recovery2.c truncation.c \ transactional2.c allocationPolicy.c \ lockManager.c iterator.c consumer.c arrayCollection.c ringbuffer.c fifo.c multiplexer.c graph.c\ @@ -15,7 +15,7 @@ libstasis_la_SOURCES=crc32.c redblack.c lhtable.c doubleLinkedList.c common.c st operations/linearHashNTA.c operations/linkedListNTA.c \ operations/pageOrientedListNTA.c operations/bTree.c \ operations/regions.c operations/lsmTree.c \ - io/rangeTracker.c io/memory.c io/file.c io/non_blocking.c io/debug.c \ + io/rangeTracker.c io/memory.c io/file.c io/pfile.c io/non_blocking.c io/debug.c \ bufferManager/pageArray.c bufferManager/bufferHash.c \ replacementPolicy/lru.c replacementPolicy/lruFast.c AM_CFLAGS=${GLOBAL_CFLAGS} diff --git a/src/stasis/bufferManager.c b/src/stasis/bufferManager.c index 5a4dffd..92744d9 100644 --- a/src/stasis/bufferManager.c +++ b/src/stasis/bufferManager.c @@ -102,15 +102,7 @@ pthread_mutex_t pinCount_mutex = PTHREAD_MUTEX_INITIALIZER; int pinCount = 0; #endif -#ifdef USE_BUFFER_MANAGER -int bufferManagerType = USE_BUFFER_MANAGER; -#else -int bufferManagerType = BUFFER_MANAGER_HASH; -#endif - static struct LH_ENTRY(table) *activePages; /* page lookup */ -/*static Page * activePagePtrs[MAX_BUFFER_SIZE];*/ - static pthread_mutex_t loadPagePtr_mutex; diff --git a/src/stasis/io/non_blocking.c b/src/stasis/io/non_blocking.c index c1f0ec3..d6397ae 100644 --- a/src/stasis/io/non_blocking.c +++ b/src/stasis/io/non_blocking.c @@ -5,11 +5,26 @@ #include #include #include +#include #include /** @file + File handle that avoids blocking on writes. It attempts to ensure + that the code calling it never waits for a "slow handle" to perform + a write. Instead, when a write request is recieved, it is + temporarly stored in a "fast handle". The caller provides factory + methods that instantiate fast and slow handles. + + For effeciency, this file handle imposes a special restriction upon + its callers. It implicitly partitions the underlying file into + blocks based upon the read and write requests it receives. Future + reads and writes must access complete blocks, and may not span + multiple blocks. This works well for page files (where each page is + a block), and log files, where each log entry is a block, as is the + header that stasis appends to the log entry. + Design: data structures: A rb tree holds a set of "fast" handles that manage @@ -21,7 +36,7 @@ is possible for a read or write to block on a memcpy(), but writes may not block on disk i/o.) - Latching protocol: + Latching protocol: Each static function that manipulates the tree or lists grabs a latch. Functions that call such functions should not hold a latch @@ -35,8 +50,20 @@ */ #define INVALID_NODE 2 +/* If defined, merge writes immediately (not recommended, as doing so + decreases the granularity of the "dirty" bit, causing clean data to + be written back.). Whether or not this is defined, writes will be + merged by nbw_worker at flush. +*/ +//#define EAGER_MERGE +/* If EAGER_MERGE is defined, this limits the number of pages it will + coaleasce into a single write. +*/ +//#define MAX_MERGE 4 -static inline stasis_read_buffer_t * alloc_read_buffer_error(stasis_handle_t * h, int error) { +/** @return a read buffer indicating an error has occured */ +static inline stasis_read_buffer_t * alloc_read_buffer_error(stasis_handle_t *h, + int error) { assert(error); stasis_read_buffer_t * r = malloc(sizeof(stasis_read_buffer_t)); r->h = h; @@ -46,8 +73,9 @@ static inline stasis_read_buffer_t * alloc_read_buffer_error(stasis_handle_t * h r->error = error; return r; } - -static inline stasis_write_buffer_t * alloc_write_buffer_error(stasis_handle_t * h, int error) { +/** @return a read buffer indicating a write error has occured */ +static inline stasis_write_buffer_t * alloc_write_buffer_error(stasis_handle_t *h, + int error) { assert(error); stasis_write_buffer_t * w = malloc(sizeof(stasis_write_buffer_t)); w->h = h; @@ -63,13 +91,15 @@ typedef struct tree_node { lsn_t start_pos; lsn_t end_pos; stasis_handle_t * h; + /** The number of I/O requests this node corresponds to. */ + int write_count; /** The number of threads accessing this handle. The handle cannot be deallocated unless this is zero. */ int pin_count; /** set to 1 when the handle is written to, 0 when the handle is written back to disk, INVALID_NODE when the handle is not in the tree. */ - int dirty; + int dirty; } tree_node; /** Wrapper for write buffers */ @@ -121,7 +151,11 @@ typedef struct nbw_impl { LinkedList * slow_handles; int slow_handle_count; - + + // These two track statistics on write coalescing. + lsn_t requested_bytes_written; + lsn_t total_bytes_written; + // Fields to manage fast handles stasis_handle_t * (*fast_factory)(lsn_t off, lsn_t len, void * arg); void * fast_factory_arg; @@ -129,6 +163,7 @@ typedef struct nbw_impl { struct RB_ENTRY(tree) * fast_handles; int fast_handle_count; int max_fast_handles; + int min_fast_handles; lsn_t used_buffer_size; lsn_t max_buffer_size; @@ -136,9 +171,13 @@ typedef struct nbw_impl { pthread_t * workers; int worker_count; pthread_cond_t pending_writes_cond; - int still_open; + int still_open; } nbw_impl; +static inline void freeFastHandle(nbw_impl * impl, const tree_node * n); + +/** Obtain a slow handle from the pool of existing ones, or obtain a new one + by calling impl->slow_factory.. */ static stasis_handle_t * getSlowHandle(nbw_impl * impl) { pthread_mutex_lock(&impl->mut); stasis_handle_t * slow = (stasis_handle_t*)popMaxVal(&impl->slow_handles); @@ -152,6 +191,7 @@ static stasis_handle_t * getSlowHandle(nbw_impl * impl) { } return slow; } +/** Release a file handle back into the pool of slow handles. */ static void releaseSlowHandle(nbw_impl * impl, stasis_handle_t * slow) { assert(slow); pthread_mutex_lock(&impl->mut); @@ -163,39 +203,97 @@ static tree_node * allocTreeNode(lsn_t off, lsn_t len) { tree_node * ret = malloc(sizeof(tree_node)); ret->start_pos = off; ret->end_pos = off + len; + ret->write_count = 1; ret->dirty = 0; ret->pin_count = 1; return ret; } -static inline const tree_node * allocFastHandle(nbw_impl * impl, lsn_t off, lsn_t len) { +static inline const tree_node * allocFastHandle(nbw_impl * impl, lsn_t off, + lsn_t len) { tree_node * np = allocTreeNode(off, len); pthread_mutex_lock(&impl->mut); - const tree_node * n = RB_ENTRY(search)(np, impl->fast_handles); - if(n == np) { // not found - if(impl->fast_handle_count > impl->max_fast_handles || - impl->used_buffer_size > impl->max_buffer_size) { - RB_ENTRY(delete)(np, impl->fast_handles); - np->dirty = INVALID_NODE; - pthread_mutex_unlock(&impl->mut); + DEBUG("allocFastHandle(%lld)\n", off/PAGE_SIZE); + + const tree_node * n = RB_ENTRY(lookup)(RB_LULTEQ, np, impl->fast_handles); + // this code only works when writes / reads are aligned to immutable + // boundaries, and never cross boundaries. + if((!n) || + !(n->start_pos <= off && + n->end_pos >= off + len)) { + + // no completely overlapping range found; allocate space in np. + + if(0 && (impl->fast_handle_count >= impl->max_fast_handles || + impl->used_buffer_size + len > impl->max_buffer_size)) { + + assert(n->end_pos <= off); + + if(impl->fast_handle_count >= impl->max_fast_handles) { + printf("Blocking on write. %d handles (%d max)\n", + impl->fast_handle_count, impl->max_fast_handles); + } + if(impl->used_buffer_size + len > impl->max_buffer_size) { + printf("Blocking on write. %lld bytes (%lld max)\n", + impl->used_buffer_size, impl->max_buffer_size); + } + + np->dirty = INVALID_NODE; np->h = getSlowHandle(impl); - } else { + + } else { impl->fast_handle_count++; impl->used_buffer_size += len; - pthread_mutex_unlock(&impl->mut); - np->h = impl->fast_factory(off,len,impl->fast_factory_arg); - } +#ifdef EAGER_MERGE + if(n && n->end_pos == off && n->write_count + 1 < MAX_MERGE) { + DEBUG("Did merge.\n"); + ((tree_node*)n)->pin_count++; + ((tree_node*)n)->write_count++; + ((tree_node*)n)->end_pos += len; + } else { +#endif + RB_ENTRY(search)(np, impl->fast_handles); + np->h = impl->fast_factory(off,len,impl->fast_factory_arg); + n = np; +#ifdef EAGER_MERGE + } +#endif + } } else { ((tree_node*)n)->pin_count++; - pthread_mutex_unlock(&impl->mut); - free(np); } + +#ifdef EAGER_MERGE + // check for a mergable range immediately after the point we're interested in. + tree_node dummy; + dummy.start_pos = n->end_pos; + dummy.end_pos = n->end_pos+1; + + while((np = (tree_node*)RB_ENTRY(find)(&dummy, impl->fast_handles)) && np->dirty && !np->pin_count && np->write_count + n->write_count < MAX_MERGE) { + DEBUG("Did post-merge of page %lld-%lld (%d) and %lld-%lld (%d) outstanding = %d\n", n->start_pos/PAGE_SIZE, -1+n->end_pos/PAGE_SIZE, n->write_count, np->start_pos/PAGE_SIZE, -1+np->end_pos/PAGE_SIZE, np->write_count, impl->fast_handle_count); + lsn_t appendLen = np->end_pos - np->start_pos; + stasis_read_buffer_t * r= np->h->read_buffer(np->h,np->start_pos, appendLen); + int ret = n->h->write(n->h,np->start_pos,r->buf, appendLen); + assert(!ret); + ret = r->h->release_read_buffer(r); + assert(!ret); + np->dirty = 0; + ((tree_node*)n)->write_count += np->write_count; + freeFastHandle(impl,np); + RB_ENTRY(delete)(n,impl->fast_handles); + ((tree_node*)n)->end_pos += appendLen; + RB_ENTRY(search)(n,impl->fast_handles); + } +#endif + + pthread_mutex_unlock(&impl->mut); + return n; } -static inline const tree_node * findFastHandle(nbw_impl * impl, lsn_t off, lsn_t len) { +static inline const tree_node * findFastHandle(nbw_impl * impl, lsn_t off, lsn_t len) { tree_node * np = allocTreeNode(off, len); pthread_mutex_lock(&impl->mut); @@ -206,12 +304,10 @@ static inline const tree_node * findFastHandle(nbw_impl * impl, lsn_t off, lsn_t free(np); return n; } -/** Unlke all of the other fastHandle functions, the caller +/** Unlke all of the other fastHandle functions, the caller should hold the mutex when calling freeFastHandle. */ -static inline void freeFastHandle(nbw_impl * impl, const tree_node * n) { +static inline void freeFastHandle(nbw_impl * impl, const tree_node * n) { RB_ENTRY(delete)(n, impl->fast_handles); - impl->fast_handle_count--; - impl->used_buffer_size -= (n->end_pos - n->start_pos); n->h->close(n->h); free((void*)n); } @@ -231,7 +327,9 @@ static inline int releaseFastHandle(nbw_impl * impl, const tree_node * n, ((tree_node*)n)->dirty = setDirty; } pthread_mutex_unlock(&impl->mut); - pthread_cond_signal(&impl->pending_writes_cond); + if(impl->fast_handle_count > impl->min_fast_handles) { + pthread_cond_signal(&impl->pending_writes_cond); + } return 0; } } @@ -245,22 +343,26 @@ static int nbw_num_copies_buffer(stasis_handle_t * h) { } static int nbw_close(stasis_handle_t * h) { nbw_impl * impl = h->impl; - + pthread_mutex_lock(&impl->mut); impl->still_open = 0; pthread_mutex_unlock(&impl->mut); pthread_cond_broadcast(&impl->pending_writes_cond); - for(int i = 0; i < impl->worker_count; i++) { + for(int i = 0; i < impl->worker_count; i++) { pthread_join(impl->workers[i], 0); } - - // No longer need latch; this is the only thread allowed to touch the handle. + // No longer need latch; this is the only thread allowed to touch the handle. free(impl->workers); - // printf("nbw had %d slow handles\n", impl->slow_handle_count); - // fflush(stdout); + DEBUG("nbw had %d slow handles\n", impl->slow_handle_count); + DEBUG("fast handles = %d, used buffer = %lld\n", impl->fast_handle_count, impl->used_buffer_size); + if(impl->requested_bytes_written < impl->total_bytes_written) { + printf("nbw: Problem with write coalescing detected.\n" + "Client wrote %lld bytes, handle wrote %lld.\n", + impl->requested_bytes_written, impl->total_bytes_written); + } assert(impl->fast_handle_count == 0); assert(impl->used_buffer_size == 0); @@ -322,6 +424,7 @@ static stasis_write_buffer_t * nbw_write_buffer(stasis_handle_t * h, } else if(off + len > impl->end_pos) { impl->end_pos = off+len; } + impl->requested_bytes_written += len; pthread_mutex_unlock(&impl->mut); } @@ -334,6 +437,7 @@ static stasis_write_buffer_t * nbw_append_buffer(stasis_handle_t * h, pthread_mutex_lock(&impl->mut); lsn_t off = impl->end_pos; impl->end_pos += len; + impl->requested_bytes_written += len; pthread_mutex_unlock(&impl->mut); return nbw_write_buffer(h, off, len); @@ -344,7 +448,6 @@ static int nbw_release_write_buffer(stasis_write_buffer_t * w) { const tree_node * n = w_impl->n; w_impl->w->h->release_write_buffer(w_impl->w); releaseFastHandle(impl, n, 1); - // pthread_cond_signal(&impl->pending_writes_cond); free(w_impl); free(w); return 0; @@ -360,8 +463,8 @@ static stasis_read_buffer_t * nbw_read_buffer(stasis_handle_t * h, read_buffer_impl * r_impl = malloc(sizeof(read_buffer_impl)); r_impl->n = n; r_impl->r = r; - - stasis_read_buffer_t * ret = malloc(sizeof(stasis_write_buffer_t)); + + stasis_read_buffer_t * ret = malloc(sizeof(stasis_read_buffer_t)); ret->h = h; ret->off = r->off; ret->len = r->len; @@ -402,6 +505,7 @@ static int nbw_write(stasis_handle_t * h, lsn_t off, } else if(off + len > impl->end_pos) { impl->end_pos = off+len; } + impl->requested_bytes_written += len; pthread_mutex_unlock(&impl->mut); } return ret; @@ -413,6 +517,7 @@ static int nbw_append(stasis_handle_t * h, lsn_t * off, pthread_mutex_lock(&impl->mut); *off = impl->end_pos; impl->end_pos+= len; + impl->requested_bytes_written += len; pthread_mutex_unlock(&impl->mut); int ret = nbw_write(h, *off, dat, len); @@ -468,6 +573,19 @@ struct stasis_handle_t nbw_func = { .error = 0 }; +/** + This worker thread simulates asynchrnous I/O by handling writeback + on behalf of the application. Multiple workers may be spawned for + a non-blocking handle. + + This function walks the list of fast handles, writing back dirty + ones, and freeing clean ones. It (almost) never performs a write + while holding the mutex. + + @todo Non-blocking handle should not memcpy() buffers while holding + the mutex. + +*/ static void * nbw_worker(void * handle) { stasis_handle_t * h = handle; nbw_impl * impl = h->impl; @@ -477,8 +595,8 @@ static void * nbw_worker(void * handle) { // cast strips const. tree_node * node = (tree_node*)RB_ENTRY(min)(impl->fast_handles); int writes = 0; - while(node) { - if(node->dirty && !node->pin_count) { + while(node) { + if(node->dirty && !node->pin_count) { node->dirty = 0; node->pin_count++; pthread_mutex_unlock(&impl->mut); @@ -488,15 +606,65 @@ static void * nbw_worker(void * handle) { lsn_t off = fast->start_position(fast); lsn_t len = fast->end_position(fast) - off; stasis_read_buffer_t * r = fast->read_buffer(fast, off, len); - slow->write(slow, off, r->buf, len); + + // cast strips const + byte *buf = (byte*)r->buf; + + pthread_mutex_lock(&impl->mut); + + int first = 1; + off_t buf_off = 0; + tree_node dummy; + dummy.start_pos = node->end_pos; + dummy.end_pos = node->end_pos+1; + tree_node * np; + while((np = (tree_node*)RB_ENTRY(find)(&dummy, impl->fast_handles)) + && np->dirty && !np->pin_count) { + lsn_t np_len = np->end_pos - np->start_pos; + len += np_len; + + if(first) { + buf = malloc(r->len + len); + memcpy(buf, r->buf, r->len); + buf_off += r->len; + first = 0; + } else { + buf = realloc(buf, len); + } + + stasis_handle_t * fast2 = np->h; + stasis_read_buffer_t * r2 = fast2->read_buffer(fast2,np->start_pos, np_len); + memcpy(buf + buf_off, r2->buf, np_len); + buf_off += np_len; + r2->h->release_read_buffer(r2); + np->dirty = 0; + dummy.start_pos = np->end_pos; + dummy.end_pos = np->end_pos+1; + } + + impl->total_bytes_written += len; + + pthread_mutex_unlock(&impl->mut); + + if(len != PAGE_SIZE) { + DEBUG("merged %lld pages at %lld into single write\n", len/PAGE_SIZE, off/PAGE_SIZE); + } + slow->write(slow, off, buf, len); + + if(!first) { + free(buf); + } + r->h->release_read_buffer(r); releaseSlowHandle(impl, slow); pthread_mutex_lock(&impl->mut); node->pin_count--; - } - tree_node * new_node = (tree_node*)RB_ENTRY(lookup)(RB_LUGREAT, node, impl->fast_handles); + } + tree_node *new_node = (tree_node*)RB_ENTRY(lookup)(RB_LUGREAT, node, impl->fast_handles); if(!node->dirty && !node->pin_count) { - freeFastHandle(impl, node); + impl->fast_handle_count -= node->write_count; + impl->used_buffer_size -= (node->end_pos - node->start_pos); + freeFastHandle(impl, node); } node = new_node; } @@ -530,13 +698,16 @@ stasis_handle_t * stasis_handle(open_non_blocking)(stasis_handle_t * (*slow_fact impl->slow_handles = 0; impl->slow_handle_count = 0; + impl->requested_bytes_written = 0; + impl->total_bytes_written = 0; + impl->fast_factory = fast_factory; impl->fast_factory_arg = fast_factory_arg; impl->fast_handles = RB_ENTRY(init)(cmp_handle, 0); impl->fast_handle_count = 0; impl->max_fast_handles = max_fast_handles; - + impl->min_fast_handles = max_fast_handles / 2; impl->max_buffer_size = buffer_size; impl->used_buffer_size = 0; @@ -557,5 +728,9 @@ stasis_handle_t * stasis_handle(open_non_blocking)(stasis_handle_t * (*slow_fact perror("Coudln't spawn worker thread for non_blocking io"); } } + + DEBUG("Opened non blocking I/O handle; buffer size = %lldmb max outstanding writes = %d\n", + impl->max_buffer_size / (1024 * 1024), impl->max_fast_handles); + return h; } diff --git a/src/stasis/transactional2.c b/src/stasis/transactional2.c index 4714b44..42e9e7d 100644 --- a/src/stasis/transactional2.c +++ b/src/stasis/transactional2.c @@ -11,9 +11,7 @@ #include #include #include -#ifdef USE_PAGEFILE #include "pageFile.h" -#endif #include #include "page.h" #include @@ -113,12 +111,20 @@ typedef struct sf_args { int openMode; int filePerm; } sf_args; -static stasis_handle_t * slow_factory(void * argsP) { +static stasis_handle_t * slow_file_factory(void * argsP) { sf_args * args = (sf_args*) argsP; stasis_handle_t * h = stasis_handle(open_file)(0, args->filename, args->openMode, args->filePerm); //h = stasis_handle(open_debug)(h); return h; } +static stasis_handle_t * slow_pfile_factory(void * argsP) { + stasis_handle_t * h = argsP; + return h; +} +static int (*slow_close)(stasis_handle_t * h) = 0; +static stasis_handle_t * slow_pfile = 0; +static int nop_close(stasis_handle_t*h) { return 0; } + int Tinit() { pthread_mutex_init(&transactional_2_mutex, NULL); numActiveXactions = 0; @@ -129,25 +135,66 @@ int Tinit() { dirtyPagesInit(); LogInit(loggerType); pageInit(); -#ifndef USE_PAGEFILE - struct sf_args * slow_arg = malloc(sizeof(sf_args)); - slow_arg->filename = STORE_FILE; -#ifdef PAGE_FILE_O_DIRECT - slow_arg->openMode = O_CREAT | O_RDWR | O_DIRECT; -#else - slow_arg->openMode = O_CREAT | O_RDWR; + + switch(bufferManagerFileHandleType) { + case BUFFER_MANAGER_FILE_HANDLE_NON_BLOCKING: { + struct sf_args * slow_arg = malloc(sizeof(sf_args)); + slow_arg->filename = STORE_FILE; + +#ifndef HAVE_O_DIRECT + if(bufferManagerO_DIRECT) { + printf("O_DIRECT not supported by this build; switching to conventional buffered I/O.\n"); + bufferManagerO_DIRECT = 0; + } #endif - slow_arg->filePerm = FILE_PERM; - // Allow 4MB of outstanding writes. - // @todo Where / how should we open storefile? - stasis_handle_t * pageFile = - stasis_handle(open_non_blocking)(slow_factory, slow_arg, fast_factory, - NULL, 20, PAGE_SIZE * 1024, 1024); - pageHandleOpen(pageFile); + if(bufferManagerO_DIRECT) { +#ifdef HAVE_O_DIRECT + slow_arg->openMode = O_CREAT | O_RDWR | O_DIRECT; #else - printf("\nWarning: Using old I/O routines.\n"); - openPageFile(); -#endif // USE_PAGEFILE + printf("Can't happen\n"); + abort(); +#endif + } else { + slow_arg->openMode = O_CREAT | O_RDWR; + } + + slow_arg->filePerm = FILE_PERM; + // Allow 4MB of outstanding writes. + // @todo Where / how should we open storefile? + stasis_handle_t * pageFile; + int worker_thread_count = 4; + if(bufferManagerNonBlockingSlowHandleType == IO_HANDLE_PFILE) { + // printf("\nusing pread()/pwrite()\n"); + slow_pfile = stasis_handle_open_pfile(0, slow_arg->filename, slow_arg->openMode, slow_arg->filePerm); + slow_close = slow_pfile->close; + slow_pfile->close = nop_close; + pageFile = + stasis_handle(open_non_blocking)(slow_pfile_factory, slow_pfile, fast_factory, + NULL, worker_thread_count, PAGE_SIZE * 1024 , 1024); + + } else if(bufferManagerNonBlockingSlowHandleType == IO_HANDLE_FILE) { + pageFile = + stasis_handle(open_non_blocking)(slow_file_factory, slow_arg, fast_factory, + NULL, worker_thread_count, PAGE_SIZE * 1024, 1024); + } else { + printf("Unknown value for config option bufferManagerNonBlockingSlowHandleType\n"); + abort(); + } + //pageFile = stasis_handle(open_debug)(pageFile); + pageHandleOpen(pageFile); + } break; + case BUFFER_MANAGER_FILE_HANDLE_DEPRECATED: { + printf("\nWarning: Using old I/O routines (with known bugs).\n"); + openPageFile(); + } break; + default: { + printf("\nUnknown buffer manager filehandle type: %d\n", + bufferManagerFileHandleType); + abort(); + } + } + //#else + //#endif // USE_PAGEFILE bufInit(bufferManagerType); DEBUG("Buffer manager type = %d\n", bufferManagerType); pageOperationsInit(); @@ -364,6 +411,11 @@ int Tdeinit() { bufDeinit(); DEBUG("Closing page file tdeinit\n"); closePageFile(); + if(slow_pfile) { + slow_close(slow_pfile); + slow_pfile = 0; + slow_close = 0; + } pageDeinit(); LogDeinit(); dirtyPagesDeinit(); @@ -374,6 +426,11 @@ int TuncleanShutdown() { truncationDeinit(); ThashDeinit(); simulateBufferManagerCrash(); + if(slow_pfile) { + slow_close(slow_pfile); + slow_pfile = 0; + slow_close = 0; + } pageDeinit(); LogDeinit(); numActiveXactions = 0; diff --git a/stasis/bufferManager.h b/stasis/bufferManager.h index 6b0b6ed..b4e67a2 100644 --- a/stasis/bufferManager.h +++ b/stasis/bufferManager.h @@ -96,20 +96,6 @@ BEGIN_C_DECLS typedef struct Page_s Page_s; typedef struct Page_s Page; -/** - This is the type of buffer manager that is being used. - - Before Stasis is intialized it will be set to a default value. - It may be changed before Tinit() is called, or overridden at - compile time by defining USE_BUFFER_MANAGER. - - (eg: gcc ... -DUSE_BUFFER_MANAGER=BUFFER_MANAGER_FOO) - - @see constants.h for a list of recognized buffer manager implementations. (The constants are named BUFFER_MANAGER_*) - - */ -extern int bufferManagerType; - /** * @param xid The transaction that is pinning the page (used by page-level locking implementations.) * @param pageid ID of the page you want to load diff --git a/stasis/constants.h b/stasis/constants.h index 40c6a3b..1f3e43e 100644 --- a/stasis/constants.h +++ b/stasis/constants.h @@ -96,6 +96,12 @@ terms specified in this license. #define BUFFER_MANAGER_MEM_ARRAY 2 #define BUFFER_MANAGER_DEPRECATED_HASH 3 +#define BUFFER_MANAGER_FILE_HANDLE_NON_BLOCKING 0 +#define BUFFER_MANAGER_FILE_HANDLE_DEPRECATED 1 + +#define IO_HANDLE_FILE 1 +#define IO_HANDLE_PFILE 2 + #define LOG_TO_FILE 0 #define LOG_TO_MEMORY 1 diff --git a/stasis/io/handle.h b/stasis/io/handle.h index 5636a5a..2e2f374 100644 --- a/stasis/io/handle.h +++ b/stasis/io/handle.h @@ -91,8 +91,8 @@ typedef struct stasis_write_buffer_t { typedef struct stasis_read_buffer_t { stasis_handle_t * h; - const byte * buf; lsn_t off; + const byte * buf; lsn_t len; void * impl; int error; @@ -100,6 +100,7 @@ typedef struct stasis_read_buffer_t { stasis_handle_t * stasis_handle(open_memory)(lsn_t start_offset); stasis_handle_t * stasis_handle(open_file)(lsn_t start_offset, char * path, int flags, int mode); +stasis_handle_t * stasis_handle(open_pfile)(lsn_t start_offset, char * path, int flags, int mode); stasis_handle_t * stasis_handle(open_non_blocking)(stasis_handle_t * (*slow_factory)(void * arg), void * slow_factory_arg, stasis_handle_t * (*fast_factory)(lsn_t off, lsn_t len, void * arg), diff --git a/stasis/transactional.h b/stasis/transactional.h index b577655..950b49f 100644 --- a/stasis/transactional.h +++ b/stasis/transactional.h @@ -489,6 +489,7 @@ terms specified in this license. #define __TRANSACTIONAL_H__ #include "common.h" +#include "flags.h" BEGIN_C_DECLS diff --git a/test/stasis/check_io.c b/test/stasis/check_io.c index aa28f51..0ac69fd 100644 --- a/test/stasis/check_io.c +++ b/test/stasis/check_io.c @@ -132,6 +132,8 @@ typedef struct { lsn_t trunc_val; pthread_mutex_t trunc_mut = PTHREAD_MUTEX_INITIALIZER; +int load_handle_truncate_is_supported = 1; + void load_handle(thread_arg* t) { lsn_t * offsets = malloc(t->count * sizeof(lsn_t)); @@ -249,7 +251,7 @@ void load_handle(thread_arg* t) { } // Truncate 1% of the time. - if(!myrandom(100)) { + if(!myrandom(100) && load_handle_truncate_is_supported) { lsn_t pre_start = h->start_position(h); pthread_mutex_lock(&trunc_mut); @@ -355,7 +357,7 @@ START_TEST(io_fileTest) { unlink("logfile.txt"); h = stasis_handle(open_file)(0, "logfile.txt", O_CREAT | O_RDWR, FILE_PERM); - // handle_concurrencytest(h); + //handle_concurrencytest(h); h->close(h); unlink("logfile.txt"); @@ -372,6 +374,33 @@ static stasis_handle_t * fast_factory(lsn_t off, lsn_t len, void * ignored) { } +START_TEST(io_pfileTest) { + load_handle_truncate_is_supported = 0; + + stasis_handle_t * h; + h = stasis_handle(open_pfile)(0, "logfile.txt", O_CREAT | O_RDWR, FILE_PERM); + // h = stasis_handle(open_debug)(h); + handle_smoketest(h); + h->close(h); + + unlink("logfile.txt"); + + h = stasis_handle(open_pfile)(0, "logfile.txt", O_CREAT | O_RDWR, FILE_PERM); + //h = stasis_handle(open_debug)(h); + handle_sequentialtest(h); + h->close(h); + + unlink("logfile.txt"); + + h = stasis_handle(open_pfile)(0, "logfile.txt", O_CREAT | O_RDWR, FILE_PERM); + //handle_concurrencytest(h); + h->close(h); + + unlink("logfile.txt"); + + load_handle_truncate_is_supported = 1; +} END_TEST + typedef struct sf_args { char * filename; int openMode; @@ -431,6 +460,7 @@ Suite * check_suite(void) { /* Sub tests are added, one per line, here */ tcase_add_test(tc, io_memoryTest); tcase_add_test(tc, io_fileTest); + tcase_add_test(tc, io_pfileTest); tcase_add_test(tc, io_nonBlockingTest); /* --------------------------------------------- */ tcase_add_checked_fixture(tc, setup, teardown);