From 4998e5756642e652d4b487977165e0ee0f0f7c11 Mon Sep 17 00:00:00 2001 From: Sears Russell Date: Thu, 7 Jun 2007 21:53:09 +0000 Subject: [PATCH] Updated for the new page API --- examples/ex2.c | 2 +- lladd/constants.h | 11 +- lladd/operations/arrayList.h | 3 +- lladd/transactional.h | 1 - src/lladd/graph.c | 2 +- src/lladd/operations.c | 3 + src/lladd/operations/alloc.c | 222 +++++++-------- src/lladd/operations/arrayList.c | 68 +++-- src/lladd/operations/bTree.c | 2 - src/lladd/operations/instantSet.c | 1 - src/lladd/operations/linearHash.c | 34 +-- src/lladd/operations/linearHashNTA.c | 1 - src/lladd/operations/pageOperations.c | 8 +- src/lladd/operations/regions.c | 14 +- src/lladd/page.c | 311 +++++++++++---------- src/lladd/page.h | 346 +++++++++++++++++++++-- src/lladd/page/fixed.c | 229 +++++++++------ src/lladd/page/fixed.h | 15 +- src/lladd/page/indirect.c | 12 +- src/lladd/page/indirect.h | 8 +- src/lladd/page/slotted.c | 386 ++++++++++++-------------- src/lladd/page/slotted.h | 140 +--------- src/lladd/pageFile.c | 1 + src/lladd/pageHandle.c | 1 - src/lladd/transactional2.c | 18 +- test/lladd/check_bTree.c | 31 ++- test/lladd/check_bufferManager.c | 26 +- test/lladd/check_indirect.c | 1 + test/lladd/check_operations.c | 14 +- test/lladd/check_page.c | 153 ++++++---- test/lladd/check_pageOperations.c | 1 - utilities/swig/lladd.i | 2 - 32 files changed, 1121 insertions(+), 946 deletions(-) diff --git a/examples/ex2.c b/examples/ex2.c index 2128a48..e898395 100644 --- a/examples/ex2.c +++ b/examples/ex2.c @@ -10,7 +10,7 @@ int main (int argc, char ** argv) { int xid = Tbegin(); - if(TrecordType(xid, ROOT_RECORD) == UNINITIALIZED_RECORD) { + if(TrecordType(xid, ROOT_RECORD) == INVALID_SLOT) { // ThashAlloc() will work here as well. rootEntry = Talloc(xid, sizeof(int)); diff --git a/lladd/constants.h b/lladd/constants.h index 3f57184..b87055f 100644 --- a/lladd/constants.h +++ b/lladd/constants.h @@ -179,7 +179,8 @@ terms specified in this license. /** This constant is used as a placeholder to mark slot locations that contain blobs. @see slotted.c, indirect.c, blobManager.c */ #define BLOB_SLOT (-2) -#define SLOT_TYPE_END (-3) +#define NORMAL_SLOT (-3) +#define SLOT_TYPE_END (-4) /** Initialized statically in transactional2.c */ extern const short SLOT_TYPE_LENGTHS[]; @@ -225,13 +226,7 @@ extern const short SLOT_TYPE_LENGTHS[]; #define FIXED_PAGE 5 #define ARRAY_LIST_PAGE 6 #define BOUNDARY_TAG_PAGE 7 - -/* Record types */ - -#define UNINITIALIZED_RECORD 0 -#define BLOB_RECORD 1 -#define SLOTTED_RECORD 2 -#define FIXED_RECORD 3 +#define MAX_PAGE_TYPE 8 #define TALLOC_REGION_SIZE 100 // Pages diff --git a/lladd/operations/arrayList.h b/lladd/operations/arrayList.h index 82902fb..858bf85 100644 --- a/lladd/operations/arrayList.h +++ b/lladd/operations/arrayList.h @@ -66,8 +66,7 @@ Operation getUnInitPage(); /** Un-initializes a page. */ #define TunInitPage(xid, rid) Tupdate(xid, rid, NULL, OPERATION_UNINITIALIZE_PAGE) -recordid dereferenceArrayListRid(Page * p, int offset); -#define dereferenceArrayListRidUnlocked(x, y) dereferenceArrayListRid((x),(y)) +recordid dereferenceArrayListRid(int xid, Page * p, int offset); compensated_function int TarrayListExtend(int xid, recordid rid, int slots); compensated_function int TarrayListInstantExtend(int xid, recordid rid, int slots); compensated_function int TarrayListLength(int xid, recordid rid); diff --git a/lladd/transactional.h b/lladd/transactional.h index ef62218..45843c5 100644 --- a/lladd/transactional.h +++ b/lladd/transactional.h @@ -488,7 +488,6 @@ compensated_function void TupdateDeferred(int xid, recordid rid, */ compensated_function void Tread(int xid, recordid rid, void *dat); compensated_function void TreadStr(int xid, recordid rid, char *dat); -void TreadUnlocked(int xid, recordid rid, void *dat); /** * Commit an active transaction. Each transaction should be completed diff --git a/src/lladd/graph.c b/src/lladd/graph.c index 17d295a..7838b86 100644 --- a/src/lladd/graph.c +++ b/src/lladd/graph.c @@ -84,7 +84,7 @@ void multiTraverse(int xid, recordid arrayList, lladdFifo_t * local, lladdFifo_t recordid nextRid = arrayList; nextRid.slot = node[i]; Page * p = loadPage(xid, arrayList.page); // just pin it forever and ever - nextRid = dereferenceArrayListRid(p, nextRid.slot); + nextRid = dereferenceArrayListRid(xid, p, nextRid.slot); releasePage(p); int thisFifo = crc32((byte*)&(nextRid.page), sizeof(nextRid.page), (unsigned int)-1) % pool->fifoCount; diff --git a/src/lladd/operations.c b/src/lladd/operations.c index 81728ac..68f51aa 100644 --- a/src/lladd/operations.c +++ b/src/lladd/operations.c @@ -55,6 +55,9 @@ terms specified in this license. Operation operationsTable[MAX_OPERATIONS]; +/** + @todo operations.c should handle LSN's for non-logical operations. +*/ void doUpdate(const LogEntry * e, Page * p) { DEBUG("OPERATION update arg length %d, lsn = %ld\n", e->contents.update.argSize, e->LSN); diff --git a/src/lladd/operations/alloc.c b/src/lladd/operations/alloc.c index 77e41eb..50ea797 100644 --- a/src/lladd/operations/alloc.c +++ b/src/lladd/operations/alloc.c @@ -7,9 +7,6 @@ #include #include "../blobManager.h" #include "../page.h" -#include "../page/slotted.h" -#include "../page/fixed.h" - #include //try{ /** @@ -78,29 +75,47 @@ */ //}end -static int operate(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) { - slottedPostRalloc(xid, p, lsn, rid); +static int operate_helper(int xid, Page * p, recordid rid, const void * dat) { + + if(recordGetTypeNew(xid, p, rid) == INVALID_SLOT) { + recordPostAlloc(xid, p, rid); + } + + assert(recordGetLength(xid, p, rid) == physical_slot_length(rid.size)); + if(rid.size < 0) { + assert(recordGetTypeNew(xid,p,rid) == rid.size); + } return 0; } +static int operate(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) { + writelock(p->rwlatch, 0); + int ret = operate_helper(xid,p,rid,dat); + pageWriteLSN(xid,p,lsn); + unlock(p->rwlatch); + return ret; +} + static int deoperate(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) { - assert(rid.page == p->id); - slottedDeRalloc(xid, p, lsn, rid); + writelock(p->rwlatch,0); + recordFree(xid, p, rid); + pageWriteLSN(xid,p,lsn); + assert(recordGetTypeNew(xid, p, rid) == INVALID_SLOT); + unlock(p->rwlatch); return 0; } static int reoperate(int xid, Page *p, lsn_t lsn, recordid rid, const void * dat) { + writelock(p->rwlatch,0); + assert(recordGetTypeNew(xid, p, rid) == INVALID_SLOT); + int ret = operate_helper(xid, p, rid, dat); + byte * buf = recordWriteNew(xid,p,rid); + memcpy(buf, dat, recordGetLength(xid,p,rid)); + pageWriteLSN(xid,p,lsn); + unlock(p->rwlatch); - // if(rid.size >= BLOB_THRESHOLD_SIZE) { // && rid.size != BLOB_SLOT) { - // rid.size = BLOB_REC_SIZE; /* Don't reuse blob space yet... */ - // rid.size = BLOB_SLOT; //sizeof(blob_record_t); - // } - - slottedPostRalloc(xid, p, lsn, rid); - recordWrite(xid, p, lsn, rid, dat); - - return 0; + return ret; } static pthread_mutex_t talloc_mutex = PTHREAD_MUTEX_INITIALIZER; @@ -138,7 +153,6 @@ Operation getRealloc() { } static uint64_t lastFreepage; -static int initialFreespace = -1; static allocationPolicy * allocPolicy; void TallocInit() { @@ -147,27 +161,29 @@ void TallocInit() { // pthread_mutex_init(&talloc_mutex, NULL); } -static compensated_function recordid TallocFromPageInternal(int xid, Page * p, unsigned long size); - static void reserveNewRegion(int xid) { int firstPage = TregionAlloc(xid, TALLOC_REGION_SIZE, STORAGE_MANAGER_TALLOC); - + int initialFreespace = -1; + void* nta = TbeginNestedTopAction(xid, OPERATION_NOOP, 0,0); availablePage ** newPages = malloc(sizeof(availablePage**)*(TALLOC_REGION_SIZE+1)); - if(initialFreespace == -1) { - Page * p = loadPage(xid, firstPage); - initialFreespace = slottedFreespace(p); - releasePage(p); - } - for(int i = 0; i < TALLOC_REGION_SIZE; i++) { + + for(int i = 0; i < TALLOC_REGION_SIZE; i++) { availablePage * next = malloc(sizeof(availablePage) * TALLOC_REGION_SIZE); - + + TinitializeSlottedPage(xid, firstPage + i); + if(initialFreespace == -1) { + Page * p = loadPage(xid, firstPage); + readlock(p->rwlatch,0); + initialFreespace = pageFreespace(xid, p); + unlock(p->rwlatch); + releasePage(p); + } next->pageid = firstPage + i; next->freespace = initialFreespace; next->lockCount = 0; newPages[i] = next; - TinitializeSlottedPage(xid, firstPage + i); } newPages[TALLOC_REGION_SIZE]= 0; allocationPolicyAddPages(allocPolicy, newPages); @@ -200,36 +216,48 @@ compensated_function recordid Talloc(int xid, unsigned long size) { lastFreepage = ap->pageid; p = loadPage(xid, lastFreepage); + writelock(p->rwlatch, 0); + while(pageFreespace(xid, p) < physical_slot_length(type)) { + pageCompact(p); + int newFreespace = pageFreespace(xid, p); - while(slottedFreespace(p) < physical_slot_length(type)) { - writelock(p->rwlatch,0); - slottedCompact(p); - unlock(p->rwlatch); - int newFreespace = slottedFreespace(p); - if(newFreespace >= physical_slot_length(type)) { + if(newFreespace >= physical_slot_length(type)) { break; } + + unlock(p->rwlatch); allocationPolicyUpdateFreespaceLockedPage(allocPolicy, xid, ap, newFreespace); releasePage(p); - + ap = allocationPolicyFindPage(allocPolicy, xid, physical_slot_length(type)); - if(!ap) { + if(!ap) { reserveNewRegion(xid); ap = allocationPolicyFindPage(allocPolicy, xid, physical_slot_length(type)); } - - lastFreepage = ap->pageid; - - p = loadPage(xid, lastFreepage); + lastFreepage = ap->pageid; + + p = loadPage(xid, lastFreepage); + writelock(p->rwlatch, 0); } - - rid = TallocFromPageInternal(xid, p, size); - - int newFreespace = slottedFreespace(p); + + rid = recordPreAlloc(xid, p, type); + + assert(rid.size != INVALID_SLOT); + + recordPostAlloc(xid, p, rid); + int newFreespace = pageFreespace(xid, p); allocationPolicyUpdateFreespaceLockedPage(allocPolicy, xid, ap, newFreespace); + unlock(p->rwlatch); + + Tupdate(xid, rid, NULL, OPERATION_ALLOC); + + if(type == BLOB_SLOT) { + rid.size = size; + allocBlob(xid, rid); + } releasePage(p); } compensate_ret(NULLRID); @@ -249,64 +277,38 @@ void allocTransactionCommit(int xid) { } compensate; } -compensated_function recordid TallocFromPage(int xid, long page, unsigned long size) { +compensated_function recordid TallocFromPage(int xid, long page, unsigned long type) { + + unsigned long size = type; + if(size > BLOB_THRESHOLD_SIZE) { + type = BLOB_SLOT; + } + pthread_mutex_lock(&talloc_mutex); Page * p = loadPage(xid, page); - recordid ret = TallocFromPageInternal(xid, p, size); - if(ret.size != INVALID_SLOT) { + writelock(p->rwlatch,0); + recordid rid = recordPreAlloc(xid, p, type); + + if(rid.size != INVALID_SLOT) { + recordPostAlloc(xid,p,rid); allocationPolicyAllocedFromPage(allocPolicy, xid, page); + unlock(p->rwlatch); + + Tupdate(xid,rid,NULL,OPERATION_ALLOC); + + if(type == BLOB_SLOT) { + rid.size = size; + allocBlob(xid,rid); + } + + } else { + unlock(p->rwlatch); } + + releasePage(p); pthread_mutex_unlock(&talloc_mutex); - return ret; -} - -static compensated_function recordid TallocFromPageInternal(int xid, Page * p, unsigned long size) { - recordid rid; - - // Does TallocFromPage need to understand blobs? This function - // seems to be too complex; all it does it delegate the allocation - // request to the page type's implementation. (Does it really need - // to check for freespace?) - - short type; - if(size >= BLOB_THRESHOLD_SIZE) { - type = BLOB_SLOT; - } else { - type = size; - } - - unsigned long slotSize = INVALID_SLOT; - - slotSize = physical_slot_length(type); - - assert(slotSize < PAGE_SIZE && slotSize > 0); - - /* if(slottedFreespace(p) < slotSize) { - slottedCompact(p); - } */ - if(slottedFreespace(p) < slotSize) { - rid = NULLRID; - } else { - rid = slottedRawRalloc(p, type); - assert(rid.size == type); - rid.size = size; - Tupdate(xid, rid, NULL, OPERATION_ALLOC); - - if(type == BLOB_SLOT) { - allocBlob(xid, rid); - } - - rid.size = type; - - } - - if(rid.size == type && // otherwise TallocFromPage failed - type == BLOB_SLOT // only special case blobs (for now) - ) { - rid.size = size; - } return rid; } @@ -338,11 +340,11 @@ compensated_function void Tdealloc(int xid, recordid rid) { compensated_function int TrecordType(int xid, recordid rid) { Page * p; - try_ret(compensation_error()) { - p = loadPage(xid, rid.page); - } end_ret(compensation_error()); + p = loadPage(xid, rid.page); + readlock(p->rwlatch,0); int ret; - ret = recordType(xid, p, rid); + ret = recordGetTypeNew(xid, p, rid); + unlock(p->rwlatch); releasePage(p); return ret; } @@ -350,21 +352,9 @@ compensated_function int TrecordType(int xid, recordid rid) { compensated_function int TrecordSize(int xid, recordid rid) { int ret; Page * p; - try_ret(compensation_error()) { - p = loadPage(xid, rid.page); - } end_ret(compensation_error()); - ret = recordSize(xid, p, rid); - releasePage(p); - return ret; -} - -compensated_function int TrecordsInPage(int xid, int pageid) { - Page * p; - try_ret(compensation_error()) { - p = loadPage(xid, pageid); - } end_ret(compensation_error()); - readlock(p->rwlatch, 187); - int ret = *numslots_ptr(p); + p = loadPage(xid, rid.page); + readlock(p->rwlatch,0); + ret = recordGetLength(xid, p, rid); unlock(p->rwlatch); releasePage(p); return ret; @@ -380,17 +370,19 @@ void TinitializeFixedPage(int xid, int pageid, int slotLength) { } static int operate_initialize_page(int xid, Page *p, lsn_t lsn, recordid rid, const void * dat) { + writelock(p->rwlatch, 0); switch(rid.slot) { case SLOTTED_PAGE: slottedPageInitialize(p); break; case FIXED_PAGE: - fixedPageInitialize(p, rid.size, recordsPerPage(rid.size)); + fixedPageInitialize(p, rid.size, fixedRecordsPerPage(rid.size)); break; default: abort(); } pageWriteLSN(xid, p, lsn); + unlock(p->rwlatch); return 0; } diff --git a/src/lladd/operations/arrayList.c b/src/lladd/operations/arrayList.c index 1f0adfc..5a76408 100644 --- a/src/lladd/operations/arrayList.c +++ b/src/lladd/operations/arrayList.c @@ -2,7 +2,7 @@ #include #include -#include "../page/fixed.h" +#include "../page.h" #include #include #include @@ -34,9 +34,12 @@ typedef struct { } TarrayListParameters; -static TarrayListParameters pageToTLP(Page * p); +static TarrayListParameters pageToTLP(int xid, Page * p); static int getBlockContainingOffset(TarrayListParameters tlp, int offset, int * firstSlotInBlock); +#define MAX_OFFSET_POSITION 3 +#define FIRST_DATA_PAGE_OFFSET 4 + /*----------------------------------------------------------------------------*/ compensated_function recordid TarrayListAlloc(int xid, int count, int multiplier, int size) { @@ -78,30 +81,24 @@ static int operateAlloc(int xid, Page * p, lsn_t lsn, recordid rid, const void * /* Allocing this page -> implicit lock, but latch to conformt to fixedPage's interface. */ writelock(p->rwlatch, 0); - fixedPageInitialize(p, sizeof(int), recordsPerPage(sizeof(int))); + fixedPageInitialize(p, sizeof(int), fixedRecordsPerPage(sizeof(int))); -#define MAX_OFFSET_POSITION 3 -#define FIRST_DATA_PAGE_OFFSET 4 - recordid countRid, multiplierRid, slotSizeRid, maxOffset, firstDataPageRid; countRid.page = multiplierRid.page = slotSizeRid.page = maxOffset.page = firstDataPageRid.page = p->id; countRid.size = multiplierRid.size = slotSizeRid.size = maxOffset.size = firstDataPageRid.size = sizeof(int); - countRid.slot = 0; + countRid.slot = 0; multiplierRid.slot = 1; slotSizeRid.slot = 2; maxOffset.slot = 3; firstDataPageRid.slot = 4; - - int firstDataPage = firstPage + 1; - fixedWriteUnlocked(p, countRid, (byte*)&count); - fixedWriteUnlocked(p, multiplierRid, (byte*)&multiplier); - fixedWriteUnlocked(p, firstDataPageRid, (byte*)&firstDataPage); - fixedWriteUnlocked(p, slotSizeRid, (byte*)&size); - int minusOne = -1; - fixedWriteUnlocked(p, maxOffset, (byte*)&minusOne); - /* Write lsn... */ + int firstDataPage = firstPage + 1; + (*(int*)recordWriteNew(xid, p, countRid))= count; + (*(int*)recordWriteNew(xid, p, multiplierRid))= multiplier; + (*(int*)recordWriteNew(xid, p, firstDataPageRid))= firstDataPage; + (*(int*)recordWriteNew(xid, p, slotSizeRid))= size; + (*(int*)recordWriteNew(xid, p, maxOffset))= -1; *page_type_ptr(p) = ARRAY_LIST_PAGE; @@ -139,7 +136,9 @@ static compensated_function int TarrayListExtendInternal(int xid, recordid rid, try_ret(compensation_error()) { p = loadPage(xid, rid.page); } end_ret(compensation_error()); - TarrayListParameters tlp = pageToTLP(p); + readlock(p->rwlatch, 0); + TarrayListParameters tlp = pageToTLP(xid, p); + unlock(p->rwlatch);; releasePage(p); p = NULL; @@ -191,18 +190,23 @@ compensated_function int TarrayListExtend(int xid, recordid rid, int slots) { } compensated_function int TarrayListLength(int xid, recordid rid) { Page * p = loadPage(xid, rid.page); - TarrayListParameters tlp = pageToTLP(p); + readlock(p->rwlatch, 0); + TarrayListParameters tlp = pageToTLP(xid, p); + unlock(p->rwlatch); releasePage(p); return tlp.maxOffset+1; } /*----------------------------------------------------------------------------*/ -recordid dereferenceArrayListRid(Page * p, int offset) { +/** + @todo XXX latching for dereference arraylist rid (and other dereference functions...) +*/ +recordid dereferenceArrayListRid(int xid, Page * p, int offset) { readlock(p->rwlatch,0); - TarrayListParameters tlp = pageToTLP(p); + TarrayListParameters tlp = pageToTLP(xid, p); - int rec_per_page = recordsPerPage((size_t)tlp.size); + int rec_per_page = fixedRecordsPerPage((size_t)tlp.size); int lastHigh = 0; int pageRidSlot = 0; /* The slot on the root arrayList page that contains the first page of the block of interest */ @@ -216,11 +220,10 @@ recordid dereferenceArrayListRid(Page * p, int offset) { int thePage; - assert(pageRidSlot + FIRST_DATA_PAGE_OFFSET < fixedPageCount(p)); - thePage = *(int*)fixed_record_ptr(p, pageRidSlot + FIRST_DATA_PAGE_OFFSET); /*reading immutable record; don't need latch.*/ + recordid rid = { p->id, pageRidSlot + FIRST_DATA_PAGE_OFFSET, sizeof(int) }; + thePage = *(int*)recordReadNew(xid,p,rid); unlock(p->rwlatch); - recordid rid; rid.page = thePage + blockPage; rid.slot = blockSlot; rid.size = tlp.size; @@ -229,7 +232,7 @@ recordid dereferenceArrayListRid(Page * p, int offset) { } static int getBlockContainingOffset(TarrayListParameters tlp, int offset, int * firstSlotInBlock) { - int rec_per_page = recordsPerPage((size_t)tlp.size); + int rec_per_page = fixedRecordsPerPage((size_t)tlp.size); long thisHigh = rec_per_page * tlp.initialSize; int lastHigh = 0; int pageRidSlot = 0; @@ -247,14 +250,19 @@ static int getBlockContainingOffset(TarrayListParameters tlp, int offset, int * return pageRidSlot; } -static TarrayListParameters pageToTLP(Page * p) { +static TarrayListParameters pageToTLP(int xid, Page * p) { TarrayListParameters tlp; tlp.firstPage = p->id; - tlp.initialSize = *(int*)fixed_record_ptr(p, 0); - tlp.multiplier = *(int*)fixed_record_ptr(p, 1); - tlp.size = *(int*)fixed_record_ptr(p, 2); - tlp.maxOffset = *(int*)fixed_record_ptr(p, 3); + /* tlp.maxOffset = *(int*)fixed_record_ptr(p, 3); */ + recordid rid = { p->id, 0, sizeof(int) }; + tlp.initialSize = *(int*)recordReadNew(xid, p, rid); + rid.slot = 1; + tlp.multiplier = *(int*)recordReadNew(xid, p, rid); + rid.slot = 2; + tlp.size = *(int*)recordReadNew(xid, p, rid); + rid.slot = 3; + tlp.maxOffset = *(int*)recordReadNew(xid, p, rid); return tlp; } diff --git a/src/lladd/operations/bTree.c b/src/lladd/operations/bTree.c index f1d8699..c1021c9 100644 --- a/src/lladd/operations/bTree.c +++ b/src/lladd/operations/bTree.c @@ -3,8 +3,6 @@ #include "../latches.h" #include #include -#include "../page.h" -#include "../page/slotted.h" #include #include #include diff --git a/src/lladd/operations/instantSet.c b/src/lladd/operations/instantSet.c index eabcdc3..51465b8 100644 --- a/src/lladd/operations/instantSet.c +++ b/src/lladd/operations/instantSet.c @@ -47,7 +47,6 @@ terms specified in this license. #include #include "../page.h" -#include "../page/fixed.h" static int operate(int xid, Page *p, lsn_t lsn, recordid rid, const void *dat) { recordWrite(xid, p, lsn, rid, dat); diff --git a/src/lladd/operations/linearHash.c b/src/lladd/operations/linearHash.c index 5cb4300..062cbdb 100644 --- a/src/lladd/operations/linearHash.c +++ b/src/lladd/operations/linearHash.c @@ -69,8 +69,6 @@ static int operateUndoDelete(int xid, Page * p, lsn_t lsn, recordid rid, const v rid.slot = 0; - /* TreadUnlocked(xid, dereferenceArrayListRid(p, rid.slot), &headerRidA); */ - /* TreadUnlocked(xid, rid, &headerRidA); */ assert(keySize == sizeof(int)); assert(valSize == sizeof(recordid)); @@ -416,16 +414,12 @@ void instant_rehash(int xid, recordid hashRid, int next_split, int i, int keySiz recordid ba = hashRid; ba.slot = next_split; recordid bb = hashRid; bb.slot = next_split + twoToThe(i-1); - // recordid ba_contents; TreadUnlocked(xid, ba, &ba_contents); - // recordid bb_contents = NULLRID; - /* Tset(xid, bb, &bb_contents); */ //TreadUnlocked(xid, bb, &bb_contents); - hashEntry * D_contents = calloc(1,sizeof(hashEntry) + keySize + valSize); hashEntry * A_contents = calloc(1,sizeof(hashEntry) + keySize + valSize); hashEntry * B_contents = calloc(1,sizeof(hashEntry) + keySize + valSize); - TreadUnlocked(xid, ba, A_contents); - TreadUnlocked(xid, bb, D_contents); + Tread(xid, ba, A_contents); + Tread(xid, bb, D_contents); recordid A = ba; //ba_contents; recordid D = bb; //bb_contents; recordid B = A_contents->next; @@ -444,7 +438,7 @@ void instant_rehash(int xid, recordid hashRid, int next_split, int i, int keySiz firstD = 0; if(D_contents->next.size != -1) { D = D_contents->next; - TreadUnlocked(xid, D, D_contents); + Tread(xid, D, D_contents); } else { abort(); // Got here? We're starting a new bucket, but found that it is already -1 terminated... } @@ -500,7 +494,7 @@ void instant_rehash(int xid, recordid hashRid, int next_split, int i, int keySiz return; } assert(oldANext.size == sizeof(hashEntry) + keySize + valSize); - TreadUnlocked(xid, oldANext, A_contents); + Tread(xid, oldANext, A_contents); // assert(memcmp(&A_contents->next, &A, sizeof(recordid))); TinstantSet(xid, A, A_contents); Tdealloc(xid, oldANext); @@ -514,7 +508,7 @@ void instant_rehash(int xid, recordid hashRid, int next_split, int i, int keySiz while(B.size != -1) { assert(B.size == sizeof(hashEntry) + keySize + valSize); - TreadUnlocked(xid, B, B_contents); + Tread(xid, B, B_contents); C = B_contents->next; old_hash = hash(B_contents+1, keySize, i-1, UINT_MAX) + 2; @@ -533,7 +527,7 @@ void instant_rehash(int xid, recordid hashRid, int next_split, int i, int keySiz // D is the tail of our list. assert(D.size == sizeof(hashEntry) + keySize + valSize); assert(B.size == -1 || B.size == sizeof(hashEntry) + keySize + valSize); - TreadUnlocked(xid, D, D_contents); + Tread(xid, D, D_contents); D_contents->next = B; assert(B.size != 0); // assert(memcmp(&D, &D_contents->next, sizeof(recordid))); @@ -542,7 +536,7 @@ void instant_rehash(int xid, recordid hashRid, int next_split, int i, int keySiz // A is somewhere in the first list. assert(A.size == sizeof(hashEntry) + keySize + valSize); assert(C.size == -1 || C.size == sizeof(hashEntry) + keySize + valSize); - TreadUnlocked(xid, A, A_contents); + Tread(xid, A, A_contents); A_contents->next = C; assert(C.size != 0); @@ -556,7 +550,7 @@ void instant_rehash(int xid, recordid hashRid, int next_split, int i, int keySiz assert(B.size == sizeof(hashEntry) + keySize + valSize); - TreadUnlocked(xid, B, B_contents); + Tread(xid, B, B_contents); B_contents->next = NULLRID; TinstantSet(xid, B, B_contents); @@ -584,7 +578,7 @@ void instant_insertIntoBucket(int xid, recordid hashRid, int bucket_number, hash Tdealloc(xid, deleteMe); hashRid.slot = bucket_number; assert(hashRid.size == sizeof(hashEntry) + valSize + keySize); - TreadUnlocked(xid, hashRid, bucket_contents); + Tread(xid, hashRid, bucket_contents); hashRid.slot = 0; } } @@ -594,7 +588,7 @@ void instant_insertIntoBucket(int xid, recordid hashRid, int bucket_number, hash hashRid.slot = bucket_number; - TreadUnlocked(xid, hashRid, bucket_contents); + Tread(xid, hashRid, bucket_contents); assert(hashRid.size == sizeof(hashEntry) + keySize + valSize); @@ -669,7 +663,7 @@ int instant_deleteFromBucket(int xid, recordid hash, int bucket_number, hashEntr /* There is something else in this bucket, so copy it into the bucket */ assert(bucket_contents->next.size == sizeof(hashEntry) + keySize + valSize); recordid oldNext = bucket_contents->next; - TreadUnlocked(xid, bucket_contents->next, bucket_contents); + Tread(xid, bucket_contents->next, bucket_contents); TinstantSet(xid, this, bucket_contents); *deletedEntry = oldNext; /* @todo delete from bucket really should do its own deallocation.. */ } @@ -695,7 +689,7 @@ int instant_deleteFromBucket(int xid, recordid hash, int bucket_number, hashEntr B = tmp; assert(A->next.size == sizeof(hashEntry) + keySize + valSize); Baddr = A->next; - TreadUnlocked(xid, Baddr, B); + Tread(xid, Baddr, B); // At this point, A points at B, and we're considering B for deletion. if(!memcmp(B+1, key, keySize)) { @@ -745,7 +739,7 @@ void ThashInstantInsert(int xid, recordid hashRid, hashEntry * bucket_contents = malloc(sizeof(hashEntry) + keySize + valSize); hashRid.slot = bucket; - TreadUnlocked(xid, hashRid, bucket_contents); + Tread(xid, hashRid, bucket_contents); hashRid.slot = 0; instant_insertIntoBucket(xid, hashRid, bucket, bucket_contents, e, keySize, valSize, 0); @@ -776,7 +770,7 @@ void ThashInstantDelete(int xid, recordid hashRid, recordid deleteMe; hashRid.slot = bucket_number; hashEntry * bucket_contents = malloc(sizeof(hashEntry) + keySize + valSize); - TreadUnlocked(xid, hashRid, bucket_contents); + Tread(xid, hashRid, bucket_contents); hashRid.slot = 0; if(instant_deleteFromBucket(xid, hashRid, bucket_number, bucket_contents, key, keySize, valSize, &deleteMe)) { diff --git a/src/lladd/operations/linearHashNTA.c b/src/lladd/operations/linearHashNTA.c index 3a29281..4173f04 100644 --- a/src/lladd/operations/linearHashNTA.c +++ b/src/lladd/operations/linearHashNTA.c @@ -4,7 +4,6 @@ #include #include #include "../page.h" -#include "../page/slotted.h" #include #include #include diff --git a/src/lladd/operations/pageOperations.c b/src/lladd/operations/pageOperations.c index 95bf34d..36a73d7 100644 --- a/src/lladd/operations/pageOperations.c +++ b/src/lladd/operations/pageOperations.c @@ -1,10 +1,10 @@ #define _XOPEN_SOURCE 600 #include +#include "config.h" #include "../page.h" #include #include -#include "../page/fixed.h" #include static pthread_mutex_t pageAllocMutex; @@ -69,8 +69,10 @@ compensated_function int TpageAlloc(int xid /*, int type */) { } int __fixedPageAlloc(int xid, Page * p, lsn_t lsn, recordid r, const void * d) { - fixedPageInitialize(p, r.size, recordsPerPage(r.size)); + writelock(p->rwlatch,0); + fixedPageInitialize(p, r.size, fixedRecordsPerPage(r.size)); pageWriteLSN(xid, p, lsn); + unlock(p->rwlatch); return 0; } @@ -82,7 +84,7 @@ int __fixedPageAlloc(int xid, Page * p, lsn_t lsn, recordid r, const void * d) { */ recordid TfixedPageAlloc(int xid, int size) { int page = TpageAlloc(xid); - recordid rid = {page, recordsPerPage(size), size}; + recordid rid = {page, fixedRecordsPerPage(size), size}; Tupdate(xid, rid, 0, OPERATION_FIXED_PAGE_ALLOC); return rid; } diff --git a/src/lladd/operations/regions.c b/src/lladd/operations/regions.c index ea4692e..db37fed 100644 --- a/src/lladd/operations/regions.c +++ b/src/lladd/operations/regions.c @@ -1,6 +1,6 @@ +#include "config.h" #include "../page.h" #include -#include "../page/slotted.h" #include typedef struct regionAllocLogArg{ @@ -26,10 +26,14 @@ static void TdeallocBoundaryTag(int xid, unsigned int page); could matter in the future. */ static int operate_alloc_boundary_tag(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) { + writelock(p->rwlatch, 0); slottedPageInitialize(p); *page_type_ptr(p) = BOUNDARY_TAG_PAGE; - slottedPostRalloc(xid, p, lsn, rid); - slottedWrite(p, rid, dat); + recordPostAlloc(xid, p, rid); + pageWriteLSN(xid, p, lsn); + byte * buf = recordWriteNew(xid, p, rid); + memcpy(buf, dat, recordGetLength(xid, p, rid)); + unlock(p->rwlatch); return 0; } @@ -123,7 +127,6 @@ void regionsInit() { Page * p = loadPage(-1, 0); int pageType = *page_type_ptr(p); - holding_mutex = pthread_self(); if(pageType != BOUNDARY_TAG_PAGE) { boundary_tag t; @@ -178,11 +181,9 @@ void fsckRegions(int xid) { TreadBoundaryTag(xid, thisPage, &orphan); assert(orphan.status == REGION_CONDEMNED); Page * p = loadPage(xid, thisPage); - fsckSlottedPage(p); releasePage(p); } else if (pageType == SLOTTED_PAGE) { Page * p = loadPage(xid, thisPage); - fsckSlottedPage(p); releasePage(p); } } @@ -491,4 +492,3 @@ void TregionFindNthActive(int xid, unsigned int regionNumber, unsigned int * fir holding_mutex = 0; pthread_mutex_unlock(®ion_mutex); } - diff --git a/src/lladd/page.c b/src/lladd/page.c index a02e9a0..c29f4ce 100644 --- a/src/lladd/page.c +++ b/src/lladd/page.c @@ -91,27 +91,31 @@ terms specified in this license. #include #include #include + +static page_impl * page_impls; + +/** + XXX latching for pageWriteLSN... +*/ void pageWriteLSN(int xid, Page * page, lsn_t lsn) { - // if(!page->dirty) { // This assert belongs here, but would cause some hacked up unit tests to fail... + // These asserts belong here, but would cause some hacked up unit tests to fail... + // if(!page->dirty) { // assert(page->LSN < lsn); // } + // assertlocked(page->rwlatch); + if(page->LSN < lsn) { page->LSN = lsn; *lsn_ptr(page) = page->LSN; - } + } dirtyPages_add(page); return; } - +/** + XXX latching for pageReadLSN... +*/ lsn_t pageReadLSN(const Page * page) { - lsn_t ret; - - readlock(page->rwlatch, 259); - ret = *lsn_ptr(page); - assert(ret == page->LSN); - readunlock(page->rwlatch); - - return ret; + return page->LSN; } /* ----- (de)initialization functions. Do not need to support multithreading. -----*/ @@ -122,179 +126,182 @@ lsn_t pageReadLSN(const Page * page) { */ void pageInit() { slottedPageInit(); + fixedPageInit(); + + page_impls = malloc(MAX_PAGE_TYPE * sizeof(page_impl)); + for(int i = 0; i < MAX_PAGE_TYPE; i++) { + page_impl p = { 0 }; + page_impls[i] = p; + } + registerPageType(slottedImpl()); + registerPageType(fixedImpl()); + registerPageType(boundaryTagImpl()); + registerPageType(arrayListImpl()); } void pageDeinit() { - slottedPageDeInit(); + fixedPageDeinit(); + slottedPageDeinit(); } +int registerPageType(page_impl p) { + assert(page_impls[p.page_type].page_type == 0); + page_impls[p.page_type] = p; + return 0; +} /** - @todo this updates the LSN of the page that points to blob, even if the page is otherwise untouched!! + @todo this updates the LSN of the page that points to blob, even if the page is otherwise untouched!! This is slow and breaks recovery. */ -void recordWrite(int xid, Page * p, lsn_t lsn, recordid rid, const void *dat) { +void recordWrite(int xid, Page * p, lsn_t lsn, recordid rid, const byte *dat) { - assert( (p->id == rid.page) && (p->memAddr != NULL) ); - - writelock(p->rwlatch, 225); - pageWriteLSN(xid, p, lsn); + assert( (p->id == rid.page) && (p->memAddr != NULL) ); + readlock(p->rwlatch, 225); + // page_impl p_impl; if(rid.size > BLOB_THRESHOLD_SIZE) { + // XXX Kludge This is done so that recovery sees the LSN update. Otherwise, it gets upset... Of course, doing it will break blob recovery unless we set blob writes to do "logical" redo... + pageWriteLSN(xid, p, lsn); unlock(p->rwlatch); writeBlob(xid, p, lsn, rid, dat); } else { - if(*page_type_ptr(p) == SLOTTED_PAGE || *page_type_ptr(p) == BOUNDARY_TAG_PAGE) { - slottedWriteUnlocked(p, rid, dat); - } else if(*page_type_ptr(p) == FIXED_PAGE || *page_type_ptr(p)==ARRAY_LIST_PAGE || !*page_type_ptr(p) ) { - fixedWriteUnlocked(p, rid, dat); - } else { - abort(); + /* p_impl = page_impls[*page_type_ptr(p)]; + if(!*page_type_ptr(p)) { + // XXX kludge!!!! + p_impl = page_impls[FIXED_PAGE]; } + assert(physical_slot_length(rid.size) == p_impl.recordGetLength(xid, p, rid)); + byte * buf = p_impl.recordWrite(xid, p, rid); + pageWriteLSN(xid, p, lsn); + memcpy(buf, dat, physical_slot_length(p_impl.recordGetLength(xid, p, rid))); */ + + byte * buf = recordWriteNew(xid, p, rid); + pageWriteLSN(xid, p, lsn); + memcpy(buf, dat, recordGetLength(xid, p, rid)); unlock(p->rwlatch); } - assert( (p->id == rid.page) && (p->memAddr != NULL) ); - + assert( (p->id == rid.page) && (p->memAddr != NULL) ); } -static int recordReadWarnedAboutPageTypeKludge = 0; -int recordRead(int xid, Page * p, recordid rid, void *buf) { - assert(rid.page == p->id); - - int page_type = *page_type_ptr(p); - - if(rid.size > BLOB_THRESHOLD_SIZE) { - readBlob(xid, p, rid, buf); - } else if(page_type == SLOTTED_PAGE || page_type == BOUNDARY_TAG_PAGE) { - slottedRead(p, rid, buf); - /* @TODO !page_type can never be required if this code is correct... arraylist is broken!! XXX */ - } else if(page_type == FIXED_PAGE || page_type==ARRAY_LIST_PAGE || !page_type) { - if(!page_type && ! recordReadWarnedAboutPageTypeKludge) { - recordReadWarnedAboutPageTypeKludge = 1; - printf("page.c: MAKING USE OF TERRIBLE KLUDGE AND IGNORING ASSERT FAILURE! FIX ARRAY LIST ASAP!!!\n"); - } - fixedRead(p, rid, buf); - } else { - abort(); - } - assert(rid.page == p->id); - - return 0; -} - - -int recordReadUnlocked(int xid, Page * p, recordid rid, void *buf) { - assert(rid.page == p->id); - - int page_type = *page_type_ptr(p); - - if(rid.size > BLOB_THRESHOLD_SIZE) { - abort(); /* Unsupported for now. */ - readBlob(xid, p, rid, buf); - } else if(page_type == SLOTTED_PAGE || page_type == BOUNDARY_TAG_PAGE) { - slottedReadUnlocked(p, rid, buf); - /* FIXED_PAGES can function correctly even if they have not been - initialized. */ - } else if(page_type == FIXED_PAGE || !page_type) { - fixedReadUnlocked(p, rid, buf); - } else { - abort(); - } - assert(rid.page == p->id); - - return 0; -} - -int recordTypeUnlocked(int xid, Page * p, recordid rid) { +int recordRead(int xid, Page * p, recordid rid, byte *buf) { assert(rid.page == p->id); - - int page_type = *page_type_ptr(p); - if(page_type == UNINITIALIZED_PAGE) { - return UNINITIALIZED_RECORD; - - } else if(page_type == SLOTTED_PAGE || page_type == BOUNDARY_TAG_PAGE) { - if(*numslots_ptr(p) <= rid.slot || *slot_ptr(p, rid.slot) == INVALID_SLOT) { - return UNINITIALIZED_PAGE; - } else if (*slot_length_ptr(p, rid.slot) == BLOB_SLOT) { - return BLOB_RECORD; - } else { - return SLOTTED_RECORD; - } - } else if(page_type == FIXED_PAGE || page_type == ARRAY_LIST_PAGE) { - return (fixedPageCount(p) > rid.slot) ? - FIXED_RECORD : UNINITIALIZED_RECORD; - } else { - return UNINITIALIZED_RECORD; - } -} - -int recordType(int xid, Page * p, recordid rid) { - readlock(p->rwlatch, 343); - int ret = recordTypeUnlocked(xid, p, rid); - unlock(p->rwlatch); - return ret; -} -/** @todo implement recordSize for blobs and fixed length pages. */ -int recordSize(int xid, Page * p, recordid rid) { - readlock(p->rwlatch, 353); - int ret = recordTypeUnlocked(xid, p, rid); - if(ret == UNINITIALIZED_RECORD) { - ret = -1; - } else if(ret == SLOTTED_RECORD) { - ret = *slot_length_ptr(p, rid.slot); - } else { - abort(); // unimplemented for fixed length pages and blobs. - } - unlock(p->rwlatch); - return ret; -} - -void recordWriteUnlocked(int xid, Page * p, lsn_t lsn, recordid rid, const void *dat) { - - assert( (p->id == rid.page) && (p->memAddr != NULL) ); - - // Need a writelock so that we can update the lsn. - - writelock(p->rwlatch, 225); - pageWriteLSN(xid, p, lsn); - unlock(p->rwlatch); - if(rid.size > BLOB_THRESHOLD_SIZE) { - abort(); - writeBlob(xid, p, lsn, rid, dat); - } else if(*page_type_ptr(p) == SLOTTED_PAGE || *page_type_ptr(p) == BOUNDARY_TAG_PAGE) { - slottedWriteUnlocked(p, rid, dat); - } else if(*page_type_ptr(p) == FIXED_PAGE || *page_type_ptr(p)==ARRAY_LIST_PAGE || !*page_type_ptr(p) ) { - fixedWriteUnlocked(p, rid, dat); + readBlob(xid, p, rid, buf); + assert(rid.page == p->id); + return 0; } else { - abort(); - } - assert( (p->id == rid.page) && (p->memAddr != NULL) ); - + readlock(p->rwlatch, 0); + /* page_impl p_impl; + int page_type = *page_type_ptr(p); + + if(!page_type) { + if (! recordReadWarnedAboutPageTypeKludge) { + recordReadWarnedAboutPageTypeKludge = 1; + printf("page.c: MAKING USE OF TERRIBLE KLUDGE AND IGNORING ASSERT FAILURE! FIX ARRAY LIST ASAP!!!\n"); + } + p_impl = page_impls[FIXED_PAGE]; + } else { + p_impl = page_impls[page_type]; + } + assert(physical_slot_length(rid.size) == p_impl.recordGetLength(xid, p, rid)); + const byte * dat = p_impl.recordRead(xid, p, rid); + memcpy(buf, dat, physical_slot_length(p_impl.recordGetLength(xid, p, rid))); */ + + const byte * dat = recordReadNew(xid,p,rid); + memcpy(buf, dat, recordGetLength(xid,p,rid)); + unlock(p->rwlatch); + return 0; + } } -recordid recordDereferenceUnlocked(int xid, Page * p, recordid rid) { - int page_type = *page_type_ptr(p); - if(page_type == SLOTTED_PAGE || page_type == FIXED_PAGE || (!page_type) || page_type == BOUNDARY_TAG_PAGE ) { - - } else if(page_type == INDIRECT_PAGE) { - rid = dereferenceRIDUnlocked(xid, rid); - } else if(page_type == ARRAY_LIST_PAGE) { - rid = dereferenceArrayListRidUnlocked(p, rid.slot); - } else { - abort(); - } - return rid; -} -recordid recordDereference(int xid, Page * p, recordid rid) { +recordid recordDereference(int xid, Page * p, recordid rid) { int page_type = *page_type_ptr(p); if(page_type == SLOTTED_PAGE || page_type == FIXED_PAGE || (!page_type) || page_type == BOUNDARY_TAG_PAGE ) { } else if(page_type == INDIRECT_PAGE) { rid = dereferenceRID(xid, rid); } else if(page_type == ARRAY_LIST_PAGE) { - rid = dereferenceArrayListRid(p, rid.slot); + rid = dereferenceArrayListRid(xid, p, rid.slot); } else { abort(); } return rid; } + +/// -------------- Dispatch functions + +static int recordWarnedAboutPageTypeKludge = 0; +const byte * recordReadNew(int xid, Page * p, recordid rid) { + int page_type = *page_type_ptr(p); + if(!page_type) { + page_type = FIXED_PAGE; + if(!recordWarnedAboutPageTypeKludge) { + recordWarnedAboutPageTypeKludge = 1; + printf("page.c: MAKING USE OF TERRIBLE KLUDGE AND IGNORING ASSERT FAILURE! FIX ARRAY LIST ASAP!!!\n"); + } + } + return page_impls[page_type].recordRead(xid, p, rid); +} +byte * recordWriteNew(int xid, Page * p, recordid rid) { + int page_type = *page_type_ptr(p); + if(!page_type) { + page_type = FIXED_PAGE; + if(!recordWarnedAboutPageTypeKludge) { + recordWarnedAboutPageTypeKludge = 1; + printf("page.c: MAKING USE OF TERRIBLE KLUDGE AND IGNORING ASSERT FAILURE! FIX ARRAY LIST ASAP!!!\n"); + } + } + return page_impls[page_type].recordWrite(xid, p, rid); +} +int recordGetTypeNew(int xid, Page *p, recordid rid) { + return page_impls[*page_type_ptr(p)] + .recordGetType(xid, p, rid); +} +void recordSetTypeNew(int xid, Page *p, recordid rid, int type) { + page_impls[*page_type_ptr(p)] + .recordSetType(xid, p, rid, type); +} +int recordGetLength(int xid, Page *p, recordid rid) { + return page_impls[*page_type_ptr(p)] + .recordGetLength(xid,p,rid); +} +recordid recordFirst(int xid, Page * p){ + return page_impls[*page_type_ptr(p)] + .recordFirst(xid,p); +} +recordid recordNext(int xid, Page * p, recordid prev){ + return page_impls[*page_type_ptr(p)] + .recordNext(xid,p,prev); +} +recordid recordPreAlloc(int xid, Page * p, int size){ + return page_impls[*page_type_ptr(p)] + .recordPreAlloc(xid,p,size); +} +void recordPostAlloc(int xid, Page * p, recordid rid){ + page_impls[*page_type_ptr(p)] + .recordPostAlloc(xid, p, rid); +} +void recordFree(int xid, Page * p, recordid rid){ + page_impls[*page_type_ptr(p)] + .recordFree(xid, p, rid); +} +int pageIsBlockSupported(int xid, Page * p){ + return page_impls[*page_type_ptr(p)] + .isBlockSupported(xid, p); +} +int pageFreespace(int xid, Page * p){ + return page_impls[*page_type_ptr(p)] + .pageFreespace(xid, p); +} +void pageCompact(Page * p){ + page_impls[*page_type_ptr(p)] + .pageCompact(p); +} +void pageLoaded(Page * p){ + page_impls[*page_type_ptr(p)] + .pageLoaded(p); +} +void pageFlushed(Page * p){ + page_impls[*page_type_ptr(p)] + .pageFlushed(p); +} diff --git a/src/lladd/page.h b/src/lladd/page.h index ad35802..8584984 100644 --- a/src/lladd/page.h +++ b/src/lladd/page.h @@ -78,11 +78,8 @@ terms specified in this license. #ifndef __PAGE_H__ #define __PAGE_H__ -#include #include #include "latches.h" -/** @todo page.h includes things that it shouldn't, and page.h should eventually be an installed header. */ - #include BEGIN_C_DECLS @@ -96,6 +93,8 @@ BEGIN_C_DECLS linked lists, @todo The Page struct should be tuned for better memory utilization. + + @todo Remove next and prev from Page_s */ struct Page_s { pageid_t id; @@ -237,47 +236,334 @@ lsn_t pageReadLSN(const Page * page); * * @return 0 on success, lladd error code on failure * + * @deprecated Unnecessary memcpy()'s */ -void recordWrite(int xid, Page * page, lsn_t lsn, recordid rid, const void *dat); -/** - * The same as writeRecord, but does not obtain a latch on the page. - */ -void recordWriteUnlocked(int xid, Page * page, lsn_t lsn, recordid rid, const void *dat); +void recordWrite(int xid, Page * page, lsn_t lsn, recordid rid, const byte *dat); /** * @param xid transaction ID * @param page a pointer to the pinned page that contains the record. * @param rid the record to be written * @param dat buffer for data * @return 0 on success, lladd error code on failure + * @deprecated Unnecessary memcpy()'s */ -int recordRead(int xid, Page * page, recordid rid, void *dat); -/** - * The same as readRecord, but does not obtain a latch. - */ -int recordReadUnlocked(int xid, Page * p, recordid rid, void *buf); +int recordRead(int xid, Page * page, recordid rid, byte *dat); -/** - Allocate memory to hold a new page. +const byte * recordReadNew(int xid, Page * p, recordid rid); +byte * recordWriteNew(int xid, Page * p, recordid rid); +int recordGetTypeNew(int xid, Page * p, recordid rid); +void recordSetTypeNew(int xid, Page * p, recordid rid, int type); +int recordGetLength(int xid, Page *p, recordid rid); +recordid recordFirst(int xid, Page * p); +recordid recordNext(int xid, Page * p, recordid prev); +recordid recordPreAlloc(int xid, Page * p, int size); +void recordPostAlloc(int xid, Page * p, recordid rid); +void recordFree(int xid, Page * p, recordid rid); +int pageIsBlockSupported(int xid, Page * p); +int pageFreespace(int xid, Page * p); +void pageCompact(Page * p); +void pageLoaded(Page * p); +void pageFlushed(Page * p); - @return A pointer to the new page. This memory is part of a pool, - and should never be freed manually. Instead, it should be passed - into pageFree() so that it can be reused. -*/ - -int recordType(int xid, Page * p, recordid rid); - -int recordSize(int xid, Page * p, recordid rid); -/** - same as getRecordType(), but does not obtain a lock. -*/ -int recordTypeUnlocked(int xid, Page * p, recordid rid); /** @return -1 if the field does not exist, the size of the field otherwise (the rid parameter's size field will be ignored). */ -int recordLength(int xid, Page * p, recordid rid); - +int recordSize(int xid, Page * p, recordid rid); +/** + @todo recordDereference doesn't dispatch to pages. Should it? +*/ recordid recordDereference(int xid, Page *p, recordid rid); -recordid recordDereferenceUnlocked(int xid, Page *p, recordid rid); + +/** + @param a block returned by blockFirst() or blockNext(). + + is*() methods return zero for false, non-zero for true. + + recordFixedLen() returns the (single) length of every record in the + block, or BLOCK_VARIABLE_LENGTH + + methods that take int * size as an argument return a record size by + setting the pointer (if it's not null). + + @see Abadi, et. al, Intergrating Compression and Execution in Column-Oriented Database Systems, VLDB 2006. + +*/ +typedef struct block_t { + int (*isValid) (struct block_t *b); + int (*isOneValue) (struct block_t *b); + int (*isValueSorted) (struct block_t *b); + int (*isPosContig) (struct block_t *b); + byte * (*recordFirst) (struct block_t *b, int *size); + byte * (*recordNext) (struct block_t *b, int *size); + int (*recordCount) (struct block_t *b); + byte * (*recordPtrArray) (struct block_t *b); + int * (*recordSizeArray) (struct block_t *b); + // These two are not in paper + int (*recordFixedLen) (struct block_t *b); + /** + This returns a pointer to an array that contains the records in + the page. It only makes sense for pages that many values of the + same length, and that can implement this more efficiently than + repeated calls to recordNext. + + @param block the block + @param count will be set to the number of slots in the page + @param stride will be set to the size of each value in the page + + @return a pointer to an array of record contents. The array is + ordered according to slot id. The page implementation manages + the memory; hopefully, this points into the buffer manager, and + this function call is O(1). If it would be expensive to return a + packed array of every record in the page, then only some of the + records might be returned. Calling recordNext on { page->id, + off+ count } will tell the caller if it's received the entire + page's contents. + + @todo Should recordArray be forced to return records in slot order? + */ + byte * (*recordPackedArray) (struct block_t *b, int * count, int * stride); + /** + This must be called when you're done with the block (before + releasePage) */ + void (*blockRelease)(struct block_t *b); + void * impl; +} block_t; + +block_t pageBlockFirst(int xid, Page * p); +block_t pageBlockNext(int xid, Page * p, block_t prev); + +/** + None of these functions obtain latches. Calling them without + holding rwlatch is an error. (Exception: dereferenceRid grabs the + latch for you...) + + The function pointer should be null if your page implementaion does + not support the method in question. + + @todo Figure out what to do about readlock vs writelock... + @todo Get rid of .size in recordid? + + New allocation sequence (normal operation) + + pin + latch + recordPreAlloc + recordPostAlloc + unlatch + + (There is a race here; other transactions can see that the page + contains a new slot, but that the LSN hasn't been updated. This + seems to be benign. writeLSN refuses to decrement LSN's... If the + lock manager is using LSNs for versioning, it might get into a + situation where the page has changed (the slot was allocated), but + the version wasn't bumped. I can't imagine this causing trouble, + unless the application is using the presense or absence of an + uninitialized slot as some sort of side channel....) + + lsn = Tupdate(...) + latch + writeLSN + unlatch + unpin + + New allocation sequence (recovery) + + pin + latch + recordPostAlloc + writeLSN + unlatch + unpin +*/ +typedef struct page_impl { + int page_type; + + // ---------- Record access + + /** + + @param size If non-null, will be set to the size of the record + that has been read. + + @return pointer to read region. The pointer will be guaranteed + valid while the page is read latched by this caller, or while + the page is write latched, and no other method has been called on + this page. + */ + const byte* (*recordRead)(int xid, Page *p, recordid rid); + /** + Like recordRead, but marks the page dirty, and provides a + non-const pointer. + + @return a pointer to the buffer manager's copy of the record. + */ + byte* (*recordWrite)(int xid, Page *p, recordid rid); + /** + Check to see if a slot is a normal slot, or something else, such + as a blob. This is stored in the size field in the slotted page + structure. + + @param p the page of interest + @param slot the slot in p that we're checking. + @return The type of this slot. + */ + int (*recordGetType)(int xid, Page *p, recordid rid); + /** + Change the type of a slot. Doing so must not change the record's + type. + */ + void (*recordSetType)(int xid, Page *p, recordid rid, int type); + /** + @return the length of a record in bytes. This value can be used + to safely access the pointers returned by page_impl.recordRead() + and page_impl.recordWrite() + */ + int (*recordGetLength)(int xid, Page *p, recordid rid); + /** + @param p the page that will be iterated over + @return the first slot on the page (in slot order), or NULLRID if the page is empty. + */ + recordid (*recordFirst)(int xid, Page *p); + /** + This returns the next slot in the page (in slot order). + + @param rid If NULLRID, start at the beginning of the page. + @return The next record in the sequence, or NULLRID if done. + */ + recordid (*recordNext)(int xid, Page *p, recordid rid); + + // -------- "Exotic" (optional) access methods. + + /* + @return non-zero if this page implementation supports the block API + */ + int (*isBlockSupported)(int xid, Page *p); + /** Returns a block from this page. For some page implementations, + this is essentially the same as recordRead. Others can produce + more sophisticated blocks. + + */ + block_t (*blockFirst)(int xid, Page *p); + block_t (*blockNext)(int xid, Page * p, block_t prev); + + + // -------- Allocation methods. + + + /** + This returns a lower bound on the amount of freespace in the + page + + @param p the page whose freespace will be estimated. + @return The number of bytes of free space on the page, or (for + efficiency's sake) an underestimate. + + */ + int (*pageFreespace)(int xid, Page * p); + /** + Compact the page in place. This operation should not change the + slot id's allocated to the records on the page. Instead, it + should update the extimate returned by page_impl.freespace(). + + Depending on the page implementation, this function may have + other side effects. + */ + void(*pageCompact)(Page *p); + /** + Generate a new, appropriately sized recordid. This is the first + of two allocation phases, and does not actually modify the page. + The caller of this function must call recordPostAlloc() before + unlatching the page. + + @see page_impl.recordPostAlloc() + @see Talloc() for an overview of allocation + + @param xid The active transaction. + @param size The size of the new record + + @return A new recordid, or NULLRID if there is not enough space + */ + recordid (*recordPreAlloc)(int xid, Page *p, int size); + /** Allocate a new record with the supplied rid. This is the second + of two allocation phases. The supplied record must use an + unoccupied slot, and there must be enough freespace on the page. + + @param xid The active transaction + @param p The page that will be allocated from + @param rid A new recordid that is (usually) from recordPreAlloc() + + @see Talloc(), page_impl.recordPreAlloc() + */ + void (*recordPostAlloc)(int xid, Page *p, recordid rid); + /** Free a record. The page implementation doesn't need to worry + about uncommitted deallocations; that is handled by a higher + level. + + @param xid The active transaction + @param page The page that contains the record that will be freed. + @param rid the recordid to be freed. + + @see Tdealloc() for example usage + */ + void (*recordFree)(int xid, Page *p, recordid rid); + /** + @todo Is recordDereference a page-implementation specific + operation, or should it be implemented once and for all in + page.c? + + indirect.c suggets this should be specfic to the page type; + blobs suggest this should be specific to record type. Perhaps + two levels of dereferencing are in order... (First: page + specific; second record specific...) + */ + recordid (*recordDereference)(int xid, Page *p, recordid rid); + + // -------- Page maintenance + + /** This is called (exactly once) right after the page is read from + disk. + + This function should set p->LSN to an appropriate value. + + @todo Arrange to call page_impl.loaded() and page_impl.flushed(). + + */ + void (*pageLoaded)(Page * p); + /** This is called (exactly once) right before the page is written + back to disk. + + This function should record p->LSN somewhere appropriate + */ + void (*pageFlushed)(Page * p); +} page_impl; + +/** + Register a new page type with Stasis. The pageType field of impl + will be checked for uniqueness. If the type is not unique, this + function will return non-zero. + + (Since error handling isn't really working yet, it aborts if the + page type is not unique.) + +*/ +int registerPageType(page_impl impl); + +// -------------------- Page specific, general purpose methods + +/** + Initialize a new page + + @param p The page that will be turned into a new slotted page. + Its contents will be overwitten. It was probably + returned by loadPage() + */ +void slottedPageInitialize(Page * p); +void fixedPageInitialize(Page * page, size_t size, int count); + +int fixedRecordsPerPage(size_t size); + +void indirectInitialize(Page * p, int height); +compensated_function recordid dereferenceRID(int xid, recordid rid); END_C_DECLS diff --git a/src/lladd/page/fixed.c b/src/lladd/page/fixed.c index f0d9ce5..229d2d8 100644 --- a/src/lladd/page/fixed.c +++ b/src/lladd/page/fixed.c @@ -1,117 +1,170 @@ -#include "../page.h" - -#include "fixed.h" - #include +#include "../page.h" +#include "fixed.h" +/** @todo should page implementations provide readLSN / writeLSN??? */ +#include -int recordsPerPage(size_t size) { + +int fixedRecordsPerPage(size_t size) { return (USABLE_SIZE_OF_PAGE - 2*sizeof(short)) / size; } /** @todo CORRECTNESS Locking for fixedPageInitialize? (should hold writelock)*/ void fixedPageInitialize(Page * page, size_t size, int count) { + assertlocked(page->rwlatch); + // XXX fixed page asserts it's been given an UNINITIALIZED_PAGE... Why doesn't that crash? assert(*page_type_ptr(page) == UNINITIALIZED_PAGE); *page_type_ptr(page) = FIXED_PAGE; *recordsize_ptr(page) = size; - assert(count <= recordsPerPage(size)); + assert(count <= fixedRecordsPerPage(size)); *recordcount_ptr(page)= count; } -short fixedPageCount(Page * page) { - assertlocked(page->rwlatch); - assert(*page_type_ptr(page) == FIXED_PAGE || *page_type_ptr(page) == ARRAY_LIST_PAGE); - return *recordcount_ptr(page); -} - -short fixedPageRecordSize(Page * page) { - assertlocked(page->rwlatch); - assert(*page_type_ptr(page) == FIXED_PAGE || *page_type_ptr(page) == ARRAY_LIST_PAGE); - return *recordsize_ptr(page); -} - -recordid fixedRawRallocMany(Page * page, int count) { - - assert(*page_type_ptr(page) == FIXED_PAGE); - recordid rid; - /* We need a writelock to prevent concurrent updates to the recordcount_ptr field. */ - writelock(page->rwlatch, 33); - if(*recordcount_ptr(page) + count <= recordsPerPage(*recordsize_ptr(page))) { - rid.page = page->id; - rid.slot = *recordcount_ptr(page); - rid.size = *recordsize_ptr(page); - *recordcount_ptr(page)+=count; - } else { - rid.page = -1; - rid.slot = -1; - rid.size = -1; - } - unlock(page->rwlatch); - - return rid; -} - -recordid fixedRawRalloc(Page *page) { - assert(*page_type_ptr(page) == FIXED_PAGE); - return fixedRawRallocMany(page, 1); -} - static int checkRidWarnedAboutUninitializedKludge = 0; static void checkRid(Page * page, recordid rid) { assertlocked(page->rwlatch); - if(*page_type_ptr(page)) { - assert(*page_type_ptr(page) == FIXED_PAGE || *page_type_ptr(page) == ARRAY_LIST_PAGE); - assert(page->id == rid.page); - assert(*recordsize_ptr(page) == rid.size); - // assert(recordsPerPage(rid.size) > rid.slot); - int recCount = *recordcount_ptr(page); - assert(recCount > rid.slot); - } else { + if(! *page_type_ptr(page)) { if(!checkRidWarnedAboutUninitializedKludge) { checkRidWarnedAboutUninitializedKludge = 1; printf("KLUDGE detected in checkRid. Fix it ASAP\n"); + fflush(stdout); } - fixedPageInitialize(page, rid.size, recordsPerPage(rid.size)); + fixedPageInitialize(page, rid.size, fixedRecordsPerPage(rid.size)); + } + + assert(*page_type_ptr(page) == FIXED_PAGE || *page_type_ptr(page) == ARRAY_LIST_PAGE); + assert(page->id == rid.page); + assert(*recordsize_ptr(page) == rid.size); + assert(fixedRecordsPerPage(rid.size) > rid.slot); +} + +//-------------- New API below this line + +static const byte* fixedRead(int xid, Page *p, recordid rid) { + assertlocked(p->rwlatch); + checkRid(p, rid); + assert(rid.slot < *recordcount_ptr(p)); + return fixed_record_ptr(p, rid.slot); +} + +static byte* fixedWrite(int xid, Page *p, recordid rid) { + assertlocked(p->rwlatch); + checkRid(p, rid); + assert(rid.slot < *recordcount_ptr(p)); + return fixed_record_ptr(p, rid.slot); +} + +static int fixedGetType(int xid, Page *p, recordid rid) { + assertlocked(p->rwlatch); + // checkRid(p, rid); + if(rid.slot < *recordcount_ptr(p)) { + int type = *recordsize_ptr(p); + if(type > 0) { + type = NORMAL_SLOT; + } + return type; + } else { + return INVALID_SLOT; } } +static void fixedSetType(int xid, Page *p, recordid rid, int type) { + assertlocked(p->rwlatch); + checkRid(p,rid); + assert(rid.slot < *recordcount_ptr(p)); + assert(physical_slot_length(type) == physical_slot_length(*recordsize_ptr(p))); + *recordsize_ptr(p) = rid.size; +} +static int fixedGetLength(int xid, Page *p, recordid rid) { + assertlocked(p->rwlatch); + //XXX this should be here... assert(rid.slot < *recordcount_ptr(p)); + checkRid(p, rid); // <-- XXX KLUDGE checkRid init's the page if necessary... + return rid.slot > *recordcount_ptr(p) ? + INVALID_SLOT : physical_slot_length(*recordsize_ptr(p)); +} +/* XXXstatic recordid fixedFirst(int xid, Page *p, recordid rid) { -void fixedReadUnlocked(Page * page, recordid rid, byte * buf) { - assertlocked(page->rwlatch); - checkRid(page, rid); - if(!memcpy(buf, fixed_record_ptr(page, rid.slot), rid.size)) { - perror("memcpy"); - abort(); +} +static recordid fixedNext(int xid, Page *p, recordid rid) { + +} */ +static int fixedFreespace(int xid, Page * p) { + assertlocked(p->rwlatch); + if(fixedRecordsPerPage(*recordsize_ptr(p)) > *recordcount_ptr(p)) { + // Return the size of a slot; that's the biggest record we can take. + return physical_slot_length(*recordsize_ptr(p)); + } else { + // Page full; return zero. + return 0; } } -void fixedRead(Page * page, recordid rid, byte * buf) { - readlock(page->rwlatch, 57); - - // printf("R { %d %d %d }\n", rid.page, rid.slot, rid.size); - checkRid(page, rid); - - - - fixedReadUnlocked(page, rid, buf); - - unlock(page->rwlatch); - +static void fixedCompact(Page * p) { + // no-op } - -void fixedWriteUnlocked(Page * page, recordid rid, const byte *dat) { - assertlocked(page->rwlatch); - checkRid(page,rid); - if(!memcpy(fixed_record_ptr(page, rid.slot), dat, rid.size)) { - perror("memcpy"); - abort(); +static recordid fixedPreAlloc(int xid, Page *p, int size) { + assertlocked(p->rwlatch); + if(fixedRecordsPerPage(*recordsize_ptr(p)) > *recordcount_ptr(p)) { + recordid rid; + rid.page = p->id; + rid.slot = *recordcount_ptr(p); + rid.size = *recordsize_ptr(p); + return rid; + } else { + return NULLRID; } } - -void fixedWrite(Page * page, recordid rid, const byte* dat) { - readlock(page->rwlatch, 73); - - // printf("W { %d %d %d }\n", rid.page, rid.slot, rid.size); - // checkRid(page, rid); - - fixedWriteUnlocked(page, rid, dat); - - unlock(page->rwlatch); +static void fixedPostAlloc(int xid, Page *p, recordid rid) { + assertlocked(p->rwlatch); + assert(*recordcount_ptr(p) == rid.slot); + assert(*recordsize_ptr(p) == rid.size); + (*recordcount_ptr(p))++; } +static void fixedFree(int xid, Page *p, recordid rid) { + assertlocked(p->rwlatch); + if(*recordsize_ptr(p) == rid.slot+1) { + (*recordsize_ptr(p))--; + } else { + // leak space; there's no way to track it with this page format. + } +} +//// XXX missing some functions w/ murky futures. +/* static lsn_t fixedReadLSN(int xid, Page * p) { + return p->LSN; +} +static void fixedWriteLSN(int xid, Page * p, lsn_t lsn) { + p->LSN = lsn; + *lsn_ptr(p) = lsn; + dirtyPages_add(p); +} */ +page_impl fixedImpl() { + static page_impl pi = { + FIXED_PAGE, + fixedRead, + fixedWrite, + fixedGetType, + fixedSetType, + fixedGetLength, + 0, // fixedFirst, + 0, // fixedNext, + 0, // notSupported, + 0, // block first + 0, // block next + fixedFreespace, + fixedCompact, + fixedPreAlloc, + fixedPostAlloc, + fixedFree, + 0, // XXX dereference + 0, // loaded + 0, // flushed + }; + return pi; +} + +page_impl arrayListImpl() { + page_impl pi = fixedImpl(); + pi.page_type = ARRAY_LIST_PAGE; + return pi; +} + +void fixedPageInit() { } +void fixedPageDeinit() { } diff --git a/src/lladd/page/fixed.h b/src/lladd/page/fixed.h index 2d29c7d..d5b226a 100644 --- a/src/lladd/page/fixed.h +++ b/src/lladd/page/fixed.h @@ -6,16 +6,9 @@ #define recordsize_ptr(page) shorts_from_end((page), 1) #define recordcount_ptr(page) shorts_from_end((page), 2) #define fixed_record_ptr(page, n) bytes_from_start((page), *recordsize_ptr((page)) * (n)) -int recordsPerPage(size_t size); -void fixedPageInitialize(Page * page, size_t size, int count); -/** Return the number of records in a fixed length page */ -short fixedPageCount(Page * page); -short fixedPageRecordSize(Page * page); -recordid fixedRawRallocMany(Page * page, int count); -recordid fixedRawRalloc(Page *page); -void fixedRead(Page * page, recordid rid, byte * buf); -void fixedWrite(Page * page, recordid rid, const byte* dat); -void fixedReadUnlocked(Page * page, recordid rid, byte * buf); -void fixedWriteUnlocked(Page * page, recordid rid, const byte* dat); +void fixedPageInit(); +void fixedPageDeinit(); +page_impl fixedImpl(); +page_impl arrayListImpl(); #endif diff --git a/src/lladd/page/indirect.c b/src/lladd/page/indirect.c index c4aaa92..2dd80c7 100644 --- a/src/lladd/page/indirect.c +++ b/src/lladd/page/indirect.c @@ -1,5 +1,5 @@ #include "indirect.h" -#include "slotted.h" +#include "slotted.h" #include #include #include @@ -170,15 +170,17 @@ compensated_function recordid __rallocMany(int xid, int parentPage, int recordSi /* Initialize leaves. (As SLOTTED_PAGE's) */ - slottedPageInitialize(&p); + writelock(p.rwlatch,0); + slottedPageInitialize(&p); p.id = parentPage; for(int i = 0; i < recordCount; i++) { /* Normally, we would worry that the page id isn't set, but we're discarding the recordid returned by page ralloc anyway. */ - slottedRawRalloc(&p, recordSize); + recordid rid = recordPreAlloc(xid, &p, recordSize); + recordPostAlloc(xid, &p, rid); } - + unlock(p.rwlatch); } try_ret(NULLRID) { TpageSet(xid, parentPage, p.memAddr); @@ -193,7 +195,7 @@ compensated_function recordid __rallocMany(int xid, int parentPage, int recordSi return rid; } - + compensated_function int indirectPageRecordCount(int xid, recordid rid) { Page * p; try_ret(-1){ diff --git a/src/lladd/page/indirect.h b/src/lladd/page/indirect.h index 8ebbe90..4b2ddb5 100644 --- a/src/lladd/page/indirect.h +++ b/src/lladd/page/indirect.h @@ -22,6 +22,8 @@ below this block. level = 1 means that the pageid's point to 'normal' pages. (They may be slotted (type = 1), or provided by some other implementation). + @todo Does anything actually use indirect.h? Why doesn't arrayList use it? + */ #include @@ -44,14 +46,8 @@ BEGIN_C_DECLS physical location of the record. */ compensated_function recordid dereferenceRID(int xid, recordid rid); -compensated_function static inline recordid dereferenceRIDUnlocked(int xid, recordid rid) {return dereferenceRID(xid,rid);} -//#define dereferenceRIDUnlocked(y, x) dereferenceRID((y), (x)) void indirectInitialize(Page * p, int height); -/* This comment is for check_compensations: -compensated_function __rallocMany(); -*/ - compensated_function recordid rallocMany(/*int parentPage, lsn_t lsn,*/int xid, int recordSize, int recordCount); compensated_function int indirectPageRecordCount(int xid, recordid rid); diff --git a/src/lladd/page/slotted.c b/src/lladd/page/slotted.c index 388ef51..a1b4849 100644 --- a/src/lladd/page/slotted.c +++ b/src/lladd/page/slotted.c @@ -1,14 +1,15 @@ /** $Id$ */ +#include "config.h" #include "../page.h" #include "slotted.h" #include +/** @todo should page implementations provide readLSN / writeLSN??? */ +#include +static inline void slottedFsck(const Page const * page) { -static void really_do_ralloc(Page * page, recordid rid) ; -size_t slottedFreespaceForSlot(Page * page, int slot); -void fsckSlottedPage(const Page const * page) { assertlocked(page->rwlatch); -#ifdef SLOTTED_PAGE_SANITY_CHECKS + Page dummy; dummy.id = -1; @@ -21,7 +22,7 @@ void fsckSlottedPage(const Page const * page) { const long slotListStart = (long)slot_length_ptr(&dummy, numslots-1); assert(slotListStart < PAGE_SIZE && slotListStart >= 0); - assert(page_type == SLOTTED_PAGE || + assert(page_type == SLOTTED_PAGE || page_type == BOUNDARY_TAG_PAGE); assert(numslots >= 0); assert(numslots * SLOTTED_PAGE_OVERHEAD_PER_RECORD < PAGE_SIZE); @@ -29,7 +30,9 @@ void fsckSlottedPage(const Page const * page) { assert(freespace <= slotListStart); assert(freelist >= INVALID_SLOT); assert(freelist < numslots); - + +#ifdef SLOTTED_PAGE_SANITY_CHECKS + // Check integrity of freelist. All free slots less than // numslots should be on it, in order. @@ -113,10 +116,6 @@ void fsckSlottedPage(const Page const * page) { } -#ifndef SLOTTED_PAGE_SANITY_CHECKS -#define fsckSlottedPage(x) ((void)0) -#endif - /** Move all of the records to the beginning of the page in order to @@ -125,7 +124,7 @@ increase the available free space. The caller of this function must have a writelock on the page. */ -void slottedCompact(Page * page) { +static void slottedCompact(Page * page) { assertlocked(page->rwlatch); Page bufPage; byte buffer[PAGE_SIZE]; @@ -180,7 +179,7 @@ void slottedCompact(Page * page) { memcpy(page->memAddr, buffer, PAGE_SIZE); - fsckSlottedPage(page); + slottedFsck(page); } @@ -190,23 +189,23 @@ void slottedPageInit() { #endif } -void slottedPageDeInit() { +void slottedPageDeinit() { } void slottedPageInitialize(Page * page) { + assertlocked(page->rwlatch); *page_type_ptr(page) = SLOTTED_PAGE; *freespace_ptr(page) = 0; *numslots_ptr(page) = 0; *freelist_ptr(page) = INVALID_SLOT; } -size_t slottedFreespaceUnlocked(Page * page); /** This is needed to correctly implement really_do_ralloc(), since it takes the position of the new slot's header into account. */ -size_t slottedFreespaceForSlot(Page * page, int slot) { +static size_t slottedFreespaceForSlot(Page * page, int slot) { assertlocked(page->rwlatch); size_t slotOverhead; @@ -236,61 +235,8 @@ size_t slottedFreespaceForSlot(Page * page, int slot) { } } -/** @todo Implement a model of slotted pages in the test scripts, and - then write a randomized test that confirms the model matches the - implementation's behavior. */ -size_t slottedFreespaceUnlocked(Page * page) { - return slottedFreespaceForSlot(page, INVALID_SLOT); -} - -size_t slottedFreespace(Page * page) { - size_t ret; - readlock(page->rwlatch, 292); - ret = slottedFreespaceUnlocked(page); - readunlock(page->rwlatch); - return ret; -} - - - -recordid slottedRawRalloc(Page * page, int size) { - int type = size; - size = physical_slot_length(type); - assert(type != INVALID_SLOT); - - writelock(page->rwlatch, 342); - fsckSlottedPage(page); - - recordid rid; - - rid.page = page->id; - rid.slot = *numslots_ptr(page); - rid.size = type; // The rid should reflect the fact that this is a special slot. - - /* The freelist_ptr points to the first free slot number, which - is the head of a linked list of free slot numbers.*/ - if(*freelist_ptr(page) != INVALID_SLOT) { - rid.slot = *freelist_ptr(page); - // really_do_ralloc will look this slot up in the freelist (which - // is O(1), since rid.slot is the head), and then remove it from - // the list. - } - - really_do_ralloc(page, rid); - - assert(*numslots_ptr(page) > rid.slot); - assert(type == *slot_length_ptr(page, rid.slot)); - assert(size == physical_slot_length(*slot_length_ptr(page, rid.slot))); - - fsckSlottedPage(page); - - writeunlock(page->rwlatch); - - return rid; -} - /** - Allocation is scary without locking. Consider this situation: + Allocation is complicated without locking. Consider this situation: (1) *numslot_ptr(page) is 10 (2) An aborting transcation calls really_do_ralloc(page) with rid.slot = 12 @@ -414,162 +360,180 @@ static void really_do_ralloc(Page * page, recordid rid) { *slot_length_ptr(page, rid.slot) = rid.size; } -/** - @param xid - @param page - @param lsn - @param rid with user-visible size. - @todo Does this still really need to check for BLOB_THRESHOLD_SIZE? - Shouldn't the caller call slottedSetType if necessary? -*/ -recordid slottedPostRalloc(int xid, Page * page, lsn_t lsn, recordid rid) { - - writelock(page->rwlatch, 376); +/// ------------------------------------------------------------------------------------------ +// PUBLIC API IS BELOW THIS LINE +/// ------------------------------------------------------------------------------------------ - if(rid.size >= BLOB_THRESHOLD_SIZE) { - rid.size = BLOB_SLOT; - } - - if(*page_type_ptr(page) != SLOTTED_PAGE && *page_type_ptr(page) != BOUNDARY_TAG_PAGE) { - /* slottedPreRalloc calls this when necessary. However, in - the case of a crash, it is possible that - slottedPreRalloc's updates were lost, so we need to check - for that here. - - If slottedPreRalloc didn't call slottedPageInitialize, - then there would be a race condition: - - Thread 1 Thread 2 - preAlloc(big record) - - preAlloc(big record) // Should check the freespace of the page and fail - postAlloc(big record) - - postAlloc(big record) // Thread 2 stole my space! => Crash? - - Note that this _will_ cause trouble if recovery is - multi-threaded, and allows the application to begin - updating the storefile without first locking any pages - that suffer from this problem. - - Also, this only works because pages that are of type - BOUNDARY_TAG_PAGE are guaranteed to have their page type - set before recovery calls this function. - - */ - - slottedPageInitialize(page); - } - fsckSlottedPage(page); - - if(*slot_ptr(page, rid.slot) == INVALID_SLOT || rid.slot >= *numslots_ptr(page)) { - really_do_ralloc(page, rid); - - } else { - - // Check to see that the slot happens to be the right size, - // so we are (hopefully) just overwriting a slot with - // itself. This can happen under normal operation, since - // really_do_ralloc() must be called before and after the - // log entry is generated. (See comment above...) - - assert(rid.size == *slot_length_ptr(page, rid.slot)); - - } - - pageWriteLSN(xid, page, lsn); - fsckSlottedPage(page); - writeunlock(page->rwlatch); - - - return rid; +static inline void sanityCheck(Page * p, recordid rid) { + assert(p->id == rid.page); + assert(rid.size < BLOB_THRESHOLD_SIZE); // Caller deals with this now! + slottedFsck(p); } -void slottedDeRalloc(int xid, Page * page, lsn_t lsn, recordid rid) { - writelock(page->rwlatch, 443); - fsckSlottedPage(page); +static const byte* slottedRead (int xid, Page *p, recordid rid) { + sanityCheck(p, rid); - if(*freespace_ptr(page) == *slot_ptr(page, rid.slot) + physical_slot_length(rid.size)) { - (*freespace_ptr(page)) -= physical_slot_length(rid.size); + return record_ptr(p, rid.slot); +} + +static byte* slottedWrite(int xid, Page *p, recordid rid) { + sanityCheck(p, rid); + + return record_ptr(p, rid.slot); +} +static int slottedGetType(int xid, Page *p, recordid rid) { + //sanityCheck(p, rid); <-- Would fail if rid.size is a blob + assert(p->id == rid.page); + slottedFsck(p); + if(rid.slot >= *numslots_ptr(p)) { return INVALID_SLOT; } + if(*slot_ptr(p, rid.slot) == INVALID_SLOT) { return INVALID_SLOT; } + int ret = *slot_length_ptr(p, rid.slot); + return ret >= 0 ? NORMAL_SLOT : ret; +} +static void slottedSetType(int xid, Page *p, recordid rid, int type) { + sanityCheck(p, rid); + + int old_type = *slot_length_ptr(p, rid.slot); + assert(rid.slot < *numslots_ptr(p)); + assert(old_type != INVALID_SLOT); + + if(type == NORMAL_SLOT) { + // set slot_length_ptr to the physical length. + *slot_length_ptr(p, rid.slot) = physical_slot_length(old_type); + } else { + // Changing to a special slot type; make sure doing so doesn't change + // the record size. + assert(physical_slot_length(type) == physical_slot_length(old_type)); + *slot_length_ptr(p, rid.slot) = type; + } +} + +static int slottedGetLength(int xid, Page *p, recordid rid) { + assert(p->id == rid.page); + slottedFsck(p); + assert(slottedGetType(xid, p, rid) != INVALID_SLOT); + if(rid.slot > *numslots_ptr(p) || slottedGetType(xid, p, rid) == INVALID_SLOT) + return INVALID_SLOT; + else + return physical_slot_length(*slot_length_ptr(p, rid.slot)); +} + +static recordid slottedNext(int xid, Page *p, recordid rid) { + sanityCheck(p, rid); + + short n = *numslots_ptr(p); + rid.slot ++; + while(rid.slot < n && !isValidSlot(p, rid.slot)) { rid.slot++; } + return isValidSlot(p, rid.slot) ? rid : NULLRID; +} + +static recordid slottedFirst(int xid, Page *p) { + slottedFsck(p); + + recordid rid = { p->id, -1, 0 }; + return slottedNext(xid, p, rid); +} + +static int notSupported(int xid, Page * p) { return 0; } + +static int slottedFreespace(int xid, Page * p) { + slottedFsck(p); + + return slottedFreespaceForSlot(p, INVALID_SLOT); +} + +static recordid slottedPreRalloc(int xid, Page * p, int type) { + assert(type != INVALID_SLOT); + slottedFsck(p); + + recordid rid; + rid.page = p->id; + rid.slot = *numslots_ptr(p); + rid.size = type; + + if(*freelist_ptr(p) != INVALID_SLOT) { + rid.slot = *freelist_ptr(p); } - assert(rid.slot < *numslots_ptr(page)); - if(rid.slot == *numslots_ptr(page)-1) { - (*numslots_ptr(page))--; - } else { - *slot_ptr(page, rid.slot) = INVALID_SLOT; - *slot_length_ptr(page, rid.slot) = *freelist_ptr(page); - *freelist_ptr(page) = rid.slot; - } - - pageWriteLSN(xid, page, lsn); - fsckSlottedPage(page); - unlock(page->rwlatch); -} - -void slottedReadUnlocked(Page * page, recordid rid, byte *buff) { - assertlocked(page->rwlatch); - int slot_length; - - fsckSlottedPage(page); - assert(page->id == rid.page); - slot_length = *slot_length_ptr(page, rid.slot); - assert((rid.size == slot_length)); - - if(!memcpy(buff, record_ptr(page, rid.slot), physical_slot_length(rid.size))) { - perror("memcpy"); - abort(); - } - fsckSlottedPage(page); - -} - -void slottedRead(Page * page, recordid rid, byte *buff) { - - int slot_length; - readlock(page->rwlatch, 519); - fsckSlottedPage(page); - - // printf("Reading from rid = {%d,%d,%d (%d)}\n", rid.page, rid.slot, rid.size, physical_slot_length(rid.size)); - - assert(page->id == rid.page); - - slot_length = *slot_length_ptr(page, rid.slot); - assert((rid.size == slot_length)); - - if(!memcpy(buff, record_ptr(page, rid.slot), physical_slot_length(rid.size))) { - perror("memcpy"); - abort(); + if(slottedFreespaceForSlot(p, rid.slot) < physical_slot_length(type)) { + rid = NULLRID; } - fsckSlottedPage(page); - unlock(page->rwlatch); - + return rid; } -void slottedWrite(Page * page, recordid rid, const byte *data) { - - readlock(page->rwlatch, 529); - - slottedWriteUnlocked(page, rid, data); - - unlock(page->rwlatch); +static void slottedPostRalloc(int xid, Page * p, recordid rid) { + sanityCheck(p, rid); + really_do_ralloc(p, rid); } -void slottedWriteUnlocked(Page * page, recordid rid, const byte *data) { - assertlocked(page->rwlatch); - int slot_length; - fsckSlottedPage(page); - assert(page->id == rid.page); - - slot_length = *slot_length_ptr(page, rid.slot); - assert((rid.size == slot_length)); +static void slottedFree(int xid, Page * p, recordid rid) { + sanityCheck(p, rid); - if(!memcpy(record_ptr(page, rid.slot), data, physical_slot_length(rid.size))) { - perror("memcpy"); - abort(); + if(*freespace_ptr(p) == *slot_ptr(p, rid.slot) + physical_slot_length(rid.size)) { + (*freespace_ptr(p)) -= physical_slot_length(rid.size); } - fsckSlottedPage(page); + + assert(rid.slot < *numslots_ptr(p)); + if(rid.slot == *numslots_ptr(p)-1) { + (*numslots_ptr(p))--; + assert(slottedGetType(xid,p,rid)==INVALID_SLOT); + } else { + *slot_ptr(p, rid.slot) = INVALID_SLOT; + *slot_length_ptr(p, rid.slot) = *freelist_ptr(p); + *freelist_ptr(p) = rid.slot; + assert(slottedGetType(xid,p,rid)==INVALID_SLOT); + } + + slottedFsck(p); +} + + +// XXX dereferenceRID + +/*static lsn_t slottedReadLSN(int xid, Page * p) { + return p->LSN; +} +static void slottedWriteLSN(int xid, Page * p, lsn_t lsn) { + p->LSN = lsn; + *lsn_ptr(p) = lsn; + dirtyPages_add(p); +//pageWriteLSN(xid, p, lsn); +} */ + +// XXX loaded + +// XXX flushed + +page_impl slottedImpl() { +static page_impl pi = { + SLOTTED_PAGE, + slottedRead, + slottedWrite, + slottedGetType, + slottedSetType, + slottedGetLength, + slottedFirst, + slottedNext, + notSupported, // is block supported + 0, //block first + 0, //block next + slottedFreespace, + slottedCompact, + slottedPreRalloc, + slottedPostRalloc, + slottedFree, + 0, //XXX page_impl_dereference_identity, + 0, //loaded + 0, //flushed + }; + return pi; +} + +page_impl boundaryTagImpl() { + page_impl p = slottedImpl(); + p.page_type = BOUNDARY_TAG_PAGE; + return p; } diff --git a/src/lladd/page/slotted.h b/src/lladd/page/slotted.h index ddf30ca..3a48843 100644 --- a/src/lladd/page/slotted.h +++ b/src/lladd/page/slotted.h @@ -70,13 +70,6 @@ Slotted page layout: #define SLOTTED_PAGE_CHECK_FOR_OVERLAP 1 #endif -void slottedWrite(Page * page, recordid rid, const byte *data); -void slottedRead(Page * page, recordid rid, byte *buff); -void slottedWriteUnlocked(Page * page, recordid rid, const byte *data); -void slottedReadUnlocked(Page * page, recordid rid, byte *buff); - -void slottedPageInitialize(Page * p); - #define freespace_ptr(page) shorts_from_end((page), 1) #define numslots_ptr(page) shorts_from_end((page), 2) #define freelist_ptr(page) shorts_from_end((page), 3) @@ -85,134 +78,7 @@ void slottedPageInitialize(Page * p); #define record_ptr(page, n) bytes_from_start((page), *slot_ptr((page), (n))) #define isValidSlot(page, n) ((*slot_ptr((page), (n)) == INVALID_SLOT) ? 0 : 1) -/** - * allocate a record. This must be done in two phases. The first - * phase reserves a slot, and produces a log entry. The second phase - * sets up the slot according to the contents of the log entry. - * - * Essentially, the implementation of this function chooses a page - * with enough space for the allocation, then calls slottedRawRalloc. - * - * Ralloc implements the first phase. - * - * @param xid The active transaction. - * @param size The size of the new record - * @return allocated record - * - * @see postRallocSlot the implementation of the second phase. - * - */ -//compensated_function recordid slottedPreRalloc(int xid, unsigned long size, Page**p); - -/** - * The second phase of slot allocation. Called after the log entry - * has been produced, and during recovery. - * - * @param page The page that should contain the new record. - * - * @param lsn The lsn of the corresponding log entry (the page's LSN - * is updated to reflect this value.) - * - * @param rid A recordid that should exist on this page. If it does - * not exist, then it is created. Because slottedPreRalloc never - * 'overbooks' pages, we are guaranteed to have enough space on the - * page for this record (though it is possible that we need to compact - * the page) - */ -recordid slottedPostRalloc(int xid, Page * page, lsn_t lsn, recordid rid); -/** - * Mark the space used by a record for reclamation. - * @param xid the transaction resposible for the deallocation, or -1 if outside of a transaction. - * @param page a pointer to the binned page that contains the record - * @param lsn the LSN of the redo log record that records this deallocation. - * @param rid the recordid to be freed. - */ -void slottedDeRalloc(int xid, Page * page, lsn_t lsn, recordid rid); - void slottedPageInit(); -void slottedPageDeInit(); - -/** - * - * Bypass logging and allocate a record. It should only be used - * when the recordid returned is deterministic, and will be available - * during recovery, as it bypassses the normal two-phase alloc / log - * procedure for record allocation. This usually means that this - * function must be called by an Operation implementation that - * alocates entire pages, logs the page ids that were allocated, - * initializes the pages and finally calls this function in a - * deterministic fashion. - * - * @see indirect.c for an example of how to use this function - * correctly. - * - * This function assumes that the page is already loaded in memory. - * It takes as parameters a Page and the size in bytes of the new - * record. - * - * If you call this function, you probably need to be holding - * lastFreepage_mutex. - * - * @see lastFreepage_mutex - * - * @return a recordid representing the newly allocated record. - * - * NOTE: might want to pad records to be multiple of words in length, or, simply - * make sure all records start word aligned, but not necessarily having - * a length that is a multiple of words. (Since Tread(), Twrite() ultimately - * call memcpy(), this shouldn't be an issue) - * - * NOTE: pageRalloc() assumes that the caller already made sure that sufficient - * amount of freespace exists in this page. - * @see slottedFreespace() - * - * @todo pageRalloc's algorithm for reusing slot id's reclaims the - * most recently freed slots first, which may encourage fragmentation. - */ -recordid slottedRawRalloc(Page * page, int size); - -/** - * Obtain an estimate of the amount of free space on this page. - * Assumes that the page is already loaded in memory. - * - * (This function is particularly useful if you need to use - * slottedRawRalloc for something...) - * - * @param p the page whose freespace will be estimated. - * - * @return an exact measurment of the freespace, or, in the case of - * fragmentation, an underestimate. - */ -size_t slottedFreespace(Page * p); - -/** - * Check to see if a slot is a normal slot, or something else, such - * as a blob. This is stored in the size field in the slotted page - * structure. If the size field is greater than PAGE_SIZE, then the - * slot contains a special value, and the size field indicates the - * type. (The type is looked up in a table to determine the amount - * of the page's physical space used by the slot.) - * - * @param p the page of interest - * @param slot the slot in p that we're checking. - * @return The type of this slot. - */ -int slottedGetType(Page * p, int slot); -/** - * Set the type of a slot to a special type, such as BLOB_SLOT. This - * function does not update any other information in the page - * structure, and just writes the raw value of type into the slot's - * size field. In particular, setting the type to NORMAL_SLOT will - * result in undefined behavior. (Such a call would destroy all - * record of the slot's true physical size) - * - * @param p the page containing the slot to be updated. - * @param slot the slot whose type will be changed. - * @param type the new type of slot. Must be greater than PAGE_SIZE. - * - */ -void slottedSetType(Page * p, int slot, int type); -/** The caller of this function must have a write lock on the page. */ -void slottedCompact(Page * page); - -void fsckSlottedPage(const Page const * page); +void slottedPageDeinit(); +page_impl slottedImpl(); +page_impl boundaryTagImpl(); diff --git a/src/lladd/pageFile.c b/src/lladd/pageFile.c index 4c6d2dd..14c6ab6 100644 --- a/src/lladd/pageFile.c +++ b/src/lladd/pageFile.c @@ -3,6 +3,7 @@ This file handles all of the file I/O for pages. */ +#include "config.h" #include "page.h" #include diff --git a/src/lladd/pageHandle.c b/src/lladd/pageHandle.c index bf47e08..98bbdf3 100644 --- a/src/lladd/pageHandle.c +++ b/src/lladd/pageHandle.c @@ -20,7 +20,6 @@ static stasis_handle_t * h; out, or forcing the log too early? */ static void phWrite(Page * ret) { - assert(ret->LSN == pageReadLSN(ret)); if(!ret->dirty) { return; } LogForce(ret->LSN); int err = h->write(h, PAGE_SIZE * ret->id, ret->memAddr, PAGE_SIZE); diff --git a/src/lladd/transactional2.c b/src/lladd/transactional2.c index 425a42b..81e0777 100644 --- a/src/lladd/transactional2.c +++ b/src/lladd/transactional2.c @@ -21,7 +21,6 @@ #include #include #include -#include "page/indirect.h" #include TransactionLog XactionTable[MAX_TRANSACTIONS]; @@ -239,7 +238,7 @@ static recordid resolveForUpdate(int xid, Page * p, recordid rid) { if(*page_type_ptr(p) == INDIRECT_PAGE) { rid = dereferenceRID(xid, rid); } else if(*page_type_ptr(p) == ARRAY_LIST_PAGE) { - rid = dereferenceArrayListRid(p, rid.slot); + rid = dereferenceArrayListRid(xid, p, rid.slot); } return rid; } @@ -284,21 +283,6 @@ compensated_function void Tdefer(int xid, recordid rid, releasePage(p); } -void TreadUnlocked(int xid, recordid rid, void * dat) { - Page * p; - try { - p = loadPage(xid, rid.page); - } end; - - rid = recordDereferenceUnlocked(xid, p, rid); - if(rid.page != p->id) { - releasePage(p); - p = loadPage(xid, rid.page); - } - recordReadUnlocked(xid, p, rid, dat); - releasePage(p); -} - compensated_function void TreadStr(int xid, recordid rid, char * dat) { Tread(xid, rid, dat); } diff --git a/test/lladd/check_bTree.c b/test/lladd/check_bTree.c index 8b8a277..cfd0c09 100644 --- a/test/lladd/check_bTree.c +++ b/test/lladd/check_bTree.c @@ -47,7 +47,7 @@ terms specified in this license. #include -#include "../../src/lladd/page/fixed.h" +#include "../../src/lladd/page.h" #include #include @@ -78,7 +78,7 @@ START_TEST(bTreeTest) This method doesn't use the Slot value of rid! We make a copy of rid_caller so that the caller's copy doesn't change. */ -int insert(Page* p, recordid rid_caller, int valueIn){ +int insert(int xid, Page* p, recordid rid_caller, int valueIn){ printf ("\nbegin insert\n"); int DEBUG = 0; int DEBUGERROR = 1; @@ -110,7 +110,7 @@ int insert(Page* p, recordid rid_caller, int valueIn){ if (DEBUGSTATEMENTS) {printf("\nDebug1\n");} - fixedRead(p, rid, countBuff); // read the value of count from slot 0 + recordRead(xid, p, rid, countBuff); // read the value of count from slot 0 if (DEBUGSTATEMENTS) {printf("\nDebug2\n");} @@ -122,14 +122,16 @@ int insert(Page* p, recordid rid_caller, int valueIn){ printf("\nrid2slot = %d\n", rid.slot); // *recordcount_ptr(p) = last accessible index on the page. - int max_index = *recordcount_ptr(p); + int max_index = fixedRecordsPerPage(rid.size); // rcs made this change. + // int max_index = *recordcount_ptr(p); // recordcount_ptr is the number of slots currently allocated on the page. + // but this code seems to do it's own allocation(?) //THESE LINES WERE CAUSING A SEGFAULT! ******************************************* if (DEBUGERROR2) {printf("\nx = %d\n", max_index);} // This causes a segfault after Debug2 // HACK! TO FIX THE ABOVE PROBLEM! - max_index = 1021; + // max_index = 1021; // assert that max_index is greater than our record of how many // entries we currently have on the page. @@ -177,7 +179,7 @@ int insert(Page* p, recordid rid_caller, int valueIn){ it to be a BTreeNode. Just puts the value 0 in the first index of the page. */ -void initializeNewBTreeNode(Page* p, recordid rid){ +void initializeNewBTreeNode(int xid, Page* p, recordid rid){ // need access to the first slot rid.slot = 0; @@ -187,7 +189,7 @@ void initializeNewBTreeNode(Page* p, recordid rid){ byte * countBuff = (byte *) & countInt; // write the count out - fixedWrite(p, rid, countBuff); + recordWrite(xid, p, 1, rid, countBuff); } void testFunctions(){ @@ -201,8 +203,8 @@ void testFunctions(){ // calling functions - initializeNewBTreeNode(p1, rid1); - insert(p1, rid1, 3); + initializeNewBTreeNode(xid, p1, rid1); + insert(xid, p1, rid1, 3); // cleaning up @@ -239,8 +241,8 @@ int SimpleExample(){ /* check consistency between rid & page's values * for number of slots and record size */ - assert (rid1.slot == fixedPageCount(p1)); - assert (rid1.size == fixedPageRecordSize(p1)); + // assert (rid1.slot == fixedPageCount(p1)); + // assert (rid1.size == fixedPageRecordSize(p1)); assert (p1->id == rid1.page); @@ -251,7 +253,8 @@ int SimpleExample(){ byte * b1 = (byte *) malloc (sizeof (int)); byte * b2 = (byte *) malloc (sizeof (int)); byte * b3 = (byte *) malloc (sizeof (int)); - int x = *recordcount_ptr(p1); + // int x = *recordcount_ptr(p1); + int x = 42; // rcs - recordcount_ptr is no longer exposed here... int y = rid1.slot; int z = 256; @@ -276,8 +279,8 @@ int SimpleExample(){ // @todo This is a messy way to do this... unlock(p1->rwlatch); - fixedWrite(p1, rid2, b1); - fixedRead(p1, rid2, b2); + recordWrite(xid, p1, 1, rid2, b1); + recordRead(xid, p1, rid2, b2); if (DEBUGP) { printf("\nb2** = %d\n",*((int *) b2));} // initializeNewBTreeNode(p1, rid1); diff --git a/test/lladd/check_bufferManager.c b/test/lladd/check_bufferManager.c index be730cd..ce95107 100644 --- a/test/lladd/check_bufferManager.c +++ b/test/lladd/check_bufferManager.c @@ -3,7 +3,6 @@ #include #include "../../src/lladd/latches.h" #include "../../src/lladd/page.h" -#include "../../src/lladd/page/slotted.h" #include #include #include @@ -41,8 +40,8 @@ long myrandom(long x) { } void initializePages() { - - int i; + + int i; printf("Initialization starting\n"); fflush(NULL); @@ -52,18 +51,19 @@ void initializePages() { rid.page = PAGE_MULT * (i+1); rid.slot = 0; rid.size = sizeof(int); - p = loadPage(-1, rid.page); + p = loadPage(-1, rid.page); - assert(p->id != -1); - slottedPostRalloc(-1, p, 0, rid); - - recordWrite(1, p, 1, rid, (byte*)&i); - - p->LSN = 0; - *lsn_ptr(p) = 0; - releasePage(p); + assert(p->id != -1); + writelock(p->rwlatch,0); + slottedPageInitialize(p); + recordPostAlloc(-1, p, rid); + int * buf = (int*)recordWriteNew(-1, p, rid); + *buf = i; + pageWriteLSN(-1, p, 0); + unlock(p->rwlatch); + releasePage(p); } - + printf("Initialization complete.\n"); fflush(NULL); } diff --git a/test/lladd/check_indirect.c b/test/lladd/check_indirect.c index 38b0d93..992e10b 100644 --- a/test/lladd/check_indirect.c +++ b/test/lladd/check_indirect.c @@ -46,6 +46,7 @@ terms specified in this license. #include "../check_includes.h" #include +#include "src/lladd/page.h" #include "../../src/lladd/page/indirect.h" #include diff --git a/test/lladd/check_operations.c b/test/lladd/check_operations.c index 9e612ae..bf2977f 100644 --- a/test/lladd/check_operations.c +++ b/test/lladd/check_operations.c @@ -48,7 +48,6 @@ terms specified in this license. #include #include "../check_includes.h" #include "../../src/lladd/page.h" -#include "../../src/lladd/page/slotted.h" #define LOG_NAME "check_operations.log" #include @@ -68,15 +67,17 @@ START_TEST(operation_physical_do_undo) { int buf; int arg; LogEntry * setToTwo; - + Tinit(); int xid = Tbegin(); long long pnum = TpageAlloc(xid); Page * p = loadPage(xid, pnum); + + writelock(p->rwlatch, 0); slottedPageInitialize(p); - - - rid = slottedRawRalloc(p, sizeof(int)); + rid = recordPreAlloc(xid, p, sizeof(int)); + recordPostAlloc(xid, p, rid); + unlock(p->rwlatch); releasePage(p); buf = 1; @@ -109,8 +110,9 @@ START_TEST(operation_physical_do_undo) { DEBUG("D\n"); p = loadPage(xid, rid.page); - + readlock(p->rwlatch,0); fail_unless(10 == pageReadLSN(p), "page lsn not set correctly."); + unlock(p->rwlatch); setToTwo->LSN = 5; diff --git a/test/lladd/check_page.c b/test/lladd/check_page.c index b196cbb..aac9d02 100644 --- a/test/lladd/check_page.c +++ b/test/lladd/check_page.c @@ -46,8 +46,6 @@ terms specified in this license. #include "../../src/lladd/page.h" #include "../../src/lladd/page/slotted.h" -#include "../../src/lladd/page/fixed.h" -#include "../../src/lladd/page/indirect.h" #include "../../src/lladd/blobManager.h" #include #include @@ -76,9 +74,10 @@ static void * multiple_simultaneous_pages ( void * arg_ptr) { recordid rid[100]; for(i = 0; i < 10000; i++) { - pthread_mutex_lock(&lsn_mutex); - this_lsn = lsn; + pthread_mutex_lock(&lsn_mutex); lsn++; + this_lsn = lsn; + assert(pageReadLSN(p) < this_lsn); pthread_mutex_unlock(&lsn_mutex); if(! first ) { @@ -86,7 +85,10 @@ static void * multiple_simultaneous_pages ( void * arg_ptr) { recordRead(1, p, rid[k], (byte*)&j); assert((j + 1) == i + k); - slottedDeRalloc(-1, p, lsn, rid[k]); + writelock(p->rwlatch,0); + recordFree(-1, p, rid[k]); + pageWriteLSN(-1, p, this_lsn); + unlock(p->rwlatch); sched_yield(); } } @@ -94,16 +96,21 @@ static void * multiple_simultaneous_pages ( void * arg_ptr) { first = 0; for(k = 0; k < 100; k++) { - - rid[k] = slottedRawRalloc(p, sizeof(short)); - i +=k; - /* printf("Slot %d = %d\n", rid[k].slot, i); */ - recordWrite(-1, p, lsn, rid[k], (byte*)&i); - i -=k; + writelock(p->rwlatch,0); + rid[k] = recordPreAlloc(-1,p,sizeof(short)); + if(rid[k].size == INVALID_SLOT) { // Is rid[k] == NULLRID? + pageCompact(p); + rid[k] = recordPreAlloc(-1,p,sizeof(short)); + } + recordPostAlloc(-1,p,rid[k]); + int * buf = (int*)recordWriteNew(-1,p,rid[k]); + *buf = i+k; + pageWriteLSN(-1, p, this_lsn); + assert(pageReadLSN(p) >= this_lsn); + unlock(p->rwlatch); sched_yield(); } - - assert(pageReadLSN(p) <= lsn); + } return NULL; @@ -124,22 +131,20 @@ static void* fixed_worker_thread(void * arg_ptr) { lsn++; pthread_mutex_unlock(&lsn_mutex); + writelock(p->rwlatch,0); if(! first ) { - fixedRead(p, rid, (byte*)&j); + j = *(int*)recordReadNew(-1,p,rid); assert((j + 1) == i); - /* slottedDeRalloc(p, lsn, rid); */ - sched_yield(); - } - + } first = 0; - - rid = fixedRawRalloc(p); - fixedWrite(p, rid, (byte*)&i); + rid = recordPreAlloc(-1, p, sizeof(int)); + recordPostAlloc(-1, p, rid); + (*(int*)recordWriteNew(-1,p,rid)) = i; + pageWriteLSN(-1, p,lsn); + assert(pageReadLSN( p) >= this_lsn); + unlock(p->rwlatch); sched_yield(); - - assert(pageReadLSN(p) <= lsn); } - return NULL; } @@ -156,31 +161,35 @@ static void* worker_thread(void * arg_ptr) { this_lsn = lsn; lsn++; - pthread_mutex_unlock(&lsn_mutex); + pthread_mutex_unlock(&lsn_mutex); if(! first ) { recordRead(1, p, rid, (byte*)&j); assert((j + 1) == i); - slottedDeRalloc(-1, p, lsn, rid); + writelock(p->rwlatch,0); + recordFree(-1, p, rid); + pageWriteLSN(-1, p, this_lsn); + unlock(p->rwlatch); sched_yield(); - } - + } + first = 0; - // TODO A condition variable would be more efficient... - - pthread_mutex_lock(&lsn_mutex); - if(slottedFreespace(p) < sizeof(int)) { + // @todo In check_page, a condition variable would be more efficient... + writelock(p->rwlatch,0); + if(pageFreespace(-1, p) < sizeof(int)) { first = 1; - pthread_mutex_unlock(&lsn_mutex); } else { - rid = slottedRawRalloc(p, sizeof(int)); - pthread_mutex_unlock(&lsn_mutex); - recordWrite(-1, p, lsn, rid, (byte*)&i); + rid = recordPreAlloc(-1, p, sizeof(int)); + recordPostAlloc(-1, p, rid); + int * buf = (int*)recordWriteNew(-1, p, rid); + pageWriteLSN(-1,p,this_lsn); + *buf = i; + assert(pageReadLSN(p) >= this_lsn); } + unlock(p->rwlatch); sched_yield(); - assert(pageReadLSN(p) <= lsn); } return NULL; @@ -203,10 +212,10 @@ START_TEST(pageNoThreadTest) pthread_mutex_init(&lsn_mutex, NULL); Tinit(); - p = loadPage(-1, 0); - + writelock(p->rwlatch,0); slottedPageInitialize(p); + unlock(p->rwlatch); worker_thread(p); p->LSN = 0; @@ -300,7 +309,12 @@ START_TEST(pageNoThreadMultPageTest) Tinit(); p = loadPage(-1, 1); + p->LSN = 0; + *lsn_ptr(p) = p->LSN; + + writelock(p->rwlatch,0); slottedPageInitialize(p); + unlock(p->rwlatch); multiple_simultaneous_pages(p); // Normally, you would call pageWriteLSN() to update the LSN. This // is a hack, since Tdeinit() will crash if it detects page updates @@ -337,7 +351,12 @@ START_TEST(pageThreadTest) { fail_unless(1, NULL); Page * p = loadPage(-1, 2); + writelock(p->rwlatch,0); slottedPageInitialize(p); + unlock(p->rwlatch); + p->LSN = 0; + *lsn_ptr(p) = p->LSN; + fail_unless(1, NULL); for(i = 0; i < THREAD_COUNT; i++) { @@ -372,7 +391,12 @@ START_TEST(fixedPageThreadTest) { pthread_mutex_init(&lsn_mutex, NULL); Tinit(); Page * p = loadPage(-1, 2); + writelock(p->rwlatch,0); fixedPageInitialize(p, sizeof(int), 0); + unlock(p->rwlatch); + p->LSN = 0; + *lsn_ptr(p) = p->LSN; + for(i = 0; i < THREAD_COUNT; i++) { pthread_create(&workers[i], NULL, fixed_worker_thread, p); @@ -392,7 +416,6 @@ START_TEST(fixedPageThreadTest) { START_TEST(pageCheckSlotTypeTest) { Tinit(); - int xid = Tbegin(); recordid slot = Talloc(xid, sizeof(int)); @@ -400,14 +423,19 @@ START_TEST(pageCheckSlotTypeTest) { recordid blob = Talloc(xid, PAGE_SIZE * 2); Page * p = loadPage(-1, slot.page); - assert(recordType(xid, p, slot) == SLOTTED_RECORD); + readlock(p->rwlatch, 0); + assert(recordGetTypeNew(xid, p, slot) == NORMAL_SLOT); + unlock(p->rwlatch); releasePage(p); /** @todo the use of the fixedRoot recordid to check getRecordType is a bit questionable, but should work. */ p = loadPage(-1, fixedRoot.page); - assert(recordType(xid, p, fixedRoot) == FIXED_RECORD); + readlock(p->rwlatch, 0); + assert(recordGetTypeNew(xid, p, fixedRoot) == NORMAL_SLOT); + + unlock(p->rwlatch); releasePage(p); fixedRoot.slot = 1; @@ -415,28 +443,34 @@ START_TEST(pageCheckSlotTypeTest) { fixedRoot.slot = 0; p = loadPage(-1, fixedEntry.page); - assert(recordType(xid, p, fixedEntry) == FIXED_RECORD); + readlock(p->rwlatch, 0); + assert(recordGetTypeNew(xid, p, fixedEntry) == NORMAL_SLOT); + unlock(p->rwlatch); releasePage(p); p = loadPage(-1, blob.page); - int type = recordType(xid, p, blob); - assert(type == BLOB_RECORD); - releasePage(p); - + readlock(p->rwlatch, 0); + int type = recordGetTypeNew(xid, p, blob); + unlock(p->rwlatch); + assert(type == BLOB_SLOT); + releasePage(p); + recordid bad; bad.page = slot.page; bad.slot = slot.slot + 10; bad.size = 4; p = loadPage(xid, bad.page); - assert(recordType(xid, p, bad) == UNINITIALIZED_RECORD); + readlock(p->rwlatch, 0); + assert(recordGetTypeNew(xid, p, bad) == INVALID_SLOT); bad.size = 100000; - assert(recordType(xid, p, bad) == UNINITIALIZED_RECORD); - /** recordType now ignores the size field, so this (correctly) returns SLOTTED_RECORD */ + assert(recordGetTypeNew(xid, p, bad) == INVALID_SLOT); + /** recordGetType now ignores the size field, so this (correctly) returns SLOTTED_RECORD */ bad.slot = slot.slot; - assert(recordType(xid, p, bad) == SLOTTED_RECORD); + assert(recordGetTypeNew(xid, p, bad) == NORMAL_SLOT); p->LSN = 0; *lsn_ptr(p) = p->LSN; + unlock(p->rwlatch); releasePage(p); Tcommit(xid); @@ -448,40 +482,39 @@ START_TEST(pageCheckSlotTypeTest) { */ START_TEST(pageTrecordTypeTest) { Tinit(); - int xid = Tbegin(); recordid slot = Talloc(xid, sizeof(int)); recordid fixedRoot = TarrayListAlloc(xid, 2, 10, 10); recordid blob = Talloc(xid, PAGE_SIZE * 2); - assert(TrecordType(xid, slot) == SLOTTED_RECORD); - assert(TrecordType(xid, fixedRoot) == FIXED_RECORD); + assert(TrecordType(xid, slot) == NORMAL_SLOT); + assert(TrecordType(xid, fixedRoot) == NORMAL_SLOT); fixedRoot.slot = 1; recordid fixedEntry = dereferenceRID(xid, fixedRoot); fixedRoot.slot = 0; - assert(TrecordType(xid, fixedEntry) == FIXED_RECORD); + assert(TrecordType(xid, fixedEntry) == NORMAL_SLOT); int type = TrecordType(xid, blob); - assert(type == BLOB_RECORD); + assert(type == BLOB_SLOT); recordid bad; bad.page = slot.page; bad.slot = slot.slot + 10; bad.size = 4; - assert(TrecordType(xid, bad) == UNINITIALIZED_RECORD); + assert(TrecordType(xid, bad) == INVALID_SLOT); bad.size = 100000; - assert(TrecordType(xid, bad) == UNINITIALIZED_RECORD); + assert(TrecordType(xid, bad) == INVALID_SLOT); bad.slot = slot.slot; - assert(TrecordType(xid, bad) == SLOTTED_RECORD); + assert(TrecordType(xid, bad) == NORMAL_SLOT); /* Try to trick TrecordType by allocating a record that's the same length as the slot used by blobs. */ recordid rid2 = Talloc(xid, sizeof(blob_record_t)); - assert(TrecordType(xid, rid2) == SLOTTED_RECORD); + assert(TrecordType(xid, rid2) == NORMAL_SLOT); Tcommit(xid); diff --git a/test/lladd/check_pageOperations.c b/test/lladd/check_pageOperations.c index 24df950..45916f8 100644 --- a/test/lladd/check_pageOperations.c +++ b/test/lladd/check_pageOperations.c @@ -45,7 +45,6 @@ terms specified in this license. #include #include "../../src/lladd/page.h" -#include "../../src/lladd/page/slotted.h" #include #include #include diff --git a/utilities/swig/lladd.i b/utilities/swig/lladd.i index a2b504a..55f2d4e 100644 --- a/utilities/swig/lladd.i +++ b/utilities/swig/lladd.i @@ -15,8 +15,6 @@ extern void TupdateRaw(int xid, recordid rid, const void *dat, int op); extern void Tread(int xid, recordid rid, void *dat); extern void TreadStr(int xid, recordid rid, char *dat); -extern void TreadUnlocked(int xid, recordid rid, void *dat); - extern int Tcommit(int xid); extern int Tabort(int xid);