diff --git a/benchmarks/distributedLsnFree.c b/benchmarks/distributedLsnFree.c index 4340c4f..97acb8d 100644 --- a/benchmarks/distributedLsnFree.c +++ b/benchmarks/distributedLsnFree.c @@ -124,7 +124,8 @@ int main(int argc, char ** argv) { for(long long i = 0; i < num_rids; i++) { Page *p = loadPage(-1, rids[i].page); writelock(p->rwlatch,0); - stasis_record_write(-1, p, last_lsn, rids[i], (byte*)&cache[i].val); + stasis_record_write(-1, p, rids[i], (byte*)&cache[i].val); + stasis_page_lsn_write(-1, p, last_lsn); unlock(p->rwlatch); releasePage(p); releasePage(p); diff --git a/benchmarks/writeBack.c b/benchmarks/writeBack.c index cc8055f..f9a7f9a 100644 --- a/benchmarks/writeBack.c +++ b/benchmarks/writeBack.c @@ -48,7 +48,8 @@ int main (int argc, char ** argv) { for(long long i = 0; i < num_rids; i++) { Page *p = loadPage(-1, rids[i].page); writelock(p->rwlatch,0); - stasis_record_write(-1, p, last_lsn, rids[i], (byte*)&cache[i].val); + stasis_record_write(-1, p, rids[i], (byte*)&cache[i].val); + stasis_page_lsn_write(-1, p, last_lsn); unlock(p->rwlatch); releasePage(p); releasePage(p); diff --git a/src/stasis/operations/alloc.c b/src/stasis/operations/alloc.c index b90d350..f1a4c3d 100644 --- a/src/stasis/operations/alloc.c +++ b/src/stasis/operations/alloc.c @@ -110,7 +110,7 @@ static int op_alloc(const LogEntry* e, Page* p) { if(e->update.arg_size == sizeof(alloc_arg) + size) { // if we're aborting a dealloc we better have a sane preimage to apply rid.size = size; - stasis_record_write(e->xid,p,e->LSN,rid,(const byte*)(arg+1)); + stasis_record_write(e->xid,p,rid,(const byte*)(arg+1)); rid.size = arg->type; } else { // otherwise, no preimage diff --git a/src/stasis/operations/arrayList.c b/src/stasis/operations/arrayList.c index 4a45fa9..e7ebeb4 100644 --- a/src/stasis/operations/arrayList.c +++ b/src/stasis/operations/arrayList.c @@ -1,213 +1,126 @@ - #include -#include -#include -#include #include -#include -#include #include -#define _XOPEN_SOURCE 600 #include +#define MAX_OFFSET_POSITION 3 +#define FIRST_DATA_PAGE_OFFSET 4 + typedef struct { pageid_t firstPage; pageid_t initialSize; pageid_t multiplier;//XXX rest are not page numbers or offsets, but must all be same length - int size; // *has* to be an int; passed into OPERATION_FIXED_PAGE_ALLOC + pageid_t size; // *has* to be smaller than a page; passed into TinitializeFixedPage() pageid_t maxOffset; -} TarrayListParameters; +} array_list_parameter_t; -static TarrayListParameters pageToTLP(int xid, Page * p); -static int getBlockContainingOffset(TarrayListParameters tlp, int offset, pageid_t * firstSlotInBlock); +static array_list_parameter_t array_list_read_parameter(int xid, Page * p) { -#define MAX_OFFSET_POSITION 3 -#define FIRST_DATA_PAGE_OFFSET 4 + array_list_parameter_t alp; + alp.firstPage = p->id; + /* tlp.maxOffset = *(int*)fixed_record_ptr(p, 3); */ + recordid rid = { p->id, 0, sizeof(pageid_t) }; + alp.initialSize = *(pageid_t*)stasis_record_read_begin(xid, p, rid); + rid.slot = 1; + alp.multiplier = *(pageid_t*)stasis_record_read_begin(xid, p, rid); + rid.slot = 2; + alp.size = *(pageid_t*)stasis_record_read_begin(xid, p, rid); + rid.slot = 3; + alp.maxOffset = *(pageid_t*)stasis_record_read_begin(xid, p, rid); -/*----------------------------------------------------------------------------*/ - -compensated_function recordid TarrayListAlloc(int xid, pageid_t count, int multiplier, int size) { - - pageid_t firstPage; - try_ret(NULLRID) { - firstPage = TpageAllocMany(xid, count+1); - } end_ret(NULLRID); - TarrayListParameters tlp; - - tlp.firstPage = firstPage; - tlp.initialSize = count; - tlp.multiplier = multiplier; - tlp.size = size; - tlp.maxOffset = 0; - - recordid rid; - - rid.page = firstPage; - rid.size = size; - rid.slot = 0; - try_ret(NULLRID) { - Tupdate(xid, firstPage, &tlp, sizeof(tlp), OPERATION_ARRAY_LIST_HEADER_INIT); - } end_ret(NULLRID); - - return rid; + return alp; } +static int array_list_get_block_containing_offset(array_list_parameter_t alp, int offset, pageid_t * firstSlotInBlock) { + int rec_per_page = stasis_fixed_records_per_page((size_t)alp.size); + long thisHigh = rec_per_page * alp.initialSize; + int lastHigh = 0; + int pageRidSlot = 0; + int currentPageLength = alp.initialSize; -static int op_array_list_header_init(const LogEntry* e, Page* p) { + while(((pageid_t)offset) >= thisHigh) { + pageRidSlot ++; + lastHigh = thisHigh; + currentPageLength *= alp.multiplier; + thisHigh += rec_per_page * currentPageLength; + } + if(firstSlotInBlock) { + *firstSlotInBlock = lastHigh; + } + return pageRidSlot; +} - assert(e->update.arg_size == sizeof(TarrayListParameters)); +static int array_list_op_init_header(const LogEntry* e, Page* p) { - const TarrayListParameters * tlp - = (const TarrayListParameters*)getUpdateArgs(e); + assert(e->update.arg_size == sizeof(array_list_parameter_t)); - pageid_t firstPage = tlp->firstPage; - pageid_t count = tlp->initialSize; - pageid_t multiplier = tlp->multiplier; - int size = tlp->size; + const array_list_parameter_t * alp + = (const array_list_parameter_t*)getUpdateArgs(e); stasis_fixed_initialize_page(p, sizeof(pageid_t), stasis_fixed_records_per_page(sizeof(pageid_t))); - recordid countRid, multiplierRid, slotSizeRid, maxOffset, firstDataPageRid; - countRid.page - = multiplierRid.page = slotSizeRid.page - = maxOffset.page = firstDataPageRid.page = p->id; + recordid initialSizeRid, multiplierRid, slotSizeRid, maxOffsetRid, firstDataPageRid; + initialSizeRid.page + = multiplierRid.page = slotSizeRid.page + = maxOffsetRid.page = firstDataPageRid.page = p->id; - countRid.size + initialSizeRid.size = multiplierRid.size = slotSizeRid.size - = maxOffset.size = firstDataPageRid.size = sizeof(pageid_t); + = maxOffsetRid.size = firstDataPageRid.size = sizeof(pageid_t); - countRid.slot = 0; + initialSizeRid.slot = 0; multiplierRid.slot = 1; slotSizeRid.slot = 2; - maxOffset.slot = 3; + maxOffsetRid.slot = 3; + // Note that firstDataPageRid is not part of the header page's header.. + // Instead, it is the value stored on the first slot of the header page. firstDataPageRid.slot = 4; - pageid_t firstDataPage = firstPage + 1; - (*(pageid_t*)stasis_record_write_begin(e->xid, p, countRid)) - = count; - (*(pageid_t*)stasis_record_write_begin(e->xid, p, multiplierRid)) - = multiplier; - (*(pageid_t*)stasis_record_write_begin(e->xid, p, firstDataPageRid)) - = firstDataPage; - (*(pageid_t*)stasis_record_write_begin(e->xid, p, slotSizeRid)) - = size; - (*(pageid_t*)stasis_record_write_begin(e->xid, p, maxOffset)) - = -1; + // Write header. + stasis_record_write(e->xid, p, initialSizeRid, (const byte*)&(alp->initialSize)); + stasis_record_write(e->xid, p, multiplierRid, (const byte*)&(alp->multiplier)); + stasis_record_write(e->xid, p, slotSizeRid, (const byte*)&(alp->size)); + stasis_record_write(e->xid, p, maxOffsetRid, (const byte*)&(alp->maxOffset)); + + // Write first slot. The page after this one stores data + pageid_t firstDataPage = alp->firstPage + 1; + stasis_record_write(e->xid, p, firstDataPageRid, (const byte*)&firstDataPage); + *stasis_page_type_ptr(p) = ARRAY_LIST_PAGE; - recordid ret; - ret.page = firstPage; - ret.slot = 0; /* slot = # of slots in array... */ - ret.size = size; - return 0; } +/*----------------------------------------------------------------------------*/ + stasis_operation_impl stasis_op_impl_array_list_header_init() { stasis_operation_impl o = { OPERATION_ARRAY_LIST_HEADER_INIT, OPERATION_ARRAY_LIST_HEADER_INIT, /* Do not need to roll back this page, since it will be deallocated. */ OPERATION_NOOP, - &op_array_list_header_init + &array_list_op_init_header }; return o; } -/*----------------------------------------------------------------------------*/ - -/** @todo locking for arrayList... this isn't pressing since currently - the only thing that calls arraylist (the hashtable - implementations) serialize bucket list operations anyway... - - @todo this function calls pow(), which is horribly inefficient. -*/ - -compensated_function int TarrayListExtend(int xid, recordid rid, int slots) { - Page * p; - try_ret(compensation_error()) { - p = loadPage(xid, rid.page); - } end_ret(compensation_error()); - readlock(p->rwlatch, 0); - TarrayListParameters tlp = pageToTLP(xid, p); - unlock(p->rwlatch);; - releasePage(p); - p = NULL; - - int lastCurrentBlock; // just a slot on a page - if(tlp.maxOffset == -1) { - lastCurrentBlock = -1; - } else{ - lastCurrentBlock = getBlockContainingOffset(tlp, tlp.maxOffset, NULL); - } - int lastNewBlock = getBlockContainingOffset(tlp, tlp.maxOffset+slots, NULL); - - DEBUG("lastCurrentBlock = %d, lastNewBlock = %d\n", lastCurrentBlock, lastNewBlock); - - recordid tmp; /* recordid of slot in base page that holds new block. */ - tmp.page = rid.page; - tmp.size = sizeof(pageid_t); - - recordid tmp2; /* recordid of newly created pages. */ - tmp2.slot = 0; - tmp2.size = tlp.size; - /* Iterate over the (small number) of indirection blocks that need to be updated */ - try_ret(compensation_error()) { - for(pageid_t i = lastCurrentBlock+1; i <= lastNewBlock; i++) { - /* Alloc block i */ - pageid_t blockSize = tlp.initialSize * pow(tlp.multiplier, i); - pageid_t newFirstPage = TpageAllocMany(xid, blockSize); - DEBUG("block %d\n", i); - tmp.slot = i + FIRST_DATA_PAGE_OFFSET; - /* Iterate over the (large number) of new blocks, clearing their contents */ - /* @todo XXX arraylist generates N log entries initing pages. - It should generate 1 entry. (Need better LSN handling first.)*/ - { - for(pageid_t i = newFirstPage; i < newFirstPage + blockSize; i++) { - TinitializeFixedPage(xid, i, tlp.size); - } - } - TsetRaw(xid,tmp,&newFirstPage); - - DEBUG("Tset: {%d, %d, %d} = %d\n", tmp.page, tmp.slot, tmp.size, newFirstPage); - } - - tmp.slot = MAX_OFFSET_POSITION; - - pageid_t newMaxOffset = tlp.maxOffset+slots; - TsetRaw(xid, tmp, &newMaxOffset); - } end_ret(compensation_error()); - return 0; - -} - -compensated_function int TarrayListLength(int xid, recordid rid) { - Page * p = loadPage(xid, rid.page); - readlock(p->rwlatch, 0); - TarrayListParameters tlp = pageToTLP(xid, p); - unlock(p->rwlatch); - releasePage(p); - return tlp.maxOffset+1; -} - -/*----------------------------------------------------------------------------*/ recordid stasis_array_list_dereference_recordid(int xid, Page * p, int offset) { readlock(p->rwlatch,0); - TarrayListParameters tlp = pageToTLP(xid, p); + array_list_parameter_t tlp = array_list_read_parameter(xid, p); int rec_per_page = stasis_fixed_records_per_page((size_t)tlp.size); pageid_t lastHigh = 0; int pageRidSlot = 0; /* The slot on the root arrayList page that contains the first page of the block of interest */ assert(tlp.maxOffset >= offset); - - pageRidSlot = getBlockContainingOffset(tlp, offset, &lastHigh); - + + pageRidSlot = array_list_get_block_containing_offset(tlp, offset, &lastHigh); + int dataSlot = offset - lastHigh; /* The offset in the block of interest of the slot we want. */ pageid_t blockPage = dataSlot / rec_per_page; /* The page in the block of interest that contains the slot we want */ int blockSlot = dataSlot - blockPage * rec_per_page; @@ -225,38 +138,107 @@ recordid stasis_array_list_dereference_recordid(int xid, Page * p, int offset) { return rid; } -static int getBlockContainingOffset(TarrayListParameters tlp, int offset, pageid_t * firstSlotInBlock) { - int rec_per_page = stasis_fixed_records_per_page((size_t)tlp.size); - long thisHigh = rec_per_page * tlp.initialSize; - int lastHigh = 0; - int pageRidSlot = 0; - int currentPageLength = tlp.initialSize; - while(((pageid_t)offset) >= thisHigh) { - pageRidSlot ++; - lastHigh = thisHigh; - currentPageLength *= tlp.multiplier; - thisHigh += rec_per_page * currentPageLength; - } - if(firstSlotInBlock) { - *firstSlotInBlock = lastHigh; - } - return pageRidSlot; +/*----------------------------------------------------------------------------*/ + +compensated_function recordid TarrayListAlloc(int xid, pageid_t count, int multiplier, int recordSize) { + + pageid_t firstPage; + try_ret(NULLRID) { + firstPage = TpageAllocMany(xid, count+1); + } end_ret(NULLRID); + array_list_parameter_t alp; + + alp.firstPage = firstPage; + alp.initialSize = count; + alp.multiplier = multiplier; + alp.size = recordSize; + alp.maxOffset = -1; + + recordid rid; + + rid.page = firstPage; + rid.slot = 0; /* number of slots in array (maxOffset + 1) */ + rid.size = recordSize; + try_ret(NULLRID) { + Tupdate(xid, firstPage, &alp, sizeof(alp), OPERATION_ARRAY_LIST_HEADER_INIT); + } end_ret(NULLRID); + + return rid; } -static TarrayListParameters pageToTLP(int xid, Page * p) { +/** @todo locking for arrayList... this isn't pressing since currently + the only thing that calls arraylist (the hashtable + implementations) serialize bucket list operations anyway... - TarrayListParameters tlp; - tlp.firstPage = p->id; - /* tlp.maxOffset = *(int*)fixed_record_ptr(p, 3); */ - recordid rid = { p->id, 0, sizeof(pageid_t) }; - tlp.initialSize = *(pageid_t*)stasis_record_read_begin(xid, p, rid); - rid.slot = 1; - tlp.multiplier = *(pageid_t*)stasis_record_read_begin(xid, p, rid); - rid.slot = 2; - tlp.size = *(pageid_t*)stasis_record_read_begin(xid, p, rid); - rid.slot = 3; - tlp.maxOffset = *(pageid_t*)stasis_record_read_begin(xid, p, rid); + @todo this function calls pow(), which is horribly inefficient. +*/ + +compensated_function int TarrayListExtend(int xid, recordid rid, int slots) { + Page * p; + try_ret(compensation_error()) { + p = loadPage(xid, rid.page); + } end_ret(compensation_error()); + readlock(p->rwlatch, 0); + array_list_parameter_t alp + = array_list_read_parameter(xid, p); + unlock(p->rwlatch);; + releasePage(p); + p = NULL; + + int lastCurrentBlock; // just a slot on a page + if(alp.maxOffset == -1) { + lastCurrentBlock = -1; + } else{ + lastCurrentBlock = array_list_get_block_containing_offset(alp, alp.maxOffset, NULL); + } + int lastNewBlock = array_list_get_block_containing_offset(alp, alp.maxOffset+slots, NULL); + + DEBUG("lastCurrentBlock = %d, lastNewBlock = %d\n", lastCurrentBlock, lastNewBlock); + + recordid tmp; /* recordid of slot in base page that holds new block. */ + tmp.page = rid.page; + tmp.size = sizeof(pageid_t); + + recordid tmp2; /* recordid of newly created pages. */ + tmp2.slot = 0; + tmp2.size = alp.size; + /* Iterate over the (small number) of indirection blocks that need to be updated */ + try_ret(compensation_error()) { + for(pageid_t i = lastCurrentBlock+1; i <= lastNewBlock; i++) { + /* Alloc block i */ + pageid_t blockSize = alp.initialSize * powl(alp.multiplier, i); + pageid_t newFirstPage = TpageAllocMany(xid, blockSize); + DEBUG("block %d\n", i); + tmp.slot = i + FIRST_DATA_PAGE_OFFSET; + /* Iterate over the (large number) of new blocks, clearing their contents */ + /* @todo XXX arraylist generates N log entries initing pages. + It should generate 1 entry. (Need better LSN handling first.)*/ + { + for(pageid_t i = newFirstPage; i < newFirstPage + blockSize; i++) { + TinitializeFixedPage(xid, i, alp.size); + } + } + TsetRaw(xid,tmp,&newFirstPage); + + DEBUG("Tset: {%d, %d, %d} = %d\n", tmp.page, tmp.slot, tmp.size, newFirstPage); + } + + tmp.slot = MAX_OFFSET_POSITION; + + pageid_t newMaxOffset = alp.maxOffset+slots; + TsetRaw(xid, tmp, &newMaxOffset); + } end_ret(compensation_error()); + return 0; - return tlp; +} + +compensated_function int TarrayListLength(int xid, recordid rid) { + Page * p = loadPage(xid, rid.page); + readlock(p->rwlatch, 0); + array_list_parameter_t alp + = array_list_read_parameter(xid, p); + unlock(p->rwlatch); + releasePage(p); + return alp.maxOffset+1; } diff --git a/src/stasis/operations/decrement.c b/src/stasis/operations/decrement.c index c365eb8..2c0a0c4 100644 --- a/src/stasis/operations/decrement.c +++ b/src/stasis/operations/decrement.c @@ -3,7 +3,7 @@ This software is copyrighted by the Regents of the University of California, and other parties. The following terms apply to all files associated with the software unless explicitly disclaimed in individual files. - + The authors hereby grant permission to use, copy, modify, distribute, and license this software and its documentation for any purpose, provided that existing copyright notices are retained in all copies @@ -13,20 +13,20 @@ authorized uses. Modifications to this software may be copyrighted by their authors and need not follow the licensing terms described here, provided that the new terms are clearly indicated on the first page of each file where they apply. - + IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY DERIVATIVES THEREOF, EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - + THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT. THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, AND THE AUTHORS AND DISTRIBUTORS HAVE NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. - + GOVERNMENT USE: If you are acquiring this software on behalf of the U.S. government, the Government shall have only "Restricted Rights" in the software and related documentation as defined in the Federal @@ -41,7 +41,7 @@ terms specified in this license. ---*/ /********************************************** * $Id$ - * + * * Decrements the given reference by one *********************************************/ @@ -55,7 +55,7 @@ static int op_decrement(const LogEntry* e, Page* p) { stasis_record_read(e->xid, p, r, (byte*)&i); i--; - stasis_record_write(e->xid, p, e->LSN, r, (byte*)&i); + stasis_record_write(e->xid, p, r, (byte*)&i); return 0; } diff --git a/src/stasis/operations/increment.c b/src/stasis/operations/increment.c index 87ab1ab..f0177d5 100644 --- a/src/stasis/operations/increment.c +++ b/src/stasis/operations/increment.c @@ -3,7 +3,7 @@ This software is copyrighted by the Regents of the University of California, and other parties. The following terms apply to all files associated with the software unless explicitly disclaimed in individual files. - + The authors hereby grant permission to use, copy, modify, distribute, and license this software and its documentation for any purpose, provided that existing copyright notices are retained in all copies @@ -13,20 +13,20 @@ authorized uses. Modifications to this software may be copyrighted by their authors and need not follow the licensing terms described here, provided that the new terms are clearly indicated on the first page of each file where they apply. - + IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY DERIVATIVES THEREOF, EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - + THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT. THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, AND THE AUTHORS AND DISTRIBUTORS HAVE NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. - + GOVERNMENT USE: If you are acquiring this software on behalf of the U.S. government, the Government shall have only "Restricted Rights" in the software and related documentation as defined in the Federal @@ -41,7 +41,7 @@ terms specified in this license. ---*/ /********************************************** * $Id$ - * + * * Increments the given reference by one **********************************************/ @@ -56,7 +56,7 @@ static int op_decrement(const LogEntry* e, Page* p) { stasis_record_read(e->xid, p, r, (byte*)&i); i++; - stasis_record_write(e->xid, p, e->LSN, r, (byte*)&i); + stasis_record_write(e->xid, p, r, (byte*)&i); return 0; } diff --git a/src/stasis/operations/set.c b/src/stasis/operations/set.c index 755ea7c..0a6b133 100644 --- a/src/stasis/operations/set.c +++ b/src/stasis/operations/set.c @@ -3,7 +3,7 @@ This software is copyrighted by the Regents of the University of California, and other parties. The following terms apply to all files associated with the software unless explicitly disclaimed in individual files. - + The authors hereby grant permission to use, copy, modify, distribute, and license this software and its documentation for any purpose, provided that existing copyright notices are retained in all copies @@ -13,20 +13,20 @@ authorized uses. Modifications to this software may be copyrighted by their authors and need not follow the licensing terms described here, provided that the new terms are clearly indicated on the first page of each file where they apply. - + IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY DERIVATIVES THEREOF, EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - + THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT. THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, AND THE AUTHORS AND DISTRIBUTORS HAVE NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. - + GOVERNMENT USE: If you are acquiring this software on behalf of the U.S. government, the Government shall have only "Restricted Rights" in the software and related documentation as defined in the Federal @@ -41,7 +41,7 @@ terms specified in this license. ---*/ /********************************************** * $Id$ - * + * * sets the given reference to dat **********************************************/ @@ -63,7 +63,7 @@ static int op_set(const LogEntry *e, Page *p) { assert(stasis_record_type_to_size(rid.size) == rid.size); assert(stasis_record_length_read(e->xid,p,rid) == rid.size); - stasis_record_write(e->xid, p, e->LSN, rid, b); + stasis_record_write(e->xid, p, rid, b); return 0; } @@ -79,7 +79,7 @@ static int op_set_inverse(const LogEntry *e, Page *p) { assert(e->update.arg_size == sizeof(slotid_t) + sizeof(int64_t) + 2 * rid.size); assert(stasis_record_type_to_size(rid.size) == rid.size); - stasis_record_write(e->xid, p, e->LSN, rid, b+rid.size); + stasis_record_write(e->xid, p, rid, b+rid.size); return 0; } @@ -156,7 +156,7 @@ static int op_set_range(const LogEntry* e, Page* p) { stasis_record_read(e->xid, p, rid, tmp); memcpy(tmp+range->offset, data, diffLength); - stasis_record_write(e->xid, p, e->LSN, rid, tmp); + stasis_record_write(e->xid, p, rid, tmp); free(tmp); return 0; @@ -177,7 +177,7 @@ static int op_set_range_inverse(const LogEntry* e, Page* p) { stasis_record_read(e->xid, p, rid, tmp); memcpy(tmp+range->offset, data, diffLength); - stasis_record_write(e->xid, p, e->LSN, rid, tmp); + stasis_record_write(e->xid, p, rid, tmp); free(tmp); return 0; @@ -198,8 +198,8 @@ compensated_function void TsetRange(int xid, recordid rid, int offset, int lengt // Copy new value into log structure memcpy(range + 1, dat, length); - // No further locking is necessary here; readRecord protects the - // page layout, but attempts at concurrent modification have undefined + // No further locking is necessary here; readRecord protects the + // page layout, but attempts at concurrent modification have undefined // results. (See page.c) readlock(p->rwlatch,0); @@ -216,7 +216,7 @@ compensated_function void TsetRange(int xid, recordid rid, int offset, int lengt releasePage(p); } -stasis_operation_impl stasis_op_impl_set() { +stasis_operation_impl stasis_op_impl_set() { stasis_operation_impl o = { OPERATION_SET, OPERATION_SET, @@ -226,7 +226,7 @@ stasis_operation_impl stasis_op_impl_set() { return o; } -stasis_operation_impl stasis_op_impl_set_inverse() { +stasis_operation_impl stasis_op_impl_set_inverse() { stasis_operation_impl o = { OPERATION_SET_INVERSE, OPERATION_SET_INVERSE, diff --git a/src/stasis/page.c b/src/stasis/page.c index 2b8ac92..cc1d0da 100644 --- a/src/stasis/page.c +++ b/src/stasis/page.c @@ -153,7 +153,7 @@ page_impl * stasis_page_impl_get(int id) { assert(page_impls[id].page_type == id); return & page_impls[id]; } -void stasis_record_write(int xid, Page * p, lsn_t lsn, recordid rid, const byte *dat) { +void stasis_record_write(int xid, Page * p, recordid rid, const byte *dat) { assertlocked(p->rwlatch); assert( (p->id == rid.page) && (p->memAddr != NULL) ); assert(rid.size <= BLOB_THRESHOLD_SIZE); diff --git a/stasis/page.h b/stasis/page.h index 2d6b4fd..33e2b42 100644 --- a/stasis/page.h +++ b/stasis/page.h @@ -517,12 +517,8 @@ stasis_record_type_to_size(ssize_t type) { * @param rid recordid where you want to write * @param dat the new value of the record. * - * @todo this updates the LSN of the page that points to blob, even if - * the page is otherwise untouched!! This is slow and breaks - * recovery. - * */ -void stasis_record_write(int xid, Page * page, lsn_t lsn, recordid rid, const byte *dat); +void stasis_record_write(int xid, Page * page, recordid rid, const byte *dat); /** * Read a record. This call will be dispatched to the proper page implementation. * diff --git a/test/stasis/check_bTree.c b/test/stasis/check_bTree.c index a84d4b0..b81d8b5 100644 --- a/test/stasis/check_bTree.c +++ b/test/stasis/check_bTree.c @@ -183,8 +183,8 @@ void initializeNewBTreeNode(int xid, Page* p){ byte * countBuff = (byte *) & countInt; // write the count out - stasis_record_write(xid, p, 1, rid, countBuff); - + stasis_record_write(xid, p, rid, countBuff); + stasis_page_lsn_write(xid, p, 1); } void testFunctions(){ printf("testing functions"); @@ -265,7 +265,8 @@ int SimpleExample(){ // @todo This is a messy way to do this... - stasis_record_write(xid, p1, 1, rid1, b1); + stasis_record_write(xid, p1, rid1, b1); + stasis_page_lsn_write(xid, p1, 1); stasis_record_read(xid, p1, rid1, b2); if (DEBUGP) { printf("\nb2** = %d\n",*((int *) b2));} diff --git a/test/stasis/check_bufferManager.c b/test/stasis/check_bufferManager.c index 3f17358..1d88da9 100644 --- a/test/stasis/check_bufferManager.c +++ b/test/stasis/check_bufferManager.c @@ -136,7 +136,7 @@ void * workerThreadWriting(void * q) { lsn_t lsn = stasis_page_lsn_read(p); assert(lsn); p->LSN --; // XXX HACK -- Sooner or later this will break... - stasis_record_write(1, p, lsn, rids[i], (byte*)&val); + stasis_record_write(1, p, rids[i], (byte*)&val); stasis_page_lsn_write(1,p,lsn); unlock(p->rwlatch); diff --git a/test/stasis/check_linearHashNTA.c b/test/stasis/check_linearHashNTA.c index b92380d..0d40ae3 100644 --- a/test/stasis/check_linearHashNTA.c +++ b/test/stasis/check_linearHashNTA.c @@ -116,7 +116,7 @@ START_TEST(linearHashNTAtest) found = ThashRemove(xid, hashHeader, (byte*)&i, sizeof(int)); assert(!found); } - printf("\nabort()\n"); fflush(stdout); + printf("\n"); fflush(stdout); Tabort(xid); xid = Tbegin(); for(i = 0; i < NUM_ENTRIES; i++) { @@ -132,12 +132,15 @@ START_TEST(linearHashNTAtest) } Tcommit(xid); Tdeinit(); + printf("\n"); fflush(stdout); } END_TEST /** @test */ START_TEST(linearHashNTAVariableSizetest) { + fflush(stdout); printf("\n"); + Tinit(); int xid = Tbegin(); @@ -187,7 +190,7 @@ START_TEST(linearHashNTAVariableSizetest) found = ThashRemove(xid, hashHeader, (byte*)&i, sizeof(int)); assert(!found); } - printf("\nabort()\n"); fflush(stdout); + printf("\n"); fflush(stdout); Tabort(xid); xid = Tbegin(); for(i = 0; i < NUM_ENTRIES; i++) { @@ -201,6 +204,7 @@ START_TEST(linearHashNTAVariableSizetest) assert(val2->size == val2->slot * NUM_ENTRIES); free(val2); } + printf("\n"); Tcommit(xid); Tdeinit(); } END_TEST @@ -487,7 +491,6 @@ START_TEST(linearHashNTAVariableLengthIteratortest) { } END_TEST START_TEST(emptyHashIterator) { - printf("\n"); Tinit(); int xid = Tbegin(); diff --git a/test/stasis/check_operations.c b/test/stasis/check_operations.c index e207642..1b3d7e4 100644 --- a/test/stasis/check_operations.c +++ b/test/stasis/check_operations.c @@ -100,7 +100,8 @@ START_TEST(operation_physical_do_undo) { p = loadPage(xid, rid.page); writelock(p->rwlatch,0); // manually fill in UNDO field - stasis_record_write(xid, p, lsn, rid, (byte*)&buf); + stasis_record_write(xid, p, rid, (byte*)&buf); + stasis_page_lsn_write(xid, p, lsn); unlock(p->rwlatch); releasePage(p); setToTwo->LSN = 10; @@ -164,7 +165,8 @@ START_TEST(operation_physical_do_undo) { p = loadPage(xid, rid.page); writelock(p->rwlatch,0); - stasis_record_write(xid, p, lsn, rid, (byte*)&buf); + stasis_record_write(xid, p, rid, (byte*)&buf); + stasis_page_lsn_write(xid, p, lsn); unlock(p->rwlatch); releasePage(p); /* Trace of test: @@ -211,8 +213,8 @@ START_TEST(operation_physical_do_undo) { assert(buf == 1); - stasis_record_write(xid, p, 0, rid, (byte*)&buf); /* reset the page's LSN. */ - + stasis_record_write(xid, p, rid, (byte*)&buf); /* reset the page's LSN. */ + stasis_page_lsn_write(xid, p, 0); DEBUG("I redo set to 2\n"); stasis_operation_redo(setToTwo,p); /* Succeeds */ @@ -553,7 +555,8 @@ START_TEST(operation_lsn_free) { for(int i = 0; i < 100; i++) { rid[i] = stasis_record_alloc_begin(xid, p, sizeof(int)); stasis_record_alloc_done(xid, p, rid[i]); - stasis_record_write(xid, p, -1, rid[i], (const byte*)&fortyTwo); + stasis_record_write(xid, p, rid[i], (const byte*)&fortyTwo); + stasis_page_lsn_write(xid, p, -1); } byte * new = malloc(PAGE_SIZE); memcpy(new, p->memAddr, PAGE_SIZE); @@ -618,7 +621,8 @@ START_TEST(operation_reorderable) { for(int i = 0; i < 100; i++) { rid[i] = stasis_record_alloc_begin(xid, p, sizeof(int)); stasis_record_alloc_done(xid, p, rid[i]); - stasis_record_write(xid, p, -1, rid[i], (const byte*)&fortyTwo); + stasis_record_write(xid, p, rid[i], (const byte*)&fortyTwo); + stasis_page_lsn_write(xid, p, -1); } byte * new = malloc(PAGE_SIZE); memcpy(new, p->memAddr, PAGE_SIZE);