diff --git a/src/stasis/CMakeLists.txt b/src/stasis/CMakeLists.txt index b477ebd..e098875 100644 --- a/src/stasis/CMakeLists.txt +++ b/src/stasis/CMakeLists.txt @@ -24,6 +24,7 @@ ADD_LIBRARY(stasis crc32.c redblack.c tsearchcompat.c lhtable.c concurrentHash.c page/raw.c page/lsnFree.c page/segment.c + page/latchFree/lfSlotted.c compensations.c operations/pageOperations.c operations/decrement.c operations/increment.c diff --git a/src/stasis/page.c b/src/stasis/page.c index da741d3..5572fb0 100644 --- a/src/stasis/page.c +++ b/src/stasis/page.c @@ -77,6 +77,7 @@ terms specified in this license. #include #include #include +#include #include #include #include @@ -120,6 +121,7 @@ void stasis_page_init(stasis_dirty_page_table_t * dpt) { stasis_page_impl_register(lsmRootImpl()); stasis_page_impl_register(slottedLsnFreeImpl()); stasis_page_impl_register(segmentImpl()); + stasis_page_impl_register(stasis_page_slotted_latch_free_impl()); } void stasis_page_deinit() { @@ -147,7 +149,7 @@ void stasis_record_write(int xid, Page * p, recordid rid, const byte *dat) { assert(rid.size <= BLOB_THRESHOLD_SIZE); byte * buf = stasis_record_write_begin(xid, p, rid); - memcpy(buf, dat, stasis_record_length_read(xid, p, rid)); + memcpy(buf, dat, stasis_record_type_to_size(rid.size)); stasis_record_write_done(xid,p,rid,buf); assert( (p->id == rid.page) && (p->memAddr != NULL) ); } @@ -157,6 +159,7 @@ int stasis_record_read(int xid, Page * p, recordid rid, byte *buf) { const byte * dat = stasis_record_read_begin(xid,p,rid); memcpy(buf, dat, stasis_record_length_read(xid,p,rid)); + stasis_record_read_done(xid,p,rid,dat); return 0; @@ -182,7 +185,9 @@ const byte * stasis_record_read_begin(int xid, Page * p, recordid rid) { byte * stasis_record_write_begin(int xid, Page * p, recordid rid) { int page_type = p->pageType; assert(page_type); - assert(stasis_record_length_read(xid, p, rid) == stasis_record_type_to_size(rid.size)); + if(p->pageType != SLOTTED_LATCH_FREE_PAGE) { + assert(stasis_record_length_read(xid, p, rid) == stasis_record_type_to_size(rid.size)); + } return page_impls[page_type].recordWrite(xid, p, rid); } void stasis_record_read_done(int xid, Page *p, recordid rid, const byte *b) { diff --git a/src/stasis/page/latchFree/lfSlotted.c b/src/stasis/page/latchFree/lfSlotted.c new file mode 100644 index 0000000..1b9c518 --- /dev/null +++ b/src/stasis/page/latchFree/lfSlotted.c @@ -0,0 +1,113 @@ +/* + * lfSlotted.c + * + * Created on: Aug 19, 2010 + * Author: sears + */ + +#include +#include +#include + +#define CAS(_a,_o,_n) __sync_bool_compare_and_swap(_a,_o,_n) +#define BARRIER() __sync_synchronize() +#define ATOMIC_ADD(_a, _i) __sync_add_and_fetch(_a, _i) + +static int notSupported(int xid, Page * p) { return 0; } + +static const byte* lfSlottedRead (int xid, Page *p, recordid rid) { + return stasis_page_slotted_record_cptr(p, rid.slot); +} + +static byte* lfSlottedWrite(int xid, Page *p, recordid rid) { + assert(*stasis_page_slotted_numslots_cptr(p) == rid.slot); + return stasis_page_slotted_record_ptr(p, rid.slot); +} +static void lfSlottedWriteDone(int xid, Page *p, recordid rid, byte *buf) { + BARRIER(); + int succ = CAS(stasis_page_slotted_numslots_ptr(p), rid.slot, rid.slot+1); + DEBUG("write done %d\n", rid.slot+1); + assert(succ); +} +static int lfSlottedRecordGetType(int xid, Page *p, recordid rid) { + if(*stasis_page_slotted_numslots_cptr(p) <= rid.slot) { return INVALID_SLOT; } + int ret = *stasis_page_slotted_slot_length_cptr(p, rid.slot); + return ret >= 0 ? NORMAL_SLOT : ret; +} +static int lfSlottedRecordGetLength(int xid, Page *p, recordid rid) { + if(*stasis_page_slotted_numslots_cptr(p) <= rid.slot) { return INVALID_SLOT; } + int ret = *stasis_page_slotted_slot_length_cptr(p, rid.slot); + return stasis_record_type_to_size(ret); +} +static recordid lfSlottedRecordFirst(int xid, Page *p) { + recordid ret; + ret.page = p->id; + ret.slot = 0; + ret.size = lfSlottedRecordGetType(xid, p, ret); + return ret.size == INVALID_SLOT ? NULLRID : ret; + +} +static recordid lfSlottedRecordNext(int xid, Page *p, recordid ret) { + ret.slot++; + ret.size = lfSlottedRecordGetType(xid, p, ret); + DEBUG("next %d %d %d\n", ret.slot, (int)ret.size, *stasis_page_slotted_numslots_ptr(p)); + return ret.size == INVALID_SLOT ? NULLRID : ret; +} +static recordid lfSlottedRecordLast(int xid, Page *p) { + recordid ret; + ret.page = p->id; + ret.slot = (*stasis_page_slotted_numslots_cptr(p))-1; + ret.size = lfSlottedRecordGetType(xid, p, ret); + return ret.size == INVALID_SLOT ? NULLRID : ret; +} + +static void lfSlottedPostAlloc(int xid, Page * page, recordid rid) { + int16_t off = __sync_fetch_and_add(stasis_page_slotted_freespace_ptr(page), stasis_record_type_to_size(rid.size)); + *stasis_page_slotted_slot_ptr(page, rid.slot) = off; + *stasis_page_slotted_slot_length_ptr(page, rid.slot) = rid.size; + // don't update numslots_ptr yet. Note that we can only have one append at a time with this update protocol... +} +void lfSlottedRecordFree(int xid, Page *p, recordid rid) { + *stasis_page_slotted_slot_length_ptr(p, rid.slot) = INVALID_SLOT; + BARRIER(); +} + +void stasis_page_slotted_latch_free_initialize_page(Page * page) { + stasis_page_slotted_initialize_page(page); + page->pageType = SLOTTED_LATCH_FREE_PAGE; + +} + +page_impl stasis_page_slotted_latch_free_impl(void) { + page_impl slotted = stasis_page_slotted_impl(); + page_impl pi = { + SLOTTED_LATCH_FREE_PAGE, + 1, + lfSlottedRead, + lfSlottedWrite, + 0,// readDone (no-op) + lfSlottedWriteDone, + lfSlottedRecordGetType, + 0, // not supported + lfSlottedRecordGetLength, + lfSlottedRecordFirst, + lfSlottedRecordNext, + lfSlottedRecordLast, + notSupported, // is block supported + stasis_block_first_default_impl, + stasis_block_next_default_impl, + stasis_block_done_default_impl, + slotted.pageFreespace, // this should work as is. + NULL, // slotted.pageCompact, // there is no chance of supporting this + NULL, //slotted.pageCompactSlotIDs, // ditto + slotted.recordPreAlloc, // this is fine; it's read only... + lfSlottedPostAlloc, + 0, // can't splice lots of records atomically with the current scheme. + lfSlottedRecordFree, + 0, // page_impl_dereference_identity, + slotted.pageLoaded, + slotted.pageFlushed, + slotted.pageCleanup + }; + return pi; +} diff --git a/src/stasis/page/slotted.c b/src/stasis/page/slotted.c index 942c266..edc3668 100644 --- a/src/stasis/page/slotted.c +++ b/src/stasis/page/slotted.c @@ -50,7 +50,8 @@ static inline void slottedFsck(const Page const * page) { assert(slotListStart < PAGE_SIZE && slotListStart >= 0); assert(page_type == SLOTTED_PAGE || page_type == BOUNDARY_TAG_PAGE || - page_type == SLOTTED_LSN_FREE_PAGE); + page_type == SLOTTED_LSN_FREE_PAGE || + page_type == SLOTTED_LATCH_FREE_PAGE); assert(numslots >= 0); assert(numslots * SLOTTED_PAGE_OVERHEAD_PER_RECORD < PAGE_SIZE); assert(freespace >= 0); diff --git a/stasis/constants.h b/stasis/constants.h index 939587b..62ef83f 100644 --- a/stasis/constants.h +++ b/stasis/constants.h @@ -235,6 +235,7 @@ static const short SLOT_TYPE_LENGTHS[] = { -1, -1, sizeof(blob_record_t), -1}; #define SLOTTED_LSN_FREE_PAGE 10 #define SEGMENT_PAGE 11 #define MULTI_PAGE 12 // Not really a page type, but used to denote operations that touch many pages. +#define SLOTTED_LATCH_FREE_PAGE 13 #define USER_DEFINED_PAGE(n) (100+n) // 0 <= n < 155 #define MAX_PAGE_TYPE 255 diff --git a/stasis/page.h b/stasis/page.h index bde9006..0934e64 100644 --- a/stasis/page.h +++ b/stasis/page.h @@ -917,6 +917,7 @@ page_impl* stasis_page_impl_get(int id); returned by loadPage() */ void stasis_page_slotted_initialize_page(Page * p); +void stasis_page_slotted_latch_free_initialize_page(Page * page); void stasis_slotted_lsn_free_initialize_page(Page * p); void stasis_fixed_initialize_page(Page * page, size_t size, int count); void stasis_indirect_initialize_page(Page * p, int height); diff --git a/stasis/page/latchFree/lfSlotted.h b/stasis/page/latchFree/lfSlotted.h new file mode 100644 index 0000000..df81ddb --- /dev/null +++ b/stasis/page/latchFree/lfSlotted.h @@ -0,0 +1,34 @@ +/** + * @file A special-purpose slotted page implementation for latch-free data structures + * + * This page format is similar to that provided by slotted.h (and + * uses an identical layout). The difference is that this format + * allows readers and writers to access the same page concurrently. + * + * It does so by making use of GCC atomic operations, and by providing + * a restricted API. + * + * Only one thread may write to a given page at a time (this restriction + * may be limited in the future), records may only be written once, and + * the space taken up by freed records is not reused. Instead, the writer + * thread must (somhehow) ensure that no readers are accessing the page, + * and then must either reinitialize the page (losing its contents), or + * simply free the page, allowing Stasis' page allocation routines to + * reuse the underlying storage. + * + * Readers are latch-free; they do not perform any explicit synchronization + * with the writer, except when they synchronize so that space can be reused. + * + * Created on: Aug 19, 2010 + * Author: sears + */ +#ifndef LFSLOTTED_H_ +#define LFSLOTTED_H_ + +#include + +void stasis_page_slotted_latch_free_init(); +void stasis_page_slotted_latch_free_deinit(); +page_impl stasis_page_slotted_latch_free_impl(); + +#endif /* LFSLOTTED_H_ */ diff --git a/stasis/page/slotted.h b/stasis/page/slotted.h index 42be326..52e68a2 100644 --- a/stasis/page/slotted.h +++ b/stasis/page/slotted.h @@ -59,7 +59,8 @@ Slotted page layout: has been handled, except in slottedPostRalloc... ************************************************************************/ - +#ifndef STASIS_PAGE_SLOTTED_H +#define STASIS_PAGE_SLOTTED_H static inline int16_t* stasis_page_slotted_freespace_ptr(Page * p) { return stasis_page_int16_ptr_from_end((p), 1); } static inline int16_t* stasis_page_slotted_numslots_ptr(Page * p) { return stasis_page_int16_ptr_from_end(p, 2); } static inline int16_t* stasis_page_slotted_freelist_ptr(Page * p) { return stasis_page_int16_ptr_from_end(p, 3); } @@ -78,3 +79,4 @@ void stasis_page_slotted_init(); void stasis_page_slotted_deinit(); page_impl stasis_page_slotted_impl(); page_impl stasis_page_boundary_tag_impl(); +#endif //STASIS_PAGE_SLOTTED_H diff --git a/test/stasis/check_page.c b/test/stasis/check_page.c index 6f87262..c1390fc 100644 --- a/test/stasis/check_page.c +++ b/test/stasis/check_page.c @@ -48,6 +48,7 @@ terms specified in this license. #include #include +#include #include #include #include @@ -152,6 +153,78 @@ static void* fixed_worker_thread(void * arg_ptr) { return NULL; } +typedef struct { + Page ** pages; + int num_pages; + int my_page; +} latchFree_worker_thread_args; + +static void* latchFree_worker_thread(void * arg_ptr) { + latchFree_worker_thread_args * arg = arg_ptr; + + int alloced_count = 0; + while(1) { + int off = myrandom(arg->num_pages); + Page * p = arg->pages[off]; + if(off == arg->my_page) { + recordid rid = stasis_record_alloc_begin(-1, p, sizeof(char)); + if(rid.size == INVALID_SLOT) { + assert(alloced_count > 200); + return 0; + } else { + stasis_record_alloc_done(-1, p, rid); + alloced_count++; + assert(alloced_count < PAGE_SIZE/2); + unsigned char c = (unsigned char)rid.slot; + stasis_record_write(-1, p, rid, &c); + } + } else if(*stasis_page_slotted_numslots_cptr(p)) { // if the page is empty, try another. + // read a random record + int slot = myrandom(stasis_record_last(-1, p).slot+1); + recordid rid; + rid.page = p->id; + rid.slot = slot; + rid.size = sizeof(char); + assert(stasis_record_type_read(-1, p, rid) == NORMAL_SLOT); + assert(stasis_record_length_read(-1, p, rid) == sizeof(char)); + unsigned char c = 0; + stasis_record_read(-1, p, rid, &c); + assert(c == (unsigned char) rid.slot); + } + } +} +START_TEST(latchFreeThreadTest) { + Tinit(); + int NUM_THREADS = 200; + Page ** pages = malloc(sizeof(Page*)*NUM_THREADS); + int xid = Tbegin(); + + pageid_t region = TregionAlloc(xid, NUM_THREADS, -1); + + for(int i = 0; i < NUM_THREADS; i++) { + pages[i] = loadPage(xid, i+region); + stasis_page_slotted_latch_free_initialize_page(pages[i]); + } + + pthread_t * threads = malloc(sizeof(pthread_t) * NUM_THREADS); + + for(int i = 0; i < NUM_THREADS; i++) { + latchFree_worker_thread_args * arg = malloc(sizeof(*arg)); + arg->pages = pages; + arg->my_page = i; + arg->num_pages = NUM_THREADS; + pthread_create(&threads[i], 0, &latchFree_worker_thread, arg); + } + for(int i = 0; i < NUM_THREADS; i++) { + pthread_join(threads[i],0); + } + for(int i = 0; i < NUM_THREADS; i++) { + releasePage(pages[i]); + } + Tcommit(xid); + Tdeinit(); + +} END_TEST static void* worker_thread(void * arg_ptr) { Page * p = (Page*)arg_ptr; int i; @@ -313,9 +386,12 @@ static void assertRecordCountSizeType(int xid, Page *p, int count, int size, int static void checkPageIterators(int xid, Page *p,int record_count) { recordid first = stasis_record_alloc_begin(xid, p, sizeof(int64_t)); stasis_record_alloc_done(xid,p,first); - - for(int i = 1; i < record_count; i++) { - stasis_record_alloc_done(xid,p,stasis_record_alloc_begin(xid,p,sizeof(int64_t))); + int64_t i = 0; + stasis_record_write(xid,p,first,(byte*)&i); + for(i = 1; i < record_count; i++) { + recordid rid = stasis_record_alloc_begin(xid,p,sizeof(int64_t)); + stasis_record_alloc_done(xid,p,rid); + stasis_record_write(xid,p,rid,(byte*)&i); } assertRecordCountSizeType(xid, p, record_count, sizeof(int64_t), NORMAL_SLOT); @@ -356,6 +432,11 @@ START_TEST(pageRecordSizeTypeIteratorTest) { checkPageIterators(xid,p,10); + memset(p->memAddr, 0, PAGE_SIZE); + stasis_page_slotted_latch_free_initialize_page(p); + + checkPageIterators(xid,p,10); + releasePage(p); Tcommit(xid); @@ -647,6 +728,7 @@ Suite * check_suite(void) { tcase_add_test(tc, pageNoThreadTest); tcase_add_test(tc, pageThreadTest); tcase_add_test(tc, fixedPageThreadTest); + tcase_add_test(tc, latchFreeThreadTest); /* --------------------------------------------- */