diff --git a/src/stasis/CMakeLists.txt b/src/stasis/CMakeLists.txt index 3ed420f..5b58f75 100644 --- a/src/stasis/CMakeLists.txt +++ b/src/stasis/CMakeLists.txt @@ -13,13 +13,14 @@ ADD_LIBRARY(stasis crc32.c redblack.c lhtable.c rw.c doubleLinkedList.c logger/filePool.c logger/inMemoryLog.c logger/logHandle.c logger/logger2.c - logger/logMemory.c page/raw.c page/slotted.c + logger/logMemory.c page/raw.c page/slotted.c page/lsnFree.c page/fixed.c compensations.c operations/pageOperations.c page/indirect.c operations/decrement.c operations/increment.c operations/prepare.c operations/set.c operations/alloc.c operations/noop.c operations/arrayList.c + operations/lsnFreeSet.c hash.c operations/naiveLinearHash.c operations/linearHashNTA.c diff --git a/src/stasis/Makefile.am b/src/stasis/Makefile.am index e958003..4f17cb5 100644 --- a/src/stasis/Makefile.am +++ b/src/stasis/Makefile.am @@ -10,7 +10,7 @@ libstasis_la_SOURCES=crc32.c redblack.c lhtable.c rw.c doubleLinkedList.c common logger/filePool.c \ logger/inMemoryLog.c logger/logHandle.c logger/logger2.c \ logger/logMemory.c \ - page/raw.c page/slotted.c page/fixed.c compensations.c \ + page/raw.c page/slotted.c page/lsnFree.c page/fixed.c compensations.c \ operations/pageOperations.c page/indirect.c operations/decrement.c \ operations/increment.c operations/prepare.c operations/set.c \ operations/alloc.c operations/noop.c \ @@ -19,6 +19,7 @@ libstasis_la_SOURCES=crc32.c redblack.c lhtable.c rw.c doubleLinkedList.c common operations/linearHashNTA.c operations/linkedListNTA.c \ operations/pageOrientedListNTA.c \ operations/regions.c operations/lsmTree.c \ + operations/lsnFreeSet.c \ io/rangeTracker.c io/memory.c io/file.c io/pfile.c io/non_blocking.c \ io/debug.c \ bufferManager.c \ diff --git a/src/stasis/operations.c b/src/stasis/operations.c index 016ad2b..620405d 100644 --- a/src/stasis/operations.c +++ b/src/stasis/operations.c @@ -78,7 +78,9 @@ void redoUpdate(const LogEntry * e) { Page * p = loadPage(e->xid, e->update.page); writelock(p->rwlatch,0); - if(stasis_page_lsn_read(p) < e->LSN) { + if(stasis_page_lsn_read(p) < e->LSN || + e->update.funcID == OPERATION_SET_LSN_FREE || + e->update.funcID == OPERATION_SET_LSN_FREE_INVERSE) { DEBUG("OPERATION xid %d Redo, %lld {%lld:%lld}\n", e->xid, e->LSN, e->update.page, stasis_page_lsn_read(p)); // Need to check the id field to find out what the REDO_action diff --git a/src/stasis/operations/lsnFreeSet.c b/src/stasis/operations/lsnFreeSet.c new file mode 100644 index 0000000..6cfd2b6 --- /dev/null +++ b/src/stasis/operations/lsnFreeSet.c @@ -0,0 +1,77 @@ +#include +#include + +#include +static int op_lsn_free_set(const LogEntry *e, Page *p) { + if(*stasis_page_type_ptr(p) != SLOTTED_LSN_FREE_PAGE) { abort() ; } + assert(e->update.arg_size >= (sizeof(pageoff_t) * 2)); + int size = e->update.arg_size; + size -= (2*sizeof(pageoff_t)); + const pageoff_t * a = (const pageoff_t*)getUpdateArgs(e); + const byte* b = (const byte*)&(a[2]); + assertlocked(p->rwlatch); + memcpy(p->memAddr + a[0], b, a[1]); + return 0; +} +static int op_lsn_free_unset(const LogEntry *e, Page *p) { + if(*stasis_page_type_ptr(p) != SLOTTED_LSN_FREE_PAGE) { return 0; } + assert(e->update.arg_size >= (sizeof(pageoff_t) * 2)); + int size = e->update.arg_size; + size -= (2*sizeof(pageoff_t)); + const pageoff_t * a = (const pageoff_t*)getUpdateArgs(e); + const byte* b = (const byte*)&(a[2]); + assertlocked(p->rwlatch); + memcpy(p->memAddr + a[0], b+a[1], a[1]); + return 0; +} +int TsetLSNFree(int xid, recordid rid, const void * dat) { + Page * p = loadPage(xid, rid.page); + readlock(p->rwlatch,0); + rid = stasis_record_dereference(xid,p,rid); + short type = stasis_record_type_read(xid,p,rid); + if(type == BLOB_SLOT) { + fprintf(stderr, "LSN-Free blobs are not implemented!\n"); + fflush(stderr); + abort(); + unlock(p->rwlatch); + } else { + rid.size = stasis_record_type_to_size(rid.size); + intptr_t sz = 2 * (sizeof(pageoff_t) + rid.size); + byte * buf = calloc(sz, 1); + pageoff_t * a = (pageoff_t*)buf; + // XXX hack! + byte * writeBuf = stasis_record_write_begin(xid, p, rid); + a[0] = writeBuf - p->memAddr; + stasis_record_write_done(xid, p, rid, writeBuf); + a[1] = rid.size; + byte * b = (byte*)&(a[2]); + // postimage + memcpy(b,dat,rid.size); + // preimage + stasis_record_read(xid, p, rid, b+rid.size); + + unlock(p->rwlatch); + + Tupdate(xid,rid.page,buf,sz,OPERATION_SET_LSN_FREE); + free(buf); + } + return 0; +} + +Operation getSetLsnFree() { + Operation o = { + OPERATION_SET_LSN_FREE, + OPERATION_SET_LSN_FREE_INVERSE, + op_lsn_free_set + }; + return o; +} + +Operation getSetLsnFreeInverse() { + Operation o = { + OPERATION_SET_LSN_FREE_INVERSE, + OPERATION_SET_LSN_FREE, + op_lsn_free_unset + }; + return o; +} diff --git a/src/stasis/operations/prepare.c b/src/stasis/operations/prepare.c index 7495d0a..0378dae 100644 --- a/src/stasis/operations/prepare.c +++ b/src/stasis/operations/prepare.c @@ -60,12 +60,12 @@ static int op_prepare(const LogEntry * e, Page * p) { } Operation getPrepare() { - Operation o = { - OPERATION_PREPARE, /* id */ - OPERATION_NOOP, - &op_prepare /* Function */ - }; - return o; + Operation o = { + OPERATION_PREPARE, /* id */ + OPERATION_NOOP, + &op_prepare /* Function */ + }; + return o; } /** PrepareGuardState is 1 if the iterator should continue on the next diff --git a/src/stasis/page.c b/src/stasis/page.c index ac659c4..cbc1d46 100644 --- a/src/stasis/page.c +++ b/src/stasis/page.c @@ -94,9 +94,6 @@ terms specified in this license. static page_impl page_impls[MAX_PAGE_TYPE]; -/** - XXX latching for pageWriteLSN... -*/ void stasis_page_lsn_write(int xid, Page * page, lsn_t lsn) { assertlocked(page->rwlatch); @@ -106,9 +103,7 @@ void stasis_page_lsn_write(int xid, Page * page, lsn_t lsn) { dirtyPages_add(page); return; } -/** - XXX latching for pageReadLSN... -*/ + lsn_t stasis_page_lsn_read(const Page * page) { assertlocked(page->rwlatch); return page->LSN; @@ -131,6 +126,7 @@ void stasis_page_init() { stasis_page_impl_register(blobImpl()); stasis_page_impl_register(indirectImpl()); stasis_page_impl_register(lsmRootImpl()); + stasis_page_impl_register(slottedLsnFreeImpl()); } void stasis_page_deinit() { diff --git a/src/stasis/page/lsnFree.c b/src/stasis/page/lsnFree.c new file mode 100644 index 0000000..2b69652 --- /dev/null +++ b/src/stasis/page/lsnFree.c @@ -0,0 +1,22 @@ +#include +#include +#include +#include + +void stasis_slotted_lsn_free_initialize_page(Page * p) { + stasis_slotted_initialize_page(p); + *stasis_page_type_ptr(p) = SLOTTED_LSN_FREE_PAGE; + *stasis_page_lsn_ptr(p) = -1; +} +static void lsnFreeLoaded(Page * p) { + p->LSN = stasis_log_file->next_available_lsn(stasis_log_file); +} +static void lsnFreeFlushed(Page * p) { } + +page_impl slottedLsnFreeImpl() { + page_impl pi = slottedImpl(); + pi.page_type = SLOTTED_LSN_FREE_PAGE; + pi.pageLoaded = lsnFreeLoaded; + pi.pageLoaded = lsnFreeFlushed; + return pi; +} diff --git a/src/stasis/page/slotted.c b/src/stasis/page/slotted.c index a26021d..0130007 100644 --- a/src/stasis/page/slotted.c +++ b/src/stasis/page/slotted.c @@ -29,7 +29,8 @@ static inline void slottedFsck(const Page const * page) { const long slotListStart = (long)slot_length_ptr(&dummy, numslots-1); assert(slotListStart < PAGE_SIZE && slotListStart >= 0); assert(page_type == SLOTTED_PAGE || - page_type == BOUNDARY_TAG_PAGE); + page_type == BOUNDARY_TAG_PAGE || + page_type == SLOTTED_LSN_FREE_PAGE); assert(numslots >= 0); assert(numslots * SLOTTED_PAGE_OVERHEAD_PER_RECORD < PAGE_SIZE); assert(freespace >= 0); diff --git a/src/stasis/transactional2.c b/src/stasis/transactional2.c index 8c5897f..3843fd1 100644 --- a/src/stasis/transactional2.c +++ b/src/stasis/transactional2.c @@ -58,7 +58,10 @@ void setupOperationsTable() { operationsTable[OPERATION_INCREMENT] = getIncrement(); operationsTable[OPERATION_DECREMENT] = getDecrement(); operationsTable[OPERATION_ALLOC] = getAlloc(); - // operationsTable[OPERATION_PREPARE] = getPrepare(); + operationsTable[OPERATION_PREPARE] = getPrepare(); + operationsTable[OPERATION_SET_LSN_FREE] = getSetLsnFree(); + operationsTable[OPERATION_SET_LSN_FREE_INVERSE] + = getSetLsnFreeInverse(); /* operationsTable[OPERATION_LHINSERT] = getLHInsert(); operationsTable[OPERATION_LHREMOVE] = getLHRemove(); */ operationsTable[OPERATION_DEALLOC] = getDealloc(); diff --git a/stasis/common.h b/stasis/common.h index eef815a..2b73935 100644 --- a/stasis/common.h +++ b/stasis/common.h @@ -107,7 +107,8 @@ typedef long long pageid_t; #define PAGEID_T_MAX INT64_MAX typedef int32_t slotid_t; #define SLOTID_T_MAX INT32_MAX - +typedef uint16_t pageoff_t; +#define PAGEOFF_T_MAX UINT16_MAX; /*#define DEBUGGING */ /*#define PROFILE_LATCHES*/ diff --git a/stasis/constants.h b/stasis/constants.h index 598cbef..a7c00bc 100644 --- a/stasis/constants.h +++ b/stasis/constants.h @@ -115,20 +115,21 @@ terms specified in this license. #define OPERATION_DECREMENT 3 #define OPERATION_ALLOC 4 #define OPERATION_PREPARE 5 -//#define OPERATION_LHINSERT 6 -//#define OPERATION_LHREMOVE 7 +#define OPERATION_SET_LSN_FREE 6 +#define OPERATION_SET_LSN_FREE_INVERSE 7 #define OPERATION_DEALLOC 8 #define OPERATION_REALLOC 9 #define OPERATION_PAGE_SET_RANGE 10 #define OPERATION_PAGE_SET_RANGE_INVERSE 11 -/*#define OPERATION_UPDATE_FREESPACE 12 -#define OPERATION_UPDATE_FREESPACE_INVERSE 13 -#define OPERATION_UPDATE_FREELIST 14 -#define OPERATION_UPDATE_FREELIST_INVERSE 15 -#define OPERATION_FREE_PAGE 16 -#define OPERATION_ALLOC_FREED 17 -#define OPERATION_UNALLOC_FREED 18 */ +// 12 +// 13 +//#define OPERATION_UPDATE_FREELIST 14 +//#define OPERATION_UPDATE_FREELIST_INVERSE 15 +//#define OPERATION_FREE_PAGE 16 +//#define OPERATION_ALLOC_FREED 17 +//#define OPERATION_UNALLOC_FREED 18 + #define OPERATION_NOOP 19 #define OPERATION_ARRAY_LIST_ALLOC 21 @@ -230,6 +231,7 @@ extern const short SLOT_TYPE_LENGTHS[]; #define BOUNDARY_TAG_PAGE 7 #define BLOB_PAGE 8 #define LSM_ROOT_PAGE 9 +#define SLOTTED_LSN_FREE_PAGE 10 #define USER_DEFINED_PAGE(n) (100+n) // 0 <= n < 155 #define MAX_PAGE_TYPE 255 diff --git a/stasis/operations.h b/stasis/operations.h index 951300c..25dd29a 100644 --- a/stasis/operations.h +++ b/stasis/operations.h @@ -143,7 +143,7 @@ typedef struct { #include "operations/linearHashNTA.h" #include "operations/regions.h" #include "operations/lsmTree.h" - +#include "operations/lsnFreeSet.h" extern Operation operationsTable[]; /* [MAX_OPERATIONS]; memset somewhere */ /** diff --git a/stasis/operations/lsnFreeSet.h b/stasis/operations/lsnFreeSet.h new file mode 100644 index 0000000..1f65687 --- /dev/null +++ b/stasis/operations/lsnFreeSet.h @@ -0,0 +1,6 @@ +#ifndef __LSN_FREE_SET_H +#define __LSN_FREE_SET_H +Operation getSetLsnFree(); +Operation getSetLsnFreeInverse(); +int TsetLSNFree(int xid, recordid rid, const void *dat); +#endif //__LSN_FREE_SET_H diff --git a/stasis/page.h b/stasis/page.h index 8fbdd34..2029972 100644 --- a/stasis/page.h +++ b/stasis/page.h @@ -875,10 +875,13 @@ int stasis_page_impl_register(page_impl impl); returned by loadPage() */ void stasis_slotted_initialize_page(Page * p); +void stasis_slotted_lsn_free_initialize_page(Page * p); void stasis_fixed_initialize_page(Page * page, size_t size, int count); void stasis_indirect_initialize_page(Page * p, int height); int stasis_fixed_records_per_page(size_t size); void stasis_blob_initialize_page(Page * p); + +page_impl slottedLsnFreeImpl(); END_C_DECLS #endif diff --git a/test/stasis/check_operations.c b/test/stasis/check_operations.c index 4264aab..50d6857 100644 --- a/test/stasis/check_operations.c +++ b/test/stasis/check_operations.c @@ -550,6 +550,70 @@ START_TEST(operation_array_list) { } END_TEST +START_TEST(operation_lsn_free) { + Tinit(); + recordid rid[100]; + { + int xid = Tbegin(); + pageid_t pid = TpageAlloc(xid); + Page * p = loadPage(xid,pid); + writelock(p->rwlatch,0); + stasis_slotted_lsn_free_initialize_page(p); + // XXX hack! + byte * old = malloc(PAGE_SIZE); + memcpy(old, p->memAddr, PAGE_SIZE); + int fortyTwo = 42; + for(int i = 0; i < 100; i++) { + rid[i] = stasis_record_alloc_begin(xid, p, sizeof(int)); + stasis_record_alloc_done(xid, p, rid[i]); + stasis_record_write(xid, p, -1, rid[i], (const byte*)&fortyTwo); + } + byte * new = malloc(PAGE_SIZE); + memcpy(new, p->memAddr, PAGE_SIZE); + memcpy(p->memAddr, old, PAGE_SIZE); + unlock(p->rwlatch); + releasePage(p); + TpageSet(xid, pid, new); + free(old); + free(new); + Tcommit(xid); + } + { + int xid[2]; + xid[0] = Tbegin(); + xid[1] = Tbegin(); + for(int i = 0; i < 100; i++) { + int foo; + Tread(xid[i%2],rid[i], &foo); + assert(foo == 42); + TsetLSNFree(xid[i%2], rid[i], &i); + Tread(xid[i%2],rid[i], &foo); + assert(foo == i); + } + Tcommit(xid[0]); + Tabort(xid[1]); + } + Tdeinit(); + + Tinit(); + { + int xid = Tbegin(); + + for(int i = 0; i < 100; i++) { + int foo; + Tread(xid, rid[i], &foo); + if(i%2) { + assert(foo == 42); + } else { + assert(foo == i); + } + } + Tcommit(xid); + } + Tdeinit(); + +} END_TEST + /** Add suite declarations here */ @@ -569,6 +633,7 @@ Suite * check_suite(void) { } tcase_add_test(tc, operation_alloc_test); tcase_add_test(tc, operation_array_list); + tcase_add_test(tc, operation_lsn_free); /* --------------------------------------------- */ tcase_add_checked_fixture(tc, setup, teardown); suite_add_tcase(s, tc);