diff --git a/src/stasis/bufferManager.c b/src/stasis/bufferManager.c index 9c0eabd..afbb299 100644 --- a/src/stasis/bufferManager.c +++ b/src/stasis/bufferManager.c @@ -3,7 +3,7 @@ This software is copyrighted by the Regents of the University of California, and other parties. The following terms apply to all files associated with the software unless explicitly disclaimed in individual files. - + The authors hereby grant permission to use, copy, modify, distribute, and license this software and its documentation for any purpose, provided that existing copyright notices are retained in all copies @@ -13,20 +13,20 @@ authorized uses. Modifications to this software may be copyrighted by their authors and need not follow the licensing terms described here, provided that the new terms are clearly indicated on the first page of each file where they apply. - + IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY DERIVATIVES THEREOF, EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - + THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT. THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, AND THE AUTHORS AND DISTRIBUTORS HAVE NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. - + GOVERNMENT USE: If you are acquiring this software on behalf of the U.S. government, the Government shall have only "Restricted Rights" in the software and related documentation as defined in the Federal @@ -51,7 +51,7 @@ terms specified in this license. -#define _GNU_SOURCE +#define _GNU_SOURCE #include // Need _GNU_SOURCE for asprintf #include @@ -77,7 +77,7 @@ terms specified in this license. #undef releasePage #undef Page -#ifdef LONG_TEST +#ifdef LONG_TEST #define PIN_COUNT #endif @@ -101,7 +101,7 @@ int pinCount = 0; #ifdef PROFILE_LATCHES_WRITE_ONLY -compensated_function Page * __profile_loadPage(int xid, pageid_t pageid, char * file, int line) { +compensated_function Page * __profile_loadPage(int xid, pageid_t pageid, char * file, int line) { Page * ret = loadPage(xid, pageid); @@ -111,19 +111,19 @@ compensated_function Page * __profile_loadPage(int xid, pageid_t pageid, char * char * holder = LH_ENTRY(find)(profile_load_hash, &ret, sizeof(void*)); int * pins = LH_ENTRY(find)(profile_load_pins_hash, &ret, sizeof(void*)); - if(!pins) { + if(!pins) { pins = malloc(sizeof(int)); *pins = 0; LH_ENTRY(insert)(profile_load_pins_hash, &ret, sizeof(void*), pins); } - if(*pins) { + if(*pins) { assert(holder); char * newHolder; asprintf(&newHolder, "%s\n%s:%d", holder, file, line); free(holder); holder = newHolder; - } else { + } else { assert(!holder); asprintf(&holder, "%s:%d", file, line); } @@ -136,7 +136,7 @@ compensated_function Page * __profile_loadPage(int xid, pageid_t pageid, char * } -compensated_function void __profile_releasePage(Page * p) { +compensated_function void __profile_releasePage(Page * p) { pthread_mutex_lock(&profile_load_mutex); // int pageid = p->id; @@ -145,10 +145,10 @@ compensated_function void __profile_releasePage(Page * p) { assert(pins); if(*pins == 1) { - + char * holder = LH_ENTRY(remove)(profile_load_hash, &p, sizeof(void*)); assert(holder); - free(holder); + free(holder); } @@ -172,7 +172,7 @@ void (*forcePageRange)(pageid_t start, pageid_t stop) = 0; void (*stasis_buffer_manager_close)() = 0; void (*stasis_buffer_manager_simulate_crash)() = 0; -Page * loadPage(int xid, pageid_t pageid) { +Page * loadPage(int xid, pageid_t pageid) { try_ret(NULL) { // This lock is released at Tcommit() if(globalLockManager.readLockPage) { globalLockManager.readLockPage(xid, pageid); } @@ -181,7 +181,7 @@ Page * loadPage(int xid, pageid_t pageid) { return loadPageImpl(xid, pageid); } -Page * loadUninitializedPage(int xid, pageid_t pageid) { +Page * loadUninitializedPage(int xid, pageid_t pageid) { try_ret(NULL) { // This lock is released at Tcommit() if(globalLockManager.readLockPage) { globalLockManager.readLockPage(xid, pageid); } @@ -195,24 +195,25 @@ void releasePage(Page * p) { releasePageImpl(p); } -int stasis_buffer_manager_open(int type) { +int stasis_buffer_manager_open(int type, stasis_page_handle_t * ph) { bufferManagerType = type; static int lastType = 0; - if(type == BUFFER_MANAGER_REOPEN) { + if(type == BUFFER_MANAGER_REOPEN) { type = lastType; - } + } lastType = type; - if(type == BUFFER_MANAGER_DEPRECATED_HASH) { - stasis_buffer_manager_deprecated_open(); + if(type == BUFFER_MANAGER_DEPRECATED_HASH) { + stasis_buffer_manager_deprecated_open(ph); return 0; - } else if (type == BUFFER_MANAGER_MEM_ARRAY) { + } else if (type == BUFFER_MANAGER_MEM_ARRAY) { stasis_buffer_manager_mem_array_open(); + ph->close(ph); // XXX should never have been opened in the first place! return 0; - } else if (type == BUFFER_MANAGER_HASH) { - stasis_buffer_manager_hash_open(); + } else if (type == BUFFER_MANAGER_HASH) { + stasis_buffer_manager_hash_open(ph); return 0; - } else { + } else { // XXX error handling abort(); - } + } } diff --git a/src/stasis/bufferManager/bufferHash.c b/src/stasis/bufferManager/bufferHash.c index f06c9fb..1774818 100644 --- a/src/stasis/bufferManager/bufferHash.c +++ b/src/stasis/bufferManager/bufferHash.c @@ -34,6 +34,8 @@ static replacementPolicy * lru; static stasis_buffer_pool_t * stasis_buffer_pool; +static stasis_page_handle_t * page_handle; + static int running; typedef struct LL_ENTRY(node_t) node_t; @@ -110,7 +112,7 @@ inline static Page * writeBackOnePage() { assert(old == victim); // printf("Write(%ld)\n", (long)victim->id); - pageWrite(victim); /// XXX pageCleanup and pageFlushed might be heavyweight. + page_handle->write(page_handle,victim); /// XXX pageCleanup and pageFlushed might be heavyweight. stasis_page_cleanup(victim); // Make sure that no one mistakenly thinks this is still a live copy. victim->id = -1; @@ -269,7 +271,7 @@ static Page * bhLoadPageImpl_helper(int xid, const pageid_t pageid, int uninitia // try to read this page from disk. pthread_mutex_unlock(&mut); - pageRead(ret); + page_handle->read(page_handle, ret); pthread_mutex_lock(&mut); @@ -322,13 +324,13 @@ static void bhReleasePage(Page * p) { pthread_mutex_unlock(&mut); } static void bhWriteBackPage(Page * p) { - pageWrite(p); + page_handle->write(page_handle, p); } static void bhForcePages() { - forcePageFile(); + page_handle->force_file(page_handle); } static void bhForcePageRange(pageid_t start, pageid_t stop) { - forceRangePageFile(start, stop); + page_handle->force_range(page_handle, start, stop); } static void bhBufDeinit() { running = 0; @@ -340,7 +342,7 @@ static void bhBufDeinit() { const struct LH_ENTRY(pair_t) * next; LH_ENTRY(openlist)(cachedPages, &iter); while((next = LH_ENTRY(readlist)(&iter))) { - pageWrite((next->value)); + page_handle->write(page_handle, (next->value)); stasis_page_cleanup((next->value)); // normally called by writeBackOnePage() } LH_ENTRY(closelist)(&iter); @@ -350,6 +352,7 @@ static void bhBufDeinit() { lru->deinit(lru); stasis_buffer_pool_deinit(stasis_buffer_pool); + page_handle->close(page_handle); } static void bhSimulateBufferManagerCrash() { running = 0; @@ -374,10 +377,11 @@ static void bhSimulateBufferManagerCrash() { lru->deinit(lru); stasis_buffer_pool_deinit(stasis_buffer_pool); + page_handle->close(page_handle); } -void stasis_buffer_manager_hash_open() { - +void stasis_buffer_manager_hash_open(stasis_page_handle_t * h) { + page_handle = h; assert(!running); #ifdef LONG_RUN diff --git a/src/stasis/bufferManager/legacy/legacyBufferManager.c b/src/stasis/bufferManager/legacy/legacyBufferManager.c index 6a320d5..91dc795 100644 --- a/src/stasis/bufferManager/legacy/legacyBufferManager.c +++ b/src/stasis/bufferManager/legacy/legacyBufferManager.c @@ -4,6 +4,7 @@ #include #include +#include #include #include @@ -25,16 +26,28 @@ static compensated_function Page *bufManLoadUninitPage(int xid, pageid_t pageid) static void bufManReleasePage (Page * p); static void bufManSimulateBufferManagerCrash(); +static stasis_page_handle_t * page_handle; + static stasis_buffer_pool_t * stasis_buffer_pool; -int stasis_buffer_manager_deprecated_open() { +static void pageWrite_legacyWrapper(Page * p) { + page_handle->write(page_handle,p); +} +static void forcePageFile_legacyWrapper() { + page_handle->force_file(page_handle); +} +static void forceRangePageFile_legacyWrapper(lsn_t start, lsn_t stop) { + page_handle->force_range(page_handle, start, stop); +} +int stasis_buffer_manager_deprecated_open(stasis_page_handle_t * ph) { + page_handle = ph; releasePageImpl = bufManReleasePage; loadPageImpl = bufManLoadPage; loadUninitPageImpl = bufManLoadUninitPage; - writeBackPage = pageWrite; - forcePages = forcePageFile; - forcePageRange = forceRangePageFile; + writeBackPage = pageWrite_legacyWrapper; + forcePages = forcePageFile_legacyWrapper; + forcePageRange = forceRangePageFile_legacyWrapper; stasis_buffer_manager_close = bufManBufDeinit; stasis_buffer_manager_simulate_crash = bufManSimulateBufferManagerCrash; @@ -50,7 +63,7 @@ int stasis_buffer_manager_deprecated_open() { first = stasis_buffer_pool_malloc_page(stasis_buffer_pool); stasis_buffer_pool_free_page(stasis_buffer_pool, first, 0); LH_ENTRY(insert)(activePages, &first->id, sizeof(first->id), first); - pageRead(first); + page_handle->read(page_handle, first); pageCacheInit(first); int err = pthread_key_create(&lastPage, 0); @@ -73,7 +86,7 @@ static void bufManBufDeinit() { LH_ENTRY(openlist(activePages, &iter)); while((next = LH_ENTRY(readlist)(&iter))) { - pageWrite((Page*)next->value); + page_handle->write(page_handle, (Page*)next->value); DEBUG("+"); } @@ -83,10 +96,10 @@ static void bufManBufDeinit() { pageCacheDeinit(); - //closePageFile(); - stasis_buffer_pool_deinit(stasis_buffer_pool); + page_handle->close(page_handle); + #ifdef PIN_COUNT if(pinCount != 0) { printf("WARNING: At exit, %d pages were still pinned!\n", pinCount); @@ -98,10 +111,10 @@ static void bufManBufDeinit() { Just close file descriptors, don't do any other clean up. (For testing.) - @todo buffer manager should never call closePageFile(); it not longer manages pageFile handles + @todo buffer manager should never call close_(); it not longer manages pageFile handles */ -void bufManSimulateBufferManagerCrash() { - closePageFile(); +static void bufManSimulateBufferManagerCrash() { + page_handle->close(page_handle); #ifdef PIN_COUNT pinCount = 0; #endif @@ -259,12 +272,12 @@ static Page* bufManGetPage(pageid_t pageid, int locktype, int uninitialized) { assert(ret != dummy_page); if(ret->id != -1) { - pageWrite(ret); + page_handle->write(page_handle, ret); } stasis_buffer_pool_free_page(stasis_buffer_pool, ret, pageid); if(!uninitialized) { - pageRead(ret); + page_handle->read(page_handle, ret); } else { memset(ret->memAddr, 0, PAGE_SIZE); ret->dirty = 0; @@ -283,7 +296,7 @@ static Page* bufManGetPage(pageid_t pageid, int locktype, int uninitialized) { /* @todo Put off putting this back into cache until we're done with it. -- This could cause the cache to empty out if the ratio of threads to buffer slots is above ~ 1/3, but it decreases the - liklihood of thrashing. */ + likelihood of thrashing. */ cacheInsertPage(ret); pthread_mutex_unlock(&loadPagePtr_mutex); diff --git a/src/stasis/bufferManager/legacy/pageFile.c b/src/stasis/bufferManager/legacy/pageFile.c index a2acde3..66cb44d 100644 --- a/src/stasis/bufferManager/legacy/pageFile.c +++ b/src/stasis/bufferManager/legacy/pageFile.c @@ -1,22 +1,22 @@ /** - @file + @file This file handles all of the file I/O for pages. */ #include "config.h" -#include +#include #include #include +#include #include +#include #include -#include -#include #include #include - +#include /** For O_DIRECT. It's unclear that this is the correct thing to \#define, but it works under linux. */ #define __USE_GNU @@ -28,16 +28,16 @@ static int stable = -1; static pthread_mutex_t stable_mutex; -static void pfForcePageFile(); -static void pfClosePageFile(); -static void pfForceRangePageFile(lsn_t start, lsn_t stop) ; +static void pfForcePageFile(stasis_page_handle_t* h); +static void pfClosePageFile(stasis_page_handle_t* h); +static void pfForceRangePageFile(stasis_page_handle_t* h, lsn_t start, lsn_t stop) ; inline static pageid_t myLseekNoLock(int f, pageid_t offset, int whence); static int oldOffset = -1; int pageFile_isDurable = 1; -static void pfPageRead(Page *ret) { +static void pfPageRead(stasis_page_handle_t * h, Page *ret) { pageid_t pageoffset; pageid_t offset; @@ -46,7 +46,7 @@ static void pfPageRead(Page *ret) { pthread_mutex_lock(&stable_mutex); - if(oldOffset != pageoffset) { + if(oldOffset != pageoffset) { offset = myLseekNoLock(stable, pageoffset, SEEK_SET); assert(offset == pageoffset); } else { @@ -60,7 +60,7 @@ static void pfPageRead(Page *ret) { if(read_size != PAGE_SIZE) { if (!read_size) { /* Past EOF... */ memset(ret->memAddr, 0, PAGE_SIZE); // The file will be extended when we write to the new page. - } else if(read_size == -1) { + } else if(read_size == -1) { perror("pageFile.c couldn't read"); fflush(NULL); abort(); @@ -78,13 +78,13 @@ static void pfPageRead(Page *ret) { } /** @todo need to sync the page file to disk occasionally, so that the dirty page table can be kept up to date. */ -static void pfPageWrite(Page * ret) { +static void pfPageWrite(stasis_page_handle_t * h, Page * ret) { /** If the page is clean, there's no reason to write it out. */ assert(ret->dirty == dirtyPages_isDirty(ret)); if(!ret->dirty) { - // if(!dirtyPages_isDirty(ret)) { - DEBUG(" =^)~ "); - return; + // if(!dirtyPages_isDirty(ret)) { + DEBUG(" =^)~ "); + return; } pageid_t pageoffset = ret->id * PAGE_SIZE; pageid_t offset ; @@ -133,12 +133,13 @@ static void pfPageWrite(Page * ret) { //#define PAGE_FILE_O_DIRECT /** @todo O_DIRECT is broken in older linuxes (eg 2.4). The build script should disable it on such platforms. */ -void openPageFile() { - pageRead = pfPageRead; - pageWrite = pfPageWrite; - forcePageFile = pfForcePageFile; - forceRangePageFile = pfForceRangePageFile; - closePageFile = pfClosePageFile; +stasis_page_handle_t* openPageFile() { + stasis_page_handle_t * ret = malloc(sizeof(*ret)); + ret->read = pfPageRead; + ret->write = pfPageWrite; + ret->force_file = pfForcePageFile; + ret->force_range = pfForceRangePageFile; + ret->close = pfClosePageFile; DEBUG("Opening storefile.\n"); @@ -149,7 +150,7 @@ void openPageFile() { stable = open (stasis_store_file_name, O_CREAT | O_RDWR, FILE_PERM); #endif - if(!pageFile_isDurable) { + if(!pageFile_isDurable) { fprintf(stderr, "\n**********\n"); fprintf (stderr, "pageFile.c: pageFile_isDurable==0; the page file will not force writes to disk.\n"); fprintf (stderr, " Transactions will not be durable if the system crashes.\n**********\n"); @@ -159,13 +160,13 @@ void openPageFile() { fflush(NULL); abort(); } - - pthread_mutex_init(&stable_mutex, NULL); + pthread_mutex_init(&stable_mutex, NULL); + return ret; } -static void pfForcePageFile() { - if(pageFile_isDurable) { +static void pfForcePageFile(stasis_page_handle_t * h) { + if(pageFile_isDurable) { #ifndef PAGE_FILE_O_DIRECT #ifdef HAVE_FDATASYNC fdatasync(stable); @@ -176,7 +177,7 @@ static void pfForcePageFile() { } } -static void pfForceRangePageFile(lsn_t start, lsn_t stop) { +static void pfForceRangePageFile(stasis_page_handle_t * h, lsn_t start, lsn_t stop) { if(pageFile_isDurable) { #ifdef HAVE_SYNC_FILE_RANGE int ret = sync_file_range(stable, start, stop, @@ -193,15 +194,15 @@ static void pfForceRangePageFile(lsn_t start, lsn_t stop) { #endif } } -static void pfClosePageFile() { +static void pfClosePageFile(stasis_page_handle_t * h) { assert(stable != -1); - forcePageFile(); + h->force_file(h); DEBUG("Closing storefile\n"); int ret = close(stable); - if(-1 == ret) { + if(-1 == ret) { perror("Couldn't close storefile."); fflush(NULL); abort(); diff --git a/src/stasis/io/handle.c b/src/stasis/io/handle.c index 0ffc2d0..58bbabd 100644 --- a/src/stasis/io/handle.c +++ b/src/stasis/io/handle.c @@ -5,12 +5,18 @@ * Author: sears */ #include + +#include +#include +#include +#include + #include #include #include -#include - +#include +#include // @todo this factory stuff doesn't really belong here... static stasis_handle_t * fast_factory(lsn_t off, lsn_t len, void * ignored) { diff --git a/src/stasis/io/non_blocking.c b/src/stasis/io/non_blocking.c index b1b8e7e..5820335 100644 --- a/src/stasis/io/non_blocking.c +++ b/src/stasis/io/non_blocking.c @@ -1,4 +1,8 @@ +#include "config.h" +#include +#include #include +#include #include #include #include @@ -6,7 +10,7 @@ #include #include #include -#include + /** @file diff --git a/src/stasis/operations/pageOperations.c b/src/stasis/operations/pageOperations.c index d8a8f5e..aca87a4 100644 --- a/src/stasis/operations/pageOperations.c +++ b/src/stasis/operations/pageOperations.c @@ -1,9 +1,11 @@ #define _XOPEN_SOURCE 600 #include -#include "config.h" +#include #include +#include #include +#include #include #include @@ -74,7 +76,7 @@ int TpageSetRange(int xid, pageid_t page, int offset, const void * memAddr, int /** @todo region sizes should be dynamic. */ #define TALLOC_PAGE_REGION_SIZE 128 // 512K -/** +/** This calls loadPage and releasePage directly, and bypasses the logger. */ @@ -86,7 +88,7 @@ compensated_function void pageOperationsInit() { recordid rid = {0, 0, sizeof(boundary_tag)}; // Need to find a region with some free pages in it. Tread(-1, rid, &t); - + pthread_mutex_init(&pageAllocMutex, NULL); } @@ -116,7 +118,7 @@ compensated_function pageid_t TpageAllocMany(int xid, int count) { return TregionAlloc(xid, count, STORAGE_MANAGER_NAIVE_PAGE_ALLOC); } -int TpageGetType(int xid, pageid_t page) { +int TpageGetType(int xid, pageid_t page) { Page * p = loadPage(xid, page); int ret = *stasis_page_type_ptr(p); releasePage(p); @@ -132,11 +134,11 @@ int TpageGetType(int xid, pageid_t page) { load page to be used, and update in-memory data. (obtains lock on loaded page) T update() the page, zeroing it, and saving the old successor in the log. relase the page (avoid deadlock in next step) - T update() LLADD's header page (the first in the store file) with a new copy of + T update() LLADD's header page (the first in the store file) with a new copy of the in-memory data, saving old version in the log. release mutex - Free: + Free: obtain mutex determine the current head of the freelist using in-memory data @@ -151,8 +153,8 @@ int TpageGetType(int xid, pageid_t page) { and setting the successor pointer. This operation physically logs a whole page, which makes it expensive. Doing so is necessary in general, but it is possible that application specific logic could - avoid the physical logging here. - + avoid the physical logging here. + Instead, we should just record the fact that the page was freed somewhere. That way, we don't need to read the page in, or write out information about it. If we lock the page against @@ -215,8 +217,8 @@ static int op_initialize_page(const LogEntry* e, Page* p) { return 0; } -stasis_operation_impl stasis_op_impl_page_initialize() { - stasis_operation_impl o = { +stasis_operation_impl stasis_op_impl_page_initialize() { + stasis_operation_impl o = { OPERATION_INITIALIZE_PAGE, OPERATION_INITIALIZE_PAGE, OPERATION_NOOP, diff --git a/src/stasis/pageHandle.c b/src/stasis/pageHandle.c index e528fbe..a3df4fc 100644 --- a/src/stasis/pageHandle.c +++ b/src/stasis/pageHandle.c @@ -9,21 +9,12 @@ #include -void (*pageWrite)(Page * dat); -void (*pageRead)(Page * ret); -void (*forcePageFile)(); -void (*forceRangePageFile)(); -void (*closePageFile)(); - -int printedForceWarning = 0; - -static stasis_handle_t * h; /** @todo Make sure this doesn't need to be atomic. (It isn't!) Can we get in trouble by setting the page clean after it's written out, or forcing the log too early? */ -static void phWrite(Page * ret) { +static void phWrite(stasis_page_handle_t * ph, Page * ret) { if(!ret->dirty) { return; } // This lock is only held to make the page implementation happy. We should // implicitly have exclusive access to the page before this function is called, @@ -31,7 +22,7 @@ static void phWrite(Page * ret) { writelock(ret->rwlatch,0); stasis_page_flushed(ret); LogForce(stasis_log_file, ret->LSN, LOG_FORCE_WAL); - int err = h->write(h, PAGE_SIZE * ret->id, ret->memAddr, PAGE_SIZE); + int err = ((stasis_handle_t*)ph->impl)->write(ph->impl, PAGE_SIZE * ret->id, ret->memAddr, PAGE_SIZE); if(err) { printf("Couldn't write to page file: %s\n", strerror(err)); fflush(stdout); @@ -40,9 +31,9 @@ static void phWrite(Page * ret) { dirtyPages_remove(ret); unlock(ret->rwlatch); } -static void phRead(Page * ret) { +static void phRead(stasis_page_handle_t * ph, Page * ret) { writelock(ret->rwlatch,0); - int err = h->read(h, PAGE_SIZE * ret->id, ret->memAddr, PAGE_SIZE); + int err = ((stasis_handle_t*)ph->impl)->read(ph->impl, PAGE_SIZE * ret->id, ret->memAddr, PAGE_SIZE); if(err) { if(err == EDOM) { // tried to read off end of file... @@ -57,30 +48,32 @@ static void phRead(Page * ret) { stasis_page_loaded(ret); unlock(ret->rwlatch); } -static void phForce() { - int err = h->force(h); +static void phForce(stasis_page_handle_t * ph) { + int err = ((stasis_handle_t*)ph->impl)->force(ph->impl); assert(!err); } -static void phForceRange(lsn_t start, lsn_t stop) { - int err = h->force_range(h,start,stop); +static void phForceRange(stasis_page_handle_t * ph, lsn_t start, lsn_t stop) { + int err = ((stasis_handle_t*)ph->impl)->force_range(ph->impl,start,stop); assert(!err); } -static void phClose() { - int err = h->close(h); +static void phClose(stasis_page_handle_t * ph) { + int err = ((stasis_handle_t*)ph->impl)->close(ph->impl); DEBUG("Closing pageHandle\n"); if(err) { printf("Couldn't close page file: %s\n", strerror(err)); fflush(stdout); abort(); - } + } + free(ph); } - -void pageHandleOpen(stasis_handle_t * handle) { +stasis_page_handle_t * stasis_page_handle_open(stasis_handle_t * handle) { DEBUG("Using pageHandle implementation\n"); - pageWrite = phWrite; - pageRead = phRead; - forcePageFile = phForce; - forceRangePageFile = phForceRange; - closePageFile = phClose; - h = handle; + stasis_page_handle_t * ret = malloc(sizeof(*ret)); + ret->write = phWrite; + ret->read = phRead; + ret->force_file = phForce; + ret->force_range = phForceRange; + ret->close = phClose; + ret->impl = handle; + return ret; } diff --git a/src/stasis/transactional2.c b/src/stasis/transactional2.c index b9d7b67..17ebdb9 100644 --- a/src/stasis/transactional2.c +++ b/src/stasis/transactional2.c @@ -21,7 +21,7 @@ #include #include #include // XXX remove this, move Tread() to set.c -//#include + #include #include @@ -51,8 +51,8 @@ void stasis_transaction_table_init() { } int Tinit() { - pthread_mutex_init(&stasis_transaction_table_mutex, NULL); - stasis_initted = 1; + pthread_mutex_init(&stasis_transaction_table_mutex, NULL); + stasis_initted = 1; stasis_transaction_table_num_active = 0; compensations_init(); @@ -60,26 +60,28 @@ int Tinit() { stasis_transaction_table_init(); stasis_operation_table_init(); dirtyPagesInit(); - if(LOG_TO_FILE == stasis_log_type) { - stasis_log_file = stasis_log_safe_writes_open(stasis_log_file_name, - stasis_log_file_mode, - stasis_log_file_permissions); - } else if(LOG_TO_MEMORY == stasis_log_type) { - stasis_log_file = stasis_log_impl_in_memory_open(); - } else { - assert(stasis_log_file != NULL); - } - stasis_page_init(); + if(LOG_TO_FILE == stasis_log_type) { + stasis_log_file = stasis_log_safe_writes_open(stasis_log_file_name, + stasis_log_file_mode, + stasis_log_file_permissions); + } else if(LOG_TO_MEMORY == stasis_log_type) { + stasis_log_file = stasis_log_impl_in_memory_open(); + } else { + assert(stasis_log_file != NULL); + } + + stasis_page_init(); + stasis_page_handle_t * page_handle; if(bufferManagerFileHandleType == BUFFER_MANAGER_FILE_HANDLE_DEPRECATED) { printf("\nWarning: Using old I/O routines (with known bugs).\n"); - openPageFile(); + page_handle = openPageFile(); } else { stasis_handle_t * h = stasis_handle_open(stasis_store_file_name); // XXX should not be global. - pageHandleOpen(h); + page_handle = stasis_page_handle_open(h); } - stasis_buffer_manager_open(bufferManagerType); + stasis_buffer_manager_open(bufferManagerType, page_handle); DEBUG("Buffer manager type = %d\n", bufferManagerType); pageOperationsInit(); TallocInit(); @@ -347,7 +349,6 @@ int Tdeinit() { TallocDeinit(); stasis_buffer_manager_close(); DEBUG("Closing page file tdeinit\n"); - closePageFile(); stasis_page_deinit(); stasis_log_file->close(stasis_log_file); dirtyPagesDeinit(); @@ -364,7 +365,7 @@ int TuncleanShutdown() { stasis_truncation_deinit(); TnaiveHashDeinit(); stasis_buffer_manager_simulate_crash(); - // XXX: closePageFile? + // XXX: close_file? stasis_page_deinit(); stasis_log_file->close(stasis_log_file); stasis_transaction_table_num_active = 0; diff --git a/stasis/bufferManager.h b/stasis/bufferManager.h index 8aa15fc..c3fe97a 100644 --- a/stasis/bufferManager.h +++ b/stasis/bufferManager.h @@ -76,14 +76,11 @@ terms specified in this license. @ingroup BUFFER_MANAGER * $Id$ */ - -#include - #ifndef __BUFFERMANAGER_H__ #define __BUFFERMANAGER_H__ BEGIN_C_DECLS - +#include /** * Obtain a pointer to a page from the buffer manager. The page will * be pinned, and the pointer valid until releasePage is called. @@ -148,7 +145,7 @@ extern void (*forcePages)(); extern void (*forcePageRange)(pageid_t start, pageid_t stop); extern void (*stasis_buffer_manager_simulate_crash)(); -int stasis_buffer_manager_open(int type); +int stasis_buffer_manager_open(int type, stasis_page_handle_t* ph); /** * will write out any dirty pages, assumes that there are no running * transactions diff --git a/stasis/bufferManager/bufferHash.h b/stasis/bufferManager/bufferHash.h index f161221..6249b38 100644 --- a/stasis/bufferManager/bufferHash.h +++ b/stasis/bufferManager/bufferHash.h @@ -1 +1,5 @@ -void stasis_buffer_manager_hash_open(); +#ifndef STASIS_BUFFERMANAGER_BUFFERHASH_H +#define STASIS_BUFFERMANAGER_BUFFERHASH_H +#include +void stasis_buffer_manager_hash_open(stasis_page_handle_t* ph); +#endif //STASIS_BUFFERMANAGER_BUFFERHASH_H diff --git a/stasis/bufferManager/legacy/legacyBufferManager.h b/stasis/bufferManager/legacy/legacyBufferManager.h index 0e8afd0..407631b 100644 --- a/stasis/bufferManager/legacy/legacyBufferManager.h +++ b/stasis/bufferManager/legacy/legacyBufferManager.h @@ -1,4 +1,5 @@ -#ifndef __STASIS_BUFFERMANAGER_LEGACY_LEGACYBUFFERMANAGER_H -#define __STASIS_BUFFERMANAGER_LEGACY_LEGACYBUFFERMANAGER_H -int stasis_buffer_manager_deprecated_open(); -#endif//__STASIS_BUFFERMANAGER_LEGACY_LEGACYBUFFERMANAGER_H +#ifndef STASIS_BUFFERMANAGER_LEGACY_LEGACYBUFFERMANAGER_H +#define STASIS_BUFFERMANAGER_LEGACY_LEGACYBUFFERMANAGER_H +#include +int stasis_buffer_manager_deprecated_open(stasis_page_handle_t * ph); +#endif//STASIS_BUFFERMANAGER_LEGACY_LEGACYBUFFERMANAGER_H diff --git a/stasis/bufferManager/legacy/pageCache.h b/stasis/bufferManager/legacy/pageCache.h index 16c11e7..87927aa 100644 --- a/stasis/bufferManager/legacy/pageCache.h +++ b/stasis/bufferManager/legacy/pageCache.h @@ -8,7 +8,7 @@ Implements lladd's caching policy. Looks up pageid in the cache. If pageid doesn't exist, then allocate a new slot for it. If there are no new slots, then callback into bufferManager's - pageRead() function. Eventually, this could be extended to + read_page() function. Eventually, this could be extended to support application specific caching schemes. If you would like to implement your own caching policy, implement @@ -18,7 +18,7 @@ The implementation of this module does not need to be threadsafe. @param first The caller should manually read this page by calling - pageRead() before calling pageCacheInit. + read_page() before calling pageCacheInit. @todo pageCacheInit should not take a page as a parameter. diff --git a/stasis/bufferManager/legacy/pageFile.h b/stasis/bufferManager/legacy/pageFile.h index 079b16d..c9bcb31 100644 --- a/stasis/bufferManager/legacy/pageFile.h +++ b/stasis/bufferManager/legacy/pageFile.h @@ -2,6 +2,8 @@ #ifndef __PAGE_FILE_H #define __PAGE_FILE_H -void openPageFile(); +#include + +stasis_page_handle_t* openPageFile(); #endif /* __PAGE_FILE_H */ diff --git a/stasis/io/handle.h b/stasis/io/handle.h index 429c7ac..f11b155 100644 --- a/stasis/io/handle.h +++ b/stasis/io/handle.h @@ -1,8 +1,6 @@ -#include - #ifndef IO_HANDLE_H #define IO_HANDLE_H - +#include /** stasis_handle() is a macro that prepends a unique prefix to the its diff --git a/stasis/operations/arrayList.h b/stasis/operations/arrayList.h index 50095a7..ace406c 100644 --- a/stasis/operations/arrayList.h +++ b/stasis/operations/arrayList.h @@ -75,7 +75,7 @@ terms specified in this license. /** @{ */ #ifndef __ARRAY_LIST_H #define __ARRAY_LIST_H - +#include /** Allocate a new array list. @param xid The transaction allocating the new arrayList. @@ -108,7 +108,7 @@ compensated_function int TarrayListExtend(int xid, recordid rid, int slots); @param rid the recordid pointing to the ArrayList. @return The number of items stored in the ArrayList. */ -compensated_function int TarrayListLength(int xid, recordid rid); +compensated_function int TarrayListLength(int xid, recordid rid); /** Used by Tread() and Tset() to map from arrayList index to recordid. */ recordid dereferenceArrayListRid(int xid, Page * p, int offset); diff --git a/stasis/operations/lsmIterators.h b/stasis/operations/lsmIterators.h index 5f0fb6e..ce52991 100644 --- a/stasis/operations/lsmIterators.h +++ b/stasis/operations/lsmIterators.h @@ -5,7 +5,7 @@ #include "stasis/bufferManager.h" #include "stasis/page/compression/compression.h" #include "stasis/page/compression/tuple.h" - +#include "stasis/operations.h" /** @file @@ -68,7 +68,7 @@ class gcIterator { } } else { at_end_=true; - } + } } explicit gcIterator() : i_(0), @@ -682,7 +682,7 @@ class versioningIterator { toByteArray(stlSetIterator * const t); }; -template +template inline const byte * toByteArray(stlSetIterator * const t) { return (*(t->it_)).toByteArray(); } diff --git a/stasis/operations/pageOperations.h b/stasis/operations/pageOperations.h index eec2a13..737b246 100644 --- a/stasis/operations/pageOperations.h +++ b/stasis/operations/pageOperations.h @@ -41,12 +41,12 @@ terms specified in this license. ---*/ /** - * @file + * @file * * Provides raw access to entire pages. * - * LLADD's pages are PAGE_SIZE bytes long. Currently, two integers are - * reserved for the LSN and the page type. providing PAGE_SIZE-8 bytes + * LLADD's pages are PAGE_SIZE bytes long. Currently, two integers are + * reserved for the LSN and the page type. providing PAGE_SIZE-8 bytes * of usable space. * * @ingroup OPERATIONS @@ -54,11 +54,10 @@ terms specified in this license. * @see page.h * * $Id$ - */ - - + */ #ifndef __PAGE_OPERATIONS_H__ #define __PAGE_OPERATIONS_H__ +#include compensated_function pageid_t TpageAlloc(int xid); compensated_function pageid_t TfixedPageAlloc(int xid, int size); diff --git a/stasis/operations/regions.h b/stasis/operations/regions.h index af2da30..76b02e9 100644 --- a/stasis/operations/regions.h +++ b/stasis/operations/regions.h @@ -1,3 +1,6 @@ +#ifndef STASIS_OPERATIONS_REGIONS_H +#define STASIS_OPERATIONS_REGIONS_H +#include /** Allocates and deallocates regions of pages. The page before each region is of type BOUNDARY_TAG. All regions except the last one in @@ -9,7 +12,7 @@ a newly allocated region are undefined. */ -typedef struct boundary_tag { +typedef struct boundary_tag { pageid_t size; pageid_t prev_size; int status; @@ -51,3 +54,4 @@ stasis_operation_impl stasis_op_impl_region_dealloc_inverse(); void fsckRegions(int xid); // XXX need callbacks to handle transaction commit/abort. +#endif diff --git a/stasis/page.h b/stasis/page.h index 6751281..cd1516f 100644 --- a/stasis/page.h +++ b/stasis/page.h @@ -88,8 +88,8 @@ terms specified in this license. Page implementations are free to define their own access methods and APIs. However, Stasis's record oriented page interface - provides a default set of methods for page access. - + provides a default set of methods for page access. + @see PAGE_RECORD_INTERFACE @todo Page deallocators should call stasis_page_cleanup() @@ -102,8 +102,10 @@ terms specified in this license. #define __PAGE_H__ #include +#include #include -#include + +#include BEGIN_C_DECLS @@ -182,7 +184,7 @@ struct Page_s { Read from a record Read lock - @see rwlatch, getPage(), pageRalloc(), pageRead() + @see rwlatch, getPage(), pageRalloc(), read_page() */ rwl * loadlatch; @@ -337,8 +339,8 @@ lsn_t stasis_page_lsn_read(const Page * page); stasis_page_type_ptr(), for example: @code - int * stasis_page_type_ptr(Page *p) { - return ( (int*)stasis_page_lsn_ptr(Page *p) ) - 1; + int * stasis_page_type_ptr(Page *p) { + return ( (int*)stasis_page_lsn_ptr(Page *p) ) - 1; } @endcode @@ -487,7 +489,7 @@ static const size_t USABLE_SIZE_OF_PAGE = (PAGE_SIZE - sizeof(lsn_t) - sizeof(in @return The length of the record in bytes. */ -static inline size_t +static inline size_t stasis_record_type_to_size(ssize_t type) { if(type >= 0) { return type; diff --git a/stasis/pageHandle.h b/stasis/pageHandle.h index 5079d91..3c67855 100644 --- a/stasis/pageHandle.h +++ b/stasis/pageHandle.h @@ -1,57 +1,66 @@ -#include +#ifndef STASIS_PAGEHANDLE_H +#define STASIS_PAGEHANDLE_H +#include #include -/** - * Write page to disk, including correct LSN. Doing so may require a - * call to logSync(). There is not much that can be done to avoid - * this call right now. In the future, it might make sense to check - * to see if some other page can be kicked, in order to avoid the log - * flush. - * - * This funciton is automatically called immediately before a page is - * evicted from cache. Operation implementors, and normal users - * should never have to call this routine. - * - * @see bufferManager.c for the implementation of pageWrite - * - * @param dat The page to be flushed to disk. No concurrent calls - * may have the same value of dat. - * - */ -extern void (*pageWrite)(Page * dat); -extern int pageFile_isDurable; +typedef struct stasis_page_handle_t { + /** + * Write page to disk, including correct LSN. Doing so may require a + * call to logSync(). There is not much that can be done to avoid + * this call right now. In the future, it might make sense to check + * to see if some other page can be kicked, in order to avoid the log + * flush. + * + * This function is automatically called immediately before a page is + * evicted from cache. Operation implementors, and normal users + * should never have to call this routine. + * + * @see bufferManager.c for the implementation of pageWrite + * + * @param dat The page to be flushed to disk. No concurrent calls + * may have the same value of dat. + * + */ + void (*write)(struct stasis_page_handle_t* ph, Page * dat); -/** - Read a page from disk. This bypassess the cache, and should only be - called by bufferManager and blobManager. To retrieve a page under - normal circumstances, use loadPage() instead. + /** + Read a page from disk. This bypasses the cache, and should only be + called by bufferManager and blobManager. To retrieve a page under + normal circumstances, use loadPage() instead. - Operation implementors and normal users should never need to call - this routine. + Operation implementors and normal users should never need to call + this routine. - @param ret A page struct, with id set correctly. The rest of this - struct will be overwritten by pageMap. This method assumes that no - concurrent calls will be passed the same value of ret. - - @see bufferManager.c for the implementation of pageRead. + @param ret A page struct, with id set correctly. The rest of this + struct will be overwritten by pageMap. This method assumes that no + concurrent calls will be passed the same value of ret. - @todo pageRead and pageWrite should be stored in a struct returned by - an initailizer, not in global function pointers. -*/ -extern void (*pageRead)(Page * ret); -/** - Force the page file to disk. Pages that have had pageWrite() - called on them are guaranteed to be on disk after this returns. + @see bufferManager.c for the implementation of read_page. - (Note that bufferManager implementations typically call pageWrite() - automatically, so in general, other pages could be written back - as well...) -*/ -extern void (*forcePageFile)(); -extern void (*forceRangePageFile)(); -/** - Force the page file to disk, then close it. -*/ -extern void (*closePageFile)(); + @todo read_page and pageWrite should be stored in a struct returned by + an initializer, not in global function pointers. + */ + void (*read)(struct stasis_page_handle_t* ph, Page * ret); + /** + Force the page file to disk. Pages that have had pageWrite() + called on them are guaranteed to be on disk after this returns. -void pageHandleOpen(stasis_handle_t * handle); + (Note that bufferManager implementations typically call pageWrite() + automatically, so in general, other pages could be written back + as well...) + */ + void (*force_file)(struct stasis_page_handle_t* ph); + void (*force_range)(struct stasis_page_handle_t* ph, lsn_t start, lsn_t stop); + /** + Force the page file to disk, then close it. + */ + void (*close)(struct stasis_page_handle_t* ph); + /** + * Pointer to implementation-specific state. + */ + void * impl; +} stasis_page_handle_t; + +stasis_page_handle_t * stasis_page_handle_open(struct stasis_handle_t * handle); + +#endif //STASIS_PAGEHANDLE_H diff --git a/test/stasis/check_indirect.c b/test/stasis/check_indirect.c index 8a9ea48..d29133b 100644 --- a/test/stasis/check_indirect.c +++ b/test/stasis/check_indirect.c @@ -42,7 +42,7 @@ terms specified in this license. ---*/ #include "../check_includes.h" - +#include #include #include #include diff --git a/test/stasis/check_io.c b/test/stasis/check_io.c index a6bdc7c..3ec7109 100644 --- a/test/stasis/check_io.c +++ b/test/stasis/check_io.c @@ -42,7 +42,8 @@ terms specified in this license. #include "../check_includes.h" #include - +#include +#include #include #include #include