diff --git a/src/stasis/io/debug.c b/src/stasis/io/debug.c index 164ccc3..0b7879c 100644 --- a/src/stasis/io/debug.c +++ b/src/stasis/io/debug.c @@ -35,6 +35,15 @@ static int debug_close(stasis_handle_t * h) { free(h); return ret; } +static stasis_handle_t * debug_dup(stasis_handle_t * h) { + stasis_handle_t * hh = ((debug_impl*)h->impl)->h; + printf("tid=%9ld call dup(%lx)\n", (long)(intptr_t)pthread_self(), (unsigned long)hh); fflush(stdout); + stasis_handle_t * ret = hh->dup(hh); + printf("tid=%9ld retn dup(%lx) = %lx\n", (long)(intptr_t)pthread_self(), (unsigned long)hh, (unsigned long)ret); fflush(stdout); + free(h->impl); + free(h); + return ret; +} static lsn_t debug_start_position(stasis_handle_t *h) { stasis_handle_t * hh = ((debug_impl*)h->impl)->h; printf("tid=%9ld call start_position(%lx)\n", (long)(intptr_t)pthread_self(), (unsigned long)hh); fflush(stdout); @@ -170,6 +179,7 @@ struct stasis_handle_t debug_func = { .num_copies = debug_num_copies, .num_copies_buffer = debug_num_copies_buffer, .close = debug_close, + .dup = debug_dup, .start_position = debug_start_position, .end_position = debug_end_position, .write = debug_write, diff --git a/src/stasis/io/file.c b/src/stasis/io/file.c index 88e05b9..9a90b58 100644 --- a/src/stasis/io/file.c +++ b/src/stasis/io/file.c @@ -46,6 +46,11 @@ static int file_close(stasis_handle_t * h) { if(!ret) return 0; else return errno; } +static stasis_handle_t* file_dup(stasis_handle_t * h) { + file_impl * impl = h->impl; + return stasis_handle_open_file(impl->start_pos, impl->filename, impl->file_flags, impl->file_mode); +} + static lsn_t file_start_position(stasis_handle_t *h) { file_impl * impl = (file_impl*)h->impl; pthread_mutex_lock(&(impl->mut)); @@ -548,6 +553,7 @@ struct stasis_handle_t file_func = { .num_copies = file_num_copies, .num_copies_buffer = file_num_copies_buffer, .close = file_close, + .dup = file_dup, .start_position = file_start_position, .end_position = file_end_position, .write = file_write, diff --git a/src/stasis/io/memory.c b/src/stasis/io/memory.c index 3e6bea7..352ae76 100644 --- a/src/stasis/io/memory.c +++ b/src/stasis/io/memory.c @@ -6,18 +6,28 @@ typedef struct mem_impl { lsn_t start_pos; lsn_t end_pos; byte * buf; + int refcount; } mem_impl; static int mem_num_copies(stasis_handle_t * h) { return 1; } static int mem_num_copies_buffer(stasis_handle_t * h) { return 0; } static int mem_close(stasis_handle_t * h) { - free(((mem_impl*)h->impl)->buf); - pthread_mutex_destroy(&(((mem_impl*)h->impl)->mut)); + mem_impl *impl = h->impl; + (impl->refcount)--; + if(impl->refcount) { return 0; } + + free(impl->buf); + pthread_mutex_destroy(&(impl->mut)); free(h->impl); free(h); return 0; } +static stasis_handle_t * mem_dup(stasis_handle_t *h) { + mem_impl *impl = h->impl; + (impl->refcount)++; + return h; +} static lsn_t mem_start_position(stasis_handle_t *h) { lsn_t ret; mem_impl* impl = (mem_impl*)(h->impl); @@ -243,6 +253,7 @@ struct stasis_handle_t mem_func = { .num_copies = mem_num_copies, .num_copies_buffer = mem_num_copies_buffer, .close = mem_close, + .dup = mem_dup, .start_position = mem_start_position, .end_position = mem_end_position, .write = mem_write, @@ -270,6 +281,7 @@ stasis_handle_t * stasis_handle(open_memory)(lsn_t start_offset) { impl->start_pos = start_offset; impl->end_pos = start_offset; impl->buf = malloc(0); + impl->refcount = 1; return ret; } diff --git a/src/stasis/io/non_blocking.c b/src/stasis/io/non_blocking.c index 7fe2e4f..6e0664a 100644 --- a/src/stasis/io/non_blocking.c +++ b/src/stasis/io/non_blocking.c @@ -179,6 +179,7 @@ typedef struct nbw_impl { pthread_cond_t force_completed_cond; pthread_cond_t pending_writes_cond; int still_open; + int refcount; } nbw_impl; static inline void freeFastHandle(nbw_impl * impl, const tree_node * n); @@ -332,6 +333,12 @@ static int nbw_close(stasis_handle_t * h) { nbw_impl * impl = h->impl; pthread_mutex_lock(&impl->mut); + + (impl->refcount)--; + if(impl->refcount) { + pthread_mutex_unlock(&impl->mut); + return 0; + } impl->still_open = 0; pthread_mutex_unlock(&impl->mut); pthread_cond_broadcast(&impl->pending_writes_cond); @@ -374,6 +381,11 @@ static int nbw_close(stasis_handle_t * h) { free(h); return ret; } +static stasis_handle_t * nbw_dup(stasis_handle_t *h) { + nbw_impl * impl = h->impl; + (impl->refcount)++; + return h; +} static lsn_t nbw_start_position(stasis_handle_t *h) { nbw_impl * impl = h->impl; pthread_mutex_lock(&impl->mut); @@ -614,6 +626,7 @@ struct stasis_handle_t nbw_func = { .num_copies = nbw_num_copies, .num_copies_buffer = nbw_num_copies_buffer, .close = nbw_close, + .dup = nbw_dup, .start_position = nbw_start_position, .end_position = nbw_end_position, .write = nbw_write, @@ -830,6 +843,7 @@ stasis_handle_t * stasis_handle(open_non_blocking) pthread_cond_init(&impl->force_completed_cond, 0); impl->still_open = 1; + impl->refcount++; stasis_handle_t *h = malloc(sizeof(stasis_handle_t)); *h = nbw_func; diff --git a/src/stasis/io/pfile.c b/src/stasis/io/pfile.c index dd9fdd4..7a10520 100644 --- a/src/stasis/io/pfile.c +++ b/src/stasis/io/pfile.c @@ -74,6 +74,11 @@ static int pfile_close(stasis_handle_t *h) { else return errno; } +static stasis_handle_t * pfile_dup(stasis_handle_t *h) { + pfile_impl *impl = h->impl; + return stasis_handle_open_pfile(impl->start_pos, impl->filename, impl->file_flags, impl->file_mode); +} + static lsn_t pfile_start_position(stasis_handle_t *h) { pfile_impl *impl = (pfile_impl*)h->impl; return impl->start_pos; @@ -415,6 +420,7 @@ struct stasis_handle_t pfile_func = { .num_copies = pfile_num_copies, .num_copies_buffer = pfile_num_copies_buffer, .close = pfile_close, + .dup = pfile_dup, .start_position = pfile_start_position, .end_position = pfile_end_position, .write = pfile_write, diff --git a/stasis/io/handle.h b/stasis/io/handle.h index 1ad3a63..b00debf 100644 --- a/stasis/io/handle.h +++ b/stasis/io/handle.h @@ -104,6 +104,15 @@ typedef struct stasis_handle_t { /** Close this handle, and release any associated resources. */ int (*close)(struct stasis_handle_t *); + /** Duplicate this handle. This is useful for performance hinting; + * sending sequential disk operations to different handles than random + * operations allows the kernel's prefetch algorithm to kick in. + * + * XXX calling dup on a handle, then calling truncate or append on the resulting handles has undefined semantics. + * + * @return a stasis_handle_t that should eventually have close() called on it. + */ + struct stasis_handle_t * (*dup)(struct stasis_handle_t *h); /** The offset of the handle's first byte */ lsn_t (*start_position)(struct stasis_handle_t * h); diff --git a/test/stasis/check_io.c b/test/stasis/check_io.c index c84b625..db83c22 100644 --- a/test/stasis/check_io.c +++ b/test/stasis/check_io.c @@ -129,10 +129,15 @@ void load_handle(thread_arg* t) { stasis_handle_t * h = t->h; - for(int i = 0; i < t->count; i++) { - offsets[i] = -1; + if(handle_truncate_is_supported) { + for(int i = 0; i < t->count; i++) { + offsets[i] = -1; + } + } else { + for(int i = 0; i < t->count; i++) { + offsets[i] = i * sizeof(int); + } } - for(int i = 0; i < OPS_PER_THREAD; i++) { int val = myrandom(t->count); @@ -178,21 +183,25 @@ void load_handle(thread_arg* t) { } } break; case 2: { // append - lsn_t oldend = h->end_position(h); - int ret = h->append(h, &(offsets[val]), (const byte*)&(t->values[val]), sizeof(int)); - assert(!ret || oldend < h->start_position(h)); + if(handle_truncate_is_supported) { + lsn_t oldend = h->end_position(h); + int ret = h->append(h, &(offsets[val]), (const byte*)&(t->values[val]), sizeof(int)); + assert(!ret || oldend < h->start_position(h)); + } } break; case 3: { // append_buffer - lsn_t oldend = h->end_position(h); - stasis_write_buffer_t * w = h->append_buffer(h, sizeof(int)); - if(!w->error) { - *((int*)w->buf) = t->values[val]; - assert(w->len == sizeof(int)); - offsets[val] = w->off; - } else { - assert(oldend < h->start_position(h)); - } - w->h->release_write_buffer(w); + if(handle_truncate_is_supported) { + lsn_t oldend = h->end_position(h); + stasis_write_buffer_t * w = h->append_buffer(h, sizeof(int)); + if(!w->error) { + *((int*)w->buf) = t->values[val]; + assert(w->len == sizeof(int)); + offsets[val] = w->off; + } else { + assert(oldend < h->start_position(h)); + } + w->h->release_write_buffer(w); + } } break; default: { abort(); @@ -202,7 +211,7 @@ void load_handle(thread_arg* t) { int check; int ret = h->read(h, offsets[val], (byte*)&check, sizeof(int)); if(!ret) { - assert(check == t->values[val]); + assert(check == t->values[val]); } @@ -219,7 +228,8 @@ void load_handle(thread_arg* t) { assert(j == t->values[val]); } else { assert(ret == EDOM); - assert(h->start_position(h) > offsets[val]); + if(handle_truncate_is_supported) + assert(h->start_position(h) > offsets[val]); } } break; case 1: { // read_buffer @@ -231,7 +241,8 @@ void load_handle(thread_arg* t) { } else { assert(r->error == EDOM); r->h->release_read_buffer(r); - assert(h->start_position(h) > offsets[val]); + if(handle_truncate_is_supported) + assert(h->start_position(h) > offsets[val]); } } break; default: @@ -287,7 +298,7 @@ void handle_sequentialtest(stasis_handle_t * h) { void handle_concurrencytest(stasis_handle_t * h) { int vc = myrandom(VALUE_COUNT) + 10; - printf("Running concurrency test with %d values", vc); fflush(stdout); + printf("Running concurrency test with %d values\n", vc); fflush(stdout); int * values = malloc(vc * sizeof(int)); @@ -297,12 +308,17 @@ void handle_concurrencytest(stasis_handle_t * h) { thread_arg * args = malloc(THREAD_COUNT * sizeof(thread_arg)); pthread_t * threads = malloc(THREAD_COUNT * sizeof(pthread_t)); + stasis_handle_t ** handles = malloc(THREAD_COUNT / 2 * sizeof(*handles)); int val_per_thread = vc / THREAD_COUNT; trunc_val = 0; for(int i = 0; i < THREAD_COUNT; i++) { args[i].values = &(values[i * val_per_thread]); args[i].count = val_per_thread; + if(!(i % 2)) { + handles[i/2] = h->dup(h); + h = handles[i/2]; + } args[i].h = h; pthread_create(&threads[i], 0, (void*(*)(void*))load_handle, &args[i]); } @@ -310,9 +326,13 @@ void handle_concurrencytest(stasis_handle_t * h) { for(int i = 0; i < THREAD_COUNT; i++) { pthread_join(threads[i], 0); } + for(int i = 0; i < THREAD_COUNT / 2; i++) { + handles[i]->close(handles[i]); + } free(values); free(args); free(threads); + free(handles); } /** @test @@ -353,7 +373,7 @@ START_TEST(io_fileTest) { unlink("logfile.txt"); h = stasis_handle(open_file)(0, "logfile.txt", O_CREAT | O_RDWR, FILE_PERM); - //handle_concurrencytest(h); +// handle_concurrencytest(h); // fails by design h->close(h); unlink("logfile.txt"); @@ -390,7 +410,7 @@ START_TEST(io_pfileTest) { unlink("logfile.txt"); h = stasis_handle(open_pfile)(0, "logfile.txt", O_CREAT | O_RDWR, FILE_PERM); - //handle_concurrencytest(h); + handle_concurrencytest(h); h->close(h); unlink("logfile.txt");