cleaned up arrayList. stasis_record_write no longer takes an LSN (it was ignoring it anyway). Added some missing calls to stasis_page_lsn_write (each time a page is written to, stasis_page_lsn_write should be called; this happens automatically for operations called via tupdate

This commit is contained in:
Sears Russell 2009-06-02 18:25:35 +00:00
parent 3cc48c4fc4
commit 120665ae53
13 changed files with 217 additions and 229 deletions

View file

@ -124,7 +124,8 @@ int main(int argc, char ** argv) {
for(long long i = 0; i < num_rids; i++) {
Page *p = loadPage(-1, rids[i].page);
writelock(p->rwlatch,0);
stasis_record_write(-1, p, last_lsn, rids[i], (byte*)&cache[i].val);
stasis_record_write(-1, p, rids[i], (byte*)&cache[i].val);
stasis_page_lsn_write(-1, p, last_lsn);
unlock(p->rwlatch);
releasePage(p);
releasePage(p);

View file

@ -48,7 +48,8 @@ int main (int argc, char ** argv) {
for(long long i = 0; i < num_rids; i++) {
Page *p = loadPage(-1, rids[i].page);
writelock(p->rwlatch,0);
stasis_record_write(-1, p, last_lsn, rids[i], (byte*)&cache[i].val);
stasis_record_write(-1, p, rids[i], (byte*)&cache[i].val);
stasis_page_lsn_write(-1, p, last_lsn);
unlock(p->rwlatch);
releasePage(p);
releasePage(p);

View file

@ -110,7 +110,7 @@ static int op_alloc(const LogEntry* e, Page* p) {
if(e->update.arg_size == sizeof(alloc_arg) + size) {
// if we're aborting a dealloc we better have a sane preimage to apply
rid.size = size;
stasis_record_write(e->xid,p,e->LSN,rid,(const byte*)(arg+1));
stasis_record_write(e->xid,p,rid,(const byte*)(arg+1));
rid.size = arg->type;
} else {
// otherwise, no preimage

View file

@ -1,213 +1,126 @@
#include <config.h>
#include <stasis/common.h>
#include <stasis/page.h>
#include <stasis/operations/pageOperations.h>
#include <stasis/operations/arrayList.h>
#include <stasis/transactional.h>
#include <stasis/bufferManager.h>
#include <assert.h>
#define _XOPEN_SOURCE 600
#include <math.h>
#define MAX_OFFSET_POSITION 3
#define FIRST_DATA_PAGE_OFFSET 4
typedef struct {
pageid_t firstPage;
pageid_t initialSize;
pageid_t multiplier;//XXX rest are not page numbers or offsets, but must all be same length
int size; // *has* to be an int; passed into OPERATION_FIXED_PAGE_ALLOC
pageid_t size; // *has* to be smaller than a page; passed into TinitializeFixedPage()
pageid_t maxOffset;
} TarrayListParameters;
} array_list_parameter_t;
static TarrayListParameters pageToTLP(int xid, Page * p);
static int getBlockContainingOffset(TarrayListParameters tlp, int offset, pageid_t * firstSlotInBlock);
static array_list_parameter_t array_list_read_parameter(int xid, Page * p) {
#define MAX_OFFSET_POSITION 3
#define FIRST_DATA_PAGE_OFFSET 4
array_list_parameter_t alp;
alp.firstPage = p->id;
/* tlp.maxOffset = *(int*)fixed_record_ptr(p, 3); */
recordid rid = { p->id, 0, sizeof(pageid_t) };
alp.initialSize = *(pageid_t*)stasis_record_read_begin(xid, p, rid);
rid.slot = 1;
alp.multiplier = *(pageid_t*)stasis_record_read_begin(xid, p, rid);
rid.slot = 2;
alp.size = *(pageid_t*)stasis_record_read_begin(xid, p, rid);
rid.slot = 3;
alp.maxOffset = *(pageid_t*)stasis_record_read_begin(xid, p, rid);
/*----------------------------------------------------------------------------*/
compensated_function recordid TarrayListAlloc(int xid, pageid_t count, int multiplier, int size) {
pageid_t firstPage;
try_ret(NULLRID) {
firstPage = TpageAllocMany(xid, count+1);
} end_ret(NULLRID);
TarrayListParameters tlp;
tlp.firstPage = firstPage;
tlp.initialSize = count;
tlp.multiplier = multiplier;
tlp.size = size;
tlp.maxOffset = 0;
recordid rid;
rid.page = firstPage;
rid.size = size;
rid.slot = 0;
try_ret(NULLRID) {
Tupdate(xid, firstPage, &tlp, sizeof(tlp), OPERATION_ARRAY_LIST_HEADER_INIT);
} end_ret(NULLRID);
return rid;
return alp;
}
static int array_list_get_block_containing_offset(array_list_parameter_t alp, int offset, pageid_t * firstSlotInBlock) {
int rec_per_page = stasis_fixed_records_per_page((size_t)alp.size);
long thisHigh = rec_per_page * alp.initialSize;
int lastHigh = 0;
int pageRidSlot = 0;
int currentPageLength = alp.initialSize;
static int op_array_list_header_init(const LogEntry* e, Page* p) {
while(((pageid_t)offset) >= thisHigh) {
pageRidSlot ++;
lastHigh = thisHigh;
currentPageLength *= alp.multiplier;
thisHigh += rec_per_page * currentPageLength;
}
if(firstSlotInBlock) {
*firstSlotInBlock = lastHigh;
}
return pageRidSlot;
}
assert(e->update.arg_size == sizeof(TarrayListParameters));
static int array_list_op_init_header(const LogEntry* e, Page* p) {
const TarrayListParameters * tlp
= (const TarrayListParameters*)getUpdateArgs(e);
assert(e->update.arg_size == sizeof(array_list_parameter_t));
pageid_t firstPage = tlp->firstPage;
pageid_t count = tlp->initialSize;
pageid_t multiplier = tlp->multiplier;
int size = tlp->size;
const array_list_parameter_t * alp
= (const array_list_parameter_t*)getUpdateArgs(e);
stasis_fixed_initialize_page(p, sizeof(pageid_t),
stasis_fixed_records_per_page(sizeof(pageid_t)));
recordid countRid, multiplierRid, slotSizeRid, maxOffset, firstDataPageRid;
countRid.page
= multiplierRid.page = slotSizeRid.page
= maxOffset.page = firstDataPageRid.page = p->id;
recordid initialSizeRid, multiplierRid, slotSizeRid, maxOffsetRid, firstDataPageRid;
initialSizeRid.page
= multiplierRid.page = slotSizeRid.page
= maxOffsetRid.page = firstDataPageRid.page = p->id;
countRid.size
initialSizeRid.size
= multiplierRid.size = slotSizeRid.size
= maxOffset.size = firstDataPageRid.size = sizeof(pageid_t);
= maxOffsetRid.size = firstDataPageRid.size = sizeof(pageid_t);
countRid.slot = 0;
initialSizeRid.slot = 0;
multiplierRid.slot = 1;
slotSizeRid.slot = 2;
maxOffset.slot = 3;
maxOffsetRid.slot = 3;
// Note that firstDataPageRid is not part of the header page's header..
// Instead, it is the value stored on the first slot of the header page.
firstDataPageRid.slot = 4;
pageid_t firstDataPage = firstPage + 1;
(*(pageid_t*)stasis_record_write_begin(e->xid, p, countRid))
= count;
(*(pageid_t*)stasis_record_write_begin(e->xid, p, multiplierRid))
= multiplier;
(*(pageid_t*)stasis_record_write_begin(e->xid, p, firstDataPageRid))
= firstDataPage;
(*(pageid_t*)stasis_record_write_begin(e->xid, p, slotSizeRid))
= size;
(*(pageid_t*)stasis_record_write_begin(e->xid, p, maxOffset))
= -1;
// Write header.
stasis_record_write(e->xid, p, initialSizeRid, (const byte*)&(alp->initialSize));
stasis_record_write(e->xid, p, multiplierRid, (const byte*)&(alp->multiplier));
stasis_record_write(e->xid, p, slotSizeRid, (const byte*)&(alp->size));
stasis_record_write(e->xid, p, maxOffsetRid, (const byte*)&(alp->maxOffset));
// Write first slot. The page after this one stores data
pageid_t firstDataPage = alp->firstPage + 1;
stasis_record_write(e->xid, p, firstDataPageRid, (const byte*)&firstDataPage);
*stasis_page_type_ptr(p) = ARRAY_LIST_PAGE;
recordid ret;
ret.page = firstPage;
ret.slot = 0; /* slot = # of slots in array... */
ret.size = size;
return 0;
}
/*----------------------------------------------------------------------------*/
stasis_operation_impl stasis_op_impl_array_list_header_init() {
stasis_operation_impl o = {
OPERATION_ARRAY_LIST_HEADER_INIT,
OPERATION_ARRAY_LIST_HEADER_INIT,
/* Do not need to roll back this page, since it will be deallocated. */
OPERATION_NOOP,
&op_array_list_header_init
&array_list_op_init_header
};
return o;
}
/*----------------------------------------------------------------------------*/
/** @todo locking for arrayList... this isn't pressing since currently
the only thing that calls arraylist (the hashtable
implementations) serialize bucket list operations anyway...
@todo this function calls pow(), which is horribly inefficient.
*/
compensated_function int TarrayListExtend(int xid, recordid rid, int slots) {
Page * p;
try_ret(compensation_error()) {
p = loadPage(xid, rid.page);
} end_ret(compensation_error());
readlock(p->rwlatch, 0);
TarrayListParameters tlp = pageToTLP(xid, p);
unlock(p->rwlatch);;
releasePage(p);
p = NULL;
int lastCurrentBlock; // just a slot on a page
if(tlp.maxOffset == -1) {
lastCurrentBlock = -1;
} else{
lastCurrentBlock = getBlockContainingOffset(tlp, tlp.maxOffset, NULL);
}
int lastNewBlock = getBlockContainingOffset(tlp, tlp.maxOffset+slots, NULL);
DEBUG("lastCurrentBlock = %d, lastNewBlock = %d\n", lastCurrentBlock, lastNewBlock);
recordid tmp; /* recordid of slot in base page that holds new block. */
tmp.page = rid.page;
tmp.size = sizeof(pageid_t);
recordid tmp2; /* recordid of newly created pages. */
tmp2.slot = 0;
tmp2.size = tlp.size;
/* Iterate over the (small number) of indirection blocks that need to be updated */
try_ret(compensation_error()) {
for(pageid_t i = lastCurrentBlock+1; i <= lastNewBlock; i++) {
/* Alloc block i */
pageid_t blockSize = tlp.initialSize * pow(tlp.multiplier, i);
pageid_t newFirstPage = TpageAllocMany(xid, blockSize);
DEBUG("block %d\n", i);
tmp.slot = i + FIRST_DATA_PAGE_OFFSET;
/* Iterate over the (large number) of new blocks, clearing their contents */
/* @todo XXX arraylist generates N log entries initing pages.
It should generate 1 entry. (Need better LSN handling first.)*/
{
for(pageid_t i = newFirstPage; i < newFirstPage + blockSize; i++) {
TinitializeFixedPage(xid, i, tlp.size);
}
}
TsetRaw(xid,tmp,&newFirstPage);
DEBUG("Tset: {%d, %d, %d} = %d\n", tmp.page, tmp.slot, tmp.size, newFirstPage);
}
tmp.slot = MAX_OFFSET_POSITION;
pageid_t newMaxOffset = tlp.maxOffset+slots;
TsetRaw(xid, tmp, &newMaxOffset);
} end_ret(compensation_error());
return 0;
}
compensated_function int TarrayListLength(int xid, recordid rid) {
Page * p = loadPage(xid, rid.page);
readlock(p->rwlatch, 0);
TarrayListParameters tlp = pageToTLP(xid, p);
unlock(p->rwlatch);
releasePage(p);
return tlp.maxOffset+1;
}
/*----------------------------------------------------------------------------*/
recordid stasis_array_list_dereference_recordid(int xid, Page * p, int offset) {
readlock(p->rwlatch,0);
TarrayListParameters tlp = pageToTLP(xid, p);
array_list_parameter_t tlp = array_list_read_parameter(xid, p);
int rec_per_page = stasis_fixed_records_per_page((size_t)tlp.size);
pageid_t lastHigh = 0;
int pageRidSlot = 0; /* The slot on the root arrayList page that contains the first page of the block of interest */
assert(tlp.maxOffset >= offset);
pageRidSlot = getBlockContainingOffset(tlp, offset, &lastHigh);
pageRidSlot = array_list_get_block_containing_offset(tlp, offset, &lastHigh);
int dataSlot = offset - lastHigh; /* The offset in the block of interest of the slot we want. */
pageid_t blockPage = dataSlot / rec_per_page; /* The page in the block of interest that contains the slot we want */
int blockSlot = dataSlot - blockPage * rec_per_page;
@ -225,38 +138,107 @@ recordid stasis_array_list_dereference_recordid(int xid, Page * p, int offset) {
return rid;
}
static int getBlockContainingOffset(TarrayListParameters tlp, int offset, pageid_t * firstSlotInBlock) {
int rec_per_page = stasis_fixed_records_per_page((size_t)tlp.size);
long thisHigh = rec_per_page * tlp.initialSize;
int lastHigh = 0;
int pageRidSlot = 0;
int currentPageLength = tlp.initialSize;
while(((pageid_t)offset) >= thisHigh) {
pageRidSlot ++;
lastHigh = thisHigh;
currentPageLength *= tlp.multiplier;
thisHigh += rec_per_page * currentPageLength;
}
if(firstSlotInBlock) {
*firstSlotInBlock = lastHigh;
}
return pageRidSlot;
/*----------------------------------------------------------------------------*/
compensated_function recordid TarrayListAlloc(int xid, pageid_t count, int multiplier, int recordSize) {
pageid_t firstPage;
try_ret(NULLRID) {
firstPage = TpageAllocMany(xid, count+1);
} end_ret(NULLRID);
array_list_parameter_t alp;
alp.firstPage = firstPage;
alp.initialSize = count;
alp.multiplier = multiplier;
alp.size = recordSize;
alp.maxOffset = -1;
recordid rid;
rid.page = firstPage;
rid.slot = 0; /* number of slots in array (maxOffset + 1) */
rid.size = recordSize;
try_ret(NULLRID) {
Tupdate(xid, firstPage, &alp, sizeof(alp), OPERATION_ARRAY_LIST_HEADER_INIT);
} end_ret(NULLRID);
return rid;
}
static TarrayListParameters pageToTLP(int xid, Page * p) {
/** @todo locking for arrayList... this isn't pressing since currently
the only thing that calls arraylist (the hashtable
implementations) serialize bucket list operations anyway...
TarrayListParameters tlp;
tlp.firstPage = p->id;
/* tlp.maxOffset = *(int*)fixed_record_ptr(p, 3); */
recordid rid = { p->id, 0, sizeof(pageid_t) };
tlp.initialSize = *(pageid_t*)stasis_record_read_begin(xid, p, rid);
rid.slot = 1;
tlp.multiplier = *(pageid_t*)stasis_record_read_begin(xid, p, rid);
rid.slot = 2;
tlp.size = *(pageid_t*)stasis_record_read_begin(xid, p, rid);
rid.slot = 3;
tlp.maxOffset = *(pageid_t*)stasis_record_read_begin(xid, p, rid);
@todo this function calls pow(), which is horribly inefficient.
*/
compensated_function int TarrayListExtend(int xid, recordid rid, int slots) {
Page * p;
try_ret(compensation_error()) {
p = loadPage(xid, rid.page);
} end_ret(compensation_error());
readlock(p->rwlatch, 0);
array_list_parameter_t alp
= array_list_read_parameter(xid, p);
unlock(p->rwlatch);;
releasePage(p);
p = NULL;
int lastCurrentBlock; // just a slot on a page
if(alp.maxOffset == -1) {
lastCurrentBlock = -1;
} else{
lastCurrentBlock = array_list_get_block_containing_offset(alp, alp.maxOffset, NULL);
}
int lastNewBlock = array_list_get_block_containing_offset(alp, alp.maxOffset+slots, NULL);
DEBUG("lastCurrentBlock = %d, lastNewBlock = %d\n", lastCurrentBlock, lastNewBlock);
recordid tmp; /* recordid of slot in base page that holds new block. */
tmp.page = rid.page;
tmp.size = sizeof(pageid_t);
recordid tmp2; /* recordid of newly created pages. */
tmp2.slot = 0;
tmp2.size = alp.size;
/* Iterate over the (small number) of indirection blocks that need to be updated */
try_ret(compensation_error()) {
for(pageid_t i = lastCurrentBlock+1; i <= lastNewBlock; i++) {
/* Alloc block i */
pageid_t blockSize = alp.initialSize * powl(alp.multiplier, i);
pageid_t newFirstPage = TpageAllocMany(xid, blockSize);
DEBUG("block %d\n", i);
tmp.slot = i + FIRST_DATA_PAGE_OFFSET;
/* Iterate over the (large number) of new blocks, clearing their contents */
/* @todo XXX arraylist generates N log entries initing pages.
It should generate 1 entry. (Need better LSN handling first.)*/
{
for(pageid_t i = newFirstPage; i < newFirstPage + blockSize; i++) {
TinitializeFixedPage(xid, i, alp.size);
}
}
TsetRaw(xid,tmp,&newFirstPage);
DEBUG("Tset: {%d, %d, %d} = %d\n", tmp.page, tmp.slot, tmp.size, newFirstPage);
}
tmp.slot = MAX_OFFSET_POSITION;
pageid_t newMaxOffset = alp.maxOffset+slots;
TsetRaw(xid, tmp, &newMaxOffset);
} end_ret(compensation_error());
return 0;
return tlp;
}
compensated_function int TarrayListLength(int xid, recordid rid) {
Page * p = loadPage(xid, rid.page);
readlock(p->rwlatch, 0);
array_list_parameter_t alp
= array_list_read_parameter(xid, p);
unlock(p->rwlatch);
releasePage(p);
return alp.maxOffset+1;
}

View file

@ -3,7 +3,7 @@ This software is copyrighted by the Regents of the University of
California, and other parties. The following terms apply to all files
associated with the software unless explicitly disclaimed in
individual files.
The authors hereby grant permission to use, copy, modify, distribute,
and license this software and its documentation for any purpose,
provided that existing copyright notices are retained in all copies
@ -13,20 +13,20 @@ authorized uses. Modifications to this software may be copyrighted by
their authors and need not follow the licensing terms described here,
provided that the new terms are clearly indicated on the first page of
each file where they apply.
IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY
FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
ARISING OUT OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY
DERIVATIVES THEREOF, EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND
NON-INFRINGEMENT. THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, AND
THE AUTHORS AND DISTRIBUTORS HAVE NO OBLIGATION TO PROVIDE
MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
GOVERNMENT USE: If you are acquiring this software on behalf of the
U.S. government, the Government shall have only "Restricted Rights" in
the software and related documentation as defined in the Federal
@ -41,7 +41,7 @@ terms specified in this license.
---*/
/**********************************************
* $Id$
*
*
* Decrements the given reference by one
*********************************************/
@ -55,7 +55,7 @@ static int op_decrement(const LogEntry* e, Page* p) {
stasis_record_read(e->xid, p, r, (byte*)&i);
i--;
stasis_record_write(e->xid, p, e->LSN, r, (byte*)&i);
stasis_record_write(e->xid, p, r, (byte*)&i);
return 0;
}

View file

@ -3,7 +3,7 @@ This software is copyrighted by the Regents of the University of
California, and other parties. The following terms apply to all files
associated with the software unless explicitly disclaimed in
individual files.
The authors hereby grant permission to use, copy, modify, distribute,
and license this software and its documentation for any purpose,
provided that existing copyright notices are retained in all copies
@ -13,20 +13,20 @@ authorized uses. Modifications to this software may be copyrighted by
their authors and need not follow the licensing terms described here,
provided that the new terms are clearly indicated on the first page of
each file where they apply.
IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY
FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
ARISING OUT OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY
DERIVATIVES THEREOF, EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND
NON-INFRINGEMENT. THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, AND
THE AUTHORS AND DISTRIBUTORS HAVE NO OBLIGATION TO PROVIDE
MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
GOVERNMENT USE: If you are acquiring this software on behalf of the
U.S. government, the Government shall have only "Restricted Rights" in
the software and related documentation as defined in the Federal
@ -41,7 +41,7 @@ terms specified in this license.
---*/
/**********************************************
* $Id$
*
*
* Increments the given reference by one
**********************************************/
@ -56,7 +56,7 @@ static int op_decrement(const LogEntry* e, Page* p) {
stasis_record_read(e->xid, p, r, (byte*)&i);
i++;
stasis_record_write(e->xid, p, e->LSN, r, (byte*)&i);
stasis_record_write(e->xid, p, r, (byte*)&i);
return 0;
}

View file

@ -3,7 +3,7 @@ This software is copyrighted by the Regents of the University of
California, and other parties. The following terms apply to all files
associated with the software unless explicitly disclaimed in
individual files.
The authors hereby grant permission to use, copy, modify, distribute,
and license this software and its documentation for any purpose,
provided that existing copyright notices are retained in all copies
@ -13,20 +13,20 @@ authorized uses. Modifications to this software may be copyrighted by
their authors and need not follow the licensing terms described here,
provided that the new terms are clearly indicated on the first page of
each file where they apply.
IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY
FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
ARISING OUT OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY
DERIVATIVES THEREOF, EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND
NON-INFRINGEMENT. THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, AND
THE AUTHORS AND DISTRIBUTORS HAVE NO OBLIGATION TO PROVIDE
MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
GOVERNMENT USE: If you are acquiring this software on behalf of the
U.S. government, the Government shall have only "Restricted Rights" in
the software and related documentation as defined in the Federal
@ -41,7 +41,7 @@ terms specified in this license.
---*/
/**********************************************
* $Id$
*
*
* sets the given reference to dat
**********************************************/
@ -63,7 +63,7 @@ static int op_set(const LogEntry *e, Page *p) {
assert(stasis_record_type_to_size(rid.size) == rid.size);
assert(stasis_record_length_read(e->xid,p,rid) == rid.size);
stasis_record_write(e->xid, p, e->LSN, rid, b);
stasis_record_write(e->xid, p, rid, b);
return 0;
}
@ -79,7 +79,7 @@ static int op_set_inverse(const LogEntry *e, Page *p) {
assert(e->update.arg_size == sizeof(slotid_t) + sizeof(int64_t) + 2 * rid.size);
assert(stasis_record_type_to_size(rid.size) == rid.size);
stasis_record_write(e->xid, p, e->LSN, rid, b+rid.size);
stasis_record_write(e->xid, p, rid, b+rid.size);
return 0;
}
@ -156,7 +156,7 @@ static int op_set_range(const LogEntry* e, Page* p) {
stasis_record_read(e->xid, p, rid, tmp);
memcpy(tmp+range->offset, data, diffLength);
stasis_record_write(e->xid, p, e->LSN, rid, tmp);
stasis_record_write(e->xid, p, rid, tmp);
free(tmp);
return 0;
@ -177,7 +177,7 @@ static int op_set_range_inverse(const LogEntry* e, Page* p) {
stasis_record_read(e->xid, p, rid, tmp);
memcpy(tmp+range->offset, data, diffLength);
stasis_record_write(e->xid, p, e->LSN, rid, tmp);
stasis_record_write(e->xid, p, rid, tmp);
free(tmp);
return 0;
@ -198,8 +198,8 @@ compensated_function void TsetRange(int xid, recordid rid, int offset, int lengt
// Copy new value into log structure
memcpy(range + 1, dat, length);
// No further locking is necessary here; readRecord protects the
// page layout, but attempts at concurrent modification have undefined
// No further locking is necessary here; readRecord protects the
// page layout, but attempts at concurrent modification have undefined
// results. (See page.c)
readlock(p->rwlatch,0);
@ -216,7 +216,7 @@ compensated_function void TsetRange(int xid, recordid rid, int offset, int lengt
releasePage(p);
}
stasis_operation_impl stasis_op_impl_set() {
stasis_operation_impl stasis_op_impl_set() {
stasis_operation_impl o = {
OPERATION_SET,
OPERATION_SET,
@ -226,7 +226,7 @@ stasis_operation_impl stasis_op_impl_set() {
return o;
}
stasis_operation_impl stasis_op_impl_set_inverse() {
stasis_operation_impl stasis_op_impl_set_inverse() {
stasis_operation_impl o = {
OPERATION_SET_INVERSE,
OPERATION_SET_INVERSE,

View file

@ -153,7 +153,7 @@ page_impl * stasis_page_impl_get(int id) {
assert(page_impls[id].page_type == id);
return & page_impls[id];
}
void stasis_record_write(int xid, Page * p, lsn_t lsn, recordid rid, const byte *dat) {
void stasis_record_write(int xid, Page * p, recordid rid, const byte *dat) {
assertlocked(p->rwlatch);
assert( (p->id == rid.page) && (p->memAddr != NULL) );
assert(rid.size <= BLOB_THRESHOLD_SIZE);

View file

@ -517,12 +517,8 @@ stasis_record_type_to_size(ssize_t type) {
* @param rid recordid where you want to write
* @param dat the new value of the record.
*
* @todo this updates the LSN of the page that points to blob, even if
* the page is otherwise untouched!! This is slow and breaks
* recovery.
*
*/
void stasis_record_write(int xid, Page * page, lsn_t lsn, recordid rid, const byte *dat);
void stasis_record_write(int xid, Page * page, recordid rid, const byte *dat);
/**
* Read a record. This call will be dispatched to the proper page implementation.
*

View file

@ -183,8 +183,8 @@ void initializeNewBTreeNode(int xid, Page* p){
byte * countBuff = (byte *) & countInt;
// write the count out
stasis_record_write(xid, p, 1, rid, countBuff);
stasis_record_write(xid, p, rid, countBuff);
stasis_page_lsn_write(xid, p, 1);
}
void testFunctions(){
printf("testing functions");
@ -265,7 +265,8 @@ int SimpleExample(){
// @todo This is a messy way to do this...
stasis_record_write(xid, p1, 1, rid1, b1);
stasis_record_write(xid, p1, rid1, b1);
stasis_page_lsn_write(xid, p1, 1);
stasis_record_read(xid, p1, rid1, b2);
if (DEBUGP) { printf("\nb2** = %d\n",*((int *) b2));}

View file

@ -136,7 +136,7 @@ void * workerThreadWriting(void * q) {
lsn_t lsn = stasis_page_lsn_read(p);
assert(lsn);
p->LSN --; // XXX HACK -- Sooner or later this will break...
stasis_record_write(1, p, lsn, rids[i], (byte*)&val);
stasis_record_write(1, p, rids[i], (byte*)&val);
stasis_page_lsn_write(1,p,lsn);
unlock(p->rwlatch);

View file

@ -116,7 +116,7 @@ START_TEST(linearHashNTAtest)
found = ThashRemove(xid, hashHeader, (byte*)&i, sizeof(int));
assert(!found);
}
printf("\nabort()\n"); fflush(stdout);
printf("\n"); fflush(stdout);
Tabort(xid);
xid = Tbegin();
for(i = 0; i < NUM_ENTRIES; i++) {
@ -132,12 +132,15 @@ START_TEST(linearHashNTAtest)
}
Tcommit(xid);
Tdeinit();
printf("\n"); fflush(stdout);
} END_TEST
/** @test
*/
START_TEST(linearHashNTAVariableSizetest)
{
fflush(stdout); printf("\n");
Tinit();
int xid = Tbegin();
@ -187,7 +190,7 @@ START_TEST(linearHashNTAVariableSizetest)
found = ThashRemove(xid, hashHeader, (byte*)&i, sizeof(int));
assert(!found);
}
printf("\nabort()\n"); fflush(stdout);
printf("\n"); fflush(stdout);
Tabort(xid);
xid = Tbegin();
for(i = 0; i < NUM_ENTRIES; i++) {
@ -201,6 +204,7 @@ START_TEST(linearHashNTAVariableSizetest)
assert(val2->size == val2->slot * NUM_ENTRIES);
free(val2);
}
printf("\n");
Tcommit(xid);
Tdeinit();
} END_TEST
@ -487,7 +491,6 @@ START_TEST(linearHashNTAVariableLengthIteratortest) {
} END_TEST
START_TEST(emptyHashIterator) {
printf("\n");
Tinit();
int xid = Tbegin();

View file

@ -100,7 +100,8 @@ START_TEST(operation_physical_do_undo) {
p = loadPage(xid, rid.page);
writelock(p->rwlatch,0);
// manually fill in UNDO field
stasis_record_write(xid, p, lsn, rid, (byte*)&buf);
stasis_record_write(xid, p, rid, (byte*)&buf);
stasis_page_lsn_write(xid, p, lsn);
unlock(p->rwlatch);
releasePage(p);
setToTwo->LSN = 10;
@ -164,7 +165,8 @@ START_TEST(operation_physical_do_undo) {
p = loadPage(xid, rid.page);
writelock(p->rwlatch,0);
stasis_record_write(xid, p, lsn, rid, (byte*)&buf);
stasis_record_write(xid, p, rid, (byte*)&buf);
stasis_page_lsn_write(xid, p, lsn);
unlock(p->rwlatch);
releasePage(p);
/* Trace of test:
@ -211,8 +213,8 @@ START_TEST(operation_physical_do_undo) {
assert(buf == 1);
stasis_record_write(xid, p, 0, rid, (byte*)&buf); /* reset the page's LSN. */
stasis_record_write(xid, p, rid, (byte*)&buf); /* reset the page's LSN. */
stasis_page_lsn_write(xid, p, 0);
DEBUG("I redo set to 2\n");
stasis_operation_redo(setToTwo,p); /* Succeeds */
@ -553,7 +555,8 @@ START_TEST(operation_lsn_free) {
for(int i = 0; i < 100; i++) {
rid[i] = stasis_record_alloc_begin(xid, p, sizeof(int));
stasis_record_alloc_done(xid, p, rid[i]);
stasis_record_write(xid, p, -1, rid[i], (const byte*)&fortyTwo);
stasis_record_write(xid, p, rid[i], (const byte*)&fortyTwo);
stasis_page_lsn_write(xid, p, -1);
}
byte * new = malloc(PAGE_SIZE);
memcpy(new, p->memAddr, PAGE_SIZE);
@ -618,7 +621,8 @@ START_TEST(operation_reorderable) {
for(int i = 0; i < 100; i++) {
rid[i] = stasis_record_alloc_begin(xid, p, sizeof(int));
stasis_record_alloc_done(xid, p, rid[i]);
stasis_record_write(xid, p, -1, rid[i], (const byte*)&fortyTwo);
stasis_record_write(xid, p, rid[i], (const byte*)&fortyTwo);
stasis_page_lsn_write(xid, p, -1);
}
byte * new = malloc(PAGE_SIZE);
memcpy(new, p->memAddr, PAGE_SIZE);