diff --git a/lladd/operations/alloc.h b/lladd/operations/alloc.h index 00a5f01..4c092e9 100644 --- a/lladd/operations/alloc.h +++ b/lladd/operations/alloc.h @@ -28,15 +28,15 @@ Operation getRealloc(); @return the recordid of the new record. */ -recordid Talloc(int xid, long size); +compensated_function recordid Talloc(int xid, long size); -recordid TallocFromPage(int xid, long page, long size); +compensated_function recordid TallocFromPage(int xid, long page, long size); /** Free a record. @todo Currently, we just leak store space on dealloc. */ -void Tdealloc(int xid, recordid rid); +compensated_function void Tdealloc(int xid, recordid rid); /** Obtain the type of a record, as returned by getRecordType. @@ -51,7 +51,7 @@ void Tdealloc(int xid, recordid rid); @see getRecordType */ -int TrecordType(int xid, recordid rid); +compensated_function int TrecordType(int xid, recordid rid); /** Obtain the length of the data stored in a record. @@ -63,9 +63,9 @@ int TrecordType(int xid, recordid rid); @return -1 if the record does not exist, the size of the record otherwise. */ -int TrecordSize(int xid, recordid rid); +compensated_function int TrecordSize(int xid, recordid rid); /** Return the number of records stored in page pageid */ -int TrecordsInPage(int xid, int pageid); +compensated_function int TrecordsInPage(int xid, int pageid); #endif diff --git a/lladd/operations/pageOrientedListNTA.h b/lladd/operations/pageOrientedListNTA.h index f69778a..722e66e 100644 --- a/lladd/operations/pageOrientedListNTA.h +++ b/lladd/operations/pageOrientedListNTA.h @@ -103,10 +103,10 @@ lladd_pagedList_iterator * TpagedListIterator(int xid, recordid list); /** @return 1 if there was another entry to be iterated over. 0 otherwise. If this function returns 1, the caller must free() the malloced memory returned via the key and value arguments.*/ -int TpagedListNext(int xid, lladd_pagedList_iterator * it, byte ** key, int * keySize, byte ** value, int * valueSize); -recordid TpagedListAlloc(int xid); -void TpagedListDelete(int xid, recordid list); -int TpagedListSpansPages(int xid, recordid list); +compensated_function int TpagedListNext(int xid, lladd_pagedList_iterator * it, byte ** key, int * keySize, byte ** value, int * valueSize); +compensated_function recordid TpagedListAlloc(int xid); +compensated_function void TpagedListDelete(int xid, recordid list); +compensated_function int TpagedListSpansPages(int xid, recordid list); Operation getPagedListInsert(); Operation getPagedListRemove(); #endif diff --git a/src/apps/cht/cht_client.c b/src/apps/cht/cht_client.c index 3834f04..703d218 100644 --- a/src/apps/cht/cht_client.c +++ b/src/apps/cht/cht_client.c @@ -71,7 +71,10 @@ static int _chtEval(DfaSet * dfaSet, } if(ht != NULL) { memcpy(&(__header_ptr(&m)->hashTable), ht, sizeof(clusterHashTable_t)); - } + } else { + + //memset(&(__header_ptr(&m)->hashTable), 0, sizeof(clusterHashTable_t)); + } /* printf("%s <- %s\n", __header_ptr(&m)->initiator, dfaSet->networkSetup.localhost); */ diff --git a/src/lladd/blobManager.c b/src/lladd/blobManager.c index 6c60e9a..bf6ec43 100644 --- a/src/lladd/blobManager.c +++ b/src/lladd/blobManager.c @@ -32,14 +32,14 @@ static void readRawRecord(int xid, Page * p, recordid rid, void * buf, int size) recordid blob_rec_rid = rid; blob_rec_rid.size = size; readRecord(xid, p, blob_rec_rid, buf); - /* Tread(xid, blob_rec_rid, buf); */ + /* T read(xid, blob_rec_rid, buf); */ } static void writeRawRecord(int xid, Page * p, recordid rid, lsn_t lsn, const void * buf, int size) { recordid blob_rec_rid = rid; blob_rec_rid.size = size; writeRecord(xid, p, lsn, blob_rec_rid, buf); - /* Tset(xid, blob_rec_rid, buf); - We no longer need to write a log + /* T set(xid, blob_rec_rid, buf); - We no longer need to write a log record out here, since we're called by something that is the result of a log record.*/ } @@ -195,37 +195,47 @@ void closeBlobStore() { */ -recordid preAllocBlob(int xid, long blobSize) { +compensated_function recordid preAllocBlob(int xid, long blobSize) { /* Allocate space for the blob entry. */ + + recordid rid; - DEBUG("Allocing blob (size %ld)\n", blobSize); + try_ret(NULLRID) { + + DEBUG("Allocing blob (size %ld)\n", blobSize); + + assert(blobSize > 0); /* Don't support zero length blobs right now... */ - assert(blobSize > 0); /* Don't support zero length blobs right now... */ + /* First in buffer manager. */ + + rid = Talloc(xid, BLOB_SLOT); //sizeof(blob_record_t)); + + rid.size = blobSize; - /* First in buffer manager. */ - - recordid rid = Talloc(xid, BLOB_SLOT); //sizeof(blob_record_t)); - - rid.size = blobSize; + } end_ret(NULLRID); return rid; } -recordid preAllocBlobFromPage(int xid, long page, long blobSize) { +compensated_function recordid preAllocBlobFromPage(int xid, long page, long blobSize) { + recordid rid; + /* Allocate space for the blob entry. */ - - DEBUG("Allocing blob (size %ld)\n", blobSize); + try_ret(NULLRID) { + DEBUG("Allocing blob (size %ld)\n", blobSize); + + assert(blobSize > 0); /* Don't support zero length blobs right now... */ + + /* First in buffer manager. */ + + rid = TallocFromPage(xid, page, BLOB_SLOT); //sizeof(blob_record_t)); - assert(blobSize > 0); /* Don't support zero length blobs right now... */ + rid.size = blobSize; - /* First in buffer manager. */ - - recordid rid = TallocFromPage(xid, page, BLOB_SLOT); //sizeof(blob_record_t)); - - rid.size = blobSize; + } end_ret(NULLRID); return rid; @@ -280,7 +290,7 @@ void allocBlob(int xid, Page * p, lsn_t lsn, recordid rid) { funlockfile(blobf0); funlockfile(blobf1); - /* Tset() needs to know to 'do the right thing' here, since we've + /* T set() needs to know to 'do the right thing' here, since we've changed the size it has recorded for this record, and writeRawRecord makes sure that that is the case. @@ -337,7 +347,7 @@ static FILE * getDirtyFD(int xid, Page * p, lsn_t lsn, recordid rid) { /* First, determine if the blob is dirty. */ - /* Tread() raw record */ + /* T read() raw record */ readRawRecord(xid, p, rid, &rec, sizeof(blob_record_t)); assert(rec.size == rid.size); @@ -352,7 +362,7 @@ static FILE * getDirtyFD(int xid, Page * p, lsn_t lsn, recordid rid) { /* Flip the fd bit on the record. */ rec.fd = rec.fd ? 0 : 1; - /* Tset() raw record */ + /* T set() raw record */ writeRawRecord(xid, p, rid, lsn, &rec, sizeof(blob_record_t)); } @@ -459,7 +469,7 @@ void abortBlobs(int xid) { and undo have to reason about lazy propogation of values to the bufferManager, and also have to preserve *write* ordering, even though the writes may be across many transactions, and could be - propogated in the wrong order. If we generate a Tset() (for the + propogated in the wrong order. If we generate a T set() (for the blob record in bufferManager) for each write, things become much easier. */ diff --git a/src/lladd/blobManager.h b/src/lladd/blobManager.h index 459fcd4..ea89b0a 100644 --- a/src/lladd/blobManager.h +++ b/src/lladd/blobManager.h @@ -77,8 +77,8 @@ typedef struct { } blob_record_t; -recordid preAllocBlob(int xid, long blobsize); -recordid preAllocBlobFromPage(int xid, long page, long blobsize); +compensated_function recordid preAllocBlob(int xid, long blobsize); +compensated_function recordid preAllocBlobFromPage(int xid, long page, long blobsize); /** Allocate a blob of size blobSize. diff --git a/src/lladd/operations.c b/src/lladd/operations.c index 409d6b1..1b0a311 100644 --- a/src/lladd/operations.c +++ b/src/lladd/operations.c @@ -65,7 +65,10 @@ void redoUpdate(const LogEntry * e) { if(e->type == UPDATELOG) { /* lsn_t pageLSN = readLSN(e->contents.update.rid.page); */ recordid rid = e->contents.update.rid; - Page * p = loadPage(e->xid, rid.page); + Page * p; + try { + p = loadPage(e->xid, rid.page); + } end; lsn_t pageLSN = pageReadLSN(p); if(e->LSN > pageLSN) { @@ -79,7 +82,10 @@ void redoUpdate(const LogEntry * e) { } else if(e->type == CLRLOG) { LogEntry * f = readLSNEntry(e->contents.clr.thisUpdateLSN); recordid rid = f->contents.update.rid; - Page * p = loadPage(e->xid, rid.page); + Page * p; + try { + p = loadPage(e->xid, rid.page); + } end; assert(rid.page == e->contents.update.rid.page); /* @todo Should this always hold? */ diff --git a/src/lladd/operations/alloc.c b/src/lladd/operations/alloc.c index 59c5f25..054d3a1 100644 --- a/src/lladd/operations/alloc.c +++ b/src/lladd/operations/alloc.c @@ -9,6 +9,7 @@ #include "../page/slotted.h" #include +//try{ /** @file @@ -44,9 +45,9 @@ int page = Treserve(int xid, int size) This would tell Talloc to treat the page as though 'size' bytes had - already been reserved. The 'free space' that Talloc() reasons + already been reserved. The 'free space' that Talloc () reasons about would be: max(reservedSpace, usedSpace). A seperate call, - TallocFromPage(xid, page, size) already exists, and should ignore + TallocFromPage (xid, page, size) already exists, and should ignore the presence of the 'reserved space' field. Track level locality is another problem that Talloc should address, @@ -54,8 +55,8 @@ Better support for locking. Consider this sequence of events: - recordid rid1 = Talloc(xid1, 1); - recordid rid2 = Talloc(xid2, 1); // May deadlock if page level + recordid rid1 = Talloc (xid1, 1); + recordid rid2 = Talloc (xid2, 1); // May deadlock if page level // locking is used. The lock manager needs a 'try lock' operation that allows @@ -74,9 +75,9 @@ $Id$ */ - +//}end static int operate(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) { - /* * @ todo Currently, Talloc() needs to clean up the page type (for recovery). Should this be elsewhere? */ + /* * @ todo Currently, T alloc () needs to clean up the page type (for recovery). Should this be elsewhere? */ /* if(*page_type_ptr(p) == UNINITIALIZED_PAGE) { *page_type_ptr(p) = SLOTTED_PAGE; @@ -149,87 +150,125 @@ Operation getRealloc() { return o; } -recordid Talloc(int xid, long size) { +compensated_function recordid Talloc(int xid, long size) { recordid rid; - Page * p = NULL; - if(size >= BLOB_THRESHOLD_SIZE && size != BLOB_SLOT) { - /**@todo is it OK that Talloc doesn't pin the page when a blob is alloced?*/ - rid = preAllocBlob(xid, size); + + int isBlob = size >= BLOB_THRESHOLD_SIZE && size != BLOB_SLOT; + + if(isBlob) { + try_ret(NULLRID) { + rid = preAllocBlob(xid, size); + Tupdate(xid,rid, NULL, OPERATION_ALLOC); + } end_ret(NULLRID); } else { - pthread_mutex_lock(&talloc_mutex); - rid = slottedPreRalloc(xid, size, &p); - assert(p != NULL); - } - Tupdate(xid,rid, NULL, OPERATION_ALLOC); - - if(p != NULL) { - /* release the page that preAllocBlob pinned for us. */ + Page * p = NULL; - /* @todo alloc.c pins multiple pages -> Will deadlock with small buffer sizes.. */ - releasePage(p); - pthread_mutex_unlock(&talloc_mutex); + begin_action_ret(pthread_mutex_unlock, &talloc_mutex, NULLRID) { + pthread_mutex_lock(&talloc_mutex); + rid = slottedPreRalloc(xid, size, &p); + Tupdate(xid, rid, NULL, OPERATION_ALLOC); + /** @todo does releasePage do the correct error checking? */ + releasePage(p); + } compensate_ret(NULLRID); } - return rid; - -} -recordid TallocFromPage(int xid, long page, long size) { - recordid rid; - - Page * p = NULL; - if(size >= BLOB_THRESHOLD_SIZE && size != BLOB_SLOT) { - rid = preAllocBlobFromPage(xid, page, size); - } else { - pthread_mutex_lock(&talloc_mutex); - rid = slottedPreRallocFromPage(xid, page, size, &p); - if(p == NULL) { - assert(rid.size == -1); - pthread_mutex_unlock(&talloc_mutex); - return rid; + /* try_ret(NULL_RID) { + if(size >= BLOB_THRESHOLD_SIZE && size != BLOB_SLOT) { + ///@todo is it OK that Talloc doesn't pin the page when a blob is alloced? + rid = pr eAllocBlob(xid, size); + } else { + pthread_mutex_lock(&talloc_mutex); + rid = slottedPreRalloc(xid, size, &p); + assert(p != NULL); } - } - Tupdate(xid,rid, NULL, OPERATION_ALLOC); + + Tupdate(xid,rid, NULL, OPERATION_ALLOC); - if(p != NULL) { - /* release the page that preRallocFromPage pinned for us. */ - /* @todo alloc.c pins multiple pages -> Will deadlock with small buffer sizes.. */ - releasePage(p); - pthread_mutex_unlock(&talloc_mutex); - } + if(p != NULL) { + /// release the page that preAllocBlob pinned for us. + + /// @todo alloc.c pins multiple pages -> Will deadlock with small buffer sizes.. + releasePage(p); + pthread_mutex_unlock(&talloc_mutex); + + } + } end_ret(NULLRID); + return rid;*/ + +} +compensated_function recordid TallocFromPage(int xid, long page, long size) { + recordid rid; + + Page * p = NULL; + if(size >= BLOB_THRESHOLD_SIZE && size != BLOB_SLOT) { + try_ret(NULLRID) { + rid = preAllocBlobFromPage(xid, page, size); + Tupdate(xid,rid, NULL, OPERATION_ALLOC); + } end_ret(NULLRID); + } else { + begin_action_ret(pthread_mutex_unlock, &talloc_mutex, NULLRID) { + pthread_mutex_lock(&talloc_mutex); + rid = slottedPreRallocFromPage(xid, page, size, &p); + if(rid.size == size) { + Tupdate(xid,rid, NULL, OPERATION_ALLOC); + } else { + assert(rid.size < 0); + } + if(p) { + /* @todo alloc.c pins multiple pages -> Will deadlock with small buffer sizes.. */ + releasePage(p); + } + } compensate_ret(NULLRID); + } + return rid; } -void Tdealloc(int xid, recordid rid) { +compensated_function void Tdealloc(int xid, recordid rid) { void * preimage = malloc(rid.size); - Page * p = loadPage(xid, rid.page); - readRecord(xid, p, rid, preimage); - /** @todo race in Tdealloc; do we care, or is this something that the log manager should cope with? */ - Tupdate(xid, rid, preimage, OPERATION_DEALLOC); - releasePage(p); + Page * p; + try { + p = loadPage(xid, rid.page); + } end; + begin_action(releasePage, p) { + readRecord(xid, p, rid, preimage); + /** @todo race in Tdealloc; do we care, or is this something that the log manager should cope with? */ + Tupdate(xid, rid, preimage, OPERATION_DEALLOC); + } compensate; free(preimage); } -int TrecordType(int xid, recordid rid) { - Page * p = loadPage(xid, rid.page); - int ret = getRecordType(xid, p, rid); +compensated_function int TrecordType(int xid, recordid rid) { + Page * p; + try_ret(compensation_error()) { + p = loadPage(xid, rid.page); + } end_ret(compensation_error()); + int ret; + ret = getRecordType(xid, p, rid); releasePage(p); return ret; } -int TrecordSize(int xid, recordid rid) { +compensated_function int TrecordSize(int xid, recordid rid) { int ret; - Page * p = loadPage(xid, rid.page); + Page * p; + try_ret(compensation_error()) { + p = loadPage(xid, rid.page); + } end_ret(compensation_error()); ret = getRecordSize(xid, p, rid); releasePage(p); return ret; } -int TrecordsInPage(int xid, int pageid) { - Page * p = loadPage(xid, pageid); +compensated_function int TrecordsInPage(int xid, int pageid) { + Page * p; + try_ret(compensation_error()) { + p = loadPage(xid, pageid); + } end_ret(compensation_error()); readlock(p->rwlatch, 187); int ret = *numslots_ptr(p); unlock(p->rwlatch); diff --git a/src/lladd/operations/arrayList.c b/src/lladd/operations/arrayList.c index 2ada861..f798076 100644 --- a/src/lladd/operations/arrayList.c +++ b/src/lladd/operations/arrayList.c @@ -120,149 +120,68 @@ Operation getArrayListAlloc() { } /*----------------------------------------------------------------------------*/ -compensated_function int TarrayListExtend(int xid, recordid rid, int slots) { +/** @todo locking for arrayList... this isn't pressing since currently + the only thing that calls arraylist (the hashtable + implementations) serialize bucket list operations anyway... */ + +static compensated_function int TarrayListExtendInternal(int xid, recordid rid, int slots, int op) { Page * p; try_ret(compensation_error()) { p = loadPage(xid, rid.page); } end_ret(compensation_error()); TarrayListParameters tlp = pageToTLP(p); - - int lastCurrentBlock; - if(tlp.maxOffset == -1) { - lastCurrentBlock = -1; - } else{ - lastCurrentBlock = getBlockContainingOffset(tlp, tlp.maxOffset, NULL); - } - int lastNewBlock = getBlockContainingOffset(tlp, tlp.maxOffset+slots, NULL); - - DEBUG("lastCurrentBlock = %d, lastNewBlock = %d\n", lastCurrentBlock, lastNewBlock); - - recordid tmp; /* recordid of slot in base page that holds new block. */ - tmp.page = rid.page; - tmp.size = sizeof(int); - - recordid tmp2; /* recordid of newly created pages. */ - tmp2.slot = 0; - tmp2.size = tlp.size; - /* Iterate over the (small number) of indirection blocks that need to be updated */ - for(int i = lastCurrentBlock+1; i <= lastNewBlock; i++) { - /* Alloc block i */ - int blockSize = tlp.initialSize * powl(tlp.multiplier, i); - int newFirstPage; - begin_action_ret(releasePage, p, compensation_error()) { - newFirstPage = TpageAllocMany(xid, blockSize); - } end_action_ret(compensation_error()); - DEBUG("block %d\n", i); - /* Iterate over the storage blocks that are pointed to by our current indirection block. */ - // for(int j = 0; j < blockSize; j++) { - // DEBUG("page %d (%d)\n", j, j + newFirstPage); - // tmp2.page = j + newFirstPage; - /** @todo If we were a little smarter about this, and fixed.c - could handle uninitialized blocks correctly, then we - wouldn't have to iterate over the datapages in - TarrayListExtend() (Fixed?)*/ - // T update(xid, tmp2, NULL, OPERATION_INITIALIZE_FIXED_PAGE); - // } - - tmp.slot = i + FIRST_DATA_PAGE_OFFSET; - /** @todo what does this do to recovery?? */ - /** @todo locking for arrayList... */ - /* *page_type_ptr(p) = FIXED_PAGE; - Tset(xid, tmp, &newFirstPage); - *page_type_ptr(p) = ARRAY_LIST_PAGE; */ - /* @todo This is a copy of Tupdate!! Replace it.*/ - begin_action_ret(releasePage, p, compensation_error()) { - alTupdate(xid, tmp, &newFirstPage, OPERATION_SET); - } end_action_ret(compensation_error()); - - DEBUG("Tset: {%d, %d, %d} = %d\n", tmp.page, tmp.slot, tmp.size, newFirstPage); - } - - tmp.slot = MAX_OFFSET_POSITION; - - int newMaxOffset = tlp.maxOffset+slots; - // *page_type_ptr(p) = FIXED_PAGE; - // Tset(xid, tmp, &newMaxOffset); - // *page_type_ptr(p) = ARRAY_LIST_PAGE; - // releasePage(p); - /* @todo This is a copy of Tupdate!! Replace it.*/ - begin_action_ret(releasePage, p, compensation_error()) { - alTupdate(xid, tmp, &newMaxOffset, OPERATION_SET); - } compensate_ret(compensation_error()); - - return 0; - -} -/** @todo: TarrayListInstantExtend, is a hacked-up cut and paste version of TarrayListExtend */ -compensated_function int TarrayListInstantExtend(int xid, recordid rid, int slots) { - Page * p = loadPage(xid, rid.page); - TarrayListParameters tlp = pageToTLP(p); - - int lastCurrentBlock; - if(tlp.maxOffset == -1) { - lastCurrentBlock = -1; - } else{ - lastCurrentBlock = getBlockContainingOffset(tlp, tlp.maxOffset, NULL); - } - int lastNewBlock = getBlockContainingOffset(tlp, tlp.maxOffset+slots, NULL); - - DEBUG("lastCurrentBlock = %d, lastNewBlock = %d\n", lastCurrentBlock, lastNewBlock); - - recordid tmp; /* recordid of slot in base page that holds new block. */ - tmp.page = rid.page; - tmp.size = sizeof(int); - - recordid tmp2; /* recordid of newly created pages. */ - tmp2.slot = 0; - tmp2.size = tlp.size; - /* Iterate over the (small number) of indirection blocks that need to be updated */ - for(int i = lastCurrentBlock+1; i <= lastNewBlock; i++) { - /* Alloc block i */ - int blockSize = tlp.initialSize * powl(tlp.multiplier, i); - int newFirstPage = TpageAllocMany(xid, blockSize); - DEBUG("block %d\n", i); - /* Iterate over the storage blocks that are pointed to by our current indirection block. */ - /* for(int j = 0; j < blockSize; j++) { - DEBUG("page %d (%d)\n", j, j + newFirstPage); - tmp2.page = j + newFirstPage; - / ** @todo If we were a little smarter about this, and fixed.c - coulds handle uninitialized blocks correctly, then we - wouldn't have to iterate over the datapages in - TarrayListExtend() * / - // Tupdate(xid, tmp2, NULL, OPERATION_INITIALIZE_FIXED_PAGE); - } */ - - tmp.slot = i + FIRST_DATA_PAGE_OFFSET; - /** @todo what does this do to recovery?? */ - /** @todo locking for arrayList... */ - *page_type_ptr(p) = FIXED_PAGE; - TinstantSet(xid, tmp, &newFirstPage); - *page_type_ptr(p) = ARRAY_LIST_PAGE; - - DEBUG("Tset: {%d, %d, %d} = %d\n", tmp.page, tmp.slot, tmp.size, newFirstPage); - } - - tmp.slot = MAX_OFFSET_POSITION; - - int newMaxOffset = tlp.maxOffset+slots; - /** @todo CORRECTNESS BUG: From recovery's point of view, arrayList is totally wrong! The - only reason we mess with p is beacuse TinstantSet doesn't handle - ARRAY_LIST_PAGES the way we need it to, so this won't be hard to - fix... */ - *page_type_ptr(p) = FIXED_PAGE; - TinstantSet(xid, tmp, &newMaxOffset); - *page_type_ptr(p) = ARRAY_LIST_PAGE; releasePage(p); + p = NULL; + int lastCurrentBlock; + if(tlp.maxOffset == -1) { + lastCurrentBlock = -1; + } else{ + lastCurrentBlock = getBlockContainingOffset(tlp, tlp.maxOffset, NULL); + } + int lastNewBlock = getBlockContainingOffset(tlp, tlp.maxOffset+slots, NULL); + + DEBUG("lastCurrentBlock = %d, lastNewBlock = %d\n", lastCurrentBlock, lastNewBlock); + + recordid tmp; /* recordid of slot in base page that holds new block. */ + tmp.page = rid.page; + tmp.size = sizeof(int); + + recordid tmp2; /* recordid of newly created pages. */ + tmp2.slot = 0; + tmp2.size = tlp.size; + /* Iterate over the (small number) of indirection blocks that need to be updated */ + try_ret(compensation_error()) { + for(int i = lastCurrentBlock+1; i <= lastNewBlock; i++) { + /* Alloc block i */ + int blockSize = tlp.initialSize * powl(tlp.multiplier, i); + int newFirstPage = TpageAllocMany(xid, blockSize); + DEBUG("block %d\n", i); + /* We used to call OPERATION_INITIALIZE_FIXED_PAGE on each page in current indirection block. */ + tmp.slot = i + FIRST_DATA_PAGE_OFFSET; + alTupdate(xid, tmp, &newFirstPage, op); + DEBUG("Tset: {%d, %d, %d} = %d\n", tmp.page, tmp.slot, tmp.size, newFirstPage); + } + + tmp.slot = MAX_OFFSET_POSITION; + + int newMaxOffset = tlp.maxOffset+slots; + alTupdate(xid, tmp, &newMaxOffset, op); + } end_ret(compensation_error()); return 0; } +compensated_function int TarrayListInstantExtend(int xid, recordid rid, int slots) { + return TarrayListExtendInternal(xid, rid, slots, OPERATION_INSTANT_SET); +} +compensated_function int TarrayListExtend(int xid, recordid rid, int slots) { + return TarrayListExtendInternal(xid, rid, slots, OPERATION_SET); +} static int operateInitFixed(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) { fixedPageInitialize(p, rid.size, recordsPerPage(rid.size)); - pageWriteLSN(xid, p, lsn); return 0; } diff --git a/src/lladd/operations/naiveLinearHash.c b/src/lladd/operations/naiveLinearHash.c index 4b50765..1f51830 100644 --- a/src/lladd/operations/naiveLinearHash.c +++ b/src/lladd/operations/naiveLinearHash.c @@ -70,27 +70,31 @@ int findInBucket(int xid, recordid hashRid, int bucket_number, const void * key, int findInBucket(int xid, recordid hashRid, int bucket_number, const void * key, int keySize, void * val, int valSize) { - - hashEntry * e = malloc(sizeof(hashEntry) + keySize + valSize); - - recordid nextEntry; - - hashRid.slot = bucket_number; - nextEntry = hashRid; - - int found = 0; - - while(nextEntry.size != -1 && nextEntry.size != 0) { - assert(nextEntry.size == sizeof(hashEntry) + keySize + valSize); - Tread(xid, nextEntry, e); - if(!memcmp(key, e+1, keySize) && e->next.size != 0) { - memcpy(val, ((byte*)(e+1))+keySize, valSize); - found = 1; - break; - } - nextEntry = e->next; - } - free(e); + int found; + try_ret(compensation_error()) { + hashEntry * e = malloc(sizeof(hashEntry) + keySize + valSize); + + recordid nextEntry; + + hashRid.slot = bucket_number; + nextEntry = hashRid; + + found = 0; + + while(nextEntry.size != -1 && nextEntry.size != 0) { + if(compensation_error()) { break; } + assert(nextEntry.size == sizeof(hashEntry) + keySize + valSize); + Tread(xid, nextEntry, e); + if(!memcmp(key, e+1, keySize) && e->next.size != 0) { + memcpy(val, ((byte*)(e+1))+keySize, valSize); + found = 1; + break; + } + nextEntry = e->next; + } + free(e); + } end_ret(compensation_error()); + return found; } @@ -102,177 +106,183 @@ void expand (int xid, recordid hash, int next_split, int i, int keySize, int val #define AMORTIZE 1000 #define FF_AM 750 if(count <= 0 && !(count * -1) % FF_AM) { - recordid * headerRidB = pblHtLookup(openHashes, &(hash.page), sizeof(int)); - int j; - TarrayListExtend(xid, hash, AMORTIZE); - for(j = 0; j < AMORTIZE; j++) { - - if(next_split >= twoToThe(i-1)+2) { - i++; - next_split = 2; - } - rehash(xid, hash, next_split, i, keySize, valSize); - next_split++; - headerNextSplit = next_split; - headerHashBits = i; - } - update_hash_header(xid, hash, i, next_split); + try { + recordid * headerRidB = pblHtLookup(openHashes, &(hash.page), sizeof(int)); + int j; + TarrayListExtend(xid, hash, AMORTIZE); + for(j = 0; j < AMORTIZE; j++) { + if(compensation_error()) { break; } + if(next_split >= twoToThe(i-1)+2) { + i++; + next_split = 2; + } + rehash(xid, hash, next_split, i, keySize, valSize); + next_split++; + headerNextSplit = next_split; + headerHashBits = i; + } + update_hash_header(xid, hash, i, next_split); + } end; } } void update_hash_header(int xid, recordid hash, int i, int next_split) { - hashEntry * he = pblHtLookup(openHashes, &(hash.page), sizeof(int)); - assert(he); - recordid * headerRidB = &he->next; - - assert(headerRidB); - - headerHashBits = i; - headerNextSplit = next_split; - hash.slot = 1; - Tset(xid, hash, headerRidB); + try { + hashEntry * he = pblHtLookup(openHashes, &(hash.page), sizeof(int)); + assert(he); + recordid * headerRidB = &he->next; + + assert(headerRidB); + + headerHashBits = i; + headerNextSplit = next_split; + hash.slot = 1; + + Tset(xid, hash, headerRidB); + } end; } void rehash(int xid, recordid hashRid, int next_split, int i, int keySize, int valSize) { - int firstA = 1; // Is 'A' the recordid of a bucket? - int firstD = 1; // What about 'D'? - - assert(hashRid.size == sizeof(hashEntry) + keySize + valSize); - recordid ba = hashRid; ba.slot = next_split; - recordid bb = hashRid; bb.slot = next_split + twoToThe(i-1); - - hashEntry * D_contents = calloc(1,sizeof(hashEntry) + keySize + valSize); - hashEntry * A_contents = calloc(1,sizeof(hashEntry) + keySize + valSize); - hashEntry * B_contents = calloc(1,sizeof(hashEntry) + keySize + valSize); - - Tread(xid, ba, A_contents); - Tread(xid, bb, D_contents); - recordid A = ba; //ba_contents; - recordid D = bb; //bb_contents; - recordid B = A_contents->next; - recordid C; - - if(!A_contents->next.size) { - /* Bucket A is empty, so we're done. */ - free(D_contents); - free(A_contents); - free(B_contents); - /* printf("Expand was a noop.\n"); - fflush(NULL); */ - return; - } - - int old_hash; - int new_hash = hash(A_contents+1, keySize, i, ULONG_MAX) + 2; - - while(new_hash != next_split) { - // Need a record in A that belongs in the first bucket... + try { + int firstA = 1; // Is 'A' the recordid of a bucket? + int firstD = 1; // What about 'D'? - recordid oldANext = A_contents->next; - - A_contents->next = NULLRID; - - if(firstD) { - // assert(memcmp(&A_contents->next, &D_contents->next, sizeof(recordid))); - Tset(xid, D, A_contents); - firstD = 0; - } else { - /* D at end of list => can overwrite next. */ - D_contents->next = Talloc(xid, sizeof(hashEntry) + keySize + valSize); /* @todo - unfortunate - to - dealloc - A's - successor, - then - alloc.. */ - // assert(memcmp(&A_contents->next, &D_contents->next, sizeof(recordid))); - Tset(xid, D_contents->next, A_contents); - // assert(memcmp(&D, &D_contents->next, sizeof(recordid))); - Tset(xid, D, D_contents); - D = A; - } - hashEntry * swap = D_contents; - D_contents = A_contents; - A_contents = swap; - - /* A_contents is now garbage. */ - - assert(A.size == sizeof(hashEntry) + keySize + valSize); - if(oldANext.size == -1) { - memset(A_contents, 0, sizeof(hashEntry) + keySize + valSize); - // assert(memcmp(&A_contents->next, &A, sizeof(recordid))); - Tset(xid, A, A_contents); + assert(hashRid.size == sizeof(hashEntry) + keySize + valSize); + recordid ba = hashRid; ba.slot = next_split; + recordid bb = hashRid; bb.slot = next_split + twoToThe(i-1); + + hashEntry * D_contents = calloc(1,sizeof(hashEntry) + keySize + valSize); + hashEntry * A_contents = calloc(1,sizeof(hashEntry) + keySize + valSize); + hashEntry * B_contents = calloc(1,sizeof(hashEntry) + keySize + valSize); + + Tread(xid, ba, A_contents); + Tread(xid, bb, D_contents); + recordid A = ba; //ba_contents; + recordid D = bb; //bb_contents; + recordid B = A_contents->next; + recordid C; + + if(!A_contents->next.size) { + /* Bucket A is empty, so we're done. */ free(D_contents); free(A_contents); free(B_contents); - /* printf("Loop 1 returning.\n"); - fflush(NULL); */ - return; - } - assert(oldANext.size == sizeof(hashEntry) + keySize + valSize); - Tread(xid, oldANext, A_contents); - // assert(memcmp(&A_contents->next, &A, sizeof(recordid))); - Tset(xid, A, A_contents); - Tdealloc(xid, oldANext); + /* printf("Expand was a noop.\n"); + fflush(NULL); */ + return; + } - new_hash = hash(A_contents+1, keySize, i, ULONG_MAX) + 2; - } - /* printf("Got past loop 1\n"); - fflush(NULL); */ + int old_hash; + int new_hash = hash(A_contents+1, keySize, i, ULONG_MAX) + 2; - B = A_contents->next; - - while(B.size != -1) { - assert(B.size == sizeof(hashEntry) + keySize + valSize); - Tread(xid, B, B_contents); - C = B_contents->next; - - old_hash = hash(B_contents+1, keySize, i-1, ULONG_MAX) + 2; - new_hash = hash(B_contents+1, keySize, i, ULONG_MAX) + 2; - - assert(next_split == old_hash); - assert(new_hash == old_hash || new_hash == old_hash + twoToThe(i-1)); - - if(new_hash == old_hash) { - A = B; - B = C; - C.size = -1; - firstA = 0; - } else { - assert(D.size == sizeof(hashEntry) + keySize + valSize); - assert(B.size == -1 || B.size == sizeof(hashEntry) + keySize + valSize); - Tread(xid, D, D_contents); - D_contents->next = B; - assert(B.size != 0); - Tset(xid, D, D_contents); - - // A is somewhere in the first list. - assert(A.size == sizeof(hashEntry) + keySize + valSize); - assert(C.size == -1 || C.size == sizeof(hashEntry) + keySize + valSize); - Tread(xid, A, A_contents); - A_contents->next = C; - assert(C.size != 0); - - Tset(xid, A, A_contents); - - // B _can't_ be a bucket. + while(new_hash != next_split) { + // Need a record in A that belongs in the first bucket... + + recordid oldANext = A_contents->next; + + A_contents->next = NULLRID; + + if(firstD) { + // assert(memcmp(&A_contents->next, &D_contents->next, sizeof(recordid))); + Tset(xid, D, A_contents); + firstD = 0; + } else { + /* D at end of list => can overwrite next. */ + D_contents->next = Talloc(xid, sizeof(hashEntry) + keySize + valSize); /* @todo + unfortunate + to + dealloc + A's + successor, + then + alloc.. */ + // assert(memcmp(&A_contents->next, &D_contents->next, sizeof(recordid))); + Tset(xid, D_contents->next, A_contents); + // assert(memcmp(&D, &D_contents->next, sizeof(recordid))); + Tset(xid, D, D_contents); + D = A; + } + hashEntry * swap = D_contents; + D_contents = A_contents; + A_contents = swap; + + /* A_contents is now garbage. */ + + assert(A.size == sizeof(hashEntry) + keySize + valSize); + if(oldANext.size == -1) { + memset(A_contents, 0, sizeof(hashEntry) + keySize + valSize); + // assert(memcmp(&A_contents->next, &A, sizeof(recordid))); + Tset(xid, A, A_contents); + free(D_contents); + free(A_contents); + free(B_contents); + /* printf("Loop 1 returning.\n"); + fflush(NULL); */ + return; + } + assert(oldANext.size == sizeof(hashEntry) + keySize + valSize); + Tread(xid, oldANext, A_contents); + // assert(memcmp(&A_contents->next, &A, sizeof(recordid))); + Tset(xid, A, A_contents); + Tdealloc(xid, oldANext); + + new_hash = hash(A_contents+1, keySize, i, ULONG_MAX) + 2; + } + /* printf("Got past loop 1\n"); + fflush(NULL); */ + + B = A_contents->next; + + while(B.size != -1) { assert(B.size == sizeof(hashEntry) + keySize + valSize); Tread(xid, B, B_contents); - B_contents->next = NULLRID; - Tset(xid, B, B_contents); - - // Update Loop State - D = B; - B = C; - C.size = -1; - firstD = 0; + C = B_contents->next; + + old_hash = hash(B_contents+1, keySize, i-1, ULONG_MAX) + 2; + new_hash = hash(B_contents+1, keySize, i, ULONG_MAX) + 2; + + assert(next_split == old_hash); + assert(new_hash == old_hash || new_hash == old_hash + twoToThe(i-1)); + + if(new_hash == old_hash) { + A = B; + B = C; + C.size = -1; + firstA = 0; + } else { + assert(D.size == sizeof(hashEntry) + keySize + valSize); + assert(B.size == -1 || B.size == sizeof(hashEntry) + keySize + valSize); + Tread(xid, D, D_contents); + D_contents->next = B; + assert(B.size != 0); + Tset(xid, D, D_contents); + + // A is somewhere in the first list. + assert(A.size == sizeof(hashEntry) + keySize + valSize); + assert(C.size == -1 || C.size == sizeof(hashEntry) + keySize + valSize); + Tread(xid, A, A_contents); + A_contents->next = C; + assert(C.size != 0); + + Tset(xid, A, A_contents); + + // B _can't_ be a bucket. + assert(B.size == sizeof(hashEntry) + keySize + valSize); + Tread(xid, B, B_contents); + B_contents->next = NULLRID; + Tset(xid, B, B_contents); + + // Update Loop State + D = B; + B = C; + C.size = -1; + firstD = 0; + } } - } - free(D_contents); - free(A_contents); - free(B_contents); - + free(D_contents); + free(A_contents); + free(B_contents); + } end; } void insertIntoBucket(int xid, recordid hashRid, int bucket_number, hashEntry * bucket_contents, hashEntry * e, int keySize, int valSize, int skipDelete) { @@ -344,6 +354,7 @@ int deleteFromBucket(int xid, recordid hash, int bucket_number, hashEntry * buck memcpy(B, bucket_contents, sizeof(hashEntry) + keySize + valSize); Baddr = this; while(B->next.size != -1) { + if(compensation_error()) { break; } // guard the asserts below. hashEntry * tmp = A; A = B; Aaddr = Baddr; diff --git a/src/lladd/operations/pageOperations.c b/src/lladd/operations/pageOperations.c index 3174428..e4e2271 100644 --- a/src/lladd/operations/pageOperations.c +++ b/src/lladd/operations/pageOperations.c @@ -191,7 +191,7 @@ compensated_function void pageOperationsInit() { commit / abort. If other transactions need to allocate when the lock is held, then they simply do not reuse pages. Since locking is not yet implemented, we require applications to manually serialize - transactions that call Talloc() or TdeAlloc + transactions that call Talloc or Tdealloc A better solution: defer the addition of 100 to the freelist until commit, and use a 'real' data structure, like a concurrent B-Tree. diff --git a/src/lladd/operations/pageOrientedListNTA.c b/src/lladd/operations/pageOrientedListNTA.c index 4bfabf9..d9cdb10 100644 --- a/src/lladd/operations/pageOrientedListNTA.c +++ b/src/lladd/operations/pageOrientedListNTA.c @@ -10,184 +10,196 @@ typedef struct { short keySize; } pagedListEntry; -recordid TpagedListAlloc(int xid) { - - recordid ret = Talloc(xid, sizeof(pagedListHeader)); - pagedListHeader header; - header.thisPage = 0; - header.nextPage = NULLRID; - Tset(xid, ret, &header); +compensated_function recordid TpagedListAlloc(int xid) { + recordid ret; + try_ret(NULLRID) { + ret = Talloc(xid, sizeof(pagedListHeader)); + pagedListHeader header; + header.thisPage = 0; + header.nextPage = NULLRID; + Tset(xid, ret, &header); + } end_ret(NULLRID); return ret; } compensated_function int TpagedListInsert(int xid, recordid list, const byte * key, int keySize, const byte * value, int valueSize) { - pagedListHeader header; - Tread(xid, list, &header); - recordid headerRid = list; + int ret; + try_ret(compensation_error()) { + pagedListHeader header; + Tread(xid, list, &header); + recordid headerRid = list; - byte * garbage; - int ret = (TpagedListFind(xid, list, key, keySize, &garbage) != -1); - if(ret) { - TpagedListRemove(xid, list, key, keySize); - free(garbage); - } - int entrySize = sizeof(pagedListEntry) + keySize + valueSize; - - recordid rid = TallocFromPage(xid, headerRid.page, entrySize); - DEBUG("Alloced rid: {%d %d %d}", rid.page, rid.slot, rid.size); - - // When the loop completes, header will contain the contents of the page header the entry will be inserted into, - // headerrid will contain the rid of that header, and rid will contain the newly allocated recordid - while(rid.size == -1) { - if(header.nextPage.size == -1) { - header.nextPage = Talloc(xid, sizeof(pagedListHeader)); - DEBUG("allocing on new page %d\n", header.nextPage.page); - Tset(xid, headerRid, &header); - pagedListHeader newHead; - newHead.thisPage = 0; - newHead.nextPage.page =0; - newHead.nextPage.slot =0; - newHead.nextPage.size =-1; - Tset(xid, header.nextPage, &newHead); + byte * garbage; + ret = (TpagedListFind(xid, list, key, keySize, &garbage) != -1); + if(ret) { + free(garbage); + TpagedListRemove(xid, list, key, keySize); } - - headerRid = header.nextPage; - Tread(xid, header.nextPage, &header); - rid = TallocFromPage(xid, headerRid.page, entrySize); + int entrySize = sizeof(pagedListEntry) + keySize + valueSize; + + recordid rid = TallocFromPage(xid, headerRid.page, entrySize); DEBUG("Alloced rid: {%d %d %d}", rid.page, rid.slot, rid.size); - } - - pagedListEntry * dat = malloc(entrySize); - - dat->keySize = keySize; - dat->nextEntry = header.thisPage; - memcpy(dat+1, key, keySize); - memcpy(((byte*)(dat+1))+keySize, value, valueSize); - Tset(xid, rid, dat); - - header.thisPage = rid.slot; - DEBUG("Header.thisPage = %d\n", rid.slot); - Tset(xid, headerRid, &header); - free(dat); - + + // When the loop completes, header will contain the contents of the page header the entry will be inserted into, + // headerrid will contain the rid of that header, and rid will contain the newly allocated recordid + while(rid.size == -1) { + if(compensation_error()) { break; } + if(header.nextPage.size == -1) { + header.nextPage = Talloc(xid, sizeof(pagedListHeader)); + DEBUG("allocing on new page %d\n", header.nextPage.page); + Tset(xid, headerRid, &header); + pagedListHeader newHead; + newHead.thisPage = 0; + newHead.nextPage.page =0; + newHead.nextPage.slot =0; + newHead.nextPage.size =-1; + Tset(xid, header.nextPage, &newHead); + } + + headerRid = header.nextPage; + Tread(xid, header.nextPage, &header); + rid = TallocFromPage(xid, headerRid.page, entrySize); + DEBUG("Alloced rid: {%d %d %d}", rid.page, rid.slot, rid.size); + } + + pagedListEntry * dat = malloc(entrySize); + + dat->keySize = keySize; + dat->nextEntry = header.thisPage; + memcpy(dat+1, key, keySize); + memcpy(((byte*)(dat+1))+keySize, value, valueSize); + Tset(xid, rid, dat); + + header.thisPage = rid.slot; + DEBUG("Header.thisPage = %d\n", rid.slot); + Tset(xid, headerRid, &header); + free(dat); + } end_ret(compensation_error()); return ret; } compensated_function int TpagedListFind(int xid, recordid list, const byte * key, int keySize, byte ** value) { - pagedListHeader header; - Tread(xid, list, &header); - - recordid rid; - rid.page = list.page; - rid.slot = header.thisPage; - - while(rid.slot || header.nextPage.size != -1) { - - if(rid.slot) { - rid.size = TrecordSize(xid, rid); - pagedListEntry * dat; - dat = malloc(rid.size); - Tread(xid, rid, dat); - - if(!memcmp(dat+1, key, keySize)) { - int valueSize = rid.size - keySize - sizeof(pagedListEntry); - *value = malloc(valueSize); - memcpy(*value, ((byte*)(dat+1))+keySize, valueSize); - free(dat); - return valueSize; - } - // if(dat->nextEntry) { // another entry on this page - rid.slot = dat->nextEntry; - free(dat); // } - } else if (header.nextPage.size != -1) { // another page - rid.page = header.nextPage.page; - Tread(xid, header.nextPage, &header); - rid.slot = header.thisPage; - } else { // we've reached the end of the last page - rid.slot = 0; - } + try_ret(compensation_error()) { + pagedListHeader header; + Tread(xid, list, &header); + recordid rid; + rid.page = list.page; + rid.slot = header.thisPage; - } - + while(rid.slot || header.nextPage.size != -1) { + if(compensation_error()) { break; } + + if(rid.slot) { + rid.size = TrecordSize(xid, rid); + pagedListEntry * dat; + if(compensation_error()) { break; } + dat = malloc(rid.size); + Tread(xid, rid, dat); + + if(!memcmp(dat+1, key, keySize)) { + int valueSize = rid.size - keySize - sizeof(pagedListEntry); + *value = malloc(valueSize); + memcpy(*value, ((byte*)(dat+1))+keySize, valueSize); + free(dat); + return valueSize; + } + // if(dat->nextEntry) { // another entry on this page + rid.slot = dat->nextEntry; + free(dat); // } + } else if (header.nextPage.size != -1) { // another page + rid.page = header.nextPage.page; + Tread(xid, header.nextPage, &header); + rid.slot = header.thisPage; + } else { // we've reached the end of the last page + rid.slot = 0; + } + } + } end_ret(compensation_error()); return -1; } compensated_function int TpagedListRemove(int xid, recordid list, const byte * key, int keySize) { pagedListHeader header; - Tread(xid, list, &header); - recordid headerRid; - recordid rid; - rid.page = list.page; - rid.slot = header.thisPage; - short lastSlot = -1; - headerRid = list; - while(rid.slot || header.nextPage.size != -1) { - if(rid.slot) { - rid.size = TrecordSize(xid, rid); - pagedListEntry * dat = malloc(rid.size); - Tread(xid, rid, dat); - - if(!memcmp(dat+1, key, keySize)) { + int ret = 0; + try_ret(compensation_error()) { + + Tread(xid, list, &header); + recordid headerRid; + recordid rid; + rid.page = list.page; + rid.slot = header.thisPage; + short lastSlot = -1; + headerRid = list; + while(rid.slot || header.nextPage.size != -1) { + if(compensation_error()) { break; } + if(rid.slot) { + rid.size = TrecordSize(xid, rid); + if(compensation_error()) { break; }; + pagedListEntry * dat = malloc(rid.size); + Tread(xid, rid, dat); - if(lastSlot != -1) { - recordid lastRid = rid; - lastRid.slot = lastSlot; - lastRid.size = TrecordSize(xid, lastRid); - pagedListEntry * lastRidBuf = malloc(lastRid.size); - Tread(xid, lastRid, lastRidBuf); - lastRidBuf->nextEntry = dat->nextEntry; - Tset(xid, lastRid, lastRidBuf); - free(lastRidBuf); - } else { + if(!memcmp(dat+1, key, keySize)) { + + if(lastSlot != -1) { + recordid lastRid = rid; + lastRid.slot = lastSlot; + lastRid.size = TrecordSize(xid, lastRid); + if(compensation_error()) { free(dat); break; } + pagedListEntry * lastRidBuf = malloc(lastRid.size); + Tread(xid, lastRid, lastRidBuf); + lastRidBuf->nextEntry = dat->nextEntry; + Tset(xid, lastRid, lastRidBuf); + free(lastRidBuf); + } else { header.thisPage = dat->nextEntry; Tset(xid, headerRid, &header); + } + Tdealloc(xid, rid); + free(dat); + ret = 1; + break; } - Tdealloc(xid, rid); + lastSlot = rid.slot; + rid.slot = dat->nextEntry; free(dat); - return 1; + } else if (header.nextPage.size != -1) { // another page + lastSlot = -1; + rid.page = header.nextPage.page; + headerRid = header.nextPage; + Tread(xid, header.nextPage, &header); + rid.slot = header.thisPage; } - lastSlot = rid.slot; - rid.slot = dat->nextEntry; - // } - // if(dat->nextEntry) { // another entry on this page - // lastSlot = rid.slot; - // rid.slot = dat->nextEntry; - free(dat); - } else if (header.nextPage.size != -1) { // another page - lastSlot = -1; - rid.page = header.nextPage.page; - headerRid = header.nextPage; - Tread(xid, header.nextPage, &header); - rid.slot = header.thisPage; - // } else { // we've reached the end of the last page - // rid.slot = 0; } - - // free(dat); - } - - return 0; + } end_ret(compensation_error()); + return ret; } compensated_function int TpagedListMove(int xid, recordid start_list, recordid end_list, const byte * key, int keySize) { - byte * value; - int valueSize = TpagedListFind(xid, start_list, key, keySize, &value); - if(valueSize != -1) { - int ret = TpagedListRemove(xid, start_list, key, keySize); - assert(ret); - ret = TpagedListInsert(xid, end_list, key, keySize, value, valueSize); - assert(!ret); - free(value); - return 1; - } else { - return 0; - } + byte * value = NULL; + int ret; + try_ret(compensation_error()) { + int valueSize = TpagedListFind(xid, start_list, key, keySize, &value); + if(valueSize != -1) { + int ret = TpagedListRemove(xid, start_list, key, keySize); + assert(ret); + ret = TpagedListInsert(xid, end_list, key, keySize, value, valueSize); + assert(!ret); + if(value) { free(value); } + ret = 1; + } else { + ret = 0; + } + } end_ret(compensation_error()); + return ret; } -lladd_pagedList_iterator * TpagedListIterator(int xid, recordid list) { +compensated_function lladd_pagedList_iterator * TpagedListIterator(int xid, recordid list) { pagedListHeader header; - Tread(xid, list, &header); + try_ret(NULL) { + Tread(xid, list, &header); + } end_ret(NULL); + lladd_pagedList_iterator * it = malloc(sizeof(lladd_pagedList_iterator)); it->headerRid = header.nextPage; @@ -198,17 +210,20 @@ lladd_pagedList_iterator * TpagedListIterator(int xid, recordid list) { return it; } -int TpagedListNext(int xid, lladd_pagedList_iterator * it, +compensated_function int TpagedListNext(int xid, lladd_pagedList_iterator * it, byte ** key, int * keySize, byte ** value, int * valueSize) { while(it->entryRid.slot || it->headerRid.size != -1) { if(it->entryRid.slot) { - it->entryRid.size = TrecordSize(xid, it->entryRid); + try_ret(compensation_error()) { + it->entryRid.size = TrecordSize(xid, it->entryRid); + } end_ret(compensation_error()); assert(it->entryRid.size != -1); pagedListEntry * entry = malloc(it->entryRid.size); - - Tread(xid, it->entryRid, entry); + begin_action_ret(free, entry, compensation_error()) { + Tread(xid, it->entryRid, entry); + } end_action_ret(compensation_error()); *keySize = entry->keySize; *valueSize = it->entryRid.size - *keySize - sizeof(pagedListEntry); @@ -227,7 +242,9 @@ int TpagedListNext(int xid, lladd_pagedList_iterator * it, } else { // move to next page. pagedListHeader header; - Tread(xid, it->headerRid, &header); + try_ret(compensation_error()) { + Tread(xid, it->headerRid, &header); + } end_ret(compensation_error()); it->entryRid.page = it->headerRid.page; it->headerRid = header.nextPage; it->entryRid.slot = header.thisPage; diff --git a/src/lladd/page/slotted.c b/src/lladd/page/slotted.c index 6c82747..664cd5a 100644 --- a/src/lladd/page/slotted.c +++ b/src/lladd/page/slotted.c @@ -216,7 +216,6 @@ compensated_function recordid slottedPreRalloc(int xid, long size, Page ** pp) { } compensated_function recordid slottedPreRallocFromPage(int xid, long page, long size, Page **pp) { - recordid ret; int isBlob = 0; if(size == BLOB_SLOT) { isBlob = 1; @@ -235,7 +234,8 @@ compensated_function recordid slottedPreRallocFromPage(int xid, long page, long slottedPageInitialize(*pp); } assert(*page_type_ptr(*pp) == SLOTTED_PAGE); - ret = slottedRawRalloc(*pp, size); + recordid ret = slottedRawRalloc(*pp, size); + assert(ret.size == size); if(isBlob) { *slot_length_ptr(*pp, ret.slot) = BLOB_SLOT; } diff --git a/src/lladd/transactional2.c b/src/lladd/transactional2.c index 0567b25..0e4b2e7 100644 --- a/src/lladd/transactional2.c +++ b/src/lladd/transactional2.c @@ -92,17 +92,19 @@ int Tinit() { openLogWriter(); - pageOperationsInit(); + try_ret(compensation_error()) { + pageOperationsInit(); + } end_ret(compensation_error()); initNestedTopActions(); ThashInit(); compensations_init(); - + setupLockManagerCallbacksNil(); //setupLockManagerCallbacksPage(); - + InitiateRecovery(); - + return 0; } @@ -144,20 +146,43 @@ int Tbegin() { return XactionTable[index].xid; } -compensated_function void Tupdate(int xid, recordid rid, const void *dat, int op) { +static compensated_function void TupdateHelper(int xid, recordid rid, const void * dat, int op, Page * p) { LogEntry * e; + + try { + if(globalLockManager.writeLockPage) { + globalLockManager.writeLockPage(xid, rid.page); + } + } end; + + + e = LogUpdate(&XactionTable[xid % MAX_TRANSACTIONS], p, rid, op, dat); + + assert(XactionTable[xid % MAX_TRANSACTIONS].prevLSN == e->LSN); + + DEBUG("T update() e->LSN: %ld\n", e->LSN); + + doUpdate(e, p); + + free(e); +} + +compensated_function void Tupdate(int xid, recordid rid, const void *dat, int op) { Page * p; #ifdef DEBUGGING pthread_mutex_lock(&transactional_2_mutex); assert(numActiveXactions <= MAX_TRANSACTIONS); pthread_mutex_unlock(&transactional_2_mutex); #endif - p = loadPage(xid, rid.page); - + try { + p = loadPage(xid, rid.page); + } end; if(*page_type_ptr(p) == INDIRECT_PAGE) { releasePage(p); - rid = dereferenceRID(xid, rid); - p = loadPage(xid, rid.page); + try { + rid = dereferenceRID(xid, rid); + p = loadPage(xid, rid.page); + } end; /** @todo Kludge! Shouldn't special case operations in transactional2. */ } else if(*page_type_ptr(p) == ARRAY_LIST_PAGE && op != OPERATION_LINEAR_INSERT && @@ -166,85 +191,66 @@ compensated_function void Tupdate(int xid, recordid rid, const void *dat, int op op != OPERATION_UNDO_LINEAR_DELETE ) { rid = dereferenceArrayListRid(p, rid.slot); releasePage(p); - p = loadPage(xid, rid.page); + try { + p = loadPage(xid, rid.page); + } end; } /** @todo For logical undo logs, grabbing a lock makes no sense! */ - try { - if(globalLockManager.writeLockPage) { + begin_action(releasePage, p) { + TupdateHelper(xid, rid, dat, op, p); + /* if(globalLockManager.writeLockPage) { globalLockManager.writeLockPage(xid, rid.page); - } - } end; - - e = LogUpdate(&XactionTable[xid % MAX_TRANSACTIONS], p, rid, op, dat); - - assert(XactionTable[xid % MAX_TRANSACTIONS].prevLSN == e->LSN); - - DEBUG("Tupdate() e->LSN: %ld\n", e->LSN); - - doUpdate(e, p); - releasePage(p); - - free(e); + } + + e = LogUpdate(&XactionTable[xid % MAX_TRANSACTIONS], p, rid, op, dat); + + } en d_action; + + assert(XactionTable[xid % MAX_TRANSACTIONS].prevLSN == e->LSN); + + DEBUG("Tupdate() e->LSN: %ld\n", e->LSN); + + doUpdate(e, p); + releasePage(p);*/ + } compensate; } compensated_function void alTupdate(int xid, recordid rid, const void *dat, int op) { - LogEntry * e; - Page * p; - - try { - if(globalLockManager.writeLockPage) { - globalLockManager.writeLockPage(xid, rid.page); - } - } end; - + Page * p ; + try { p = loadPage(xid, rid.page); - + } end; + + begin_action(releasePage, p) { + TupdateHelper(xid, rid, dat, op, p); + } compensate; - - /* if(*page_type_ptr(p) == INDIRECT_PAGE) { - releasePage(p); - rid = dereferenceRID(rid); - p = loadPage(rid.page); - / ** @todo Kludge! Shouldn't special case operations in transactional2. * / - } else if(*page_type_ptr(p) == ARRAY_LIST_PAGE && - op != OPERATION_LINEAR_INSERT && - op != OPERATION_UNDO_LINEAR_INSERT && - op != OPERATION_LINEAR_DELETE && - op != OPERATION_UNDO_LINEAR_DELETE ) { - rid = dereferenceArrayListRid(p, rid.slot); - releasePage(p); - p = loadPage(rid.page); - } */ - - e = LogUpdate(&XactionTable[xid % MAX_TRANSACTIONS], p, rid, op, dat); - - assert(XactionTable[xid % MAX_TRANSACTIONS].prevLSN == e->LSN); - - DEBUG("Tupdate() e->LSN: %ld\n", e->LSN); - - doUpdate(e, p); - releasePage(p); - /* end Tupdate() */ - free(e); } void TreadUnlocked(int xid, recordid rid, void * dat) { - Page * p = loadPage(xid, rid.page); + Page * p; + try { + p = loadPage(xid, rid.page); + } end; int page_type = *page_type_ptr(p); if(page_type == SLOTTED_PAGE || page_type == FIXED_PAGE || !page_type ) { } else if(page_type == INDIRECT_PAGE) { releasePage(p); - rid = dereferenceRIDUnlocked(xid, rid); - p = loadPage(xid, rid.page); + try { + rid = dereferenceRIDUnlocked(xid, rid); + p = loadPage(xid, rid.page); + } end; } else if(page_type == ARRAY_LIST_PAGE) { rid = dereferenceArrayListRidUnlocked(p, rid.slot); releasePage(p); - p = loadPage(xid, rid.page); + try { + p = loadPage(xid, rid.page); + } end; } else { abort(); diff --git a/test/cht/client.c b/test/cht/client.c index 846dbae..a360ffe 100644 --- a/test/cht/client.c +++ b/test/cht/client.c @@ -19,17 +19,17 @@ int main(int argc, char ** argv) { // cht_evals go here... int xid = 1;//NULL_MACHINE; // What's the deal with this? ;) - clusterHashTable_t * new_ht; - cHtCreate(xid, cht_client, new_ht); + clusterHashTable_t new_ht; + cHtCreate(xid, cht_client, &new_ht); int i; for(i = 0; i < 10000; i++) { int one = i; int two = i+1; - cHtInsert(xid, cht_client, new_ht, &one, sizeof(int), &two, sizeof(int)); + cHtInsert(xid, cht_client, &new_ht, &one, sizeof(int), &two, sizeof(int)); int newOne, newTwo; newOne = i; newTwo = 0; unsigned int newLen = sizeof(int); - int ret = cHtLookup(xid, cht_client, new_ht, &newOne, sizeof(int), &newTwo, &newLen); + int ret = cHtLookup(xid, cht_client, &new_ht, &newOne, sizeof(int), &newTwo, &newLen); // xid++; //printf("lookup returned %d (%d->%d)\n", ret, newOne, newTwo); assert(ret); @@ -42,19 +42,19 @@ int main(int argc, char ** argv) { for(i = 0; i < 10000; i+=10) { int one = i; int two = -1; unsigned int size = sizeof(int); - int removed = cHtRemove(xid, cht_client, new_ht, &one, sizeof(int), &two, &size); + int removed = cHtRemove(xid, cht_client, &new_ht, &one, sizeof(int), &two, &size); assert(removed); size = sizeof(int); two = -1; - removed = cHtRemove(xid, cht_client, new_ht, &one, sizeof(int), &two, &size); + removed = cHtRemove(xid, cht_client, &new_ht, &one, sizeof(int), &two, &size); assert(!removed); int newOne, newTwo; newOne = i; newTwo = 0; unsigned int newLen = sizeof(int); - int ret = cHtLookup(xid, cht_client, new_ht, &newOne, sizeof(int), &newTwo, &newLen); + int ret = cHtLookup(xid, cht_client, &new_ht, &newOne, sizeof(int), &newTwo, &newLen); assert(!ret); @@ -68,7 +68,7 @@ int main(int argc, char ** argv) { newOne = i; newTwo = 0; unsigned int newLen = sizeof(int); - int ret = cHtLookup(xid, cht_client, new_ht, &newOne, sizeof(int), &newTwo, &newLen); + int ret = cHtLookup(xid, cht_client, &new_ht, &newOne, sizeof(int), &newTwo, &newLen); // xid++; //printf("lookup returned %d (%d->%d)\n", ret, newOne, newTwo); assert(ret);