diff --git a/lladd/io/handle.h b/lladd/io/handle.h new file mode 100644 index 0000000..e31bf0c --- /dev/null +++ b/lladd/io/handle.h @@ -0,0 +1,85 @@ +#include +#define stasis_handle(x) stasis_handle_##x + +/** + Error handling: + + read, write, append, open, release_read_buffer and + release_write_buffer return 0 on success, and an error code + otherwise. read_buffer() and write_buffer() return error codes via + the error field of the handles they produce. + + Errors in num_copies, num_copies_buffer, start_position, and end_position + are always unrecoverable, and return -1. + + Here are the meanings of the various error codes: + + EDOM off is less than the beginning of the file (possibly due to truncation). + EBADF an unrecoverable error occurred; the handle is no longer vaild. The error + that caused this one is stored in the handle's error field. + + Handle implementations may return return other errors as appropriate. + */ +typedef struct stasis_handle_t { + /** @return the number of in-memory copies made when the caller + provides the buffer */ + int (*num_copies)(); + /** @return the number of in-memory copies made when the handle + provides the buffer */ + int (*num_copies_buffer)(); + + int (*close)(struct stasis_handle_t *); + + /** The offset of the handle's first byte */ + lsn_t (*start_position)(struct stasis_handle_t * h); + /** The offset of the byte after the end of the handle's data. */ + lsn_t (*end_position)(struct stasis_handle_t * h); + + int (*write)(struct stasis_handle_t * h, lsn_t off, + const byte * dat, lsn_t len); + int (*append)(struct stasis_handle_t * h, lsn_t * off, const byte * dat, lsn_t len); + + struct stasis_write_buffer_t * (*write_buffer)(struct stasis_handle_t * h, + lsn_t off, lsn_t len); + struct stasis_write_buffer_t * (*append_buffer)(struct stasis_handle_t * h, + lsn_t len); + int (*release_write_buffer)(struct stasis_write_buffer_t * w); + + int (*read)(struct stasis_handle_t * h, + lsn_t offset, byte * buf, lsn_t length); + + struct stasis_read_buffer_t * (*read_buffer)(struct stasis_handle_t * h, + lsn_t ofset, lsn_t length); + int (*release_read_buffer)(struct stasis_read_buffer_t * r); + + int (*truncate_start)(struct stasis_handle_t * h, lsn_t new_start); + + int error; + + void * impl; + +} stasis_handle_t; + +typedef struct stasis_write_buffer_t { + stasis_handle_t * h; + lsn_t off; + byte * buf; + lsn_t len; + void * impl; + int error; +} stasis_write_buffer_t; + +typedef struct stasis_read_buffer_t { + stasis_handle_t * h; + const byte * buf; + lsn_t len; + void * impl; + int error; +} stasis_read_buffer_t; + +stasis_handle_t * stasis_handle(open_memory)(); +stasis_handle_t * stasis_handle(open_file)(char * path, int mode); +stasis_handle_t * stasis_handle(open_non_blocking)(stasis_handle_t * h, + int worker_thread_count); +stasis_handle_t * stasis_handle(open_verifying)(stasis_handle_t * h); + diff --git a/src/lladd/Makefile.am b/src/lladd/Makefile.am index 5cf12fa..31e7d53 100644 --- a/src/lladd/Makefile.am +++ b/src/lladd/Makefile.am @@ -17,7 +17,8 @@ liblladd_a_SOURCES=crc32.c redblack.c lhtable.c common.c stats.c io.c bufferMana operations/naiveLinearHash.c operations/nestedTopActions.c \ operations/linearHashNTA.c operations/linkedListNTA.c \ operations/pageOrientedListNTA.c operations/bTree.c \ - operations/regions.c + operations/regions.c \ + io/memory.c # page/header.c logger/logMemory.c \ ringbuffer.c \ asdfas #operations/lladdhash.c #AM_CFLAGS= -g -Wall -pedantic -std=gnu99 diff --git a/src/lladd/io/memory.c b/src/lladd/io/memory.c new file mode 100644 index 0000000..7e19667 --- /dev/null +++ b/src/lladd/io/memory.c @@ -0,0 +1,273 @@ +#include +#include +#include +#include +#include +#include +#include + + +typedef struct mem_impl { + pthread_mutex_t mut; + rwl * trunc_latch; + lsn_t start_pos; + lsn_t end_pos; + byte * buf; +} mem_impl; + +static int mem_num_copies() { return 1; } +static int mem_num_copies_buffer() { return 0; } + +static int mem_close(stasis_handle_t * h) { + free(((mem_impl*)h->impl)->buf); + pthread_mutex_destroy(&(((mem_impl*)h->impl)->mut)); + deletelock(((mem_impl*)h->impl)->trunc_latch); + free(h->impl); + free(h); + return 0; +} +static lsn_t mem_start_position(stasis_handle_t *h) { + lsn_t ret; + mem_impl* impl = (mem_impl*)(h->impl); + + pthread_mutex_lock(&impl->mut); + ret = impl->start_pos; + pthread_mutex_unlock(&impl->mut); + + return ret; +} +static lsn_t mem_end_position(stasis_handle_t *h) { + lsn_t ret; + mem_impl* impl = (mem_impl*)(h->impl); + + pthread_mutex_lock(&impl->mut); + ret = impl->end_pos; + pthread_mutex_unlock(&impl->mut); + + return ret; +} +static stasis_write_buffer_t * mem_write_buffer(stasis_handle_t * h, + lsn_t off, lsn_t len) { + mem_impl* impl = (mem_impl*)(h->impl); + + stasis_write_buffer_t * ret = malloc(sizeof(stasis_write_buffer_t)); + if(!ret) { return NULL; } + + writelock(impl->trunc_latch,0); + pthread_mutex_lock(&(impl->mut)); + + int error = 0; + + if(impl->start_pos > off) { + error = EDOM; + } else if(impl->end_pos > off+len) { + // Just need to return buffer; h's state is unchanged. + } else { + byte * newbuf = realloc(impl->buf, off+len - impl->start_pos); + + if(newbuf) { + impl->buf = newbuf; + impl->end_pos = off+len; + } else { + error = ENOMEM; + } + } + + if(error) { + ret->h = h; + ret->off = 0; + ret->buf = 0; + ret->len = 0; + ret->impl = 0; + ret->error = error; + } else { + ret->h = h; + ret->off = off; + ret->buf = &(impl->buf[off-impl->start_pos]); + ret->len = len; + ret->impl = 0; + ret->error = 0; + } + + pthread_mutex_unlock(&(impl->mut)); + + return ret; +} + +static stasis_write_buffer_t * mem_append_buffer(stasis_handle_t * h, + lsn_t len) { + mem_impl * impl = (mem_impl*)(h->impl); + + stasis_write_buffer_t * ret = malloc(sizeof(stasis_write_buffer_t)); + if(!ret) { return 0; } + + writelock(impl->trunc_latch,0); + pthread_mutex_lock(&(impl->mut)); + + lsn_t off = impl->end_pos; + impl->end_pos += len; + byte * newbuf = realloc(impl->buf, impl->end_pos - impl->start_pos); + if(newbuf) { + impl->buf = newbuf; + + ret->h = h; + ret->off = off; + ret->buf = &(impl->buf[off-impl->start_pos]); + ret->len = len; + ret->impl = 0; + ret->error = 0; + } else { + ret->h = h; + ret->off = 0; + ret->buf = 0; + ret->len = 0; + ret->impl = 0; + ret->error = ENOMEM; + } + pthread_mutex_unlock(&(impl->mut)); + + return ret; +} +static int mem_release_write_buffer(stasis_write_buffer_t * w) { + unlock(((mem_impl*)w->h->impl)->trunc_latch); + free(w); + return 0; +} + +static int mem_write(stasis_handle_t * h, lsn_t off, + const byte * dat, lsn_t len) { + // Overlapping writes aren't atomic; no latch needed. + stasis_write_buffer_t * w = mem_write_buffer(h, off, len); + int ret; + if(w->error) { + ret = w->error; + } else { + memcpy(w->buf, dat, len); + ret = 0; + } + mem_release_write_buffer(w); + return ret; +} + +static int mem_append(stasis_handle_t * h, lsn_t * off, const byte * dat, lsn_t len) { + stasis_write_buffer_t * w = mem_append_buffer(h, len); + int ret; + if(w->error) { + ret = w->error; + } else { + memcpy(w->buf, dat, len); + ret = 0; + } + *off = w->off; + mem_release_write_buffer(w); + return ret; +} + +static stasis_read_buffer_t * mem_read_buffer(stasis_handle_t * h, + lsn_t off, lsn_t len) { + mem_impl * impl = (mem_impl*)(h->impl); + readlock(impl->trunc_latch,0); + pthread_mutex_lock(&(impl->mut)); + + stasis_read_buffer_t * ret = malloc(sizeof(stasis_read_buffer_t)); + if(!ret) { return NULL; } + + if(off < impl->start_pos || off + len > impl->end_pos) { + ret->h = h; + ret->buf = 0; + ret->len = 0; + ret->impl = 0; + ret->error = EDOM; + } else { + ret->h = h; + ret->buf = &(impl->buf[off-impl->start_pos]); + ret->len = len; + ret->impl = 0; + ret->error = 0; + } + pthread_mutex_unlock(&(impl->mut)); + + return ret; +} +static int mem_release_read_buffer(stasis_read_buffer_t * r) { + unlock(((mem_impl*)r->h->impl)->trunc_latch); + free(r); + return 0; +} + +static int mem_read(stasis_handle_t * h, + lsn_t off, byte * buf, lsn_t len) { + stasis_read_buffer_t * r = mem_read_buffer(h, off, len); + int ret; + if(r->error) { + ret = r->error; + } else { + memcpy(buf, r->buf, len); + ret = 0; + } + mem_release_read_buffer(r); + return ret; +} + + +static int mem_truncate_start(stasis_handle_t * h, lsn_t new_start) { + mem_impl* impl = (mem_impl*) h->impl; + writelock(impl->trunc_latch,0); + pthread_mutex_lock(&(impl->mut)); + if(new_start < impl->start_pos) { + pthread_mutex_unlock(&impl->mut); + unlock(impl->trunc_latch); + return 0; + } + if(new_start > impl->end_pos) { + pthread_mutex_unlock(&impl->mut); + unlock(impl->trunc_latch); + return EDOM; + } + + byte * new_buf = malloc(impl->end_pos -new_start); + + memcpy(new_buf, &(impl->buf[new_start - impl->start_pos]), impl->end_pos - new_start); + + free(impl->buf); + + impl->buf = new_buf; + impl->start_pos = new_start; + + pthread_mutex_unlock(&(impl->mut)); + unlock(impl->trunc_latch); + return 0; +} + +struct stasis_handle_t mem_func = { + .num_copies = mem_num_copies, + .num_copies_buffer = mem_num_copies_buffer, + .close = mem_close, + .start_position = mem_start_position, + .end_position = mem_end_position, + .write = mem_write, + .append = mem_append, + .write_buffer = mem_write_buffer, + .append_buffer = mem_append_buffer, + .release_write_buffer = mem_release_write_buffer, + .read = mem_read, + .read_buffer = mem_read_buffer, + .release_read_buffer = mem_release_read_buffer, + .truncate_start = mem_truncate_start, + .error = 0 +}; + +stasis_handle_t * stasis_handle(open_memory)() { + stasis_handle_t * ret = malloc(sizeof(stasis_handle_t)); + *ret = mem_func; + + mem_impl * impl = malloc(sizeof(mem_impl)); + ret->impl = impl; + pthread_mutex_init(&(impl->mut), 0); + impl->trunc_latch = initlock(); + impl->start_pos = 0; + impl->end_pos = 0; + impl->buf = malloc(0); + + return ret; +} diff --git a/test/lladd/check_io.c b/test/lladd/check_io.c new file mode 100644 index 0000000..9b61b57 --- /dev/null +++ b/test/lladd/check_io.c @@ -0,0 +1,348 @@ +/*--- This software is copyrighted by the Regents of the University of +California, and other parties. The following terms apply to all files +associated with the software unless explicitly disclaimed in +individual files. + +The authors hereby grant permission to use, copy, modify, distribute, +and license this software and its documentation for any purpose, +provided that existing copyright notices are retained in all copies +and that this notice is included verbatim in any distributions. No +written agreement, license, or royalty fee is required for any of the +authorized uses. Modifications to this software may be copyrighted by +their authors and need not follow the licensing terms described here, +provided that the new terms are clearly indicated on the first page of +each file where they apply. + +IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY +FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES +ARISING OUT OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY +DERIVATIVES THEREOF, EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES, +INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND +NON-INFRINGEMENT. THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, AND +THE AUTHORS AND DISTRIBUTORS HAVE NO OBLIGATION TO PROVIDE +MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. + +GOVERNMENT USE: If you are acquiring this software on behalf of the +U.S. government, the Government shall have only "Restricted Rights" in +the software and related documentation as defined in the Federal +Acquisition Regulations (FARs) in Clause 52.227.19 (c) (2). If you are +acquiring the software on behalf of the Department of Defense, the +software shall be classified as "Commercial Computer Software" and the +Government shall have only "Restricted Rights" as defined in Clause +252.227-7013 (c) (1) of DFARs. Notwithstanding the foregoing, the +authors grant the U.S. Government and others acting in its behalf +permission to use and distribute the software in accordance with the +terms specified in this license. +---*/ +#include +#include + +#include +#include + +#include "../check_includes.h" + +#define LOG_NAME "check_io.log" + +long myrandom(long x) { + double xx = x; + double r = random(); + double max = ((long)RAND_MAX)+1; + max /= xx; + return (long)((r/max)); +} + + +void handle_smoketest(stasis_handle_t * h) { + + const int one = 0x11111111; + const int two = 0x22222222; + const int three = 0x33333333; + const int four = 0x44444444; + + assert((!h->num_copies()) || (!h->num_copies_buffer())); + + assert(0 == h->start_position(h) || + 0 == h->end_position(h)); + + assert(! h->write(h, 0, (byte*)&one, sizeof(int))); + + int one_read = 0; + assert(! h->read(h, 0, (byte*)&one_read, sizeof(int))); + assert(one_read == one); + stasis_write_buffer_t * w = h->write_buffer(h, sizeof(int), sizeof(int)); + *((int*)(w->buf)) = two; + w->h->release_write_buffer(w); + one_read = 0; + stasis_read_buffer_t * r = h->read_buffer(h, 0, sizeof(int)); + one_read = *((int*)(r->buf)); + r->h->release_read_buffer(r); + assert(one == one_read); + + int two_read = 0; + assert(! h->read(h, sizeof(int), (byte*)&two_read, sizeof(int))); + assert(two == two_read); + + lsn_t off; + assert(! h->append(h, &off, (byte*)&three, sizeof(int))); + + w = h->append_buffer(h, sizeof(int)); + memcpy(w->buf, &four, sizeof(int)); + w->h->release_write_buffer(w); + + h->truncate_start(h, 2 * sizeof(int)); + + int three_read = 0; + int four_read = 0; + + assert(! h->read(h, 2*sizeof(int), (byte*)&three_read, sizeof(int))); + + r = h->read_buffer(h, 3*sizeof(int), sizeof(int)); + memcpy(&four_read, r->buf, sizeof(int)); + r->h->release_read_buffer(r); + + assert(three == three_read); + assert(four == four_read); + +} + + +typedef struct { + int * values; + int count; + stasis_handle_t * h; +} thread_arg; + +#define VALUE_COUNT 100000 +#define THREAD_COUNT 100 +#define OPS_PER_THREAD 500000 + +lsn_t trunc_val; +pthread_mutex_t trunc_mut = PTHREAD_MUTEX_INITIALIZER; + +void load_handle(thread_arg* t) { + lsn_t * offsets = malloc(t->count * sizeof(lsn_t)); + + stasis_handle_t * h = t->h; + + for(int i = 0; i < t->count; i++) { + offsets[i] = -1; + } + + for(int i = 0; i < OPS_PER_THREAD; i++) { + int val = myrandom(t->count); + + if(offsets[val] == -1) { + // Need to write it somewhere. + + long choice = myrandom(4); + + switch(choice) { + case 0: { // overwrite old entry with write() + long val2 = myrandom(t->count); + offsets[val] = offsets[val2]; + offsets[val2] = -1; + if(offsets[val] != -1) { + int ret = h->write(h, offsets[val], (const byte*)&(t->values[val]), sizeof(int)); + if(ret) { + assert(ret == EDOM); + offsets[val] = -1; + i--; + } + } else { + i--; + } + } break; + case 1: { // overwrite old entry with write_buffer() + long val2 = myrandom(t->count); + offsets[val] = offsets[val2]; + offsets[val2] = -1; + if(offsets[val] != -1) { + stasis_write_buffer_t * w = h->write_buffer(h, offsets[val], sizeof(int)); + if(!w->error) { + *((int*)w->buf) = t->values[val]; + assert(w->len == sizeof(int)); + assert(w->off == offsets[val]); + } else { + assert(w->error == EDOM); + offsets[val] = -1; + i--; + } + w->h->release_write_buffer(w); + } else { + i--; + } + } break; + case 2: { // append + int ret = h->append(h, &(offsets[val]), (const byte*)&(t->values[val]), sizeof(int)); + assert(!ret); + } break; + case 3: { // append_buffer + stasis_write_buffer_t * w = h->append_buffer(h, sizeof(int)); + if(!w->error) { + *((int*)w->buf) = t->values[val]; + assert(w->len == sizeof(int)); + offsets[val] = w->off; + } else { + abort(); + } + w->h->release_write_buffer(w); + } break; + default: { + abort(); + } + } + + int check; + int ret = h->read(h, offsets[val], (byte*)&check, sizeof(int)); + if(!ret) { + assert(check == t->values[val]); + } + + + } else { + // Read the value. + + long choice = myrandom(2); + + switch(choice) { + case 0: { // read + int j; + int ret = h->read(h, offsets[val], (byte*)&j, sizeof(int)); + if(!ret) { + assert(j == t->values[val]); + } else { + assert(ret == EDOM); + assert(h->start_position(h) > offsets[val]); + } + } break; + case 1: { // read_buffer + stasis_read_buffer_t * r = h->read_buffer(h, offsets[val], sizeof(int)); + if(!r->error) { + assert(*(int*)(r->buf) == t->values[val]); + assert(r->len == sizeof(int)); + } else { + assert(r->error == EDOM); + assert(h->start_position(h) > offsets[val]); + } + r->h->release_read_buffer(r); + + } break; + default: + abort(); + }; + + } + + // Truncate 1% of the time. + if(!myrandom(100)) { + lsn_t pre_start = h->start_position(h); + + pthread_mutex_lock(&trunc_mut); + lsn_t start = trunc_val; + lsn_t stop = start - 100 + myrandom(200); + if(stop > trunc_val) { + trunc_val = stop; + } + pthread_mutex_unlock(&trunc_mut); + + + assert(pre_start <= start); + int ret = h->truncate_start(h, stop); + if(!ret) { + lsn_t post_stop = h->start_position(h); + assert(stop <= post_stop); + } + + } + } + free(offsets); +} + +void handle_sequentialtest(stasis_handle_t * h) { + time_t seed = time(0); + printf("\nSeed = %ld\n", seed); + srandom(seed); + + int * values = malloc(VALUE_COUNT * sizeof(int)); + + for(int i = 0; i < VALUE_COUNT; i++) { + values[i] = i; + } + trunc_val = 0; + thread_arg arg = { values, VALUE_COUNT, h}; + load_handle(&arg); + + free(values); +} + +void handle_concurrencytest(stasis_handle_t * h) { + int vc = myrandom(VALUE_COUNT) + 10; + + printf("Running concurrency test with %d values", vc); + + int * values = malloc(vc * sizeof(int)); + + for(int i = 0; i < vc; i++) { + values[i] = i; + } + + thread_arg * args = malloc(THREAD_COUNT * sizeof(thread_arg)); + pthread_t * threads = malloc(THREAD_COUNT * sizeof(pthread_t)); + + int val_per_thread = vc / THREAD_COUNT; + trunc_val = 0; + for(int i = 0; i < THREAD_COUNT; i++) { + args[i].values = &(values[i * val_per_thread]); + args[i].count = val_per_thread; + args[i].h = h; + pthread_create(&threads[i], 0, (void*(*)(void*))load_handle, &args[i]); + } + + for(int i = 0; i < THREAD_COUNT; i++) { + pthread_join(threads[i], 0); + } + free(values); + free(args); + free(threads); +} +/** + @test + Check the memory I/O handle. +*/ +START_TEST(io_memoryTest) { + stasis_handle_t * h = stasis_handle(open_memory)(0); + handle_smoketest(h); + h->close(h); + h = stasis_handle(open_memory)(0); + handle_sequentialtest(h); + h->close(h); + h = stasis_handle(open_memory)(0); + handle_concurrencytest(h); + h->close(h); +} END_TEST + + +/** + Add suite declarations here +*/ +Suite * check_suite(void) { + Suite *s = suite_create("io"); + /* Begin a new test */ + TCase *tc = tcase_create("io_test"); + tcase_set_timeout(tc, 600); // ten minute timeout + + /* Sub tests are added, one per line, here */ + tcase_add_test(tc, io_memoryTest); + /* --------------------------------------------- */ + tcase_add_checked_fixture(tc, setup, teardown); + suite_add_tcase(s, tc); + + return s; +} + +#include "../check_setup.h"