add latch-free slotted pages, and a bug-fix in page.c

This commit is contained in:
Sears Russell 2010-09-17 01:10:42 +00:00
parent e6067ae60b
commit bb904e85cf
9 changed files with 247 additions and 7 deletions

View file

@ -24,6 +24,7 @@ ADD_LIBRARY(stasis crc32.c redblack.c tsearchcompat.c lhtable.c concurrentHash.c
page/raw.c
page/lsnFree.c
page/segment.c
page/latchFree/lfSlotted.c
compensations.c
operations/pageOperations.c
operations/decrement.c operations/increment.c

View file

@ -77,6 +77,7 @@ terms specified in this license.
#include <stasis/page/slotted.h>
#include <stasis/page/fixed.h>
#include <stasis/page/uninitialized.h>
#include <stasis/page/latchFree/lfSlotted.h>
#include <stasis/operations/arrayList.h>
#include <stasis/bufferPool.h>
#include <stasis/truncation.h>
@ -120,6 +121,7 @@ void stasis_page_init(stasis_dirty_page_table_t * dpt) {
stasis_page_impl_register(lsmRootImpl());
stasis_page_impl_register(slottedLsnFreeImpl());
stasis_page_impl_register(segmentImpl());
stasis_page_impl_register(stasis_page_slotted_latch_free_impl());
}
void stasis_page_deinit() {
@ -147,7 +149,7 @@ void stasis_record_write(int xid, Page * p, recordid rid, const byte *dat) {
assert(rid.size <= BLOB_THRESHOLD_SIZE);
byte * buf = stasis_record_write_begin(xid, p, rid);
memcpy(buf, dat, stasis_record_length_read(xid, p, rid));
memcpy(buf, dat, stasis_record_type_to_size(rid.size));
stasis_record_write_done(xid,p,rid,buf);
assert( (p->id == rid.page) && (p->memAddr != NULL) );
}
@ -157,6 +159,7 @@ int stasis_record_read(int xid, Page * p, recordid rid, byte *buf) {
const byte * dat = stasis_record_read_begin(xid,p,rid);
memcpy(buf, dat, stasis_record_length_read(xid,p,rid));
stasis_record_read_done(xid,p,rid,dat);
return 0;
@ -182,7 +185,9 @@ const byte * stasis_record_read_begin(int xid, Page * p, recordid rid) {
byte * stasis_record_write_begin(int xid, Page * p, recordid rid) {
int page_type = p->pageType;
assert(page_type);
assert(stasis_record_length_read(xid, p, rid) == stasis_record_type_to_size(rid.size));
if(p->pageType != SLOTTED_LATCH_FREE_PAGE) {
assert(stasis_record_length_read(xid, p, rid) == stasis_record_type_to_size(rid.size));
}
return page_impls[page_type].recordWrite(xid, p, rid);
}
void stasis_record_read_done(int xid, Page *p, recordid rid, const byte *b) {

View file

@ -0,0 +1,113 @@
/*
* lfSlotted.c
*
* Created on: Aug 19, 2010
* Author: sears
*/
#include <stasis/common.h>
#include <stasis/page.h>
#include <stasis/page/latchFree/lfSlotted.h>
#define CAS(_a,_o,_n) __sync_bool_compare_and_swap(_a,_o,_n)
#define BARRIER() __sync_synchronize()
#define ATOMIC_ADD(_a, _i) __sync_add_and_fetch(_a, _i)
static int notSupported(int xid, Page * p) { return 0; }
static const byte* lfSlottedRead (int xid, Page *p, recordid rid) {
return stasis_page_slotted_record_cptr(p, rid.slot);
}
static byte* lfSlottedWrite(int xid, Page *p, recordid rid) {
assert(*stasis_page_slotted_numslots_cptr(p) == rid.slot);
return stasis_page_slotted_record_ptr(p, rid.slot);
}
static void lfSlottedWriteDone(int xid, Page *p, recordid rid, byte *buf) {
BARRIER();
int succ = CAS(stasis_page_slotted_numslots_ptr(p), rid.slot, rid.slot+1);
DEBUG("write done %d\n", rid.slot+1);
assert(succ);
}
static int lfSlottedRecordGetType(int xid, Page *p, recordid rid) {
if(*stasis_page_slotted_numslots_cptr(p) <= rid.slot) { return INVALID_SLOT; }
int ret = *stasis_page_slotted_slot_length_cptr(p, rid.slot);
return ret >= 0 ? NORMAL_SLOT : ret;
}
static int lfSlottedRecordGetLength(int xid, Page *p, recordid rid) {
if(*stasis_page_slotted_numslots_cptr(p) <= rid.slot) { return INVALID_SLOT; }
int ret = *stasis_page_slotted_slot_length_cptr(p, rid.slot);
return stasis_record_type_to_size(ret);
}
static recordid lfSlottedRecordFirst(int xid, Page *p) {
recordid ret;
ret.page = p->id;
ret.slot = 0;
ret.size = lfSlottedRecordGetType(xid, p, ret);
return ret.size == INVALID_SLOT ? NULLRID : ret;
}
static recordid lfSlottedRecordNext(int xid, Page *p, recordid ret) {
ret.slot++;
ret.size = lfSlottedRecordGetType(xid, p, ret);
DEBUG("next %d %d %d\n", ret.slot, (int)ret.size, *stasis_page_slotted_numslots_ptr(p));
return ret.size == INVALID_SLOT ? NULLRID : ret;
}
static recordid lfSlottedRecordLast(int xid, Page *p) {
recordid ret;
ret.page = p->id;
ret.slot = (*stasis_page_slotted_numslots_cptr(p))-1;
ret.size = lfSlottedRecordGetType(xid, p, ret);
return ret.size == INVALID_SLOT ? NULLRID : ret;
}
static void lfSlottedPostAlloc(int xid, Page * page, recordid rid) {
int16_t off = __sync_fetch_and_add(stasis_page_slotted_freespace_ptr(page), stasis_record_type_to_size(rid.size));
*stasis_page_slotted_slot_ptr(page, rid.slot) = off;
*stasis_page_slotted_slot_length_ptr(page, rid.slot) = rid.size;
// don't update numslots_ptr yet. Note that we can only have one append at a time with this update protocol...
}
void lfSlottedRecordFree(int xid, Page *p, recordid rid) {
*stasis_page_slotted_slot_length_ptr(p, rid.slot) = INVALID_SLOT;
BARRIER();
}
void stasis_page_slotted_latch_free_initialize_page(Page * page) {
stasis_page_slotted_initialize_page(page);
page->pageType = SLOTTED_LATCH_FREE_PAGE;
}
page_impl stasis_page_slotted_latch_free_impl(void) {
page_impl slotted = stasis_page_slotted_impl();
page_impl pi = {
SLOTTED_LATCH_FREE_PAGE,
1,
lfSlottedRead,
lfSlottedWrite,
0,// readDone (no-op)
lfSlottedWriteDone,
lfSlottedRecordGetType,
0, // not supported
lfSlottedRecordGetLength,
lfSlottedRecordFirst,
lfSlottedRecordNext,
lfSlottedRecordLast,
notSupported, // is block supported
stasis_block_first_default_impl,
stasis_block_next_default_impl,
stasis_block_done_default_impl,
slotted.pageFreespace, // this should work as is.
NULL, // slotted.pageCompact, // there is no chance of supporting this
NULL, //slotted.pageCompactSlotIDs, // ditto
slotted.recordPreAlloc, // this is fine; it's read only...
lfSlottedPostAlloc,
0, // can't splice lots of records atomically with the current scheme.
lfSlottedRecordFree,
0, // page_impl_dereference_identity,
slotted.pageLoaded,
slotted.pageFlushed,
slotted.pageCleanup
};
return pi;
}

View file

@ -50,7 +50,8 @@ static inline void slottedFsck(const Page const * page) {
assert(slotListStart < PAGE_SIZE && slotListStart >= 0);
assert(page_type == SLOTTED_PAGE ||
page_type == BOUNDARY_TAG_PAGE ||
page_type == SLOTTED_LSN_FREE_PAGE);
page_type == SLOTTED_LSN_FREE_PAGE ||
page_type == SLOTTED_LATCH_FREE_PAGE);
assert(numslots >= 0);
assert(numslots * SLOTTED_PAGE_OVERHEAD_PER_RECORD < PAGE_SIZE);
assert(freespace >= 0);

View file

@ -235,6 +235,7 @@ static const short SLOT_TYPE_LENGTHS[] = { -1, -1, sizeof(blob_record_t), -1};
#define SLOTTED_LSN_FREE_PAGE 10
#define SEGMENT_PAGE 11
#define MULTI_PAGE 12 // Not really a page type, but used to denote operations that touch many pages.
#define SLOTTED_LATCH_FREE_PAGE 13
#define USER_DEFINED_PAGE(n) (100+n) // 0 <= n < 155
#define MAX_PAGE_TYPE 255

View file

@ -917,6 +917,7 @@ page_impl* stasis_page_impl_get(int id);
returned by loadPage()
*/
void stasis_page_slotted_initialize_page(Page * p);
void stasis_page_slotted_latch_free_initialize_page(Page * page);
void stasis_slotted_lsn_free_initialize_page(Page * p);
void stasis_fixed_initialize_page(Page * page, size_t size, int count);
void stasis_indirect_initialize_page(Page * p, int height);

View file

@ -0,0 +1,34 @@
/**
* @file A special-purpose slotted page implementation for latch-free data structures
*
* This page format is similar to that provided by slotted.h (and
* uses an identical layout). The difference is that this format
* allows readers and writers to access the same page concurrently.
*
* It does so by making use of GCC atomic operations, and by providing
* a restricted API.
*
* Only one thread may write to a given page at a time (this restriction
* may be limited in the future), records may only be written once, and
* the space taken up by freed records is not reused. Instead, the writer
* thread must (somhehow) ensure that no readers are accessing the page,
* and then must either reinitialize the page (losing its contents), or
* simply free the page, allowing Stasis' page allocation routines to
* reuse the underlying storage.
*
* Readers are latch-free; they do not perform any explicit synchronization
* with the writer, except when they synchronize so that space can be reused.
*
* Created on: Aug 19, 2010
* Author: sears
*/
#ifndef LFSLOTTED_H_
#define LFSLOTTED_H_
#include <stasis/page/slotted.h>
void stasis_page_slotted_latch_free_init();
void stasis_page_slotted_latch_free_deinit();
page_impl stasis_page_slotted_latch_free_impl();
#endif /* LFSLOTTED_H_ */

View file

@ -59,7 +59,8 @@ Slotted page layout:
has been handled, except in slottedPostRalloc...
************************************************************************/
#ifndef STASIS_PAGE_SLOTTED_H
#define STASIS_PAGE_SLOTTED_H
static inline int16_t* stasis_page_slotted_freespace_ptr(Page * p) { return stasis_page_int16_ptr_from_end((p), 1); }
static inline int16_t* stasis_page_slotted_numslots_ptr(Page * p) { return stasis_page_int16_ptr_from_end(p, 2); }
static inline int16_t* stasis_page_slotted_freelist_ptr(Page * p) { return stasis_page_int16_ptr_from_end(p, 3); }
@ -78,3 +79,4 @@ void stasis_page_slotted_init();
void stasis_page_slotted_deinit();
page_impl stasis_page_slotted_impl();
page_impl stasis_page_boundary_tag_impl();
#endif //STASIS_PAGE_SLOTTED_H

View file

@ -48,6 +48,7 @@ terms specified in this license.
#include <stasis/page.h>
#include <stasis/page/slotted.h>
#include <stasis/page/latchFree/lfSlotted.h>
#include <stasis/blobManager.h>
#include <stasis/bufferManager.h>
#include <stasis/transactional.h>
@ -152,6 +153,78 @@ static void* fixed_worker_thread(void * arg_ptr) {
return NULL;
}
typedef struct {
Page ** pages;
int num_pages;
int my_page;
} latchFree_worker_thread_args;
static void* latchFree_worker_thread(void * arg_ptr) {
latchFree_worker_thread_args * arg = arg_ptr;
int alloced_count = 0;
while(1) {
int off = myrandom(arg->num_pages);
Page * p = arg->pages[off];
if(off == arg->my_page) {
recordid rid = stasis_record_alloc_begin(-1, p, sizeof(char));
if(rid.size == INVALID_SLOT) {
assert(alloced_count > 200);
return 0;
} else {
stasis_record_alloc_done(-1, p, rid);
alloced_count++;
assert(alloced_count < PAGE_SIZE/2);
unsigned char c = (unsigned char)rid.slot;
stasis_record_write(-1, p, rid, &c);
}
} else if(*stasis_page_slotted_numslots_cptr(p)) { // if the page is empty, try another.
// read a random record
int slot = myrandom(stasis_record_last(-1, p).slot+1);
recordid rid;
rid.page = p->id;
rid.slot = slot;
rid.size = sizeof(char);
assert(stasis_record_type_read(-1, p, rid) == NORMAL_SLOT);
assert(stasis_record_length_read(-1, p, rid) == sizeof(char));
unsigned char c = 0;
stasis_record_read(-1, p, rid, &c);
assert(c == (unsigned char) rid.slot);
}
}
}
START_TEST(latchFreeThreadTest) {
Tinit();
int NUM_THREADS = 200;
Page ** pages = malloc(sizeof(Page*)*NUM_THREADS);
int xid = Tbegin();
pageid_t region = TregionAlloc(xid, NUM_THREADS, -1);
for(int i = 0; i < NUM_THREADS; i++) {
pages[i] = loadPage(xid, i+region);
stasis_page_slotted_latch_free_initialize_page(pages[i]);
}
pthread_t * threads = malloc(sizeof(pthread_t) * NUM_THREADS);
for(int i = 0; i < NUM_THREADS; i++) {
latchFree_worker_thread_args * arg = malloc(sizeof(*arg));
arg->pages = pages;
arg->my_page = i;
arg->num_pages = NUM_THREADS;
pthread_create(&threads[i], 0, &latchFree_worker_thread, arg);
}
for(int i = 0; i < NUM_THREADS; i++) {
pthread_join(threads[i],0);
}
for(int i = 0; i < NUM_THREADS; i++) {
releasePage(pages[i]);
}
Tcommit(xid);
Tdeinit();
} END_TEST
static void* worker_thread(void * arg_ptr) {
Page * p = (Page*)arg_ptr;
int i;
@ -313,9 +386,12 @@ static void assertRecordCountSizeType(int xid, Page *p, int count, int size, int
static void checkPageIterators(int xid, Page *p,int record_count) {
recordid first = stasis_record_alloc_begin(xid, p, sizeof(int64_t));
stasis_record_alloc_done(xid,p,first);
for(int i = 1; i < record_count; i++) {
stasis_record_alloc_done(xid,p,stasis_record_alloc_begin(xid,p,sizeof(int64_t)));
int64_t i = 0;
stasis_record_write(xid,p,first,(byte*)&i);
for(i = 1; i < record_count; i++) {
recordid rid = stasis_record_alloc_begin(xid,p,sizeof(int64_t));
stasis_record_alloc_done(xid,p,rid);
stasis_record_write(xid,p,rid,(byte*)&i);
}
assertRecordCountSizeType(xid, p, record_count, sizeof(int64_t), NORMAL_SLOT);
@ -356,6 +432,11 @@ START_TEST(pageRecordSizeTypeIteratorTest) {
checkPageIterators(xid,p,10);
memset(p->memAddr, 0, PAGE_SIZE);
stasis_page_slotted_latch_free_initialize_page(p);
checkPageIterators(xid,p,10);
releasePage(p);
Tcommit(xid);
@ -647,6 +728,7 @@ Suite * check_suite(void) {
tcase_add_test(tc, pageNoThreadTest);
tcase_add_test(tc, pageThreadTest);
tcase_add_test(tc, fixedPageThreadTest);
tcase_add_test(tc, latchFreeThreadTest);
/* --------------------------------------------- */