diff --git a/benchmarks/sequentialThroughput.c b/benchmarks/sequentialThroughput.c new file mode 100644 index 0000000..5763121 --- /dev/null +++ b/benchmarks/sequentialThroughput.c @@ -0,0 +1,78 @@ +#include +#include +#include + +#include +#include + +/*static stasis_handle_t * memory_factory(lsn_t off, lsn_t len, void * ignored) { + stasis_handle_t * h = stasis_handle(open_memory)(off); + //h = stasis_handle(open_debug)(h); + stasis_write_buffer_t * w = h->append_buffer(h, len); + w->h->release_write_buffer(w); + return h; +} +typedef struct sf_args { + char * filename; + int openMode; + int filePerm; +} sf_args; +static stasis_handle_t * traditional_file_factory(void * argsP) { + sf_args * args = (sf_args*) argsP; + stasis_handle_t * h = stasis_handle(open_file)(0, args->filename, args->openMode, args->filePerm); + //h = stasis_handle(open_debug)(h); + return h; +} +*/ + +static inline long mb_to_page(long mb) { + return (mb * 1024 * 1024) / PAGE_SIZE; +} + +int main(int argc, char ** argv) { + int direct = 0; + int legacyBM = 0; + int legacyFH = 0; + + long page_count = mb_to_page(100); + + for(int i = 1; i < argc; i++) { + if(!strcmp(argv[i], "--direct")) { + direct = 1; + bufferManagerO_DIRECT = 1; + } else if(!strcmp(argv[i], "--deprecatedBM")) { + bufferManagerType = BUFFER_MANAGER_DEPRECATED_HASH; + legacyBM = 1; + } else if(!strcmp(argv[i], "--deprecatedFH")) { + bufferManagerFileHandleType = BUFFER_MANAGER_FILE_HANDLE_DEPRECATED; + legacyFH = 1; + } else if(!strcmp(argv[i], "--pfile")) { + bufferManagerNonBlockingSlowHandleType = IO_HANDLE_PFILE; + } else if(!strcmp(argv[i], "--file")) { + bufferManagerNonBlockingSlowHandleType = IO_HANDLE_FILE; + } else if(!strcmp(argv[i], "--mb")) { + i++; + page_count = mb_to_page(atoll(argv[i])); + } else { + printf("Unknown argument: %s\n", argv[i]); + return 1; + } + } + + if(legacyFH && direct) { + printf("--direct and --deprecatedFH are incompatible with each other\n"); + return 1; + } + + Tinit(); + + for(long i =0; i < page_count; i++) { + Page * p = loadPage(-1, i); + dirtyPages_add(p); + releasePage(p); + } + + + Tdeinit(); + return 0; +} diff --git a/libdfa/rw.h b/libdfa/rw.h index bc9b64f..4576341 100644 --- a/libdfa/rw.h +++ b/libdfa/rw.h @@ -22,6 +22,17 @@ #include #ifndef __LIBDFA_RW_H #define __LIBDFA_RW_H + +#ifdef __cplusplus +# define BEGIN_C_DECLS extern "C" { +# define END_C_DECLS } +#else /* !__cplusplus */ +# define BEGIN_C_DECLS +# define END_C_DECLS +#endif /* __cplusplus */ + +BEGIN_C_DECLS + typedef struct { pthread_mutex_t *mut; int writers; @@ -55,4 +66,7 @@ rwargs *newRWargs (rwl *l, int i, long d); void *reader (void *args); void *writer (void *args); */ + +END_C_DECLS + #endif /* rw.h */ diff --git a/src/stasis/bufferManager/bufferHash.c b/src/stasis/bufferManager/bufferHash.c index e7006b5..dfc7845 100644 --- a/src/stasis/bufferManager/bufferHash.c +++ b/src/stasis/bufferManager/bufferHash.c @@ -106,9 +106,9 @@ inline static Page * writeBackOnePage() { lru->remove(lru, victim); Page * old = LH_ENTRY(remove)(cachedPages, &(victim->id), sizeof(int)); assert(old == victim); - + // printf("Write(%ld)\n", (long)victim->id); - pageWrite(victim); + pageWrite(victim); /// XXX pageCleanup and pageFlushed might be heavyweight. pageCleanup(victim); // Make sure that no one mistakenly thinks this is still a live copy. victim->id = -1; diff --git a/src/stasis/io/non_blocking.c b/src/stasis/io/non_blocking.c index d6397ae..72c36c6 100644 --- a/src/stasis/io/non_blocking.c +++ b/src/stasis/io/non_blocking.c @@ -62,7 +62,7 @@ //#define MAX_MERGE 4 /** @return a read buffer indicating an error has occured */ -static inline stasis_read_buffer_t * alloc_read_buffer_error(stasis_handle_t *h, +static inline stasis_read_buffer_t* alloc_read_buffer_error(stasis_handle_t *h, int error) { assert(error); stasis_read_buffer_t * r = malloc(sizeof(stasis_read_buffer_t)); @@ -74,8 +74,8 @@ static inline stasis_read_buffer_t * alloc_read_buffer_error(stasis_handle_t *h, return r; } /** @return a read buffer indicating a write error has occured */ -static inline stasis_write_buffer_t * alloc_write_buffer_error(stasis_handle_t *h, - int error) { +static inline stasis_write_buffer_t* alloc_write_buffer_error + (stasis_handle_t *h, int error) { assert(error); stasis_write_buffer_t * w = malloc(sizeof(stasis_write_buffer_t)); w->h = h; @@ -87,7 +87,7 @@ static inline stasis_write_buffer_t * alloc_write_buffer_error(stasis_handle_t * return w; } /** Wraps stasis_handle_t so that it can be stored in an rbtree. */ -typedef struct tree_node { +typedef struct tree_node { lsn_t start_pos; lsn_t end_pos; stasis_handle_t * h; @@ -97,43 +97,43 @@ typedef struct tree_node { be deallocated unless this is zero. */ int pin_count; /** set to 1 when the handle is written to, 0 when the handle is - written back to disk, INVALID_NODE when the handle is not in + written back to disk, INVALID_NODE when the handle is not in the tree. */ int dirty; } tree_node; /** Wrapper for write buffers */ -typedef struct write_buffer_impl { +typedef struct write_buffer_impl { /** The tree node that contains this buffer */ const tree_node * n; /** The underlying buffer. */ stasis_write_buffer_t * w; } write_buffer_impl; -typedef struct read_buffer_impl { - /** The tree node that contains this buffer, or NULL if the buffer +typedef struct read_buffer_impl { + /** The tree node that contains this buffer, or NULL if the buffer is from a slow handle. */ const tree_node * n; /** The underlying buffer. */ stasis_read_buffer_t * r; } read_buffer_impl; -/** +/** Compare two tree_node structs. Two tree nodes are equal if they are zero length, and start at the same point, or if they overlap. */ static int cmp_handle(const void * ap, const void * bp, const void * ignored) { tree_node * a = (tree_node*)ap; tree_node * b = (tree_node*)bp; - if(a->start_pos == b->start_pos && + if(a->start_pos == b->start_pos && a->start_pos == a->end_pos && - b->start_pos == b->end_pos ) { + b->start_pos == b->end_pos ) { return 0; } - if(a->end_pos <= b->start_pos) { + if(a->end_pos <= b->start_pos) { return -1; } else if(a->start_pos >= b->end_pos) { return 1; - } else { + } else { return 0; } } @@ -143,7 +143,7 @@ typedef struct nbw_impl { // Handle state lsn_t start_pos; - lsn_t end_pos; + lsn_t end_pos; // Fields to manage slow handles stasis_handle_t * (*slow_factory)(void * arg); @@ -178,7 +178,7 @@ static inline void freeFastHandle(nbw_impl * impl, const tree_node * n); /** Obtain a slow handle from the pool of existing ones, or obtain a new one by calling impl->slow_factory.. */ -static stasis_handle_t * getSlowHandle(nbw_impl * impl) { +static stasis_handle_t * getSlowHandle(nbw_impl * impl) { pthread_mutex_lock(&impl->mut); stasis_handle_t * slow = (stasis_handle_t*)popMaxVal(&impl->slow_handles); assert(slow); @@ -186,20 +186,20 @@ static stasis_handle_t * getSlowHandle(nbw_impl * impl) { impl->slow_handle_count++; pthread_mutex_unlock(&impl->mut); slow = impl->slow_factory(impl->slow_factory_arg); - } else { + } else { pthread_mutex_unlock(&impl->mut); } return slow; } /** Release a file handle back into the pool of slow handles. */ -static void releaseSlowHandle(nbw_impl * impl, stasis_handle_t * slow) { +static void releaseSlowHandle(nbw_impl * impl, stasis_handle_t * slow) { assert(slow); pthread_mutex_lock(&impl->mut); addVal(&impl->slow_handles, (long)slow); pthread_mutex_unlock(&impl->mut); } -static tree_node * allocTreeNode(lsn_t off, lsn_t len) { +static tree_node * allocTreeNode(lsn_t off, lsn_t len) { tree_node * ret = malloc(sizeof(tree_node)); ret->start_pos = off; ret->end_pos = off + len; @@ -235,7 +235,7 @@ static inline const tree_node * allocFastHandle(nbw_impl * impl, lsn_t off, impl->fast_handle_count, impl->max_fast_handles); } if(impl->used_buffer_size + len > impl->max_buffer_size) { - printf("Blocking on write. %lld bytes (%lld max)\n", + printf("Blocking on write. %lld bytes (%lld max)\n", impl->used_buffer_size, impl->max_buffer_size); } @@ -267,15 +267,24 @@ static inline const tree_node * allocFastHandle(nbw_impl * impl, lsn_t off, } #ifdef EAGER_MERGE - // check for a mergable range immediately after the point we're interested in. + // check for a mergable range immediately after the point we're + // interested in. (DEAD CODE) tree_node dummy; dummy.start_pos = n->end_pos; dummy.end_pos = n->end_pos+1; - while((np = (tree_node*)RB_ENTRY(find)(&dummy, impl->fast_handles)) && np->dirty && !np->pin_count && np->write_count + n->write_count < MAX_MERGE) { - DEBUG("Did post-merge of page %lld-%lld (%d) and %lld-%lld (%d) outstanding = %d\n", n->start_pos/PAGE_SIZE, -1+n->end_pos/PAGE_SIZE, n->write_count, np->start_pos/PAGE_SIZE, -1+np->end_pos/PAGE_SIZE, np->write_count, impl->fast_handle_count); + while((np = (tree_node*)RB_ENTRY(find)(&dummy, impl->fast_handles)) + && np->dirty + && !np->pin_count + && np->write_count + n->write_count < MAX_MERGE) { + DEBUG("Did post-merge of page %lld-%lld (%d) and %lld-%lld (%d) " + "outstanding = %d\n", n->start_pos/PAGE_SIZE, + -1+n->end_pos/PAGE_SIZE, n->write_count, np->start_pos/PAGE_SIZE, + -1+np->end_pos/PAGE_SIZE, np->write_count, impl->fast_handle_count); lsn_t appendLen = np->end_pos - np->start_pos; - stasis_read_buffer_t * r= np->h->read_buffer(np->h,np->start_pos, appendLen); + stasis_read_buffer_t * r= np->h->read_buffer(np->h,np->start_pos, + appendLen); + int ret = n->h->write(n->h,np->start_pos,r->buf, appendLen); assert(!ret); ret = r->h->release_read_buffer(r); @@ -293,7 +302,8 @@ static inline const tree_node * allocFastHandle(nbw_impl * impl, lsn_t off, return n; } -static inline const tree_node * findFastHandle(nbw_impl * impl, lsn_t off, lsn_t len) { +static inline const tree_node * findFastHandle(nbw_impl * impl, lsn_t off, + lsn_t len) { tree_node * np = allocTreeNode(off, len); pthread_mutex_lock(&impl->mut); @@ -311,19 +321,19 @@ static inline void freeFastHandle(nbw_impl * impl, const tree_node * n) { n->h->close(n->h); free((void*)n); } -static inline int releaseFastHandle(nbw_impl * impl, const tree_node * n, +static inline int releaseFastHandle(nbw_impl * impl, const tree_node * n, int setDirty) { - if(n->dirty == INVALID_NODE) { + if(n->dirty == INVALID_NODE) { // Not in tree; cast removes "const" releaseSlowHandle(impl, n->h); free((void*)n); return 0; - } else { + } else { assert(setDirty == 0 || setDirty == 1); assert(n->dirty == 0 || n->dirty == 1); pthread_mutex_lock(&impl->mut); ((tree_node*)n)->pin_count--; - if(n->dirty == 0) { + if(n->dirty == 0) { ((tree_node*)n)->dirty = setDirty; } pthread_mutex_unlock(&impl->mut); @@ -334,14 +344,14 @@ static inline int releaseFastHandle(nbw_impl * impl, const tree_node * n, } } /** @todo nbw_num_copies is unimplemented. */ -static int nbw_num_copies(stasis_handle_t * h) { +static int nbw_num_copies(stasis_handle_t * h) { return 0; } /** @todo nbw_num_copies_buffer is unimplemented. */ -static int nbw_num_copies_buffer(stasis_handle_t * h) { +static int nbw_num_copies_buffer(stasis_handle_t * h) { return 0; } -static int nbw_close(stasis_handle_t * h) { +static int nbw_close(stasis_handle_t * h) { nbw_impl * impl = h->impl; pthread_mutex_lock(&impl->mut); @@ -357,7 +367,8 @@ static int nbw_close(stasis_handle_t * h) { free(impl->workers); DEBUG("nbw had %d slow handles\n", impl->slow_handle_count); - DEBUG("fast handles = %d, used buffer = %lld\n", impl->fast_handle_count, impl->used_buffer_size); + DEBUG("fast handles = %d, used buffer = %lld\n", impl->fast_handle_count, + impl->used_buffer_size); if(impl->requested_bytes_written < impl->total_bytes_written) { printf("nbw: Problem with write coalescing detected.\n" "Client wrote %lld bytes, handle wrote %lld.\n", @@ -377,47 +388,47 @@ static int nbw_close(stasis_handle_t * h) { destroyList(&impl->slow_handles); assert(impl->slow_handle_count == 0); - + free(h->impl); free(h); return 0; } -static lsn_t nbw_start_position(stasis_handle_t *h) { +static lsn_t nbw_start_position(stasis_handle_t *h) { nbw_impl * impl = h->impl; pthread_mutex_lock(&impl->mut); lsn_t ret = impl->start_pos; pthread_mutex_unlock(&impl->mut); return ret; } -static lsn_t nbw_end_position(stasis_handle_t *h) { +static lsn_t nbw_end_position(stasis_handle_t *h) { nbw_impl * impl = h->impl; pthread_mutex_lock(&impl->mut); lsn_t ret = impl->end_pos; pthread_mutex_unlock(&impl->mut); return ret; } -static stasis_write_buffer_t * nbw_write_buffer(stasis_handle_t * h, +static stasis_write_buffer_t * nbw_write_buffer(stasis_handle_t * h, lsn_t off, lsn_t len) { nbw_impl * impl = h->impl; const tree_node * n = allocFastHandle(impl, off, len); stasis_write_buffer_t * w = n->h->write_buffer(n->h, off, len); write_buffer_impl * w_impl = malloc(sizeof(write_buffer_impl)); - w_impl->n = n; + w_impl->n = n; w_impl->w = w; stasis_write_buffer_t * ret = malloc(sizeof(stasis_write_buffer_t)); - ret->h = h; - ret->off = w->off; - ret->len = w->len; - ret->buf = w->buf; + ret->h = h; + ret->off = w->off; + ret->len = w->len; + ret->buf = w->buf; ret->error = w->error; - ret->impl = w_impl; - - if(!ret->error) { + ret->impl = w_impl; + + if(!ret->error) { pthread_mutex_lock(&impl->mut); assert(impl->start_pos <= impl->end_pos); - if(off < impl->start_pos) { + if(off < impl->start_pos) { // Note: We're returning a valid write buffer to space before // the handle's truncation point. Spooky. ret->error = EDOM; @@ -430,8 +441,8 @@ static stasis_write_buffer_t * nbw_write_buffer(stasis_handle_t * h, return ret; } -static stasis_write_buffer_t * nbw_append_buffer(stasis_handle_t * h, - lsn_t len) { +static stasis_write_buffer_t * nbw_append_buffer(stasis_handle_t * h, + lsn_t len) { nbw_impl * impl = h->impl; pthread_mutex_lock(&impl->mut); @@ -442,7 +453,7 @@ static stasis_write_buffer_t * nbw_append_buffer(stasis_handle_t * h, return nbw_write_buffer(h, off, len); } -static int nbw_release_write_buffer(stasis_write_buffer_t * w) { +static int nbw_release_write_buffer(stasis_write_buffer_t * w) { nbw_impl * impl = w->h->impl; write_buffer_impl * w_impl = w->impl; const tree_node * n = w_impl->n; @@ -453,7 +464,7 @@ static int nbw_release_write_buffer(stasis_write_buffer_t * w) { return 0; } static stasis_read_buffer_t * nbw_read_buffer(stasis_handle_t * h, - lsn_t off, lsn_t len) { + lsn_t off, lsn_t len) { nbw_impl * impl = h->impl; const tree_node * n = findFastHandle(impl, off, len); stasis_read_buffer_t * r; @@ -474,16 +485,17 @@ static stasis_read_buffer_t * nbw_read_buffer(stasis_handle_t * h, return ret; } -static int nbw_release_read_buffer(stasis_read_buffer_t * r) { +static int nbw_release_read_buffer(stasis_read_buffer_t * r) { nbw_impl * impl = r->h->impl; read_buffer_impl * r_impl = r->impl; const tree_node * n = r_impl->n; stasis_handle_t * oldHandle = r_impl->r->h; r_impl->r->h->release_read_buffer(r_impl->r); - // XXX shouldn't need to check for this here; getFastHandle does something similar... - if(n) { + // XXX shouldn't need to check for this here; getFastHandle does + // something similar... + if(n) { releaseFastHandle(impl, n, 0); - } else { + } else { assert(oldHandle); releaseSlowHandle(impl, oldHandle); } @@ -491,16 +503,16 @@ static int nbw_release_read_buffer(stasis_read_buffer_t * r) { free(r); return 0; } -static int nbw_write(stasis_handle_t * h, lsn_t off, - const byte * dat, lsn_t len) { +static int nbw_write(stasis_handle_t * h, lsn_t off, + const byte * dat, lsn_t len) { nbw_impl * impl = h->impl; const tree_node * n = allocFastHandle(impl, off, len); int ret = n->h->write(n->h, off, dat, len); releaseFastHandle(impl, n, 1); - if(!ret) { + if(!ret) { pthread_mutex_lock(&impl->mut); assert(impl->start_pos <= impl->end_pos); - if(off < impl->start_pos) { + if(off < impl->start_pos) { ret = EDOM; } else if(off + len > impl->end_pos) { impl->end_pos = off+len; @@ -510,8 +522,8 @@ static int nbw_write(stasis_handle_t * h, lsn_t off, } return ret; } -static int nbw_append(stasis_handle_t * h, lsn_t * off, - const byte * dat, lsn_t len) { +static int nbw_append(stasis_handle_t * h, lsn_t * off, + const byte * dat, lsn_t len) { nbw_impl * impl = h->impl; pthread_mutex_lock(&impl->mut); @@ -523,33 +535,33 @@ static int nbw_append(stasis_handle_t * h, lsn_t * off, int ret = nbw_write(h, *off, dat, len); return ret; } -static int nbw_read(stasis_handle_t * h, - lsn_t off, byte * buf, lsn_t len) { +static int nbw_read(stasis_handle_t * h, + lsn_t off, byte * buf, lsn_t len) { nbw_impl * impl = h->impl; const tree_node * n = findFastHandle(impl, off, len); int ret; // XXX should be handled by releaseFastHandle. - if(n) { + if(n) { ret = n->h->read(n->h, off, buf, len); releaseFastHandle(impl, n, 0); - } else { + } else { stasis_handle_t * slow = getSlowHandle(impl); ret = slow->read(slow, off, buf, len); releaseSlowHandle(impl, slow); } return ret; } -static int nbw_truncate_start(stasis_handle_t * h, lsn_t new_start) { +static int nbw_truncate_start(stasis_handle_t * h, lsn_t new_start) { nbw_impl * impl = h->impl; int error = 0; pthread_mutex_lock(&impl->mut); - if(new_start <= impl->end_pos && new_start > impl->start_pos) { + if(new_start <= impl->end_pos && new_start > impl->start_pos) { impl->start_pos = new_start; - } else { + } else { error = EDOM; } pthread_mutex_unlock(&impl->mut); - if(!error) { + if(!error) { // XXX close all slow handles; truncate of them. (ie: implement truncate) } return error; @@ -586,12 +598,12 @@ struct stasis_handle_t nbw_func = { the mutex. */ -static void * nbw_worker(void * handle) { +static void * nbw_worker(void * handle) { stasis_handle_t * h = handle; nbw_impl * impl = h->impl; pthread_mutex_lock(&impl->mut); - while(1) { + while(1) { // cast strips const. tree_node * node = (tree_node*)RB_ENTRY(min)(impl->fast_handles); int writes = 0; @@ -633,7 +645,8 @@ static void * nbw_worker(void * handle) { } stasis_handle_t * fast2 = np->h; - stasis_read_buffer_t * r2 = fast2->read_buffer(fast2,np->start_pos, np_len); + stasis_read_buffer_t * r2 = fast2->read_buffer(fast2,np->start_pos, + np_len); memcpy(buf + buf_off, r2->buf, np_len); buf_off += np_len; r2->h->release_read_buffer(r2); @@ -647,7 +660,8 @@ static void * nbw_worker(void * handle) { pthread_mutex_unlock(&impl->mut); if(len != PAGE_SIZE) { - DEBUG("merged %lld pages at %lld into single write\n", len/PAGE_SIZE, off/PAGE_SIZE); + DEBUG("merged %lld pages at %lld into single write\n", + len/PAGE_SIZE, off/PAGE_SIZE); } slow->write(slow, off, buf, len); @@ -660,15 +674,16 @@ static void * nbw_worker(void * handle) { pthread_mutex_lock(&impl->mut); node->pin_count--; } - tree_node *new_node = (tree_node*)RB_ENTRY(lookup)(RB_LUGREAT, node, impl->fast_handles); - if(!node->dirty && !node->pin_count) { + tree_node *new_node = (tree_node*)RB_ENTRY(lookup)(RB_LUGREAT, node, + impl->fast_handles); + if(!node->dirty && !node->pin_count) { impl->fast_handle_count -= node->write_count; impl->used_buffer_size -= (node->end_pos - node->start_pos); freeFastHandle(impl, node); } node = new_node; } - if(!impl->fast_handle_count || !writes) { + if(!impl->fast_handle_count || !writes) { if(impl->still_open) { pthread_cond_wait(&impl->pending_writes_cond, &impl->mut); } else { @@ -680,12 +695,11 @@ static void * nbw_worker(void * handle) { return 0; } -stasis_handle_t * stasis_handle(open_non_blocking)(stasis_handle_t * (*slow_factory)(void * arg), - void * slow_factory_arg, - stasis_handle_t * (*fast_factory)(lsn_t, lsn_t, void *), - void * fast_factory_arg, - int worker_thread_count, - lsn_t buffer_size, int max_fast_handles) { +stasis_handle_t * stasis_handle(open_non_blocking) + (stasis_handle_t * (*slow_factory)(void * arg), void * slow_factory_arg, + stasis_handle_t * (*fast_factory)(lsn_t, lsn_t, void *), + void * fast_factory_arg, int worker_thread_count, lsn_t buffer_size, + int max_fast_handles) { nbw_impl * impl = malloc(sizeof(nbw_impl)); pthread_mutex_init(&impl->mut, 0); @@ -715,16 +729,16 @@ stasis_handle_t * stasis_handle(open_non_blocking)(stasis_handle_t * (*slow_fact impl->worker_count = worker_thread_count; pthread_cond_init(&impl->pending_writes_cond, 0); - + impl->still_open = 1; stasis_handle_t *h = malloc(sizeof(stasis_handle_t)); *h = nbw_func; h->impl = impl; - for(int i = 0; i < impl->worker_count; i++) { + for(int i = 0; i < impl->worker_count; i++) { int err = pthread_create(&(impl->workers[i]), 0, nbw_worker, h); - if(err) { + if(err) { perror("Coudln't spawn worker thread for non_blocking io"); } } diff --git a/src/stasis/io/pfile.c b/src/stasis/io/pfile.c index fedab31..442fe94 100644 --- a/src/stasis/io/pfile.c +++ b/src/stasis/io/pfile.c @@ -5,8 +5,10 @@ #include #include #include +#include #include + #include #include #include @@ -22,62 +24,95 @@ @see handle.h */ +/** + Per-handle information for pfile +*/ typedef struct pfile_impl { + /** + This should be held whenever end_pos is accessed. + */ pthread_mutex_t mut; + /** + The logical offset of the file. Once the file is open, this will + never change, as pfile doesn't support truncation. + */ lsn_t start_pos; + /** + The logical end of the file. + */ lsn_t end_pos; + /** + File descriptor + */ int fd; + /** + Flags passed into open + */ int file_flags; + /** + File creation mode + */ int file_mode; - char * filename; + /** + The name of the underlying file. + */ + char *filename; } pfile_impl; +/** + We can pass the caller's buffer directly into pread()/pwrite() + without making any copies. +*/ static int pfile_num_copies(stasis_handle_t *h) { return 0; } +/** + We have to call malloc(), but not memcpy(). Maybe this should return 1. +*/ static int pfile_num_copies_buffer(stasis_handle_t *h) { return 0; } -static int pfile_close(stasis_handle_t *h) { - pfile_impl * impl = (pfile_impl*)h->impl; +static int pfile_close(stasis_handle_t *h) { + pfile_impl *impl = (pfile_impl*)h->impl; DEBUG("Closing pfile: end = %lld\n", impl->end_pos); int fd = impl->fd; free((void*)impl->filename); free(impl); free(h); int ret = close(fd); - if(!ret) return 0; + if (!ret) return 0; else return errno; } + static lsn_t pfile_start_position(stasis_handle_t *h) { - pfile_impl * impl = (pfile_impl*)h->impl; + pfile_impl *impl = (pfile_impl*)h->impl; return impl->start_pos; } + static lsn_t pfile_end_position(stasis_handle_t *h) { - pfile_impl * impl = (pfile_impl*)h->impl; + pfile_impl *impl = (pfile_impl*)h->impl; pthread_mutex_lock(&impl->mut); lsn_t ret = impl->end_pos; pthread_mutex_unlock(&impl->mut); return ret; } -inline static int pfile_write_unlocked(int fd, lsn_t off, const byte * dat, + +inline static int pfile_write_unlocked(int fd, lsn_t off, const byte *dat, lsn_t len) { - int error = 0; - ssize_t bytes_written = 0; - while(bytes_written < len) { + while (bytes_written < len) { ssize_t count = pwrite(fd, dat + bytes_written, len - bytes_written, off + bytes_written); - if(count == -1) { - if(errno == EAGAIN || errno == EINTR) { + if (count == -1) { + if (errno == EAGAIN || errno == EINTR) { // @see file.c for an explanation; basically; we ignore these, // and try again. count = 0; } else { - if(errno == EBADF) { + if (errno == EBADF) { error = EBADF; } else { error = errno; @@ -86,35 +121,49 @@ inline static int pfile_write_unlocked(int fd, lsn_t off, const byte * dat, } } bytes_written += count; - if(bytes_written != len) { + if (bytes_written != len) { DEBUG("pwrite spinning\n"); } } return error; } -static int pfile_read(stasis_handle_t * h, lsn_t off, byte * buf, lsn_t len) { - pfile_impl * impl = (pfile_impl*)(h->impl); + +static int pfile_read(stasis_handle_t *h, lsn_t off, byte *buf, lsn_t len) { + pfile_impl *impl = (pfile_impl*)(h->impl); int error = 0; - if(off < impl->start_pos) { + // reading impl->end_pos is probably atomic on most hardware, but + // this is safer. + pthread_mutex_lock(&impl->mut); + lsn_t end_pos = impl->end_pos; + pthread_mutex_unlock(&impl->mut); + + // checking end_pos is adequate, (we assume this is the only handle + // touching the file) + + if (off < impl->start_pos) { error = EDOM; - } else if(off + len > impl->end_pos) { + } else if (off + len > end_pos) { error = EDOM; } else { ssize_t bytes_read = 0; - while(bytes_read < len) { + while (bytes_read < len) { ssize_t count = pread(impl->fd, buf + bytes_read, len - bytes_read, off + bytes_read - impl->start_pos); - if(count == -1) { - if(errno == EAGAIN || errno == EINTR) { + if (count == -1) { + if (errno == EAGAIN || errno == EINTR) { count = 0; } else { - if(errno == EBADF) { + if (errno == EBADF) { h->error = EBADF; } else { + int err = errno; + // XXX Why is sys_errlist[] is unavailable here? + perror("pfile_read encountered an unknown error code."); + fprintf(stderr, "pread() returned -1; errno is %d\n",err); abort(); // XXX other errors? } error = errno; @@ -122,24 +171,28 @@ static int pfile_read(stasis_handle_t * h, lsn_t off, byte * buf, lsn_t len) { } } bytes_read += count; - if(bytes_read != len) { + if (bytes_read != len) { DEBUG("pread spinning\n"); } } assert(bytes_read == len); } - return error; } -static int pfile_write(stasis_handle_t *h, lsn_t off, const byte *dat, lsn_t len) { + +static int pfile_write(stasis_handle_t *h, lsn_t off, const byte *dat, + lsn_t len) { pfile_impl *impl = (pfile_impl*)(h->impl); int error = 0; lsn_t phys_off; - if(impl->start_pos > off) { + if (impl->start_pos > off) { error = EDOM; } else { pthread_mutex_lock(&impl->mut); - if(impl->end_pos < off+len) { + if (impl->end_pos < off+len) { + // update end_pos now; the caller is not allowed to look at this + // part of the file until we return, so if they notice that the + // file hasn't been extended yet, it's a bug on their end. impl->end_pos = off+len; } pthread_mutex_unlock(&impl->mut); @@ -148,47 +201,49 @@ static int pfile_write(stasis_handle_t *h, lsn_t off, const byte *dat, lsn_t len } return error; } -static int pfile_append(stasis_handle_t *h, lsn_t *off, const byte *dat, lsn_t len) { + +static int pfile_append(stasis_handle_t *h, lsn_t *off, const byte *dat, + lsn_t len) { pfile_impl *impl = (pfile_impl*)(h->impl); pthread_mutex_lock(&impl->mut); *off = impl->end_pos; impl->end_pos += len; pthread_mutex_unlock(&impl->mut); lsn_t phys_off = *off - impl->start_pos; - int error = pfile_write_unlocked(impl->fd, phys_off, dat,len); - return error; + return pfile_write_unlocked(impl->fd, phys_off, dat,len); } -static stasis_write_buffer_t * pfile_write_buffer(stasis_handle_t * h, + +static stasis_write_buffer_t * pfile_write_buffer(stasis_handle_t *h, lsn_t off, lsn_t len) { + stasis_write_buffer_t *ret = malloc(sizeof(stasis_write_buffer_t)); - stasis_write_buffer_t * ret = malloc(sizeof(stasis_write_buffer_t)); - - if(!ret) { + if (!ret) { h->error = ENOMEM; return NULL; } - pfile_impl * impl = (pfile_impl*)h->impl; + pfile_impl *impl = (pfile_impl*)h->impl; int error = 0; - if(impl->start_pos > off) { + if (impl->start_pos > off) { error = EDOM; } pthread_mutex_lock(&impl->mut); - if(off + len > impl->end_pos) { + // @todo Come up with a reasonable way to avoid sparse files. + if (off + len > impl->end_pos) { impl->end_pos = off+len; } pthread_mutex_unlock(&impl->mut); - byte * buf; - if(!error) { + byte *buf; + if (!error) { buf = malloc(len); - if(!buf) { error = ENOMEM; } + if (!buf) { error = ENOMEM; } } - if(error) { + if (error) { ret->h = h; ret->off = 0; ret->buf = 0; @@ -205,14 +260,14 @@ static stasis_write_buffer_t * pfile_write_buffer(stasis_handle_t * h, } return ret; } -static stasis_write_buffer_t *pfile_append_buffer(stasis_handle_t * h, + +static stasis_write_buffer_t *pfile_append_buffer(stasis_handle_t *h, lsn_t len) { - // Allocate the handle - stasis_write_buffer_t * ret = malloc(sizeof(stasis_write_buffer_t)); - if(!ret) { return NULL; } + stasis_write_buffer_t *ret = malloc(sizeof(stasis_write_buffer_t)); + if (!ret) { return NULL; } - pfile_impl * impl = (pfile_impl*)h->impl; + pfile_impl *impl = (pfile_impl*)h->impl; // Obtain an appropriate offset pthread_mutex_lock(&(impl->mut)); @@ -222,13 +277,13 @@ static stasis_write_buffer_t *pfile_append_buffer(stasis_handle_t * h, pthread_mutex_unlock(&(impl->mut)); // Allocate the buffer - byte * buf = malloc(len); + byte *buf = malloc(len); int error = 0; - if(!buf) { + if (!buf) { error = ENOMEM; } - if(error) { + if (error) { ret->h = h; ret->off = 0; ret->buf = 0; @@ -244,50 +299,52 @@ static stasis_write_buffer_t *pfile_append_buffer(stasis_handle_t * h, ret->error = 0; } return ret; - } -static int pfile_release_write_buffer(stasis_write_buffer_t * w) { - pfile_impl * impl = (pfile_impl*)(w->h->impl); +static int pfile_release_write_buffer(stasis_write_buffer_t *w) { + pfile_impl *impl = (pfile_impl*)(w->h->impl); pthread_mutex_lock(&(impl->mut)); int error = 0; - if(impl->end_pos < w->off + w->len || + if (impl->end_pos < w->off + w->len || impl->start_pos > w->off) { error = EDOM; } pthread_mutex_unlock(&(impl->mut)); - if(!error) { - error = pfile_write_unlocked(impl->fd, w->off-impl->start_pos, w->buf, w->len); + if (!error) { + error = pfile_write_unlocked(impl->fd, w->off-impl->start_pos, w->buf, + w->len); + } + if (w->buf) { + free(w->buf); } - free(w->buf); free(w); return error; } -static stasis_read_buffer_t * pfile_read_buffer(stasis_handle_t * h, +static stasis_read_buffer_t *pfile_read_buffer(stasis_handle_t *h, lsn_t off, lsn_t len) { - stasis_read_buffer_t * ret = malloc(sizeof(stasis_read_buffer_t)); - if(!ret) { return NULL; } + stasis_read_buffer_t *ret = malloc(sizeof(stasis_read_buffer_t)); + if (!ret) { return NULL; } - byte * buf = malloc(len); + byte *buf = malloc(len); int error = 0; - if(!buf) { error = ENOMEM; } + if (!buf) { error = ENOMEM; } - if(!error) { + if (!error) { error = pfile_read(h, off, buf, len); } - if(error) { + if (error) { ret->h = h; ret->buf = 0; ret->off = 0; ret->len = 0; ret->impl = 0; ret->error = error; - if(buf) { free(buf); } + if (buf) { free(buf); } } else { ret->h = h; ret->buf = buf; @@ -298,19 +355,25 @@ static stasis_read_buffer_t * pfile_read_buffer(stasis_handle_t * h, } return ret; } -static int pfile_release_read_buffer(stasis_read_buffer_t * r) { - free((void*)r->buf); + +static int pfile_release_read_buffer(stasis_read_buffer_t *r) { + if (r->buf) { + free((void*)r->buf); + } free(r); return 0; } -static int pfile_truncate_start(stasis_handle_t * h, lsn_t new_start) { + +static int pfile_truncate_start(stasis_handle_t *h, lsn_t new_start) { static int truncate_warned = 0; - if(!truncate_warned) { - printf("\nWarning: pfile doesn't support truncation; ignoring truncation request\n"); + if (!truncate_warned) { + printf("\nWarning: pfile doesn't support truncation; " + "ignoring truncation request\n"); truncate_warned = 1; } return 0; } + struct stasis_handle_t pfile_func = { .num_copies = pfile_num_copies, .num_copies_buffer = pfile_num_copies_buffer, @@ -328,28 +391,34 @@ struct stasis_handle_t pfile_func = { .truncate_start = pfile_truncate_start, .error = 0 }; -stasis_handle_t * stasis_handle(open_pfile)(lsn_t start_offset, char * filename, int flags, int mode) { - stasis_handle_t * ret = malloc(sizeof(stasis_handle_t)); - if(!ret) { return NULL; } + +stasis_handle_t *stasis_handle(open_pfile)(lsn_t start_offset, + char *filename, + int flags, int mode) { + stasis_handle_t *ret = malloc(sizeof(stasis_handle_t)); + if (!ret) { return NULL; } *ret = pfile_func; - pfile_impl * impl = malloc(sizeof(pfile_impl)); + pfile_impl *impl = malloc(sizeof(pfile_impl)); + if (!impl) { free(ret); return NULL; } + ret->impl = impl; pthread_mutex_init(&(impl->mut), 0); impl->fd = open(filename, flags, mode); assert(sizeof(off_t) >= (64/8)); - if(impl->fd == -1) { + if (impl->fd == -1) { ret->error = errno; } impl->start_pos = start_offset; off_t file_len = lseek(impl->fd,0,SEEK_END); - if(file_len == (off_t)-1) { + if (file_len == (off_t)-1) { ret->error = errno; } impl->end_pos = file_len + start_offset; - DEBUG("file len = %lld, start_off = %lld, end = %lld\n", file_len, start_offset, impl->end_pos); + DEBUG("file len = %lld, start_off = %lld, end = %lld\n", + file_len, start_offset, impl->end_pos); impl->filename = strdup(filename); impl->file_flags = flags; diff --git a/src/stasis/operations/lsmTree.c b/src/stasis/operations/lsmTree.c index f2054ce..0dd44f2 100644 --- a/src/stasis/operations/lsmTree.c +++ b/src/stasis/operations/lsmTree.c @@ -450,8 +450,10 @@ recordid TlsmAppendPage(int xid, recordid tree, if(ret.size == INVALID_SLOT) { if(lastLeaf->id != p->id) { + assert(s->lastLeaf != tree.page); unlock(lastLeaf->rwlatch); releasePage(lastLeaf); // don't need that page anymore... + lastLeaf = 0; } // traverse down the root of the tree. @@ -537,6 +539,7 @@ recordid TlsmAppendPage(int xid, recordid tree, writeNodeRecord(xid, lastLeaf, ret.slot, key, keySize, val_page); if(lastLeaf->id != p->id) { + assert(s->lastLeaf != tree.page); unlock(lastLeaf->rwlatch); releasePage(lastLeaf); } @@ -638,6 +641,37 @@ pageid_t TlsmFindPage(int xid, recordid tree, const byte *key) { } +pageid_t TlsmLastPage(int xid, recordid tree) { + Page * root = loadPage(xid, tree.page); + readlock(root->rwlatch,0); + lsmTreeState *state = root->impl; + int keySize = getKeySize(xid,root); + if(state->lastLeaf == -1) { + const lsmTreeNodeRecord *nr = readNodeRecord(xid,root,DEPTH, + keySize); + int depth = nr->ptr; + state->lastLeaf = findLastLeaf(xid,root,depth); + } + pageid_t ret = state->lastLeaf; + unlock(root->rwlatch); + + // ret points to the last internal node at this point. + releasePage(root); + + Page * p = loadPage(xid, ret); + readlock(p->rwlatch,0); + if(*recordcount_ptr(p) == 2) { + ret = -1; + } else { + const lsmTreeNodeRecord *nr = readNodeRecord(xid,p,(*recordcount_ptr(p))-1,keySize); + ret = nr->ptr; + } + unlock(p->rwlatch); + releasePage(p); + + return ret; +} + /** The buffer manager calls this when the lsmTree's root page is loaded. This function allocates some storage for cached values @@ -683,6 +717,9 @@ lladdIterator_t *lsmTreeIterator_open(int xid, recordid root) { releasePage(p); p = loadPage(xid,leafid); readlock(p->rwlatch,0); + assert(depth != 0); + } else { + assert(depth == 0); } lsmIteratorImpl *impl = malloc(sizeof(lsmIteratorImpl)); impl->p = p; @@ -708,6 +745,24 @@ lladdIterator_t *lsmTreeIterator_open(int xid, recordid root) { } */ return it; } +lladdIterator_t *lsmTreeIterator_copy(int xid, lladdIterator_t* i) { + lsmIteratorImpl *it = i->impl; + lsmIteratorImpl *mine = malloc(sizeof(lsmIteratorImpl)); + + if(it->p) { + mine->p = loadPage(xid, it->p->id); + readlock(mine->p->rwlatch,0); + } else { + mine->p = 0; + } + memcpy(&mine->current, &it->current,sizeof(recordid)); + mine->t = it->t; + mine->justOnePage = it->justOnePage; + lladdIterator_t * ret = malloc(sizeof(lladdIterator_t)); + ret->type = -1; // XXX LSM_TREE_ITERATOR + ret->impl = mine; + return ret; +} void lsmTreeIterator_close(int xid, lladdIterator_t *it) { lsmIteratorImpl *impl = it->impl; if(impl->p) { @@ -738,7 +793,7 @@ int lsmTreeIterator_next(int xid, lladdIterator_t *it) { impl->current.size = keySize; } else { impl->p = 0; - impl->current.size = -1; + impl->current.size = INVALID_SLOT; } } else { assert(impl->current.size == keySize + sizeof(lsmTreeNodeRecord)); diff --git a/stasis/constants.h b/stasis/constants.h index a31064c..5598d0e 100644 --- a/stasis/constants.h +++ b/stasis/constants.h @@ -82,10 +82,10 @@ terms specified in this license. #ifndef MAX_BUFFER_SIZE //#define MAX_BUFFER_SIZE 100003 -/*#define MAX_BUFFER_SIZE 20029 */ +#define MAX_BUFFER_SIZE 20029 //#define MAX_BUFFER_SIZE 10007 //#define MAX_BUFFER_SIZE 5003 -#define MAX_BUFFER_SIZE 2003 +//#define MAX_BUFFER_SIZE 2003 //#define MAX_BUFFER_SIZE 4006 /* #define MAX_BUFFER_SIZE 71 */ /*#define MAX_BUFFER_SIZE 7 */ diff --git a/stasis/operations/lsmTree.h b/stasis/operations/lsmTree.h index e3319f7..fb17018 100644 --- a/stasis/operations/lsmTree.h +++ b/stasis/operations/lsmTree.h @@ -63,7 +63,10 @@ void TlsmSetPageAllocator(pageid_t (*allocer)(int xid, void * ignored), */ pageid_t TlsmFindPage(int xid, recordid tree, const byte *key); - +/** + @todo TlsmFirstPage for symmetry? + */ +pageid_t TlsmLastPage(int xid, recordid tree); /// --------------- Iterator implementation typedef struct lsmTreeNodeRecord { @@ -93,7 +96,7 @@ lladdIterator_t * lsmTreeIterator_open(int xid, recordid tree); */ void lsmTreeIterator_close(int xid, lladdIterator_t * it); int lsmTreeIterator_next (int xid, lladdIterator_t * it); - +lladdIterator_t *lsmTreeIterator_copy(int xid, lladdIterator_t* i); static inline int lsmTreeIterator_key (int xid, lladdIterator_t *it, byte **key) { lsmIteratorImpl * impl = (lsmIteratorImpl*)it->impl;