Converted page.h to follow new naming convention; documented new page api; begun stasis documentation cleanup.

This commit is contained in:
Sears Russell 2007-10-02 00:18:33 +00:00
parent 4c06c172e0
commit c9bbf3af94
41 changed files with 859 additions and 660 deletions

View file

@ -55,7 +55,7 @@ SHOW_USED_FILES = YES
#---------------------------------------------------------------------------
# configuration options related to warning and progress messages
#---------------------------------------------------------------------------
QUIET = NO
QUIET = YES
WARNINGS = YES
WARN_IF_UNDOCUMENTED = YES
WARN_IF_DOC_ERROR = YES

View file

@ -1,5 +1,4 @@
# Doxyfile 1.3.6-20040222
#---------------------------------------------------------------------------
# Project related configuration options
#---------------------------------------------------------------------------
@ -30,8 +29,8 @@ SUBGROUPING = YES
# Build related configuration options
#---------------------------------------------------------------------------
EXTRACT_ALL = YES
EXTRACT_PRIVATE = NO
EXTRACT_STATIC = NO
EXTRACT_PRIVATE = YES
EXTRACT_STATIC = YES
EXTRACT_LOCAL_CLASSES = YES
HIDE_UNDOC_MEMBERS = NO
HIDE_UNDOC_CLASSES = NO
@ -55,7 +54,7 @@ SHOW_USED_FILES = YES
#---------------------------------------------------------------------------
# configuration options related to warning and progress messages
#---------------------------------------------------------------------------
QUIET = NO
QUIET = YES
WARNINGS = YES
WARN_IF_UNDOCUMENTED = YES
WARN_IF_DOC_ERROR = YES
@ -95,7 +94,7 @@ FILTER_SOURCE_FILES = NO
# configuration options related to source browsing
#---------------------------------------------------------------------------
SOURCE_BROWSER = YES
INLINE_SOURCES = YES
INLINE_SOURCES = NO # YES
STRIP_CODE_COMMENTS = YES
REFERENCED_BY_RELATION = YES
REFERENCES_RELATION = YES
@ -129,16 +128,16 @@ TREEVIEW_WIDTH = 250
#---------------------------------------------------------------------------
# configuration options related to the LaTeX output
#---------------------------------------------------------------------------
GENERATE_LATEX = NO
GENERATE_LATEX = YES
LATEX_OUTPUT = latex
LATEX_CMD_NAME = latex
MAKEINDEX_CMD_NAME = makeindex
COMPACT_LATEX = NO
PAPER_TYPE = a4wide
PAPER_TYPE = letter
EXTRA_PACKAGES =
LATEX_HEADER =
PDF_HYPERLINKS = NO
USE_PDFLATEX = NO
PDF_HYPERLINKS = YES
USE_PDFLATEX = YES
LATEX_BATCHMODE = NO
LATEX_HIDE_INDICES = NO
#---------------------------------------------------------------------------
@ -209,7 +208,7 @@ PERL_PATH = /usr/bin/perl
#---------------------------------------------------------------------------
CLASS_DIAGRAMS = YES
HIDE_UNDOC_RELATIONS = YES
HAVE_DOT = YES
HAVE_DOT = NO # YES
CLASS_GRAPH = YES
COLLABORATION_GRAPH = YES
UML_LOOK = NO

View file

@ -24,7 +24,7 @@ void readBlob(int xid, Page * p2, recordid rid, byte * buf) {
rawRid.size = BLOB_SLOT;
byte * pbuf = alloca(PAGE_SIZE);
blob_record_t rec;
recordRead(xid, p2, rawRid, (byte*)&rec);
stasis_record_read(xid, p2, rawRid, (byte*)&rec);
for(chunk = 0; (chunk+1) * USABLE_SIZE_OF_PAGE < rid.size; chunk++) {
TpageGet(xid, rec.offset+chunk, pbuf);
@ -43,7 +43,7 @@ void writeBlob(int xid, Page * p2, lsn_t lsn, recordid rid, const byte * buf) {
rawRid.size = BLOB_SLOT;
byte * pbuf = alloca(PAGE_SIZE);
blob_record_t rec;
recordRead(xid, p2, rawRid, (byte*)&rec);
stasis_record_read(xid, p2, rawRid, (byte*)&rec);
Page tmp;
tmp.memAddr=pbuf;
@ -51,7 +51,7 @@ void writeBlob(int xid, Page * p2, lsn_t lsn, recordid rid, const byte * buf) {
for(chunk = 0; (chunk+1) * USABLE_SIZE_OF_PAGE < rid.size; chunk++) {
TpageGet(xid, rec.offset+chunk, pbuf);
*page_type_ptr(&tmp) = BLOB_PAGE;
*stasis_page_type_ptr(&tmp) = BLOB_PAGE;
memcpy(pbuf, buf + (chunk * USABLE_SIZE_OF_PAGE), USABLE_SIZE_OF_PAGE);
TpageSet(xid, rec.offset+chunk, pbuf);
}
@ -64,10 +64,10 @@ void writeBlob(int xid, Page * p2, lsn_t lsn, recordid rid, const byte * buf) {
static int notSupported(int xid, Page * p) { return 0; }
static void blobLoaded(Page *p) {
p->LSN = *lsn_ptr(p);
p->LSN = *stasis_page_lsn_ptr(p);
}
static void blobFlushed(Page *p) {
*lsn_ptr(p) = p->LSN;
*stasis_page_lsn_ptr(p) = p->LSN;
}
static void blobCleanup(Page *p) { }

View file

@ -109,7 +109,7 @@ inline static Page * writeBackOnePage() {
// printf("Write(%ld)\n", (long)victim->id);
pageWrite(victim); /// XXX pageCleanup and pageFlushed might be heavyweight.
pageCleanup(victim);
stasis_page_cleanup(victim);
// Make sure that no one mistakenly thinks this is still a live copy.
victim->id = -1;
@ -278,7 +278,7 @@ static void bhBufDeinit() {
LH_ENTRY(openlist)(cachedPages, &iter);
while((next = LH_ENTRY(readlist)(&iter))) {
pageWrite((next->value));
pageCleanup((next->value)); // normally called by writeBackOnePage()
stasis_page_cleanup((next->value)); // normally called by writeBackOnePage()
}
LH_ENTRY(closelist)(&iter);
LH_ENTRY(destroy)(cachedPages);
@ -300,8 +300,8 @@ static void bhSimulateBufferManagerCrash() {
while((next = LH_ENTRY(readlist)(&iter))) {
Page * p = next->value;
writelock(p->rwlatch,0);
pageFlushed(p); // normally, pageWrite() would call this...
pageCleanup(p); // normally called by writeBackOnePage()
stasis_page_flushed(p); // normally, pageWrite() would call this...
stasis_page_cleanup(p); // normally called by writeBackOnePage()
unlock(p->rwlatch);
}
LH_ENTRY(closelist)(&iter);

View file

@ -314,7 +314,7 @@ static LogEntry * LogAction(TransactionLog * l, Page * p, recordid rid, int oper
DEBUG("Creating %ld byte physical pre-image.\n", physical_slot_length(rid.size));
preImage = malloc(physical_slot_length(rid.size));
recordRead(l->xid, p, rid, preImage);
stasis_record_read(l->xid, p, rid, preImage);
} else if (undoType == NO_INVERSE_WHOLE_PAGE) {
DEBUG("Logging entire page\n");

View file

@ -77,7 +77,7 @@ void redoUpdate(const LogEntry * e) {
pageLSN = 0;
} else {
p = loadPage(e->xid, rid.page);
pageLSN = pageReadLSN(p);
pageLSN = stasis_page_lsn_read(p);
}
} end;
@ -110,7 +110,7 @@ void redoUpdate(const LogEntry * e) {
} else {
try {
p = loadPage(e->xid, rid.page);
pageLSN = pageReadLSN(p);
pageLSN = stasis_page_lsn_read(p);
} end;
}
}
@ -149,7 +149,7 @@ void undoUpdate(const LogEntry * e, Page * p, lsn_t clr_lsn) {
#endif
lsn_t page_lsn = -1;
if(p) {
page_lsn = pageReadLSN(p);
page_lsn = stasis_page_lsn_read(p);
}
if(e->LSN <= page_lsn || !p) {
// Actually execute the undo
@ -159,7 +159,7 @@ void undoUpdate(const LogEntry * e, Page * p, lsn_t clr_lsn) {
e->update.rid.page, e->contents.rid.slot, e->update.rid.size);
assert(p);
recordWrite(e->xid, p, clr_lsn, e->update.rid, getUpdatePreImage(e));
stasis_record_write(e->xid, p, clr_lsn, e->update.rid, getUpdatePreImage(e));
} else if(undo == NO_INVERSE_WHOLE_PAGE) {
@ -168,7 +168,7 @@ void undoUpdate(const LogEntry * e, Page * p, lsn_t clr_lsn) {
assert(p);
memcpy(p->memAddr, getUpdatePreImage(e), PAGE_SIZE);
pageWriteLSN(e->xid, p, clr_lsn);
stasis_page_lsn_write(e->xid, p, clr_lsn);
} else {

View file

@ -78,13 +78,13 @@
static int operate_helper(int xid, Page * p, recordid rid, const void * dat) {
if(recordGetTypeNew(xid, p, rid) == INVALID_SLOT) {
recordPostAlloc(xid, p, rid);
if(stasis_record_type_read(xid, p, rid) == INVALID_SLOT) {
stasis_record_alloc_done(xid, p, rid);
}
assert(recordGetLength(xid, p, rid) == physical_slot_length(rid.size));
assert(stasis_record_length_read(xid, p, rid) == physical_slot_length(rid.size));
if(rid.size < 0) {
assert(recordGetTypeNew(xid,p,rid) == rid.size);
assert(stasis_record_type_read(xid,p,rid) == rid.size);
}
return 0;
}
@ -92,27 +92,27 @@ static int operate_helper(int xid, Page * p, recordid rid, const void * dat) {
static int operate(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) {
writelock(p->rwlatch, 0);
int ret = operate_helper(xid,p,rid,dat);
pageWriteLSN(xid,p,lsn);
stasis_page_lsn_write(xid,p,lsn);
unlock(p->rwlatch);
return ret;
}
static int deoperate(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) {
writelock(p->rwlatch,0);
recordFree(xid, p, rid);
pageWriteLSN(xid,p,lsn);
assert(recordGetTypeNew(xid, p, rid) == INVALID_SLOT);
stasis_record_free(xid, p, rid);
stasis_page_lsn_write(xid,p,lsn);
assert(stasis_record_type_read(xid, p, rid) == INVALID_SLOT);
unlock(p->rwlatch);
return 0;
}
static int reoperate(int xid, Page *p, lsn_t lsn, recordid rid, const void * dat) {
writelock(p->rwlatch,0);
assert(recordGetTypeNew(xid, p, rid) == INVALID_SLOT);
assert(stasis_record_type_read(xid, p, rid) == INVALID_SLOT);
int ret = operate_helper(xid, p, rid, dat);
byte * buf = recordWriteNew(xid,p,rid);
memcpy(buf, dat, recordGetLength(xid,p,rid));
pageWriteLSN(xid,p,lsn);
byte * buf = stasis_record_write_begin(xid,p,rid);
memcpy(buf, dat, stasis_record_length_read(xid,p,rid));
stasis_page_lsn_write(xid,p,lsn);
unlock(p->rwlatch);
return ret;
@ -179,7 +179,7 @@ static void reserveNewRegion(int xid) {
if(initialFreespace == -1) {
Page * p = loadPage(xid, firstPage);
readlock(p->rwlatch,0);
initialFreespace = pageFreespace(xid, p);
initialFreespace = stasis_record_freespace(xid, p);
unlock(p->rwlatch);
releasePage(p);
}
@ -220,9 +220,9 @@ compensated_function recordid Talloc(int xid, unsigned long size) {
p = loadPage(xid, lastFreepage);
writelock(p->rwlatch, 0);
while(pageFreespace(xid, p) < physical_slot_length(type)) {
pageCompact(p);
int newFreespace = pageFreespace(xid, p);
while(stasis_record_freespace(xid, p) < physical_slot_length(type)) {
stasis_record_compact(p);
int newFreespace = stasis_record_freespace(xid, p);
if(newFreespace >= physical_slot_length(type)) {
break;
@ -246,12 +246,12 @@ compensated_function recordid Talloc(int xid, unsigned long size) {
writelock(p->rwlatch, 0);
}
rid = recordPreAlloc(xid, p, type);
rid = stasis_record_alloc_begin(xid, p, type);
assert(rid.size != INVALID_SLOT);
recordPostAlloc(xid, p, rid);
int newFreespace = pageFreespace(xid, p);
stasis_record_alloc_done(xid, p, rid);
int newFreespace = stasis_record_freespace(xid, p);
allocationPolicyUpdateFreespaceLockedPage(allocPolicy, xid, ap, newFreespace);
unlock(p->rwlatch);
@ -290,10 +290,10 @@ compensated_function recordid TallocFromPage(int xid, long page, unsigned long t
pthread_mutex_lock(&talloc_mutex);
Page * p = loadPage(xid, page);
writelock(p->rwlatch,0);
recordid rid = recordPreAlloc(xid, p, type);
recordid rid = stasis_record_alloc_begin(xid, p, type);
if(rid.size != INVALID_SLOT) {
recordPostAlloc(xid,p,rid);
stasis_record_alloc_done(xid,p,rid);
allocationPolicyAllocedFromPage(allocPolicy, xid, page);
unlock(p->rwlatch);
@ -327,11 +327,11 @@ compensated_function void Tdealloc(int xid, recordid rid) {
} end;
recordid newrid = recordDereference(xid, p, rid);
recordid newrid = stasis_record_dereference(xid, p, rid);
allocationPolicyLockPage(allocPolicy, xid, newrid.page);
begin_action(releasePage, p) {
recordRead(xid, p, rid, preimage);
stasis_record_read(xid, p, rid, preimage);
/** @todo race in Tdealloc; do we care, or is this something that the log manager should cope with? */
Tupdate(xid, rid, preimage, OPERATION_DEALLOC);
} compensate;
@ -346,7 +346,7 @@ compensated_function int TrecordType(int xid, recordid rid) {
p = loadPage(xid, rid.page);
readlock(p->rwlatch,0);
int ret;
ret = recordGetTypeNew(xid, p, rid);
ret = stasis_record_type_read(xid, p, rid);
unlock(p->rwlatch);
releasePage(p);
return ret;
@ -357,7 +357,7 @@ compensated_function int TrecordSize(int xid, recordid rid) {
Page * p;
p = loadPage(xid, rid.page);
readlock(p->rwlatch,0);
ret = recordGetLength(xid, p, rid);
ret = stasis_record_length_read(xid, p, rid);
unlock(p->rwlatch);
releasePage(p);
return ret;
@ -376,15 +376,15 @@ static int operate_initialize_page(int xid, Page *p, lsn_t lsn, recordid rid, co
writelock(p->rwlatch, 0);
switch(rid.slot) {
case SLOTTED_PAGE:
slottedPageInitialize(p);
stasis_slotted_initialize_page(p);
break;
case FIXED_PAGE:
fixedPageInitialize(p, rid.size, fixedRecordsPerPage(rid.size));
stasis_fixed_initialize_page(p, rid.size, stasis_fixed_records_per_page(rid.size));
break;
default:
abort();
}
pageWriteLSN(xid, p, lsn);
stasis_page_lsn_write(xid, p, lsn);
unlock(p->rwlatch);
return 0;
}

View file

@ -81,7 +81,7 @@ static int operateAlloc(int xid, Page * p, lsn_t lsn, recordid rid, const void *
/* Allocing this page -> implicit lock, but latch to conformt to
fixedPage's interface. */
writelock(p->rwlatch, 0);
fixedPageInitialize(p, sizeof(int), fixedRecordsPerPage(sizeof(int)));
stasis_fixed_initialize_page(p, sizeof(int), stasis_fixed_records_per_page(sizeof(int)));
recordid countRid, multiplierRid, slotSizeRid, maxOffset, firstDataPageRid;
countRid.page = multiplierRid.page = slotSizeRid.page = maxOffset.page = firstDataPageRid.page = p->id;
@ -94,15 +94,15 @@ static int operateAlloc(int xid, Page * p, lsn_t lsn, recordid rid, const void *
firstDataPageRid.slot = 4;
int firstDataPage = firstPage + 1;
(*(int*)recordWriteNew(xid, p, countRid))= count;
(*(int*)recordWriteNew(xid, p, multiplierRid))= multiplier;
(*(int*)recordWriteNew(xid, p, firstDataPageRid))= firstDataPage;
(*(int*)recordWriteNew(xid, p, slotSizeRid))= size;
(*(int*)recordWriteNew(xid, p, maxOffset))= -1;
(*(int*)stasis_record_write_begin(xid, p, countRid))= count;
(*(int*)stasis_record_write_begin(xid, p, multiplierRid))= multiplier;
(*(int*)stasis_record_write_begin(xid, p, firstDataPageRid))= firstDataPage;
(*(int*)stasis_record_write_begin(xid, p, slotSizeRid))= size;
(*(int*)stasis_record_write_begin(xid, p, maxOffset))= -1;
*page_type_ptr(p) = ARRAY_LIST_PAGE;
*stasis_page_type_ptr(p) = ARRAY_LIST_PAGE;
pageWriteLSN(xid, p, lsn);
stasis_page_lsn_write(xid, p, lsn);
recordid ret;
ret.page = firstPage;
@ -206,7 +206,7 @@ recordid dereferenceArrayListRid(int xid, Page * p, int offset) {
readlock(p->rwlatch,0);
TarrayListParameters tlp = pageToTLP(xid, p);
int rec_per_page = fixedRecordsPerPage((size_t)tlp.size);
int rec_per_page = stasis_fixed_records_per_page((size_t)tlp.size);
int lastHigh = 0;
int pageRidSlot = 0; /* The slot on the root arrayList page that contains the first page of the block of interest */
@ -221,7 +221,7 @@ recordid dereferenceArrayListRid(int xid, Page * p, int offset) {
int thePage;
recordid rid = { p->id, pageRidSlot + FIRST_DATA_PAGE_OFFSET, sizeof(int) };
thePage = *(int*)recordReadNew(xid,p,rid);
thePage = *(int*)stasis_record_read_begin(xid,p,rid);
unlock(p->rwlatch);
rid.page = thePage + blockPage;
@ -232,7 +232,7 @@ recordid dereferenceArrayListRid(int xid, Page * p, int offset) {
}
static int getBlockContainingOffset(TarrayListParameters tlp, int offset, int * firstSlotInBlock) {
int rec_per_page = fixedRecordsPerPage((size_t)tlp.size);
int rec_per_page = stasis_fixed_records_per_page((size_t)tlp.size);
long thisHigh = rec_per_page * tlp.initialSize;
int lastHigh = 0;
int pageRidSlot = 0;
@ -256,13 +256,13 @@ static TarrayListParameters pageToTLP(int xid, Page * p) {
tlp.firstPage = p->id;
/* tlp.maxOffset = *(int*)fixed_record_ptr(p, 3); */
recordid rid = { p->id, 0, sizeof(int) };
tlp.initialSize = *(int*)recordReadNew(xid, p, rid);
tlp.initialSize = *(int*)stasis_record_read_begin(xid, p, rid);
rid.slot = 1;
tlp.multiplier = *(int*)recordReadNew(xid, p, rid);
tlp.multiplier = *(int*)stasis_record_read_begin(xid, p, rid);
rid.slot = 2;
tlp.size = *(int*)recordReadNew(xid, p, rid);
tlp.size = *(int*)stasis_record_read_begin(xid, p, rid);
rid.slot = 3;
tlp.maxOffset = *(int*)recordReadNew(xid, p, rid);
tlp.maxOffset = *(int*)stasis_record_read_begin(xid, p, rid);
return tlp;
}

View file

@ -51,9 +51,9 @@ terms specified in this license.
static int operate(int xid, Page * p, lsn_t lsn, recordid r, const void *d) {
int i;
recordRead(xid, p, r, (byte*)&i);
stasis_record_read(xid, p, r, (byte*)&i);
i--;
recordWrite(xid, p, lsn, r, (byte*)&i);
stasis_record_write(xid, p, lsn, r, (byte*)&i);
return 0;
}

View file

@ -51,9 +51,9 @@ terms specified in this license.
static int operate(int xid, Page * p, lsn_t lsn, recordid r, const void *d) {
int i;
recordRead(xid, p, r, (byte*)&i);
stasis_record_read(xid, p, r, (byte*)&i);
i++;
recordWrite(xid, p, lsn, r, (byte*)&i);
stasis_record_write(xid, p, lsn, r, (byte*)&i);
return 0;
}

View file

@ -49,7 +49,7 @@ terms specified in this license.
#include "../page.h"
static int operate(int xid, Page *p, lsn_t lsn, recordid rid, const void *dat) {
recordWrite(xid, p, lsn, rid, dat);
stasis_record_write(xid, p, lsn, rid, dat);
return 0;
}

View file

@ -308,7 +308,7 @@ void instant_update_hash_header(int xid, recordid hash, int i, int next_split) {
logical consistency to the buckets' linked lists. Then, call instant_rehash
to complete the split.
@XXX Need to test recover_split, and figure out where to call it!
@todo XXX Need to test recover_split, and figure out where to call it!
*/
static void recover_split(int xid, recordid hashRid, int i, int next_split, int keySize, int valSize) __attribute__((unused));

View file

@ -100,11 +100,11 @@ typedef struct lsmTreeState {
* header in the first two lsmTreeNodeRecords on the page.
*/
static void initializeNodePage(int xid, Page *p, size_t keylen) {
fixedPageInitialize(p, sizeof(lsmTreeNodeRecord)+keylen, 0);
recordid reserved1 = recordPreAlloc(xid, p, sizeof(lsmTreeNodeRecord)+keylen);
recordPostAlloc(xid, p, reserved1);
recordid reserved2 = recordPreAlloc(xid, p, sizeof(lsmTreeNodeRecord)+keylen);
recordPostAlloc(xid, p, reserved2);
stasis_fixed_initialize_page(p, sizeof(lsmTreeNodeRecord)+keylen, 0);
recordid reserved1 = stasis_record_alloc_begin(xid, p, sizeof(lsmTreeNodeRecord)+keylen);
stasis_record_alloc_done(xid, p, reserved1);
recordid reserved2 = stasis_record_alloc_begin(xid, p, sizeof(lsmTreeNodeRecord)+keylen);
stasis_record_alloc_done(xid, p, reserved2);
}
/**
@ -132,12 +132,12 @@ static void initializeNodePage(int xid, Page *p, size_t keylen) {
*/
static inline size_t getKeySizeFixed(int xid, Page const *p) {
return (*recordsize_ptr(p)) - sizeof(lsmTreeNodeRecord);
return (*recordsize_cptr(p)) - sizeof(lsmTreeNodeRecord);
}
static inline size_t getKeySizeVirtualMethods(int xid, Page *p) {
recordid rid = { p->id, 0, 0 };
return recordGetLength(xid, p, rid) - sizeof(lsmTreeNodeRecord);
return stasis_record_length_read(xid, p, rid) - sizeof(lsmTreeNodeRecord);
}
/**
* Read a record from the page node, assuming the nodes are fixed pages.
@ -158,9 +158,9 @@ lsmTreeNodeRecord* readNodeRecordVirtualMethods(int xid, Page * p,
recordid rid = {p->id, slot, sizeof(lsmTreeNodeRecord)};
const lsmTreeNodeRecord *nr
= (const lsmTreeNodeRecord*)recordReadNew(xid,p,rid);
= (const lsmTreeNodeRecord*)stasis_record_read_begin(xid,p,rid);
memcpy(ret, nr, sizeof(lsmTreeNodeRecord) + keylen);
recordReadDone(xid,p,rid,(const byte*)nr);
stasis_record_read_done(xid,p,rid,(const byte*)nr);
DEBUG("reading {%lld, %d, %d} = %d, %lld\n",
p->id, slot, sizeof(lsmTreeNodeRecord), ret.key, ret.ptr);
@ -177,7 +177,7 @@ void writeNodeRecordFixed(int xid, Page *p, int slot,
lsmTreeNodeRecord *nr = (lsmTreeNodeRecord*)fixed_record_ptr(p,slot);
nr->ptr = ptr;
memcpy(nr+1, key, keylen);
pageWriteLSN(xid, p, 0); // XXX need real LSN?
stasis_page_lsn_write(xid, p, 0); // XXX need real LSN?
}
/**
@ -188,13 +188,13 @@ void writeNodeRecordVirtualMethods(int xid, Page *p, int slot,
const byte *key, size_t keylen,
pageid_t ptr) {
recordid rid = {p->id, slot, sizeof(lsmTreeNodeRecord)};
lsmTreeNodeRecord *target = (lsmTreeNodeRecord*)recordWriteNew(xid,p,rid);
lsmTreeNodeRecord *target = (lsmTreeNodeRecord*)stasis_record_write_begin(xid,p,rid);
target->ptr = ptr;
memcpy(target+1,key,keylen);
DEBUG("Writing to record {%d %d %lld}\n", rid.page, rid.slot, rid.size);
recordWriteDone(xid,p,rid,(byte*)target);
pageWriteLSN(xid, p, 0); // XXX need real LSN?
stasis_record_write_done(xid,p,rid,(byte*)target);
stasis_page_lsn_write(xid, p, 0); // XXX need real LSN?
}
recordid TlsmCreate(int xid, int comparator, int keySize) {
@ -209,8 +209,8 @@ recordid TlsmCreate(int xid, int comparator, int keySize) {
Page *p = loadPage(xid, ret.page);
writelock(p->rwlatch,0);
fixedPageInitialize(p, sizeof(lsmTreeNodeRecord) + keySize, 0);
*page_type_ptr(p) = LSM_ROOT_PAGE;
stasis_fixed_initialize_page(p, sizeof(lsmTreeNodeRecord) + keySize, 0);
*stasis_page_type_ptr(p) = LSM_ROOT_PAGE;
lsmTreeState *state = malloc(sizeof(lsmTreeState));
state->lastLeaf = -1; /// XXX define something in constants.h?
@ -218,15 +218,15 @@ recordid TlsmCreate(int xid, int comparator, int keySize) {
p->impl = state;
recordid tmp
= recordPreAlloc(xid, p, sizeof(lsmTreeNodeRecord) + keySize);
recordPostAlloc(xid,p,tmp);
= stasis_record_alloc_begin(xid, p, sizeof(lsmTreeNodeRecord) + keySize);
stasis_record_alloc_done(xid,p,tmp);
assert(tmp.page == ret.page
&& tmp.slot == DEPTH
&& tmp.size == sizeof(lsmTreeNodeRecord) + keySize);
tmp = recordPreAlloc(xid, p, sizeof(lsmTreeNodeRecord) + keySize);
recordPostAlloc(xid,p,tmp);
tmp = stasis_record_alloc_begin(xid, p, sizeof(lsmTreeNodeRecord) + keySize);
stasis_record_alloc_done(xid,p,tmp);
assert(tmp.page == ret.page
&& tmp.slot == COMPARATOR
@ -261,9 +261,9 @@ static recordid buildPathToLeaf(int xid, recordid root, Page *root_p,
if(depth-1) {
// recurse: the page we just allocated is not a leaf.
recordid child_rec = recordPreAlloc(xid, child_p, sizeof(lsmTreeNodeRecord)+key_len);
recordid child_rec = stasis_record_alloc_begin(xid, child_p, sizeof(lsmTreeNodeRecord)+key_len);
assert(child_rec.size != INVALID_SLOT);
recordPostAlloc(xid, child_p, child_rec);
stasis_record_alloc_done(xid, child_p, child_rec);
ret = buildPathToLeaf(xid, child_rec, child_p, depth-1, key, key_len,
val_page,lastLeaf);
@ -281,12 +281,12 @@ static recordid buildPathToLeaf(int xid, recordid root, Page *root_p,
// forward link (initialize to -1)
writeNodeRecord(xid,child_p,NEXT_LEAF,dummy,key_len,-1);
recordid leaf_rec = recordPreAlloc(xid, child_p,
recordid leaf_rec = stasis_record_alloc_begin(xid, child_p,
sizeof(lsmTreeNodeRecord)+key_len);
assert(leaf_rec.slot == FIRST_SLOT);
recordPostAlloc(xid, child_p, leaf_rec);
stasis_record_alloc_done(xid, child_p, leaf_rec);
writeNodeRecord(xid,child_p,leaf_rec.slot,key,key_len,val_page);
ret = leaf_rec;
@ -329,13 +329,13 @@ static recordid appendInternalNode(int xid, Page *p,
int depth,
const byte *key, size_t key_len,
pageid_t val_page, pageid_t lastLeaf) {
assert(*page_type_ptr(p) == LSM_ROOT_PAGE ||
*page_type_ptr(p) == FIXED_PAGE);
assert(*stasis_page_type_ptr(p) == LSM_ROOT_PAGE ||
*stasis_page_type_ptr(p) == FIXED_PAGE);
if(!depth) {
// leaf node.
recordid ret = recordPreAlloc(xid, p, sizeof(lsmTreeNodeRecord)+key_len);
recordid ret = stasis_record_alloc_begin(xid, p, sizeof(lsmTreeNodeRecord)+key_len);
if(ret.size != INVALID_SLOT) {
recordPostAlloc(xid, p, ret);
stasis_record_alloc_done(xid, p, ret);
writeNodeRecord(xid,p,ret.slot,key,key_len,val_page);
}
return ret;
@ -356,9 +356,9 @@ static recordid appendInternalNode(int xid, Page *p,
releasePage(child_page);
}
if(ret.size == INVALID_SLOT) { // subtree is full; split
ret = recordPreAlloc(xid, p, sizeof(lsmTreeNodeRecord)+key_len);
ret = stasis_record_alloc_begin(xid, p, sizeof(lsmTreeNodeRecord)+key_len);
if(ret.size != INVALID_SLOT) {
recordPostAlloc(xid, p, ret);
stasis_record_alloc_done(xid, p, ret);
ret = buildPathToLeaf(xid, ret, p, depth, key, key_len, val_page,
lastLeaf);
@ -445,7 +445,7 @@ recordid TlsmAppendPage(int xid, recordid tree,
lastLeaf = p;
}
recordid ret = recordPreAlloc(xid, lastLeaf,
recordid ret = stasis_record_alloc_begin(xid, lastLeaf,
sizeof(lsmTreeNodeRecord)+keySize);
if(ret.size == INVALID_SLOT) {
@ -474,13 +474,13 @@ recordid TlsmAppendPage(int xid, recordid tree,
for(int i = FIRST_SLOT; i < *recordcount_ptr(p); i++) {
recordid cnext = recordPreAlloc(xid, lc,
recordid cnext = stasis_record_alloc_begin(xid, lc,
sizeof(lsmTreeNodeRecord)+keySize);
assert(i == cnext.slot);
assert(cnext.size != INVALID_SLOT);
recordPostAlloc(xid, lc, cnext);
stasis_record_alloc_done(xid, lc, cnext);
const lsmTreeNodeRecord *nr = readNodeRecord(xid,p,i,keySize);
writeNodeRecord(xid,lc,i,(byte*)(nr+1),keySize,nr->ptr);
@ -495,12 +495,12 @@ recordid TlsmAppendPage(int xid, recordid tree,
*recordcount_ptr(p) = FIRST_SLOT+1;
lsmTreeNodeRecord *nr
= (lsmTreeNodeRecord*)recordWriteNew(xid, p, pFirstSlot);
= (lsmTreeNodeRecord*)stasis_record_write_begin(xid, p, pFirstSlot);
// don't overwrite key...
nr->ptr = child;
recordWriteDone(xid,p,pFirstSlot,(byte*)nr);
pageWriteLSN(xid, p, 0); // XXX need real LSN?
stasis_record_write_done(xid,p,pFirstSlot,(byte*)nr);
stasis_page_lsn_write(xid, p, 0); // XXX need real LSN?
byte *dummy = calloc(1,keySize);
if(!depth) {
@ -534,7 +534,7 @@ recordid TlsmAppendPage(int xid, recordid tree,
// write the new value to an existing page
DEBUG("Writing %d to existing page# %lld\n", *(int*)key, lastLeaf->id);
recordPostAlloc(xid, lastLeaf, ret);
stasis_record_alloc_done(xid, lastLeaf, ret);
writeNodeRecord(xid, lastLeaf, ret.slot, key, keySize, val_page);

View file

@ -52,7 +52,7 @@ int noop(int xid, Page *p, lsn_t lsn, recordid rid, const void *dat) {
/* If p is null, then this is a logical no-op that spans pages, so do nothing.
Otherwise, write the LSN to the appropriate page (to keep recovery happy)
and return */
if(p) pageWriteLSN(xid, p, lsn);
if(p) stasis_page_lsn_write(xid, p, lsn);
return 0;
}

View file

@ -11,7 +11,7 @@ static pthread_mutex_t pageAllocMutex;
int __pageSet(int xid, Page * p, lsn_t lsn, recordid r, const void * d) {
memcpy(p->memAddr, d, PAGE_SIZE);
pageWriteLSN(xid, p, lsn);
stasis_page_lsn_write(xid, p, lsn);
return 0;
}
@ -70,8 +70,8 @@ compensated_function int TpageAlloc(int xid /*, int type */) {
int __fixedPageAlloc(int xid, Page * p, lsn_t lsn, recordid r, const void * d) {
writelock(p->rwlatch,0);
fixedPageInitialize(p, r.size, fixedRecordsPerPage(r.size));
pageWriteLSN(xid, p, lsn);
stasis_fixed_initialize_page(p, r.size, stasis_fixed_records_per_page(r.size));
stasis_page_lsn_write(xid, p, lsn);
unlock(p->rwlatch);
return 0;
}
@ -84,7 +84,7 @@ int __fixedPageAlloc(int xid, Page * p, lsn_t lsn, recordid r, const void * d) {
*/
recordid TfixedPageAlloc(int xid, int size) {
int page = TpageAlloc(xid);
recordid rid = {page, fixedRecordsPerPage(size), size};
recordid rid = {page, stasis_fixed_records_per_page(size), size};
Tupdate(xid, rid, 0, OPERATION_FIXED_PAGE_ALLOC);
return rid;
}
@ -105,7 +105,7 @@ compensated_function int TpageAllocMany(int xid, int count /*, int type*/) {
int TpageGetType(int xid, int pageid) {
Page * p = loadPage(xid, pageid);
int ret = *page_type_ptr(p);
int ret = *stasis_page_type_ptr(p);
releasePage(p);
return ret;
}

View file

@ -27,12 +27,12 @@ static void TdeallocBoundaryTag(int xid, unsigned int page);
*/
static int operate_alloc_boundary_tag(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) {
writelock(p->rwlatch, 0);
slottedPageInitialize(p);
*page_type_ptr(p) = BOUNDARY_TAG_PAGE;
recordPostAlloc(xid, p, rid);
pageWriteLSN(xid, p, lsn);
byte * buf = recordWriteNew(xid, p, rid);
memcpy(buf, dat, recordGetLength(xid, p, rid));
stasis_slotted_initialize_page(p);
*stasis_page_type_ptr(p) = BOUNDARY_TAG_PAGE;
stasis_record_alloc_done(xid, p, rid);
stasis_page_lsn_write(xid, p, lsn);
byte * buf = stasis_record_write_begin(xid, p, rid);
memcpy(buf, dat, stasis_record_length_read(xid, p, rid));
unlock(p->rwlatch);
return 0;
}
@ -125,7 +125,7 @@ static void TdeallocBoundaryTag(int xid, unsigned int page) {
void regionsInit() {
Page * p = loadPage(-1, 0);
int pageType = *page_type_ptr(p);
int pageType = *stasis_page_type_ptr(p);
holding_mutex = pthread_self();
if(pageType != BOUNDARY_TAG_PAGE) {
@ -263,7 +263,7 @@ static void consolidateRegions(int xid, unsigned int * firstPage, boundary_tag
boundary_tag succ_tag;
TreadBoundaryTag(xid, succ_page, &succ_tag);
// TODO: Check page_type_ptr()...
// TODO: Check stasis_page_type_ptr()...
if(succ_tag.size == UINT32_MAX) {
t->size = UINT32_MAX;

View file

@ -50,7 +50,7 @@ terms specified in this license.
#include <string.h>
#include <assert.h>
static int operate(int xid, Page *p, lsn_t lsn, recordid rid, const void *dat) {
recordWrite(xid, p, lsn, rid, dat);
stasis_record_write(xid, p, lsn, rid, dat);
return 0;
}
typedef struct {
@ -68,9 +68,9 @@ static int operateRange(int xid, Page * p, lsn_t lsn, recordid rid, const void *
byte * data = (byte*)(range + 1);
byte * tmp = malloc(rid.size);
recordRead(xid, p, rid, tmp);
stasis_record_read(xid, p, rid, tmp);
memcpy(tmp+range->offset, data, diffLength);
recordWrite(xid, p, lsn, rid, tmp);
stasis_record_write(xid, p, lsn, rid, tmp);
free(tmp);
return 0;
@ -88,9 +88,9 @@ static int deOperateRange(int xid, Page * p, lsn_t lsn, recordid rid, const void
data += diffLength;
byte * tmp = malloc(rid.size);
recordRead(xid, p, rid, tmp);
stasis_record_read(xid, p, rid, tmp);
memcpy(tmp+range->offset, data, diffLength);
recordWrite(xid, p, lsn, rid, tmp);
stasis_record_write(xid, p, lsn, rid, tmp);
free(tmp);
return 0;
@ -114,7 +114,7 @@ compensated_function void TsetRange(int xid, recordid rid, int offset, int lengt
// No further locking is necessary here; readRecord protects the
// page layout, but attempts at concurrent modification have undefined
// results. (See page.c)
recordRead(xid, p, rid, record);
stasis_record_read(xid, p, rid, record);
// Copy old value into log structure
memcpy((byte*)(range + 1) + length, record+offset, length);

View file

@ -97,7 +97,7 @@ static page_impl page_impls[MAX_PAGE_TYPE];
/**
XXX latching for pageWriteLSN...
*/
void pageWriteLSN(int xid, Page * page, lsn_t lsn) {
void stasis_page_lsn_write(int xid, Page * page, lsn_t lsn) {
// These asserts belong here, but would cause some hacked up unit tests to fail...
// if(!page->dirty) {
// assert(page->LSN < lsn);
@ -113,30 +113,30 @@ void pageWriteLSN(int xid, Page * page, lsn_t lsn) {
/**
XXX latching for pageReadLSN...
*/
lsn_t pageReadLSN(const Page * page) {
lsn_t stasis_page_lsn_read(const Page * page) {
return page->LSN;
}
/* ----- (de)initialization functions. Do not need to support multithreading. -----*/
/**
* pageInit() initializes all the important variables needed in
* initializes all the important variables needed in
* all the functions dealing with pages.
*/
void pageInit() {
void stasis_page_init() {
slottedPageInit();
fixedPageInit();
registerPageType(slottedImpl());
registerPageType(fixedImpl());
registerPageType(boundaryTagImpl());
registerPageType(arrayListImpl());
registerPageType(blobImpl());
registerPageType(indirectImpl());
registerPageType(lsmRootImpl());
stasis_page_impl_register(slottedImpl());
stasis_page_impl_register(fixedImpl());
stasis_page_impl_register(boundaryTagImpl());
stasis_page_impl_register(arrayListImpl());
stasis_page_impl_register(blobImpl());
stasis_page_impl_register(indirectImpl());
stasis_page_impl_register(lsmRootImpl());
}
void pageDeinit() {
void stasis_page_deinit() {
for(int i = 0; i < MAX_PAGE_TYPE; i++) {
page_impl p = { 0 };
@ -147,7 +147,7 @@ void pageDeinit() {
slottedPageDeinit();
}
int registerPageType(page_impl p) {
int stasis_page_impl_register(page_impl p) {
assert(page_impls[p.page_type].page_type == 0);
page_impls[p.page_type] = p;
return 0;
@ -155,25 +155,25 @@ int registerPageType(page_impl p) {
/**
@todo this updates the LSN of the page that points to blob, even if the page is otherwise untouched!! This is slow and breaks recovery.
*/
void recordWrite(int xid, Page * p, lsn_t lsn, recordid rid, const byte *dat) {
void stasis_record_write(int xid, Page * p, lsn_t lsn, recordid rid, const byte *dat) {
assert( (p->id == rid.page) && (p->memAddr != NULL) );
readlock(p->rwlatch, 225);
if(rid.size > BLOB_THRESHOLD_SIZE) {
// XXX Kludge This is done so that recovery sees the LSN update. Otherwise, it gets upset... Of course, doing it will break blob recovery unless we set blob writes to do "logical" redo...
pageWriteLSN(xid, p, lsn);
stasis_page_lsn_write(xid, p, lsn);
unlock(p->rwlatch);
writeBlob(xid, p, lsn, rid, dat);
} else {
byte * buf = recordWriteNew(xid, p, rid);
pageWriteLSN(xid, p, lsn);
memcpy(buf, dat, recordGetLength(xid, p, rid));
byte * buf = stasis_record_write_begin(xid, p, rid);
stasis_page_lsn_write(xid, p, lsn);
memcpy(buf, dat, stasis_record_length_read(xid, p, rid));
unlock(p->rwlatch);
}
assert( (p->id == rid.page) && (p->memAddr != NULL) );
}
int recordRead(int xid, Page * p, recordid rid, byte *buf) {
int stasis_record_read(int xid, Page * p, recordid rid, byte *buf) {
assert(rid.page == p->id);
if(rid.size > BLOB_THRESHOLD_SIZE) {
@ -182,23 +182,21 @@ int recordRead(int xid, Page * p, recordid rid, byte *buf) {
return 0;
} else {
readlock(p->rwlatch, 0);
const byte * dat = recordReadNew(xid,p,rid);
memcpy(buf, dat, recordGetLength(xid,p,rid));
const byte * dat = stasis_record_read_begin(xid,p,rid);
memcpy(buf, dat, stasis_record_length_read(xid,p,rid));
unlock(p->rwlatch);
return 0;
}
}
recordid recordDereference(int xid, Page * p, recordid rid) {
int page_type = *page_type_ptr(p);
if(page_type == SLOTTED_PAGE || page_type == FIXED_PAGE || (!page_type) || page_type == BOUNDARY_TAG_PAGE ) {
} else if(page_type == INDIRECT_PAGE) {
rid = dereferenceRID(xid, rid);
/**
@todo stasis_record_dereference should dispatch via page_impl...
*/
recordid stasis_record_dereference(int xid, Page * p, recordid rid) {
int page_type = *stasis_page_type_ptr(p);
if(page_type == INDIRECT_PAGE) {
rid = dereferenceIndirectRID(xid, rid);
} else if(page_type == ARRAY_LIST_PAGE) {
rid = dereferenceArrayListRid(xid, p, rid.slot);
} else {
abort();
}
return rid;
}
@ -206,8 +204,8 @@ recordid recordDereference(int xid, Page * p, recordid rid) {
/// -------------- Dispatch functions
static int recordWarnedAboutPageTypeKludge = 0;
const byte * recordReadNew(int xid, Page * p, recordid rid) {
int page_type = *page_type_ptr(p);
const byte * stasis_record_read_begin(int xid, Page * p, recordid rid) {
int page_type = *stasis_page_type_ptr(p);
if(!page_type) {
page_type = FIXED_PAGE;
if(!recordWarnedAboutPageTypeKludge) {
@ -217,8 +215,8 @@ const byte * recordReadNew(int xid, Page * p, recordid rid) {
}
return page_impls[page_type].recordRead(xid, p, rid);
}
byte * recordWriteNew(int xid, Page * p, recordid rid) {
int page_type = *page_type_ptr(p);
byte * stasis_record_write_begin(int xid, Page * p, recordid rid) {
int page_type = *stasis_page_type_ptr(p);
if(!page_type) {
page_type = FIXED_PAGE;
if(!recordWarnedAboutPageTypeKludge) {
@ -228,73 +226,73 @@ byte * recordWriteNew(int xid, Page * p, recordid rid) {
}
return page_impls[page_type].recordWrite(xid, p, rid);
}
void recordReadDone(int xid, Page *p, recordid rid, const byte *b) {
int page_type = *page_type_ptr(p);
void stasis_record_read_done(int xid, Page *p, recordid rid, const byte *b) {
int page_type = *stasis_page_type_ptr(p);
if(page_impls[page_type].recordReadDone) {
page_impls[page_type].recordReadDone(xid,p,rid,b);
}
}
void recordWriteDone(int xid, Page *p, recordid rid, byte *b) {
int page_type = *page_type_ptr(p);
void stasis_record_write_done(int xid, Page *p, recordid rid, byte *b) {
int page_type = *stasis_page_type_ptr(p);
if(page_impls[page_type].recordWriteDone) {
page_impls[page_type].recordWriteDone(xid,p,rid,b);
}
}
int recordGetTypeNew(int xid, Page *p, recordid rid) {
return page_impls[*page_type_ptr(p)]
int stasis_record_type_read(int xid, Page *p, recordid rid) {
return page_impls[*stasis_page_type_ptr(p)]
.recordGetType(xid, p, rid);
}
void recordSetTypeNew(int xid, Page *p, recordid rid, int type) {
page_impls[*page_type_ptr(p)]
void stasis_record_type_write(int xid, Page *p, recordid rid, int type) {
page_impls[*stasis_page_type_ptr(p)]
.recordSetType(xid, p, rid, type);
}
int recordGetLength(int xid, Page *p, recordid rid) {
return page_impls[*page_type_ptr(p)]
int stasis_record_length_read(int xid, Page *p, recordid rid) {
return page_impls[*stasis_page_type_ptr(p)]
.recordGetLength(xid,p,rid);
}
recordid recordFirst(int xid, Page * p){
return page_impls[*page_type_ptr(p)]
recordid stasis_record_first(int xid, Page * p){
return page_impls[*stasis_page_type_ptr(p)]
.recordFirst(xid,p);
}
recordid recordNext(int xid, Page * p, recordid prev){
return page_impls[*page_type_ptr(p)]
recordid stasis_record_next(int xid, Page * p, recordid prev){
return page_impls[*stasis_page_type_ptr(p)]
.recordNext(xid,p,prev);
}
recordid recordPreAlloc(int xid, Page * p, int size){
return page_impls[*page_type_ptr(p)]
recordid stasis_record_alloc_begin(int xid, Page * p, int size){
return page_impls[*stasis_page_type_ptr(p)]
.recordPreAlloc(xid,p,size);
}
void recordPostAlloc(int xid, Page * p, recordid rid){
page_impls[*page_type_ptr(p)]
void stasis_record_alloc_done(int xid, Page * p, recordid rid){
page_impls[*stasis_page_type_ptr(p)]
.recordPostAlloc(xid, p, rid);
}
void recordFree(int xid, Page * p, recordid rid){
page_impls[*page_type_ptr(p)]
void stasis_record_free(int xid, Page * p, recordid rid){
page_impls[*stasis_page_type_ptr(p)]
.recordFree(xid, p, rid);
}
int pageIsBlockSupported(int xid, Page * p){
return page_impls[*page_type_ptr(p)]
int stasis_block_supported(int xid, Page * p){
return page_impls[*stasis_page_type_ptr(p)]
.isBlockSupported(xid, p);
}
block_t * pageBlockFirst(int xid, Page * p){
int t = *page_type_ptr(p);
block_t * stasis_block_first(int xid, Page * p){
int t = *stasis_page_type_ptr(p);
return page_impls[t]
.blockFirst(xid, p);
}
block_t * pageBlockNext(int xid, Page * p, block_t * prev){
return page_impls[*page_type_ptr(p)]
block_t * stasis_block_next(int xid, Page * p, block_t * prev){
return page_impls[*stasis_page_type_ptr(p)]
.blockNext(xid, p,prev);
}
void pageBlockDone(int xid, Page * p, block_t * done){
page_impls[*page_type_ptr(p)]
void stasis_block_done(int xid, Page * p, block_t * done){
page_impls[*stasis_page_type_ptr(p)]
.blockDone(xid, p,done);
}
int pageFreespace(int xid, Page * p){
return page_impls[*page_type_ptr(p)]
int stasis_record_freespace(int xid, Page * p){
return page_impls[*stasis_page_type_ptr(p)]
.pageFreespace(xid, p);
}
void pageCompact(Page * p){
page_impls[*page_type_ptr(p)]
void stasis_record_compact(Page * p){
page_impls[*stasis_page_type_ptr(p)]
.pageCompact(p);
}
/** @todo How should the LSN of pages without a page_type be handled?
@ -303,27 +301,26 @@ void pageCompact(Page * p){
LSN-free pages, we'll need special "loadPageForAlloc(), and
loadPageOfType() methods (or something...)
*/
void pageLoaded(Page * p){
short type = *page_type_ptr(p);
void stasis_page_loaded(Page * p){
short type = *stasis_page_type_ptr(p);
if(type) {
assert(page_impls[type].page_type == type);
page_impls[type].pageLoaded(p);
} else {
p->LSN = *lsn_ptr(p); // XXX kluge - shouldn't special-case UNINITIALIZED_PAGE
p->LSN = *stasis_page_lsn_ptr(p); // XXX kluge - shouldn't special-case UNINITIALIZED_PAGE
}
}
void pageFlushed(Page * p){
short type = *page_type_ptr(p);
void stasis_page_flushed(Page * p){
short type = *stasis_page_type_ptr(p);
if(type) {
assert(page_impls[type].page_type == type);
page_impls[type]
.pageFlushed(p);
page_impls[type].pageFlushed(p);
} else {
*lsn_ptr(p) = p->LSN;
*stasis_page_lsn_ptr(p) = p->LSN;
}
}
void pageCleanup(Page * p) {
short type = *page_type_ptr(p);
void stasis_page_cleanup(Page * p) {
short type = *stasis_page_type_ptr(p);
if(type) {
assert(page_impls[type].page_type == type);
page_impls[type].pageCleanup(p);
@ -345,20 +342,20 @@ typedef struct genericBlockImpl {
*/
static const byte * blkFirst(block_t * b) {
genericBlockImpl * impl = b->impl;
impl->pos = recordFirst(-1, impl->p);
impl->pos = stasis_record_first(-1, impl->p);
if(! memcmp(&(impl->pos), &(NULLRID), sizeof(recordid))) {
return 0;
} else {
return recordReadNew(-1, impl->p, impl->pos);
return stasis_record_read_begin(-1, impl->p, impl->pos);
}
}
static const byte * blkNext(block_t * b) {
genericBlockImpl * impl = b->impl;
impl->pos = recordNext(-1, impl->p, impl->pos);
impl->pos = stasis_record_next(-1, impl->p, impl->pos);
if(! memcmp(&(impl->pos), &NULLRID, sizeof(recordid))) {
return 0;
} else {
return recordReadNew(-1, impl->p, impl->pos);
return stasis_record_read_begin(-1, impl->p, impl->pos);
}
}
static int blkSize(block_t * b) {
@ -387,7 +384,7 @@ block_t genericBlock = {
0
};
block_t* pageGenericBlockFirst(int xid, Page * p) {
block_t* stasis_block_first_default_impl(int xid, Page * p) {
block_t* ret = malloc(sizeof(block_t));
*ret = genericBlock;
genericBlockImpl impl = { p, NULLRID };
@ -395,11 +392,11 @@ block_t* pageGenericBlockFirst(int xid, Page * p) {
*(genericBlockImpl*)(ret->impl) = impl;
return ret;
}
block_t* pageGenericBlockNext(int xid, Page *p, block_t *prev) {
pageGenericBlockDone(xid, p, prev);
block_t* stasis_block_next_default_impl(int xid, Page *p, block_t *prev) {
stasis_block_done_default_impl(xid, p, prev);
return 0; // definitely done.
}
void pageGenericBlockDone(int xid, Page *p, block_t *b) {
void stasis_block_done_default_impl(int xid, Page *p, block_t *b) {
free(b->impl);
free(b);
}

View file

@ -43,38 +43,61 @@ terms specified in this license.
/**
* @file
*
* interface for dealing with generic, lsn based pages
* interface for dealing with generic, LSN based pages
*
* This file provides a re-entrant interface for pages that are labeled
* with an LSN and a page type.
*
* @ingroup LLADD_CORE
* $Id$
* @ingroup LLADD_CORE pageFormats
*
STRUCTURE OF A GENERIC PAGE
<pre>
+----------------------------------------------------------------------+
| |
| USABLE SPACE |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| +-----------+-----+
| | page type | LSN |
+----------------------------------------------------+-----------+-----+
</pre>
* $Id$
*/
/**
@defgroup pageFormats Page format implementations
Stasis allows developers to define their own on-disk page formats.
Currently, each page format must end with a hard-coded header
containing an LSN and a page type. (This restriction will be
removed in the future.)
This section explains how new page formats can be implemented in
Stasis, and documents the currently available page types.
A number of callbacks are invoked on existing pages as they are read
from disk, flushed back, and ultimately, evicted from cache:
-# stasis_page_loaded() is invoked when the page is read from disk. It
should set the Page::LSN field appropriately, and
perhaps allocate any data structures that will be stored in the
Page::impl field.
-# stasis_page_flushed() is invoked when a dirty page is written back to
disk. It should make sure that all updates are applied to the
physical representation of the page. (Implementations of this
callback usually copy the Page::LSN field into the page header.)
-# stasis_page_cleanup() is invoked before a page is evicted from cache.
It should free any memory associated with the page, such as
that allocated by stasis_page_loaded(), or pointed to by Page::impl.
When an uninitialized page is read from disk, Stasis has no way of
knowing which stasis_page_loaded() callback should be invoked. Therefore,
when a new page is initialized the page initialization code should
perform the work that would normally be performed by stasis_page_loaded().
Similarly, before a page is freed (and therefore, will be treated as
uninitialized data) stasis_page_cleanup() should be called.
Page implementations are free to define their own access methods
and APIs. However, Stasis's record oriented page interface
provides a default set of methods for page access.
@see pageRecordInterface
@todo Page deallocators should call stasis_page_cleanup()
@todo Create variant of loadPage() that takes a page type
@todo Add support for LSN free pages.
*/
/*@{*/
#ifndef __PAGE_H__
#define __PAGE_H__
@ -86,11 +109,10 @@ BEGIN_C_DECLS
/**
The page type contains in-memory information about pages. This
information is used by LLADD to track the page while it is in
information is used by Stasis to track the page while it is in
memory, and is never written to disk.
In particular, our current page replacement policy requires two doubly
linked lists,
@todo Page_s shouldn't hardcode doubly linked lists.
@todo The Page struct should be tuned for better memory utilization.
@ -141,12 +163,12 @@ struct Page_s {
also defined in this struct. Therefore, it cannot be read from
or written to disk. Furthermore, since we do not impose an
order on operations, the holder of a readlock may not use the
lsn field to determine whether a particular operation has
LSN field to determine whether a particular operation has
completed on the page.
The write lock is used to block all writers (other than the one
holding the page), and to ensure that all updates with lsn less
than or equal to the page's lsn have been applied. Therefore,
holding the page), and to ensure that all updates with LSN less
than or equal to the page's LSN have been applied. Therefore,
threads that write the page to disk must hold this lock. Since
it precludes access by all other threads, a write lock also
allows the holder to evict the current page, and replace it.
@ -165,38 +187,65 @@ struct Page_s {
*/
rwl * loadlatch;
/**
Page type implementatioms may store whatever they'd like in this
pointer. It persists from each call to pageLoaded() to the
subsequent call to pageFlushed().
Page type implementations may store whatever they'd like in this
pointer. It persists from each call to stasis_page_loaded() to the
subsequent call to stasis_page_flushed().
*/
void * impl;
};
#define lsn_ptr(page) (((lsn_t*)(&((page)->memAddr[PAGE_SIZE])))-1)
#define page_type_ptr(page) (((int*)lsn_ptr((page)))-1)
#define end_of_usable_space_ptr(page) page_type_ptr((page))
#define shorts_from_end(page, count) (((short*)end_of_usable_space_ptr((page)))-(count))
#define bytes_from_start(page, count) (((byte*)((page)->memAddr))+(count))
#define shorts_from_start(page, count) (((short*)((page)->memAddr))+(count))
#define ints_from_start(page, count) (((int*)((page)->memAddr))+(count))
#define ints_from_end(page, count) (((int*)end_of_usable_space_ptr((page)))-(count))
#define decode_size(size) (((size) >= SLOT_TYPE_BASE) ? SLOT_TYPE_LENGTHS[(size)-SLOT_TYPE_BASE] : (size))
#define USABLE_SIZE_OF_PAGE (PAGE_SIZE - sizeof(lsn_t) - sizeof(int))
#define physical_slot_length(size) ((size) >= 0 ? (size) : SLOT_TYPE_LENGTHS[-1*size])
/*@}*/
/**
* initializes all the global variables needed by the functions
* dealing with pages.
@defgroup pageLSNHeaderGeneric LSN and Page Types
@ingroup pageFormats
Most Stasis pages contain an LSN and a page type. These are used
by recovery to determine whether or not to perform redo. At
run time, the page type is used to decide which page implementation
should manipulate the page.
STRUCTURE OF A GENERIC PAGE
<pre>
+----------------------------------------------------------------------+
| |
| USABLE SPACE |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| +-----------+-----+
| | page type | LSN |
+----------------------------------------------------+-----------+-----+
</pre>
*/
void pageInit();
/*@{*/
static inline lsn_t* stasis_page_lsn_ptr(Page *p) {
return ((lsn_t*)(&(p->memAddr[PAGE_SIZE])))-1;
}
/**
* releases all resources held by the page sub-system.
Returns a pointer to the page's type. This information is stored with the LSN.
Stasis uses it to determine which page implementation should handle each
page.
@param p Any page that contains an LSN header.
@see stasis_page_impl_register
@todo Need to typedef page_type_t
*/
void pageDeinit();
static inline int* stasis_page_type_ptr(Page *p) {
return ((int*)stasis_page_lsn_ptr(p))-1;
}
static inline const int* stasis_page_type_cptr(const Page *p) {
return (const int*)stasis_page_type_ptr((Page*)p);
}
/**
* assumes that the page is already loaded in memory. It takes as a
@ -206,7 +255,7 @@ void pageDeinit();
* dirtyPages table is needed for log truncation. (If the page->id is
* null, this function assumes the page is not in the buffer pool, and
* does not update dirtyPages. Similarly, if the page is already
* dirty, there is no need to udpate dirtyPages.
* dirty, there is no need to update dirtyPages.
*
* @param xid The transaction that is writing to the page, or -1 if
* outside of a transaction.
@ -215,75 +264,187 @@ void pageDeinit();
* function.
*
* @param lsn The new lsn of the page. If the new lsn is less than
* the page's current lsn, then the page's lsn will not be changed.
* If the page is clean, the new lsn must be greater than the old lsn.
* the page's current LSN, then the page's LSN will not be changed.
* If the page is clean, the new LSN must be greater than the old LSN.
*/
void pageWriteLSN(int xid, Page * page, lsn_t lsn);
void stasis_page_lsn_write(int xid, Page * page, lsn_t lsn);
/**
* assumes that the page is already loaded in memory. It takes
* as a parameter a Page and returns the LSN that is currently written on that
* page in memory.
*/
lsn_t pageReadLSN(const Page * page);
lsn_t stasis_page_lsn_read(const Page * page);
/*@}*/
/**
* @param xid transaction id @param lsn the lsn that the updated
* record will reflect. This is needed by recovery, and undo. (The
* lsn of a page must always increase. Undos are handled by passing
* in the LSN of the CLR that records the undo.)
@defgroup pageUtils Utility methods for page manipulation
@ingroup pageFormats
These methods make it easy to manipulate pages that use a standard
Stasis header (one with an LSN and page type).
Each one counts bytes from the beginning or end of the page's
usable space. Methods with "_cptr_" in their names return const
pointers (and can accept const Page pointers as arguments).
Methods with "_ptr_" in their names take non-const pages, and
return non-const pointers.
*/
/*@{*/
static inline byte*
stasis_page_byte_ptr_from_start(Page *p, int count) {
return ((byte*)(p->memAddr))+count;
}
static inline byte*
stasis_page_byte_ptr_from_end(Page *p, int count) {
return ((byte*)stasis_page_type_ptr(p))-count;
}
static inline int16_t*
stasis_page_int16_ptr_from_start(Page *p, int count) {
return ((int16_t*)(p->memAddr))+count;
}
static inline int16_t*
stasis_page_int16_ptr_from_end(Page *p, int count) {
return ((int16_t*)stasis_page_type_ptr(p))-count;
}
static inline int32_t*
stasis_page_int32_ptr_from_start(Page *p, int count) {
return ((int32_t*)(p->memAddr))+count;
}
static inline int32_t*
stasis_page_int32_ptr_from_end(Page *p, int count) {
return ((int32_t*)stasis_page_type_ptr(p))-count;
}
// Const methods
static inline const byte*
stasis_page_byte_cptr_from_start(const Page *p, int count) {
return (const byte*)stasis_page_byte_ptr_from_start((Page*)p, count);
}
static inline const byte*
stasis_page_byte_cptr_from_end(const Page *p, int count) {
return (const byte*)stasis_page_byte_ptr_from_end((Page*)p, count);
}
static inline const int16_t*
stasis_page_int16_cptr_from_start(const Page *p, int count) {
return (const int16_t*)stasis_page_int16_ptr_from_start((Page*)p,count);
}
static inline const int16_t*
stasis_page_int16_cptr_from_end(const Page *p, int count) {
return (const int16_t*)stasis_page_int16_ptr_from_end((Page*)p,count);
}
static inline const int32_t*
stasis_page_int32_cptr_from_start(const Page *p, int count) {
return (const int32_t*)stasis_page_int32_ptr_from_start((Page*)p,count);
}
static inline const int32_t*
stasis_page_int32_cptr_from_end(const Page *p, int count) {
return (const int32_t*)stasis_page_int32_ptr_from_end((Page*)p,count);
}
/*@}*/
/**
* initializes all the global variables needed by the functions
* dealing with pages.
*
* @todo documentation group for page init and deinit?
*/
void stasis_page_init();
/**
* releases all resources held by the page sub-system.
*/
void stasis_page_deinit();
/**
@defgroup pageRecordInterface Record-oriented page interface
@ingroup pageFormats
Stasis provides a default record-oriented interface to page
implementations. By defining these methods, and registering
appropriate callbacks, page implementations allow callers to
access their data through standard Stasis methods such as Tread()
and Tset().
*/
/*@{*/
static const size_t USABLE_SIZE_OF_PAGE = (PAGE_SIZE - sizeof(lsn_t) - sizeof(int));
/**
Stasis records carry type information with them. The type either
encodes the physical size of opaque data (the common case), or
indicates that the record should be handled specially (as the
fixed-length header of a data structure, for example).
This function maps from record type to record size.
@param type If positive, a record type is equal to the record size,
in bytes. If negative, the type has special significance. Its
length is looked up in the SLOT_TYPE_LENGTHS array.
@return The length of the record in bytes.
*/
static inline size_t physical_slot_length(ssize_t type) {
return type >= 0 ? type : SLOT_TYPE_LENGTHS[0 - type];
}
/**
* @param xid transaction id
*
* @param page a pointer to an in-memory copy of the page as it
* currently exists. This copy will be updated by writeRecord.
*
* @param lsn the LSN that the updated record will reflect. This is
* needed by recovery, and undo. (The LSN of a page must always
* increase. Undos are handled by passing in the LSN of the CLR that
* records the undo.)
*
* @param rid recordid where you want to write
*
* @param dat the new value of the record.
*
* @return 0 on success, lladd error code on failure
*
* @deprecated Unnecessary memcpy()'s
*/
void recordWrite(int xid, Page * page, lsn_t lsn, recordid rid, const byte *dat);
void stasis_record_write(int xid, Page * page, lsn_t lsn, recordid rid, const byte *dat);
/**
* @param xid transaction ID
* @param page a pointer to the pinned page that contains the record.
* @param rid the record to be written
* @param dat buffer for data
* @return 0 on success, lladd error code on failure
* @deprecated Unnecessary memcpy()'s
* @return 0 on success, Stasis error code on failure
*/
int recordRead(int xid, Page * page, recordid rid, byte *dat);
int stasis_record_read(int xid, Page * page, recordid rid, byte *dat);
const byte * recordReadNew(int xid, Page * p, recordid rid);
byte * recordWriteNew(int xid, Page * p, recordid rid);
void recordReadDone(int xid, Page *p, recordid rid, const byte* buf);
void recordWriteDone(int xid, Page *p, recordid rid, byte *buf);
int recordGetTypeNew(int xid, Page * p, recordid rid);
void recordSetTypeNew(int xid, Page * p, recordid rid, int type);
int recordGetLength(int xid, Page *p, recordid rid);
recordid recordFirst(int xid, Page * p);
recordid recordNext(int xid, Page * p, recordid prev);
recordid recordPreAlloc(int xid, Page * p, int size);
void recordPostAlloc(int xid, Page * p, recordid rid);
void recordFree(int xid, Page * p, recordid rid);
int pageIsBlockSupported(int xid, Page * p);
int pageFreespace(int xid, Page * p);
void pageCompact(Page * p);
void pageLoaded(Page * p);
void pageFlushed(Page * p);
void pageCleanup(Page * p);
const byte * stasis_record_read_begin(int xid, Page * p, recordid rid);
byte * stasis_record_write_begin(int xid, Page * p, recordid rid);
void stasis_record_read_done(int xid, Page *p, recordid rid, const byte* buf);
void stasis_record_write_done(int xid, Page *p, recordid rid, byte *buf);
int stasis_record_type_read(int xid, Page * p, recordid rid);
void stasis_record_type_write(int xid, Page * p, recordid rid, int type);
int stasis_record_length_read(int xid, Page *p, recordid rid);
recordid stasis_record_first(int xid, Page * p);
recordid stasis_record_next(int xid, Page * p, recordid prev);
recordid stasis_record_alloc_begin(int xid, Page * p, int size);
void stasis_record_alloc_done(int xid, Page * p, recordid rid);
void stasis_record_free(int xid, Page * p, recordid rid);
int stasis_block_supported(int xid, Page * p);
int stasis_record_freespace(int xid, Page * p);
void stasis_record_compact(Page * p);
void stasis_page_loaded(Page * p);
void stasis_page_flushed(Page * p);
void stasis_page_cleanup(Page * p);
/**
@return -1 if the field does not exist, the size of the field otherwise (the rid parameter's size field will be ignored).
*/
int recordSize(int xid, Page * p, recordid rid);
/**
@todo recordDereference doesn't dispatch to pages. Should it?
@todo XXX stasis_record_dereference should be dispatched via page_impl[]
*/
recordid recordDereference(int xid, Page *p, recordid rid);
recordid stasis_record_dereference(int xid, Page *p, recordid rid);
/*@}*/
/**
@param a block returned by blockFirst() or blockNext().
@param a block returned by stasis_block_first() or stasis_block_next().
is*() methods return zero for false, non-zero for true.
@ -293,7 +454,7 @@ recordid recordDereference(int xid, Page *p, recordid rid);
methods that take int * size as an argument return a record size by
setting the pointer (if it's not null).
@see Abadi, et. al, Intergrating Compression and Execution in Column-Oriented Database Systems, VLDB 2006.
@see Abadi, et. al, Integrating Compression and Execution in Column-Oriented Database Systems, VLDB 2006.
*/
typedef struct block_t {
@ -324,7 +485,7 @@ typedef struct block_t {
the memory; hopefully, this points into the buffer manager, and
this function call is O(1). If it would be expensive to return a
packed array of every record in the page, then only some of the
records might be returned. Calling recordNext on { page->id,
records might be returned. Calling stasis_record_next() on { page->id,
off+ count } will tell the caller if it's received the entire
page's contents.
@ -342,36 +503,36 @@ typedef struct block_t {
This function should work with any valid page implementation, but
it might be less efficient than a custom implementation.
This is a convenience function for page implementors. Other code
should call pageBlockFirst() instead.
This is a convenience function for page implementers. Other code
should call stasis_block_first() instead.
*/
block_t *pageGenericBlockFirst(int xid, Page *p);
block_t *stasis_block_first_default_impl(int xid, Page *p);
/**
This function should work with any valid page implementation, but
it might be less efficient than a custom implementation.
This is a convenience function for page implementors. Other code
should call pageBlockNext() instead.
This is a convenience function for page implementers. Other code
should call stasis_block_next() instead.
*/
block_t * pageGenericBlockNext(int xid, Page *p, block_t *prev);
block_t * stasis_block_next_default_impl(int xid, Page *p, block_t *prev);
/**
This function should work with any valid page implementation, but
it might be less efficient than a custom implementation.
This is a convenience function for page implementors. Other code
should call pageBlockDone() instead.
This is a convenience function for page implementers. Other code
should call stasis_block_done() instead.
*/
void pageGenericBlockDone(int xid, Page *p, block_t *b);
void stasis_block_done_default_impl(int xid, Page *p, block_t *b);
block_t * pageBlockFirst(int xid, Page * p);
block_t * pageBlockNext(int xid, Page * p, block_t * prev);
void pageBlockDone(int xid, Page * p, block_t * done);
block_t * stasis_block_first(int xid, Page * p);
block_t * stasis_block_next(int xid, Page * p, block_t * prev);
void stasis_block_done(int xid, Page * p, block_t * done);
/**
None of these functions obtain latches. Calling them without
holding rwlatch is an error. (Exception: dereferenceRid grabs the
latch for you...)
holding rwlatch is an error. (Exception: recordDereference grabs the
latch for you... XXX does it?)
The function pointer should be null if your page implementaion does
The function pointer should be null if your page implementation does
not support the method in question.
@todo Figure out what to do about readlock vs writelock...
@ -381,22 +542,22 @@ void pageBlockDone(int xid, Page * p, block_t * done);
pin
latch
recordPreAlloc
recordPostAlloc
stasis_record_alloc_begin
stasis_record_alloc_done
unlatch
(There is a race here; other transactions can see that the page
contains a new slot, but that the LSN hasn't been updated. This
seems to be benign. writeLSN refuses to decrement LSN's... If the
lock manager is using LSNs for versioning, it might get into a
situation where the page has changed (the slot was allocated), but
the version wasn't bumped. I can't imagine this causing trouble,
unless the application is using the presense or absence of an
uninitialized slot as some sort of side channel....)
seems to be benign. stasis_page_lsn_write() refuses to decrement
LSN's... If the lock manager is using LSNs for versioning, it
might get into a situation where the page has changed (the slot was
allocated), but the version wasn't bumped. I can't imagine this
causing trouble, unless the application is using the presence or
absence of an uninitialized slot as some sort of side channel....)
lsn = Tupdate(...)
latch
writeLSN
stasis_page_lsn_write
unlatch
unpin
@ -404,8 +565,8 @@ void pageBlockDone(int xid, Page * p, block_t * done);
pin
latch
recordPostAlloc
writeLSN
stasis_record_alloc_done
stasis_page_lsn_write
unlatch
unpin
*/
@ -433,9 +594,6 @@ typedef struct page_impl {
@return a pointer to the buffer manager's copy of the record.
*/
byte* (*recordWrite)(int xid, Page *p, recordid rid);
/**
@todo Most code doesn't call recordReadDone() and recordWriteDone() yet.
*/
void (*recordReadDone)(int xid, Page *p, recordid rid, const byte *b);
void (*recordWriteDone)(int xid, Page *p, recordid rid, byte *b);
/**
@ -503,7 +661,7 @@ typedef struct page_impl {
/**
Compact the page in place. This operation should not change the
slot id's allocated to the records on the page. Instead, it
should update the extimate returned by page_impl.freespace().
should update the estimate returned by page_impl.freespace().
Depending on the page implementation, this function may have
other side effects.
@ -512,7 +670,7 @@ typedef struct page_impl {
/**
Generate a new, appropriately sized recordid. This is the first
of two allocation phases, and does not actually modify the page.
The caller of this function must call recordPostAlloc() before
The caller of this function must call stasis_record_alloc_done() before
unlatching the page.
@see page_impl.recordPostAlloc()
@ -530,7 +688,7 @@ typedef struct page_impl {
@param xid The active transaction
@param p The page that will be allocated from
@param rid A new recordid that is (usually) from recordPreAlloc()
@param rid A new recordid that is (usually) from stasis_record_alloc_begin()
@see Talloc(), page_impl.recordPreAlloc()
*/
@ -551,7 +709,7 @@ typedef struct page_impl {
operation, or should it be implemented once and for all in
page.c?
indirect.c suggets this should be specfic to the page type;
indirect.c suggests this should be specific to the page type;
blobs suggest this should be specific to record type. Perhaps
two levels of dereferencing are in order... (First: page
specific; second record specific...)
@ -560,27 +718,33 @@ typedef struct page_impl {
// -------- Page maintenance
/** This is called (exactly once) right after the page is read from
disk.
/** This is called when the page is read from disk.
This function should set p->LSN to an appropriate value.
@todo In order to support "raw" pages, we need a new page read
method that lets the caller decide which page type should handle
the call to pageLoaded().
the call to stasis_page_loaded().
@todo pageLoaded() should set p->pageType.
@todo stasis_page_loaded() should set p->pageType.
@todo set *page_type_ptr() to UNINITIALIZED_PAGE when appropriate.
@todo set *stasis_page_type_ptr() to UNINITIALIZED_PAGE when appropriate.
*/
void (*pageLoaded)(Page * p);
/** This is called (exactly once) right before the page is written
back to disk.
/** This is called before the page is written back to disk.
This function should record p->LSN somewhere appropriate
(perhaps via stasis_page_lsn_ptr()), and should prepare the page
to be written back to disk.
*/
void (*pageFlushed)(Page * p);
/** This is called before the page is evicted from memory.
At this point the page has already been written back to disk
(if necessary). Any resources held on behalf of this page
should be released.
*/
void (*pageCleanup)(Page * p);
} page_impl;
@ -593,7 +757,7 @@ typedef struct page_impl {
page type is not unique.)
*/
int registerPageType(page_impl impl);
int stasis_page_impl_register(page_impl impl);
// -------------------- Page specific, general purpose methods
@ -601,16 +765,14 @@ int registerPageType(page_impl impl);
Initialize a new page
@param p The page that will be turned into a new slotted page.
Its contents will be overwitten. It was probably
Its contents will be overwritten. It was probably
returned by loadPage()
*/
void slottedPageInitialize(Page * p);
void fixedPageInitialize(Page * page, size_t size, int count);
void stasis_slotted_initialize_page(Page * p);
void stasis_fixed_initialize_page(Page * page, size_t size, int count);
void stasis_indirect_initialize_page(Page * p, int height);
int fixedRecordsPerPage(size_t size);
void indirectInitialize(Page * p, int height);
compensated_function recordid dereferenceRID(int xid, recordid rid);
int stasis_fixed_records_per_page(size_t size);
END_C_DECLS

View file

@ -6,35 +6,35 @@
int fixedRecordsPerPage(size_t size) {
int stasis_fixed_records_per_page(size_t size) {
return (USABLE_SIZE_OF_PAGE - 2*sizeof(short)) / size;
}
/** @todo CORRECTNESS Locking for fixedPageInitialize? (should hold writelock)*/
void fixedPageInitialize(Page * page, size_t size, int count) {
/** @todo CORRECTNESS Locking for stasis_fixed_initialize_page? (should hold writelock)*/
void stasis_fixed_initialize_page(Page * page, size_t size, int count) {
assertlocked(page->rwlatch);
// XXX fixed page asserts it's been given an UNINITIALIZED_PAGE... Why doesn't that crash?
assert(*page_type_ptr(page) == UNINITIALIZED_PAGE);
*page_type_ptr(page) = FIXED_PAGE;
assert(*stasis_page_type_ptr(page) == UNINITIALIZED_PAGE);
*stasis_page_type_ptr(page) = FIXED_PAGE;
*recordsize_ptr(page) = size;
assert(count <= fixedRecordsPerPage(size));
assert(count <= stasis_fixed_records_per_page(size));
*recordcount_ptr(page)= count;
}
static int checkRidWarnedAboutUninitializedKludge = 0;
static void checkRid(Page * page, recordid rid) {
assertlocked(page->rwlatch);
if(! *page_type_ptr(page)) {
if(! *stasis_page_type_ptr(page)) {
if(!checkRidWarnedAboutUninitializedKludge) {
checkRidWarnedAboutUninitializedKludge = 1;
printf("KLUDGE detected in checkRid. Fix it ASAP\n");
fflush(stdout);
}
fixedPageInitialize(page, rid.size, fixedRecordsPerPage(rid.size));
stasis_fixed_initialize_page(page, rid.size, stasis_fixed_records_per_page(rid.size));
}
assert(page->id == rid.page);
assert(*recordsize_ptr(page) == rid.size);
assert(fixedRecordsPerPage(rid.size) > rid.slot);
assert(stasis_fixed_records_per_page(rid.size) > rid.slot);
}
//-------------- New API below this line
@ -75,7 +75,7 @@ static void fixedSetType(int xid, Page *p, recordid rid, int type) {
}
static int fixedGetLength(int xid, Page *p, recordid rid) {
assertlocked(p->rwlatch);
assert(*page_type_ptr(p));
assert(*stasis_page_type_ptr(p));
return rid.slot > *recordcount_ptr(p) ?
INVALID_SLOT : physical_slot_length(*recordsize_ptr(p));
}
@ -84,7 +84,7 @@ static int notSupported(int xid, Page * p) { return 0; }
static int fixedFreespace(int xid, Page * p) {
assertlocked(p->rwlatch);
if(fixedRecordsPerPage(*recordsize_ptr(p)) > *recordcount_ptr(p)) {
if(stasis_fixed_records_per_page(*recordsize_ptr(p)) > *recordcount_ptr(p)) {
// Return the size of a slot; that's the biggest record we can take.
return physical_slot_length(*recordsize_ptr(p));
} else {
@ -97,7 +97,7 @@ static void fixedCompact(Page * p) {
}
static recordid fixedPreAlloc(int xid, Page *p, int size) {
assertlocked(p->rwlatch);
if(fixedRecordsPerPage(*recordsize_ptr(p)) > *recordcount_ptr(p)) {
if(stasis_fixed_records_per_page(*recordsize_ptr(p)) > *recordcount_ptr(p)) {
recordid rid;
rid.page = p->id;
rid.slot = *recordcount_ptr(p);
@ -125,10 +125,10 @@ static void fixedFree(int xid, Page *p, recordid rid) {
// XXX dereferenceRID
void fixedLoaded(Page *p) {
p->LSN = *lsn_ptr(p);
p->LSN = *stasis_page_lsn_ptr(p);
}
void fixedFlushed(Page *p) {
*lsn_ptr(p) = p->LSN;
*stasis_page_lsn_ptr(p) = p->LSN;
}
void fixedCleanup(Page *p) { }
page_impl fixedImpl() {
@ -144,9 +144,9 @@ page_impl fixedImpl() {
fixedFirst,
fixedNext,
notSupported, // notSupported,
pageGenericBlockFirst,
pageGenericBlockNext,
pageGenericBlockDone,
stasis_block_first_default_impl,
stasis_block_next_default_impl,
stasis_block_done_default_impl,
fixedFreespace,
fixedCompact,
fixedPreAlloc,

View file

@ -2,10 +2,13 @@
#ifndef __FIXED_H
#define __FIXED_H
// @todo rename fixed.h macros to something more specific
#define recordsize_ptr(page) shorts_from_end((page), 1)
#define recordcount_ptr(page) shorts_from_end((page), 2)
#define fixed_record_ptr(page, n) bytes_from_start((page), *recordsize_ptr((page)) * (n))
/**
@todo rename fixed.h macros turn them into static inline functions.
*/
#define recordsize_ptr(page) stasis_page_int16_ptr_from_end((page), 1)
#define recordsize_cptr(page) stasis_page_int16_cptr_from_end((page), 1)
#define recordcount_ptr(page) stasis_page_int16_ptr_from_end((page), 2)
#define fixed_record_ptr(page, n) stasis_page_byte_ptr_from_start((page), *recordsize_ptr((page)) * (n))
static inline recordid fixedNext(int xid, Page *p, recordid rid) {
short n = *recordcount_ptr(p);

View file

@ -1,7 +1,10 @@
#include "../page.h"
#include "header.h"
#include <assert.h>
/**
@file header.c is dead code(?)
@todo Delete header.c
*/
int headerPageInitialize() {
Page * p;
try_ret(0) {

View file

@ -10,11 +10,11 @@
void indirectInitialize(Page * p, int height) {
*level_ptr(p) = height;
*page_type_ptr(p) = INDIRECT_PAGE;
*stasis_page_type_ptr(p) = INDIRECT_PAGE;
memset(p->memAddr, INVALID_SLOT, ((size_t)level_ptr(p)) - ((size_t)p->memAddr));
}
/** @todo Is locking for dereferenceRID really necessary? */
compensated_function recordid dereferenceRID(int xid, recordid rid) {
compensated_function recordid dereferenceIndirectRID(int xid, recordid rid) {
Page * page;
try_ret(NULLRID) {
page = loadPage(xid, rid.page);
@ -23,7 +23,7 @@ compensated_function recordid dereferenceRID(int xid, recordid rid) {
// printf("a"); fflush(stdout);
int offset = 0;
int max_slot;
while(*page_type_ptr(page) == INDIRECT_PAGE) {
while(*stasis_page_type_ptr(page) == INDIRECT_PAGE) {
int i = 0;
for(max_slot = *maxslot_ptr(page, i); ( max_slot + offset ) <= rid.slot; max_slot = *maxslot_ptr(page, i)) {
i++;
@ -171,14 +171,14 @@ compensated_function recordid __rallocMany(int xid, int parentPage, int recordSi
/* Initialize leaves. (As SLOTTED_PAGE's) */
writelock(p.rwlatch,0);
slottedPageInitialize(&p);
stasis_slotted_initialize_page(&p);
p.id = parentPage;
for(int i = 0; i < recordCount; i++) {
/* Normally, we would worry that the page id isn't set, but
we're discarding the recordid returned by page ralloc
anyway. */
recordid rid = recordPreAlloc(xid, &p, recordSize);
recordPostAlloc(xid, &p, rid);
recordid rid = stasis_record_alloc_begin(xid, &p, recordSize);
stasis_record_alloc_done(xid, &p, rid);
}
unlock(p.rwlatch);
}
@ -204,7 +204,7 @@ compensated_function int indirectPageRecordCount(int xid, recordid rid) {
readlock(p->rwlatch, 0);
int i = 0;
unsigned int ret;
if(*page_type_ptr(p) == INDIRECT_PAGE) {
if(*stasis_page_type_ptr(p) == INDIRECT_PAGE) {
while(*maxslot_ptr(p, i) > 0) {
i++;
@ -214,7 +214,7 @@ compensated_function int indirectPageRecordCount(int xid, recordid rid) {
} else {
ret = (*maxslot_ptr(p, i-1)) - 1;
}
} else if (*page_type_ptr(p) == SLOTTED_PAGE) {
} else if (*stasis_page_type_ptr(p) == SLOTTED_PAGE) {
int numslots = *numslots_ptr(p);
ret = 0;
@ -237,10 +237,10 @@ compensated_function int indirectPageRecordCount(int xid, recordid rid) {
static int notSupported(int xid, Page * p) { return 0; }
void indirectLoaded(Page *p) {
p->LSN = *lsn_ptr(p);
p->LSN = *stasis_page_lsn_ptr(p);
}
void indirectFlushed(Page *p) {
*lsn_ptr(p) = p->LSN;
*stasis_page_lsn_ptr(p) = p->LSN;
}
void indirectCleanup(Page *p) { }
static page_impl pi = {

View file

@ -34,10 +34,11 @@ implementation).
BEGIN_C_DECLS
#define level_ptr(page) shorts_from_end((page), 3)
#define level_ptr(page) stasis_page_int16_ptr_from_end((page), 3)
#define page_ptr(page, offset) ints_from_start((page), 2*(offset))
#define maxslot_ptr(page, offset) ints_from_start((page), 2*(offset)+1)
/** @todo indirect.h cannot handle 64 bit file offsets! */
#define page_ptr(page, offset) stasis_page_int32_ptr_from_start((page), 2*(offset))
#define maxslot_ptr(page, offset) stasis_page_int32_ptr_from_start((page), 2*(offset)+1)
#define INDIRECT_POINTERS_PER_PAGE (USABLE_SIZE_OF_PAGE / 16)
@ -45,7 +46,7 @@ BEGIN_C_DECLS
Translates a recordid that points to an indirect block into the
physical location of the record.
*/
compensated_function recordid dereferenceRID(int xid, recordid rid);
compensated_function recordid dereferenceIndirectRID(int xid, recordid rid);
void indirectInitialize(Page * p, int height);
compensated_function recordid rallocMany(/*int parentPage, lsn_t lsn,*/int xid, int recordSize, int recordCount);

View file

@ -15,10 +15,10 @@ static inline void slottedFsck(const Page const * page) {
dummy.id = -1;
dummy.memAddr = 0;
const short page_type = *page_type_ptr(page);
const short numslots = *numslots_ptr(page);
const short freespace = *freespace_ptr(page);
const short freelist = *freelist_ptr(page);
const short page_type = *stasis_page_type_cptr(page);
const short numslots = *numslots_cptr(page);
const short freespace = *freespace_cptr(page);
const short freelist = *freelist_cptr(page);
const long slotListStart = (long)slot_length_ptr(&dummy, numslots-1);
assert(slotListStart < PAGE_SIZE && slotListStart >= 0);
@ -193,9 +193,9 @@ void slottedPageDeinit() {
}
void slottedPageInitialize(Page * page) {
void stasis_slotted_initialize_page(Page * page) {
assertlocked(page->rwlatch);
*page_type_ptr(page) = SLOTTED_PAGE;
*stasis_page_type_ptr(page) = SLOTTED_PAGE;
*freespace_ptr(page) = 0;
*numslots_ptr(page) = 0;
*freelist_ptr(page) = INVALID_SLOT;
@ -500,11 +500,11 @@ static void slottedFree(int xid, Page * p, recordid rid) {
// XXX dereferenceRID
void slottedLoaded(Page *p) {
p->LSN = *lsn_ptr(p);
p->LSN = *stasis_page_lsn_ptr(p);
slottedFsck(p); // @todo In normal case, arrange for fsck to run on load/flush, but nowhere else.
}
void slottedFlushed(Page *p) {
*lsn_ptr(p) = p->LSN;
*stasis_page_lsn_ptr(p) = p->LSN;
slottedFsck(p);
}
void slottedCleanup(Page *p) { }
@ -522,9 +522,9 @@ static page_impl pi = {
slottedFirst,
slottedNext,
notSupported, // is block supported
pageGenericBlockFirst,
pageGenericBlockNext,
pageGenericBlockDone,
stasis_block_first_default_impl,
stasis_block_next_default_impl,
stasis_block_done_default_impl,
slottedFreespace,
slottedCompact,
slottedPreRalloc,

View file

@ -70,12 +70,24 @@ Slotted page layout:
#define SLOTTED_PAGE_CHECK_FOR_OVERLAP 1
#endif
#define freespace_ptr(page) shorts_from_end((page), 1)
#define numslots_ptr(page) shorts_from_end((page), 2)
#define freelist_ptr(page) shorts_from_end((page), 3)
#define slot_ptr(page, n) shorts_from_end((page), (2*(n))+4)
#define slot_length_ptr(page, n) shorts_from_end((page), (2*(n))+5)
#define record_ptr(page, n) bytes_from_start((page), *slot_ptr((page), (n)))
/**
@todo rename and rewrite slotted.h macros as static inline functions.
*/
#define freespace_ptr(page) stasis_page_int16_ptr_from_end((page), 1)
#define numslots_ptr(page) stasis_page_int16_ptr_from_end((page), 2)
#define freelist_ptr(page) stasis_page_int16_ptr_from_end((page), 3)
#define slot_ptr(page, n) stasis_page_int16_ptr_from_end((page), (2*(n))+4)
#define slot_length_ptr(page, n) stasis_page_int16_ptr_from_end((page), (2*(n))+5)
#define record_ptr(page, n) stasis_page_byte_ptr_from_start((page), \
*slot_ptr((page), (n)))
#define freespace_cptr(page) stasis_page_int16_cptr_from_end((page), 1)
#define numslots_cptr(page) stasis_page_int16_cptr_from_end((page), 2)
#define freelist_cptr(page) stasis_page_int16_cptr_from_end((page), 3)
#define slot_cptr(page, n) stasis_page_int16_cptr_from_end((page), (2*(n))+4)
#define slot_length_cptr(page, n) stasis_page_int16_cptr_from_end((page), (2*(n))+5)
#define record_cptr(page, n) stasis_page_byte_cptr_from_start((page), \
*slot_cptr((page), (n)))
void slottedPageInit();
void slottedPageDeinit();

View file

@ -68,7 +68,7 @@ static void pfPageRead(Page *ret) {
}
ret->dirty = 0;
pageLoaded(ret);
stasis_page_loaded(ret);
pthread_mutex_unlock(&stable_mutex);
@ -86,11 +86,11 @@ static void pfPageWrite(Page * ret) {
pageid_t pageoffset = ret->id * PAGE_SIZE;
pageid_t offset ;
pageFlushed(ret);
stasis_page_flushed(ret);
// If necessary, force the log to disk so that ret's LSN will be stable.
assert(ret->LSN == pageReadLSN(ret));
assert(ret->LSN == stasis_page_lsn_read(ret));
LogForce(ret->LSN);
pthread_mutex_lock(&stable_mutex);

View file

@ -25,7 +25,7 @@ static void phWrite(Page * ret) {
// implicitly have exclusive access to the page before this function is called,
// or we'll deadlock.
writelock(ret->rwlatch,0);
pageFlushed(ret);
stasis_page_flushed(ret);
LogForce(ret->LSN);
int err = h->write(h, PAGE_SIZE * ret->id, ret->memAddr, PAGE_SIZE);
if(err) {
@ -50,7 +50,7 @@ static void phRead(Page * ret) {
}
}
ret->dirty = 0;
pageLoaded(ret);
stasis_page_loaded(ret);
unlock(ret->rwlatch);
}
static void phForce() {

View file

@ -227,7 +227,7 @@ static void Undo(int recovery) {
p = loadPage(thisXid, e->update.rid.page);
// If this fails, something is wrong with redo or normal operation.
this_lsn= pageReadLSN(p);
this_lsn = stasis_page_lsn_read(p);
assert(e->LSN <= this_lsn);
} else {

View file

@ -134,7 +134,7 @@ int Tinit() {
setupOperationsTable();
dirtyPagesInit();
LogInit(loggerType);
pageInit();
stasis_page_init();
switch(bufferManagerFileHandleType) {
case BUFFER_MANAGER_FILE_HANDLE_NON_BLOCKING: {
@ -280,16 +280,6 @@ static compensated_function void TactionHelper(int xid, recordid rid,
}
static recordid resolveForUpdate(int xid, Page * p, recordid rid) {
if(*page_type_ptr(p) == INDIRECT_PAGE) {
rid = dereferenceRID(xid, rid);
} else if(*page_type_ptr(p) == ARRAY_LIST_PAGE) {
rid = dereferenceArrayListRid(xid, p, rid.slot);
}
return rid;
}
compensated_function void TupdateRaw(int xid, recordid rid,
const void * dat, int op) {
assert(xid >= 0);
@ -306,7 +296,7 @@ compensated_function void TupdateStr(int xid, recordid rid,
compensated_function void Tupdate(int xid, recordid rid,
const void *dat, int op) {
Page * p = loadPage(xid, rid.page);
rid = resolveForUpdate(xid, p, rid);
rid = stasis_record_dereference(xid, p, rid);
if(p->id != rid.page) {
releasePage(p);
@ -321,7 +311,7 @@ compensated_function void Tdefer(int xid, recordid rid,
const void * dat, int op) {
Page * p = loadPage(xid, rid.page);
recordid newrid = resolveForUpdate(xid, p, rid);
recordid newrid = stasis_record_dereference(xid, p, rid);
// Caller cannot rely on late or early binding of rid.
assert(rid.page == newrid.page &&
rid.slot == newrid.slot &&
@ -340,12 +330,12 @@ compensated_function void Tread(int xid, recordid rid, void * dat) {
p = loadPage(xid, rid.page);
} end;
rid = recordDereference(xid, p, rid);
rid = stasis_record_dereference(xid, p, rid);
if(rid.page != p->id) {
releasePage(p);
p = loadPage(xid, rid.page);
}
recordRead(xid, p, rid, dat);
stasis_record_read(xid, p, rid, dat);
releasePage(p);
}
@ -418,7 +408,7 @@ int Tdeinit() {
slow_pfile = 0;
slow_close = 0;
}
pageDeinit();
stasis_page_deinit();
LogDeinit();
dirtyPagesDeinit();
return 0;
@ -433,7 +423,7 @@ int TuncleanShutdown() {
slow_pfile = 0;
slow_close = 0;
}
pageDeinit();
stasis_page_deinit();
LogDeinit();
numActiveXactions = 0;
dirtyPagesDeinit();

View file

@ -97,8 +97,13 @@ typedef struct Page_s Page_s;
typedef struct Page_s Page;
/**
* Obtain a pointer to a page from the buffer manager. The page will
* be pinned, and the pointer valid until releasePage is called.
*
* @param xid The transaction that is pinning the page (used by page-level locking implementations.)
*
* @param pageid ID of the page you want to load
*
* @return fully formed Page type
*/
Page * loadPage(int xid, int pageid);

View file

@ -35,7 +35,7 @@ extern int bufferManagerFileHandleType;
extern int bufferManagerNonBlockingSlowHandleType;
/**
If true, the buffer manager will use O_DIRECT. Set at compile time by
#defining BUFFER_MANAGER_O_DIRECT.
defining BUFFER_MANAGER_O_DIRECT.
*/
extern int bufferManagerO_DIRECT;
#endif

View file

@ -15,7 +15,9 @@
/**
@file Interface for I/O handle implementations.
@file
Interface for I/O handle implementations.
This interface is designed to provide some extra features needed by
the buffer manager and the log, and to hide the operating system's

View file

@ -29,9 +29,13 @@ void lsmTreeRegisterComparator(int id, lsm_comparator_t i);
/**
Initialize a new LSM tree.
@param comparator. The id of the comparator this tree should use.
@param xid The tranasction that is creating the tree.
@param comparator The id of the comparator this tree should use.
(It must have been registered with lsmTreeRegisterComparator
before TlsmCreate() is called.
@param keySize
*/
recordid TlsmCreate(int xid, int comparator, int keySize);
/**

View file

@ -46,17 +46,28 @@ terms specified in this license.
* The minimal subset of Stasis necessary to implement transactional consistency.
*
* This module includes the standard API (excluding operations), the
* logger, the buffer mananger, and recovery code.
* logger, the buffer manager, and recovery code.
*
* In theory, the other .h files that are installed in /usr/include
* aren't needed for application developers.
*
* @todo Move as much of the stuff in lladd/ to src/lladd/ as possible. Alternatively, move all headers to lladd/, and be done with it!
* @todo Move as much of the stuff in stasis/ to src/stasis/ as possible. Alternatively, move all headers to stasis/, and be done with it!
*
*/
/**
@mainpage Introduction to Stasis
This is the main section.
<ul>
<li>@ref gettingStarted</li>
<li>@ref pageFormats</li>
<li>@ref LLADD_CORE</li>
<li>@ref OPERATIONS</li>
</ul>
*/
/**
@page gettingStarted Getting Started
@section compiling Compiling and installation
Prerequisites:
@ -131,7 +142,7 @@ terms specified in this license.
self explanatory. If not, they are covered in detail elsewhere. Tinit() and
Tdeinit() initialize the library, and clean up when the program is finished.
Other partiularly useful functions are ThashCreate(), ThashDelete(),
Other particularly useful functions are ThashCreate(), ThashDelete(),
ThashInsert(), ThashRemove(), and ThashLookup() which provide a
re-entrant linear hash implementation. ThashIterator() and
ThashNext() provide an iterator over the hashtable's values.
@ -153,7 +164,7 @@ terms specified in this license.
@include examples/ex2.c
@see test.c for a complete, executable example of reopeneing an existing store.
@see test.c for a complete, executable example of reopening an existing store.
@todo Explain how to determine the correct value of rootEntry.size in the case
of a hashtable.
@ -175,7 +186,7 @@ terms specified in this license.
being written. This is less consistency than SQL's Level 0 (Dirty
Reads) provides. Some of Stasis' data structures do obtain short
read and write locks automatically. Refer to individual data
structues for more information.
structures for more information.
Stasis' allocation functions, such as Talloc(), do not reuse space
that was freed by an ongoing transaction. This means that you may
@ -190,7 +201,7 @@ terms specified in this license.
other. This means that transactions may observe the effects of
transactions that will eventually abort.
Finally, Stasis asumes that each thread has its own transaction;
Finally, Stasis assumes that each thread has its own transaction;
concurrent calls within the same transaction are not supported.
This restriction may be removed in the future.
@ -202,7 +213,7 @@ terms specified in this license.
Stasis. Running 'make check' in test/stasis runs all of the Stasis
tests without running the obsolete tests.
@section archictecture Stasis' structure
@section architecture Stasis' structure
This section is geared toward people that would like to extend
Stasis. The OSDI paper provides a higher level description and
@ -375,7 +386,7 @@ terms specified in this license.
portably is a bit tricky. Stasis has settled upon a compromise in
this matter. Its page file formats are compatible within a single
architecture, but not across systems with varying lengths of
primitive types, or that vary in endianess.
primitive types, or that vary in endianness.
Over time, types that vary in length such as "int", "long", etc
will be removed from Stasis, but their usage still exists in a few
@ -410,14 +421,16 @@ terms specified in this license.
involved. However, it lets the compiler deal with the underlying
multiplications, and often reduces the number of casts, leading to
slightly more readable code. Take this implementation of
page_type_ptr(), for example:
stasis_page_type_ptr(), for example:
@code
int * page_type_ptr(Page *p) { return ( (int*)lsn_ptr(Page *p) ) - 1; }
int * stasis_page_type_ptr(Page *p) {
return ( (int*)stasis_page_lsn_ptr(Page *p) ) - 1;
}
@endcode
Here, the page type is stored as an integer immediately before the
lsn_ptr. Using arithmetic over char*'s would require an extra
LSN pointer. Using arithmetic over char*'s would require an extra
cast to char*, and a multiplication by sizeof(int).
@par A note on storage allocation
@ -438,7 +451,7 @@ terms specified in this license.
then reused without being reset.
# Allocate a single page at a time using TallocPage(), and
TsetPage(). This is currently the msot attractive route, though
TsetPage(). This is currently the most attractive route, though
TsetPage() does not call pageLoaded() when it resets page types,
which can lead to trouble.
@ -562,7 +575,7 @@ int Tbegin();
/**
* Used when extending Stasis.
* Operation implementors should wrap around this function to provide more mnuemonic names.
* Operation implementers should wrap around this function to provide more mnemonic names.
*
* @param xid The current transaction.
* @param rid The record the operation pertains to. For some logical operations, this will be a dummy record.
@ -580,6 +593,8 @@ compensated_function void TupdateRaw(int xid, recordid rid,
compensated_function void TupdateDeferred(int xid, recordid rid,
const void *dat, int op);
/**
* Read the value of a record.
*
* @param xid transaction ID
* @param rid reference to page/slot
* @param dat buffer into which data goes
@ -646,7 +661,7 @@ void TsetXIDCount(int xid);
* Checks to see if a transaction is still active.
*
* @param xid The transaction id to be tested.
* @return true if the transacation is still running, false otherwise.
* @return true if the transaction is still running, false otherwise.
*/
int TisActiveTransaction(int xid);
@ -657,7 +672,7 @@ lsn_t transactions_minRecLSN();
/**
Report Stasis' current durablity guarantees.
Report Stasis' current durability guarantees.
@return VOLATILE if the data will be lost after Tdeinit(), or a
crash, PERSISTENT if the data will be written back to disk after

View file

@ -110,7 +110,7 @@ int insert(int xid, Page* p, recordid rid_caller, int valueIn){
if (DEBUGSTATEMENTS) {printf("\nDebug1\n");}
recordRead(xid, p, rid, countBuff); // read the value of count from slot 0
stasis_record_read(xid, p, rid, countBuff); // read the value of count from slot 0
if (DEBUGSTATEMENTS) {printf("\nDebug2\n");}
@ -122,7 +122,7 @@ int insert(int xid, Page* p, recordid rid_caller, int valueIn){
printf("\nrid2slot = %d\n", rid.slot);
// *recordcount_ptr(p) = last accessible index on the page.
int max_index = fixedRecordsPerPage(rid.size); // rcs made this change.
int max_index = stasis_fixed_records_per_page(rid.size); // rcs made this change.
// int max_index = *recordcount_ptr(p); // recordcount_ptr is the number of slots currently allocated on the page.
// but this code seems to do it's own allocation(?)
@ -189,7 +189,7 @@ void initializeNewBTreeNode(int xid, Page* p, recordid rid){
byte * countBuff = (byte *) & countInt;
// write the count out
recordWrite(xid, p, 1, rid, countBuff);
stasis_record_write(xid, p, 1, rid, countBuff);
}
void testFunctions(){
@ -247,7 +247,7 @@ int SimpleExample(){
/* check to make sure page is recorded as a FIXED_PAGE */
assert( *page_type_ptr(p1) == FIXED_PAGE);
assert( *stasis_page_type_ptr(p1) == FIXED_PAGE);
if (DEBUGP) { printf("\n%d\n", rid1.page); }
byte * b1 = (byte *) malloc (sizeof (int));
@ -279,8 +279,8 @@ int SimpleExample(){
// @todo This is a messy way to do this...
unlock(p1->rwlatch);
recordWrite(xid, p1, 1, rid2, b1);
recordRead(xid, p1, rid2, b2);
stasis_record_write(xid, p1, 1, rid2, b1);
stasis_record_read(xid, p1, rid2, b2);
if (DEBUGP) { printf("\nb2** = %d\n",*((int *) b2));}
// initializeNewBTreeNode(p1, rid1);

View file

@ -55,11 +55,11 @@ void initializePages() {
assert(p->id != -1);
writelock(p->rwlatch,0);
slottedPageInitialize(p);
recordPostAlloc(-1, p, rid);
int * buf = (int*)recordWriteNew(-1, p, rid);
stasis_slotted_initialize_page(p);
stasis_record_alloc_done(-1, p, rid);
int * buf = (int*)stasis_record_write_begin(-1, p, rid);
*buf = i;
pageWriteLSN(-1, p, 0);
stasis_page_lsn_write(-1, p, 0);
unlock(p->rwlatch);
releasePage(p);
}
@ -87,7 +87,7 @@ void * workerThread(void * p) {
p = loadPage(-1, rid.page);
recordRead(1, p, rid, (byte*)&j);
stasis_record_read(1, p, rid, (byte*)&j);
releasePage(p);
@ -138,7 +138,7 @@ void * workerThreadWriting(void * q) {
}
/* sched_yield(); */
recordWrite(1, p, 0, rids[i], (byte*)&val);
stasis_record_write(1, p, 0, rids[i], (byte*)&val);
assert(p->id == rids[i].page);
releasePage(p);
@ -156,7 +156,7 @@ void * workerThreadWriting(void * q) {
p = loadPage(xid, rids[i].page);
recordRead(1, p, rids[i], (byte*)&val);
stasis_record_read(1, p, rids[i], (byte*)&val);
releasePage(p);

View file

@ -104,7 +104,7 @@ START_TEST(indirectAlloc) {
Page * p = loadPage(xid, page);
int page_type = *page_type_ptr(p);
int page_type = *stasis_page_type_ptr(p);
assert(page_type == SLOTTED_PAGE);
@ -125,7 +125,7 @@ START_TEST(indirectAlloc) {
p = loadPage(xid, page);
page_type = *page_type_ptr(p);
page_type = *stasis_page_type_ptr(p);
assert(page_type == INDIRECT_PAGE);
@ -149,7 +149,7 @@ START_TEST(indirectAlloc) {
p = loadPage(xid, page);
page_type = *page_type_ptr(p);
page_type = *stasis_page_type_ptr(p);
assert(page_type == INDIRECT_PAGE);
@ -180,7 +180,7 @@ START_TEST(indirectAccessDirect) {
Page * p = loadPage(xid, page);
int page_type = *page_type_ptr(p);
int page_type = *stasis_page_type_ptr(p);
assert(page_type == SLOTTED_PAGE);
@ -194,7 +194,7 @@ START_TEST(indirectAccessDirect) {
for(int i = 0; i < 500; i++) {
rid.slot = i;
Tset(xid, dereferenceRID(xid, rid), &i);
Tset(xid, dereferenceIndirectRID(xid, rid), &i);
}
Tcommit(xid);
@ -203,7 +203,7 @@ START_TEST(indirectAccessDirect) {
for(int i = 0; i < 500; i++) {
rid.slot = i;
int j;
Tread(xid, dereferenceRID(xid, rid), &j);
Tread(xid, dereferenceIndirectRID(xid, rid), &j);
assert(j == i);
}
@ -225,7 +225,7 @@ START_TEST(indirectAccessIndirect) {
Page * p = loadPage(xid, page);
int page_type = *page_type_ptr(p);
int page_type = *stasis_page_type_ptr(p);
assert(page_type == INDIRECT_PAGE);
@ -240,7 +240,7 @@ START_TEST(indirectAccessIndirect) {
for(int i = 0; i < 500000; i++) {
rid.slot = i;
// printf("i=%d", i); fflush(stdout);
recordid rid2 = dereferenceRID(xid, rid);
recordid rid2 = dereferenceIndirectRID(xid, rid);
// printf("."); fflush(stdout);
Tset(xid, rid2, &i);
}
@ -254,7 +254,7 @@ START_TEST(indirectAccessIndirect) {
for(int i = 0; i < 500000; i++) {
rid.slot = i;
int j;
Tread(xid, dereferenceRID(xid, rid), &j);
Tread(xid, dereferenceIndirectRID(xid, rid), &j);
assert(j == i);
}

View file

@ -74,9 +74,9 @@ START_TEST(operation_physical_do_undo) {
Page * p = loadPage(xid, pnum);
writelock(p->rwlatch, 0);
slottedPageInitialize(p);
rid = recordPreAlloc(xid, p, sizeof(int));
recordPostAlloc(xid, p, rid);
stasis_slotted_initialize_page(p);
rid = stasis_record_alloc_begin(xid, p, sizeof(int));
stasis_record_alloc_done(xid, p, rid);
unlock(p->rwlatch);
releasePage(p);
@ -91,7 +91,7 @@ START_TEST(operation_physical_do_undo) {
DEBUG("B\n");
p = loadPage(xid, rid.page);
recordWrite(xid, p, lsn, rid, (byte*)&buf);
stasis_record_write(xid, p, lsn, rid, (byte*)&buf);
releasePage(p);
setToTwo->LSN = 10;
@ -101,7 +101,7 @@ START_TEST(operation_physical_do_undo) {
releasePage(p);
p = loadPage(xid, rid.page);
recordRead(xid, p, rid, (byte*)&buf);
stasis_record_read(xid, p, rid, (byte*)&buf);
releasePage(p);
fail_unless(buf == 2, NULL);
@ -111,7 +111,7 @@ START_TEST(operation_physical_do_undo) {
p = loadPage(xid, rid.page);
readlock(p->rwlatch,0);
fail_unless(10 == pageReadLSN(p), "page lsn not set correctly.");
fail_unless(10 == stasis_page_lsn_read(p), "page lsn not set correctly.");
unlock(p->rwlatch);
setToTwo->LSN = 5;
@ -120,7 +120,7 @@ START_TEST(operation_physical_do_undo) {
releasePage(p);
p = loadPage(xid, rid.page);
recordRead(xid, p, rid, (byte*)&buf);
stasis_record_read(xid, p, rid, (byte*)&buf);
releasePage(p);
fail_unless(buf == 1, NULL);
@ -130,7 +130,7 @@ START_TEST(operation_physical_do_undo) {
p = loadPage(xid, rid.page);
recordRead(xid, p, rid, (byte*)&buf);
stasis_record_read(xid, p, rid, (byte*)&buf);
releasePage(p);
fail_unless(buf == 1, NULL);
@ -149,7 +149,7 @@ START_TEST(operation_physical_do_undo) {
buf = 1;
p = loadPage(xid, rid.page);
recordWrite(xid, p, lsn, rid, (byte*)&buf);
stasis_record_write(xid, p, lsn, rid, (byte*)&buf);
releasePage(p);
/* Trace of test:
@ -177,14 +177,14 @@ START_TEST(operation_physical_do_undo) {
redoUpdate(setToTwo);
p = loadPage(xid, rid.page);
recordRead(xid, p, rid, (byte*)&buf);
stasis_record_read(xid, p, rid, (byte*)&buf);
assert(buf == 2);
fail_unless(buf == 2, NULL);
DEBUG("G undo set to 2\n");
undoUpdate(setToTwo, p, 20); /* Succeeds -- 20 is the 'CLR' entry's lsn.*/
recordRead(xid, p, rid, (byte*)&buf);
stasis_record_read(xid, p, rid, (byte*)&buf);
fail_unless(buf == 1, NULL);
releasePage(p);
@ -194,18 +194,18 @@ START_TEST(operation_physical_do_undo) {
p = loadPage(xid, rid.page);
recordRead(xid, p, rid, (byte*)&buf);
stasis_record_read(xid, p, rid, (byte*)&buf);
fail_unless(buf == 1, NULL);
recordWrite(xid, p, 0, rid, (byte*)&buf); /* reset the page's LSN. */
stasis_record_write(xid, p, 0, rid, (byte*)&buf); /* reset the page's LSN. */
DEBUG("I redo set to 2\n");
releasePage(p);
redoUpdate(setToTwo); /* Succeeds */
p = loadPage(xid, rid.page);
recordRead(xid, p, rid, (byte*)&buf);
stasis_record_read(xid, p, rid, (byte*)&buf);
fail_unless(buf == 2, NULL);
releasePage(p);

View file

@ -43,13 +43,14 @@ terms specified in this license.
/** @file
@todo check_page should judiciously avoid lsn_ptr()
@todo check_page should judiciously avoid stasis_page_lsn_ptr()
*/
#include <config.h>
#include <check.h>
#include "../../src/stasis/page.h"
#include "../../src/stasis/page/indirect.h"
#include "../../src/stasis/page/slotted.h"
#include "../../src/stasis/blobManager.h"
#include <stasis/bufferManager.h>
@ -82,17 +83,17 @@ static void * multiple_simultaneous_pages ( void * arg_ptr) {
pthread_mutex_lock(&lsn_mutex);
lsn++;
this_lsn = lsn;
assert(pageReadLSN(p) < this_lsn);
assert(stasis_page_lsn_read(p) < this_lsn);
pthread_mutex_unlock(&lsn_mutex);
if(! first ) {
for(k = 0; k < 100; k++) {
recordRead(1, p, rid[k], (byte*)&j);
stasis_record_read(1, p, rid[k], (byte*)&j);
assert((j + 1) == i + k);
writelock(p->rwlatch,0);
recordFree(-1, p, rid[k]);
pageWriteLSN(-1, p, this_lsn);
stasis_record_free(-1, p, rid[k]);
stasis_page_lsn_write(-1, p, this_lsn);
unlock(p->rwlatch);
sched_yield();
}
@ -102,16 +103,16 @@ static void * multiple_simultaneous_pages ( void * arg_ptr) {
for(k = 0; k < 100; k++) {
writelock(p->rwlatch,0);
rid[k] = recordPreAlloc(-1,p,sizeof(short));
rid[k] = stasis_record_alloc_begin(-1,p,sizeof(short));
if(rid[k].size == INVALID_SLOT) { // Is rid[k] == NULLRID?
pageCompact(p);
rid[k] = recordPreAlloc(-1,p,sizeof(short));
stasis_record_compact(p);
rid[k] = stasis_record_alloc_begin(-1,p,sizeof(short));
}
recordPostAlloc(-1,p,rid[k]);
int * buf = (int*)recordWriteNew(-1,p,rid[k]);
stasis_record_alloc_done(-1,p,rid[k]);
int * buf = (int*)stasis_record_write_begin(-1,p,rid[k]);
*buf = i+k;
pageWriteLSN(-1, p, this_lsn);
assert(pageReadLSN(p) >= this_lsn);
stasis_page_lsn_write(-1, p, this_lsn);
assert(stasis_page_lsn_read(p) >= this_lsn);
unlock(p->rwlatch);
sched_yield();
}
@ -138,15 +139,15 @@ static void* fixed_worker_thread(void * arg_ptr) {
writelock(p->rwlatch,0);
if(! first ) {
j = *(int*)recordReadNew(-1,p,rid);
j = *(int*)stasis_record_read_begin(-1,p,rid);
assert((j + 1) == i);
}
first = 0;
rid = recordPreAlloc(-1, p, sizeof(int));
recordPostAlloc(-1, p, rid);
(*(int*)recordWriteNew(-1,p,rid)) = i;
pageWriteLSN(-1, p,lsn);
assert(pageReadLSN( p) >= this_lsn);
rid = stasis_record_alloc_begin(-1, p, sizeof(int));
stasis_record_alloc_done(-1, p, rid);
(*(int*)stasis_record_write_begin(-1,p,rid)) = i;
stasis_page_lsn_write(-1, p,lsn);
assert(stasis_page_lsn_read(p) >= this_lsn);
unlock(p->rwlatch);
sched_yield();
}
@ -169,11 +170,11 @@ static void* worker_thread(void * arg_ptr) {
pthread_mutex_unlock(&lsn_mutex);
if(! first ) {
recordRead(1, p, rid, (byte*)&j);
stasis_record_read(1, p, rid, (byte*)&j);
assert((j + 1) == i);
writelock(p->rwlatch,0);
recordFree(-1, p, rid);
pageWriteLSN(-1, p, this_lsn);
stasis_record_free(-1, p, rid);
stasis_page_lsn_write(-1, p, this_lsn);
unlock(p->rwlatch);
sched_yield();
}
@ -182,15 +183,15 @@ static void* worker_thread(void * arg_ptr) {
// @todo In check_page, a condition variable would be more efficient...
writelock(p->rwlatch,0);
if(pageFreespace(-1, p) < sizeof(int)) {
if(stasis_record_freespace(-1, p) < sizeof(int)) {
first = 1;
} else {
rid = recordPreAlloc(-1, p, sizeof(int));
recordPostAlloc(-1, p, rid);
int * buf = (int*)recordWriteNew(-1, p, rid);
pageWriteLSN(-1,p,this_lsn);
rid = stasis_record_alloc_begin(-1, p, sizeof(int));
stasis_record_alloc_done(-1, p, rid);
int * buf = (int*)stasis_record_write_begin(-1, p, rid);
stasis_page_lsn_write(-1,p,this_lsn);
*buf = i;
assert(pageReadLSN(p) >= this_lsn);
assert(stasis_page_lsn_read(p) >= this_lsn);
}
unlock(p->rwlatch);
sched_yield();
@ -219,12 +220,12 @@ START_TEST(pageNoThreadTest)
Tinit();
p = loadPage(-1, 0);
writelock(p->rwlatch,0);
slottedPageInitialize(p);
stasis_slotted_initialize_page(p);
unlock(p->rwlatch);
worker_thread(p);
p->LSN = 0;
*lsn_ptr(p) = p->LSN;
*stasis_page_lsn_ptr(p) = p->LSN;
releasePage(p);
@ -248,8 +249,8 @@ START_TEST(pageCheckMacros) {
lsn_t lsn = 5;
*lsn_ptr(&p) = lsn;
*page_type_ptr(&p) = 10;
*stasis_page_lsn_ptr(&p) = lsn;
*stasis_page_type_ptr(&p) = 10;
*freespace_ptr(&p) = 15;
*numslots_ptr(&p) = 20;
*slot_ptr(&p, 0) = 30;
@ -259,15 +260,15 @@ START_TEST(pageCheckMacros) {
*slot_length_ptr(&p, 1) = 36;
*slot_length_ptr(&p, 40) = 41;
*bytes_from_start(&p, 0) = 50;
*bytes_from_start(&p, 1) = 51;
*bytes_from_start(&p, 2) = 52;
*bytes_from_start(&p, 3) = 53;
*bytes_from_start(&p, 4) = 54;
*stasis_page_byte_ptr_from_start(&p, 0) = 50;
*stasis_page_byte_ptr_from_start(&p, 1) = 51;
*stasis_page_byte_ptr_from_start(&p, 2) = 52;
*stasis_page_byte_ptr_from_start(&p, 3) = 53;
*stasis_page_byte_ptr_from_start(&p, 4) = 54;
assert(*lsn_ptr(&p) == lsn);
assert(*page_type_ptr(&p) == 10);
assert(end_of_usable_space_ptr(&p) == page_type_ptr(&p));
assert(*stasis_page_lsn_ptr(&p) == lsn);
assert(*stasis_page_type_ptr(&p) == 10);
//assert(end_of_usable_space_ptr(&p) == stasis_page_type_ptr(&p));
assert(*freespace_ptr(&p) == 15);
assert(*numslots_ptr(&p) == 20);
assert(*slot_ptr(&p, 0) == 30);
@ -277,30 +278,30 @@ START_TEST(pageCheckMacros) {
assert(*slot_length_ptr(&p, 1) == 36);
assert(*slot_length_ptr(&p, 40) == 41);
assert(*bytes_from_start(&p, 0) == 50);
assert(*bytes_from_start(&p, 1) == 51);
assert(*bytes_from_start(&p, 2) == 52);
assert(*bytes_from_start(&p, 3) == 53);
assert(*bytes_from_start(&p, 4) == 54);
assert(*stasis_page_byte_ptr_from_start(&p, 0) == 50);
assert(*stasis_page_byte_ptr_from_start(&p, 1) == 51);
assert(*stasis_page_byte_ptr_from_start(&p, 2) == 52);
assert(*stasis_page_byte_ptr_from_start(&p, 3) == 53);
assert(*stasis_page_byte_ptr_from_start(&p, 4) == 54);
} END_TEST
static void assertRecordCountSizeType(int xid, Page *p, int count, int size, int type) {
int foundRecords = 0;
recordid it = recordFirst(xid,p);
recordid it = stasis_record_first(xid,p);
assert(it.size != INVALID_SLOT);
do {
foundRecords++;
assert(recordGetLength(xid,p,it) == size);
assert(recordGetTypeNew(xid,p,it) == type);
assert(stasis_record_length_read(xid,p,it) == size);
assert(stasis_record_type_read(xid,p,it) == type);
it.size = 0;
assert(recordGetLength(xid,p,it) == size);
assert(recordGetTypeNew(xid,p,it) == type);
assert(stasis_record_length_read(xid,p,it) == size);
assert(stasis_record_type_read(xid,p,it) == type);
it.size = INVALID_SLOT;
assert(recordGetLength(xid,p,it) == size);
assert(recordGetTypeNew(xid,p,it) == type);
it = recordNext(xid,p,it);
assert(stasis_record_length_read(xid,p,it) == size);
assert(stasis_record_type_read(xid,p,it) == type);
it = stasis_record_next(xid,p,it);
} while(it.size != INVALID_SLOT);
assert(foundRecords == count);
@ -310,20 +311,20 @@ static void assertRecordCountSizeType(int xid, Page *p, int count, int size, int
}
static void checkPageIterators(int xid, Page *p,int record_count) {
recordid first = recordPreAlloc(xid, p, sizeof(int64_t));
recordPostAlloc(xid,p,first);
recordid first = stasis_record_alloc_begin(xid, p, sizeof(int64_t));
stasis_record_alloc_done(xid,p,first);
for(int i = 1; i < record_count; i++) {
recordPostAlloc(xid,p,recordPreAlloc(xid,p,sizeof(int64_t)));
stasis_record_alloc_done(xid,p,stasis_record_alloc_begin(xid,p,sizeof(int64_t)));
}
assertRecordCountSizeType(xid, p, record_count, sizeof(int64_t), NORMAL_SLOT);
if(*page_type_ptr(p) == SLOTTED_PAGE) {
if(*stasis_page_type_ptr(p) == SLOTTED_PAGE) {
recordid other = first;
other.slot = 3;
recordFree(xid,p,other);
stasis_record_free(xid,p,other);
assertRecordCountSizeType(xid, p, record_count-1, sizeof(int64_t), NORMAL_SLOT);
}
}
@ -341,7 +342,7 @@ START_TEST(pageRecordSizeTypeIteratorTest) {
Page * p = loadPage(xid,pid);
writelock(p->rwlatch,0);
slottedPageInitialize(p);
stasis_slotted_initialize_page(p);
checkPageIterators(xid,p,10);
@ -351,7 +352,7 @@ START_TEST(pageRecordSizeTypeIteratorTest) {
p = loadPage(xid,pid);
writelock(p->rwlatch,0);
fixedPageInitialize(p,sizeof(int64_t),0);
stasis_fixed_initialize_page(p,sizeof(int64_t),0);
checkPageIterators(xid,p,10);
@ -378,17 +379,17 @@ START_TEST(pageNoThreadMultPageTest)
p = loadPage(-1, 1);
p->LSN = 0;
*lsn_ptr(p) = p->LSN;
*stasis_page_lsn_ptr(p) = p->LSN;
writelock(p->rwlatch,0);
slottedPageInitialize(p);
stasis_slotted_initialize_page(p);
unlock(p->rwlatch);
multiple_simultaneous_pages(p);
// Normally, you would call pageWriteLSN() to update the LSN. This
// is a hack, since Tdeinit() will crash if it detects page updates
// that are off the end of the log..
p->LSN = 0;
*lsn_ptr(p) = p->LSN;
*stasis_page_lsn_ptr(p) = p->LSN;
releasePage(p);
@ -420,10 +421,10 @@ START_TEST(pageThreadTest) {
Page * p = loadPage(-1, 2);
writelock(p->rwlatch,0);
slottedPageInitialize(p);
stasis_slotted_initialize_page(p);
unlock(p->rwlatch);
p->LSN = 0;
*lsn_ptr(p) = p->LSN;
*stasis_page_lsn_ptr(p) = p->LSN;
fail_unless(1, NULL);
@ -438,7 +439,7 @@ START_TEST(pageThreadTest) {
/* unlock(p->loadlatch); */
p->LSN = 0;
*lsn_ptr(p) = p->LSN;
*stasis_page_lsn_ptr(p) = p->LSN;
releasePage(p);
Tdeinit();
@ -460,10 +461,10 @@ START_TEST(fixedPageThreadTest) {
Tinit();
Page * p = loadPage(-1, 2);
writelock(p->rwlatch,0);
fixedPageInitialize(p, sizeof(int), 0);
stasis_fixed_initialize_page(p, sizeof(int), 0);
unlock(p->rwlatch);
p->LSN = 0;
*lsn_ptr(p) = p->LSN;
*stasis_page_lsn_ptr(p) = p->LSN;
for(i = 0; i < THREAD_COUNT; i++) {
@ -475,7 +476,7 @@ START_TEST(fixedPageThreadTest) {
}
p->LSN = 0;
*lsn_ptr(p) = p->LSN;
*stasis_page_lsn_ptr(p) = p->LSN;
releasePage(p);
Tdeinit();
pthread_mutex_destroy(&lsn_mutex);
@ -492,8 +493,8 @@ START_TEST(pageCheckSlotTypeTest) {
Page * p = loadPage(-1, slot.page);
readlock(p->rwlatch, 0);
assert(recordGetTypeNew(xid, p, slot) == NORMAL_SLOT);
assert(recordGetLength(xid, p, slot) == sizeof(int));
assert(stasis_record_type_read(xid, p, slot) == NORMAL_SLOT);
assert(stasis_record_length_read(xid, p, slot) == sizeof(int));
unlock(p->rwlatch);
releasePage(p);
@ -502,24 +503,26 @@ START_TEST(pageCheckSlotTypeTest) {
p = loadPage(-1, fixedRoot.page);
readlock(p->rwlatch, 0);
assert(recordGetTypeNew(xid, p, fixedRoot) == NORMAL_SLOT);
assert(stasis_record_type_read(xid, p, fixedRoot) == NORMAL_SLOT);
fixedRoot.slot = 1;
// Force it to use indirect implementation (we're checking an array list page...)
recordid fixedEntry = dereferenceIndirectRID(xid, fixedRoot);
fixedRoot.slot = 0;
unlock(p->rwlatch);
releasePage(p);
fixedRoot.slot = 1;
recordid fixedEntry = dereferenceRID(xid, fixedRoot);
fixedRoot.slot = 0;
p = loadPage(-1, fixedEntry.page);
readlock(p->rwlatch, 0);
assert(recordGetTypeNew(xid, p, fixedEntry) == NORMAL_SLOT);
assert(stasis_record_type_read(xid, p, fixedEntry) == NORMAL_SLOT);
unlock(p->rwlatch);
releasePage(p);
p = loadPage(-1, blob.page);
readlock(p->rwlatch, 0);
int type = recordGetTypeNew(xid, p, blob);
int type = stasis_record_type_read(xid, p, blob);
unlock(p->rwlatch);
assert(type == BLOB_SLOT);
releasePage(p);
@ -531,14 +534,14 @@ START_TEST(pageCheckSlotTypeTest) {
p = loadPage(xid, bad.page);
readlock(p->rwlatch, 0);
assert(recordGetTypeNew(xid, p, bad) == INVALID_SLOT);
assert(stasis_record_type_read(xid, p, bad) == INVALID_SLOT);
bad.size = 100000;
assert(recordGetTypeNew(xid, p, bad) == INVALID_SLOT);
assert(stasis_record_type_read(xid, p, bad) == INVALID_SLOT);
/** recordGetType now ignores the size field, so this (correctly) returns SLOTTED_RECORD */
bad.slot = slot.slot;
assert(recordGetTypeNew(xid, p, bad) == NORMAL_SLOT);
assert(stasis_record_type_read(xid, p, bad) == NORMAL_SLOT);
p->LSN = 0;
*lsn_ptr(p) = p->LSN;
*stasis_page_lsn_ptr(p) = p->LSN;
unlock(p->rwlatch);
releasePage(p);
@ -561,7 +564,10 @@ START_TEST(pageTrecordTypeTest) {
assert(TrecordType(xid, fixedRoot) == NORMAL_SLOT);
fixedRoot.slot = 1;
recordid fixedEntry = dereferenceRID(xid, fixedRoot);
// This is an array list page, but we want to check the state
// of the internal node.
recordid fixedEntry = dereferenceIndirectRID(xid, fixedRoot);
fixedRoot.slot = 0;
assert(TrecordType(xid, fixedEntry) == NORMAL_SLOT);

View file

@ -76,12 +76,12 @@ START_TEST(pageOpCheckRecovery) {
memset(p.memAddr, 1, PAGE_SIZE);
// Reset the page type after overwriting it with memset. Otherwise, Stasis
// will try to interpret it when it flushes the page to disk.
*page_type_ptr(&p) = 0;
*stasis_page_type_ptr(&p) = 0;
TpageSet(xid, pageid1, p.memAddr);
memset(p.memAddr, 2, PAGE_SIZE);
*page_type_ptr(&p) = 0;
*stasis_page_type_ptr(&p) = 0;
TpageSet(xid, pageid2, p.memAddr);
@ -100,24 +100,24 @@ START_TEST(pageOpCheckRecovery) {
int pageid3 = TpageAlloc(xid);
memset(p.memAddr, 3, PAGE_SIZE);
*page_type_ptr(&p) = 0;
*stasis_page_type_ptr(&p) = 0;
TpageSet(xid, pageid3, p.memAddr);
byte newAddr[PAGE_SIZE];
memset(p.memAddr, 1, PAGE_SIZE);
*page_type_ptr(&p) = 0;
*stasis_page_type_ptr(&p) = 0;
TpageGet(xid, pageid1, newAddr);
assert(!memcmp(p.memAddr, newAddr, PAGE_SIZE-sizeof(lsn_t)));
memset(p.memAddr, 2, PAGE_SIZE);
*page_type_ptr(&p) = 0;
*stasis_page_type_ptr(&p) = 0;
TpageGet(xid, pageid2, newAddr);
assert(!memcmp(p.memAddr, newAddr, PAGE_SIZE-sizeof(lsn_t)));
memset(p.memAddr, 3, PAGE_SIZE);
*page_type_ptr(&p) = 0;
*stasis_page_type_ptr(&p) = 0;
TpageGet(xid, pageid3, newAddr);
assert(!memcmp(p.memAddr, newAddr, PAGE_SIZE-sizeof(lsn_t)));
Tcommit(xid);