Updated page api to handle page types that need to grab resources on record read/ write, or that perform callbacks when read / written to disk.

This commit is contained in:
Sears Russell 2007-07-18 20:09:14 +00:00
parent 356387eafe
commit 9fd91cffb8
16 changed files with 353 additions and 117 deletions

View file

@ -44,11 +44,14 @@ void writeBlob(int xid, Page * p2, lsn_t lsn, recordid rid, const byte * buf) {
byte * pbuf = alloca(PAGE_SIZE); byte * pbuf = alloca(PAGE_SIZE);
blob_record_t rec; blob_record_t rec;
recordRead(xid, p2, rawRid, (byte*)&rec); recordRead(xid, p2, rawRid, (byte*)&rec);
Page tmp;
tmp.memAddr=pbuf;
assert(rec.offset); assert(rec.offset);
for(chunk = 0; (chunk+1) * USABLE_SIZE_OF_PAGE < rid.size; chunk++) { for(chunk = 0; (chunk+1) * USABLE_SIZE_OF_PAGE < rid.size; chunk++) {
TpageGet(xid, rec.offset+chunk, pbuf); TpageGet(xid, rec.offset+chunk, pbuf);
*page_type_ptr(&tmp) = BLOB_PAGE;
memcpy(pbuf, buf + (chunk * USABLE_SIZE_OF_PAGE), USABLE_SIZE_OF_PAGE); memcpy(pbuf, buf + (chunk * USABLE_SIZE_OF_PAGE), USABLE_SIZE_OF_PAGE);
TpageSet(xid, rec.offset+chunk, pbuf); TpageSet(xid, rec.offset+chunk, pbuf);
} }
@ -56,6 +59,41 @@ void writeBlob(int xid, Page * p2, lsn_t lsn, recordid rid, const byte * buf) {
memcpy(pbuf, buf + (chunk * USABLE_SIZE_OF_PAGE), rid.size % USABLE_SIZE_OF_PAGE); memcpy(pbuf, buf + (chunk * USABLE_SIZE_OF_PAGE), rid.size % USABLE_SIZE_OF_PAGE);
TpageSet(xid, rec.offset+chunk, pbuf); TpageSet(xid, rec.offset+chunk, pbuf);
// printf("Write Chunk = %d (%d)\n", chunk, rec.offset+chunk); // printf("Write Chunk = %d (%d)\n", chunk, rec.offset+chunk);
}
static int notSupported(int xid, Page * p) { return 0; }
void blobLoaded(Page *p) {
p->LSN = *lsn_ptr(p);
}
void blobFlushed(Page *p) {
*lsn_ptr(p) = p->LSN;
}
static page_impl pi = {
BLOB_PAGE,
0, //read,
0, //write,
0, //readDone
0, //writeDone
0, //getType,
0, //setType,
0, //getLength,
0, //recordFirst,
0, //recordNext,
notSupported, // is block supported
0, //pageGenericBlockFirst,
0, //pageGenericBlockNext,
0, //pageGenericBlockDone,
0, //freespace,
0, //compact,
0, //preRalloc,
0, //postRalloc,
0, //Free,
0, //XXX page_impl_dereference_identity,
blobLoaded,
blobFlushed,
};
page_impl blobImpl() {
return pi;
} }

View file

@ -2,6 +2,7 @@
#define __BLOB_MANAGER_H #define __BLOB_MANAGER_H
#include <stasis/common.h> #include <stasis/common.h>
#include "page.h"
BEGIN_C_DECLS BEGIN_C_DECLS
/** /**
@ -40,6 +41,8 @@ BEGIN_C_DECLS
@todo Set range?? @todo Set range??
@todo Update blobManager to (partially) provide a page api
@ingroup LLADD_CORE @ingroup LLADD_CORE
*/ */
@ -64,6 +67,8 @@ compensated_function recordid preAllocBlobFromPage(int xid, long page, long blob
void allocBlob(int xid, recordid rid); void allocBlob(int xid, recordid rid);
page_impl blobImpl();
END_C_DECLS END_C_DECLS
#endif #endif

View file

@ -1,4 +1,3 @@
#include <config.h>
#include <stasis/common.h> #include <stasis/common.h>
#include <pthread.h> #include <pthread.h>
#include <stasis/stats.h> #include <stasis/stats.h>

View file

@ -106,7 +106,6 @@ void pageWriteLSN(int xid, Page * page, lsn_t lsn) {
if(page->LSN < lsn) { if(page->LSN < lsn) {
page->LSN = lsn; page->LSN = lsn;
*lsn_ptr(page) = page->LSN;
} }
dirtyPages_add(page); dirtyPages_add(page);
return; return;
@ -137,6 +136,8 @@ void pageInit() {
registerPageType(fixedImpl()); registerPageType(fixedImpl());
registerPageType(boundaryTagImpl()); registerPageType(boundaryTagImpl());
registerPageType(arrayListImpl()); registerPageType(arrayListImpl());
registerPageType(blobImpl());
registerPageType(indirectImpl());
} }
void pageDeinit() { void pageDeinit() {
@ -157,23 +158,12 @@ void recordWrite(int xid, Page * p, lsn_t lsn, recordid rid, const byte *dat) {
assert( (p->id == rid.page) && (p->memAddr != NULL) ); assert( (p->id == rid.page) && (p->memAddr != NULL) );
readlock(p->rwlatch, 225); readlock(p->rwlatch, 225);
// page_impl p_impl;
if(rid.size > BLOB_THRESHOLD_SIZE) { if(rid.size > BLOB_THRESHOLD_SIZE) {
// XXX Kludge This is done so that recovery sees the LSN update. Otherwise, it gets upset... Of course, doing it will break blob recovery unless we set blob writes to do "logical" redo... // XXX Kludge This is done so that recovery sees the LSN update. Otherwise, it gets upset... Of course, doing it will break blob recovery unless we set blob writes to do "logical" redo...
pageWriteLSN(xid, p, lsn); pageWriteLSN(xid, p, lsn);
unlock(p->rwlatch); unlock(p->rwlatch);
writeBlob(xid, p, lsn, rid, dat); writeBlob(xid, p, lsn, rid, dat);
} else { } else {
/* p_impl = page_impls[*page_type_ptr(p)];
if(!*page_type_ptr(p)) {
// XXX kludge!!!!
p_impl = page_impls[FIXED_PAGE];
}
assert(physical_slot_length(rid.size) == p_impl.recordGetLength(xid, p, rid));
byte * buf = p_impl.recordWrite(xid, p, rid);
pageWriteLSN(xid, p, lsn);
memcpy(buf, dat, physical_slot_length(p_impl.recordGetLength(xid, p, rid))); */
byte * buf = recordWriteNew(xid, p, rid); byte * buf = recordWriteNew(xid, p, rid);
pageWriteLSN(xid, p, lsn); pageWriteLSN(xid, p, lsn);
memcpy(buf, dat, recordGetLength(xid, p, rid)); memcpy(buf, dat, recordGetLength(xid, p, rid));
@ -190,23 +180,6 @@ int recordRead(int xid, Page * p, recordid rid, byte *buf) {
return 0; return 0;
} else { } else {
readlock(p->rwlatch, 0); readlock(p->rwlatch, 0);
/* page_impl p_impl;
int page_type = *page_type_ptr(p);
if(!page_type) {
if (! recordReadWarnedAboutPageTypeKludge) {
recordReadWarnedAboutPageTypeKludge = 1;
printf("page.c: MAKING USE OF TERRIBLE KLUDGE AND IGNORING ASSERT FAILURE! FIX ARRAY LIST ASAP!!!\n");
}
p_impl = page_impls[FIXED_PAGE];
} else {
p_impl = page_impls[page_type];
}
assert(physical_slot_length(rid.size) == p_impl.recordGetLength(xid, p, rid));
const byte * dat = p_impl.recordRead(xid, p, rid);
memcpy(buf, dat, physical_slot_length(p_impl.recordGetLength(xid, p, rid))); */
const byte * dat = recordReadNew(xid,p,rid); const byte * dat = recordReadNew(xid,p,rid);
memcpy(buf, dat, recordGetLength(xid,p,rid)); memcpy(buf, dat, recordGetLength(xid,p,rid));
unlock(p->rwlatch); unlock(p->rwlatch);
@ -253,6 +226,18 @@ byte * recordWriteNew(int xid, Page * p, recordid rid) {
} }
return page_impls[page_type].recordWrite(xid, p, rid); return page_impls[page_type].recordWrite(xid, p, rid);
} }
void recordReadDone(int xid, Page *p, recordid rid, const byte *b) {
int page_type = *page_type_ptr(p);
if(page_impls[page_type].recordReadDone) {
page_impls[page_type].recordReadDone(xid,p,rid,b);
}
}
void recordWriteDone(int xid, Page *p, recordid rid, byte *b) {
int page_type = *page_type_ptr(p);
if(page_impls[page_type].recordWriteDone) {
page_impls[page_type].recordWriteDone(xid,p,rid,b);
}
}
int recordGetTypeNew(int xid, Page *p, recordid rid) { int recordGetTypeNew(int xid, Page *p, recordid rid) {
return page_impls[*page_type_ptr(p)] return page_impls[*page_type_ptr(p)]
.recordGetType(xid, p, rid); .recordGetType(xid, p, rid);
@ -289,6 +274,19 @@ int pageIsBlockSupported(int xid, Page * p){
return page_impls[*page_type_ptr(p)] return page_impls[*page_type_ptr(p)]
.isBlockSupported(xid, p); .isBlockSupported(xid, p);
} }
block_t * pageBlockFirst(int xid, Page * p){
int t = *page_type_ptr(p);
return page_impls[t]
.blockFirst(xid, p);
}
block_t * pageBlockNext(int xid, Page * p, block_t * prev){
return page_impls[*page_type_ptr(p)]
.blockNext(xid, p,prev);
}
void pageBlockDone(int xid, Page * p, block_t * done){
page_impls[*page_type_ptr(p)]
.blockDone(xid, p,done);
}
int pageFreespace(int xid, Page * p){ int pageFreespace(int xid, Page * p){
return page_impls[*page_type_ptr(p)] return page_impls[*page_type_ptr(p)]
.pageFreespace(xid, p); .pageFreespace(xid, p);
@ -297,11 +295,100 @@ void pageCompact(Page * p){
page_impls[*page_type_ptr(p)] page_impls[*page_type_ptr(p)]
.pageCompact(p); .pageCompact(p);
} }
/** @todo How should the LSN of pages without a page_type be handled?
This only works because we don't have LSN-free pages yet. With
LSN-free pages, we'll need special "loadPageForAlloc(), and
loadPageOfType() methods (or something...)
*/
void pageLoaded(Page * p){ void pageLoaded(Page * p){
page_impls[*page_type_ptr(p)] short type = *page_type_ptr(p);
.pageLoaded(p); if(type) {
page_impls[type].pageLoaded(p);
} else {
p->LSN = *lsn_ptr(p); // XXX kluge - shouldn't special-case UNINITIALIZED_PAGE
}
} }
void pageFlushed(Page * p){ void pageFlushed(Page * p){
page_impls[*page_type_ptr(p)] short type = *page_type_ptr(p);
if(type) {
page_impls[type]
.pageFlushed(p); .pageFlushed(p);
} else {
*lsn_ptr(p) = p->LSN;
}
}
/// Generic block implementations
static int blkTrue(block_t *b) { return 1; }
static int blkFalse(block_t *b) { return 0; }
typedef struct genericBlockImpl {
Page * p;
recordid pos;
} genericBlockImpl;
/**
@todo The block API should pass around xids.
*/
static const byte * blkFirst(block_t * b) {
genericBlockImpl * impl = b->impl;
impl->pos = recordFirst(-1, impl->p);
if(! memcmp(&(impl->pos), &(NULLRID), sizeof(recordid))) {
return 0;
} else {
return recordReadNew(-1, impl->p, impl->pos);
}
}
static const byte * blkNext(block_t * b) {
genericBlockImpl * impl = b->impl;
impl->pos = recordNext(-1, impl->p, impl->pos);
if(! memcmp(&(impl->pos), &NULLRID, sizeof(recordid))) {
return 0;
} else {
return recordReadNew(-1, impl->p, impl->pos);
}
}
static int blkSize(block_t * b) {
genericBlockImpl * impl = b->impl;
return physical_slot_length(impl->pos.size);
}
static void blkRelease(block_t * b) {
free(b->impl);
free(b);
}
block_t genericBlock = {
blkTrue, // isValid
blkFalse, //isOneValue
blkFalse, //isValueSorted
blkFalse, //isPosContig
blkFirst,
blkNext,
blkSize,
0, //recordCount
0, //ptrArray can't do pointer array efficiently...
0, //sizePtrArray
blkFalse, //recordFixedLen
0, //packedArray
blkRelease,
0
};
block_t* pageGenericBlockFirst(int xid, Page * p) {
block_t* ret = malloc(sizeof(block_t));
*ret = genericBlock;
genericBlockImpl impl = { p, NULLRID };
ret->impl = malloc(sizeof(genericBlockImpl));
*(genericBlockImpl*)(ret->impl) = impl;
return ret;
}
block_t* pageGenericBlockNext(int xid, Page *p, block_t *prev) {
pageGenericBlockDone(xid, p, prev);
return 0; // definitely done.
}
void pageGenericBlockDone(int xid, Page *p, block_t *b) {
free(b->impl);
free(b);
} }

View file

@ -173,6 +173,7 @@ struct Page_s {
#define shorts_from_end(page, count) (((short*)end_of_usable_space_ptr((page)))-(count)) #define shorts_from_end(page, count) (((short*)end_of_usable_space_ptr((page)))-(count))
#define bytes_from_start(page, count) (((byte*)((page)->memAddr))+(count)) #define bytes_from_start(page, count) (((byte*)((page)->memAddr))+(count))
#define shorts_from_start(page, count) (((short*)((page)->memAddr))+(count))
#define ints_from_start(page, count) (((int*)((page)->memAddr))+(count)) #define ints_from_start(page, count) (((int*)((page)->memAddr))+(count))
#define ints_from_end(page, count) (((int*)end_of_usable_space_ptr((page)))-(count)) #define ints_from_end(page, count) (((int*)end_of_usable_space_ptr((page)))-(count))
@ -251,6 +252,8 @@ int recordRead(int xid, Page * page, recordid rid, byte *dat);
const byte * recordReadNew(int xid, Page * p, recordid rid); const byte * recordReadNew(int xid, Page * p, recordid rid);
byte * recordWriteNew(int xid, Page * p, recordid rid); byte * recordWriteNew(int xid, Page * p, recordid rid);
void recordReadDone(int xid, Page *p, recordid rid, const byte* buf);
void recordWriteDone(int xid, Page *p, recordid rid, byte *buf);
int recordGetTypeNew(int xid, Page * p, recordid rid); int recordGetTypeNew(int xid, Page * p, recordid rid);
void recordSetTypeNew(int xid, Page * p, recordid rid, int type); void recordSetTypeNew(int xid, Page * p, recordid rid, int type);
int recordGetLength(int xid, Page *p, recordid rid); int recordGetLength(int xid, Page *p, recordid rid);
@ -293,10 +296,11 @@ typedef struct block_t {
int (*isOneValue) (struct block_t *b); int (*isOneValue) (struct block_t *b);
int (*isValueSorted) (struct block_t *b); int (*isValueSorted) (struct block_t *b);
int (*isPosContig) (struct block_t *b); int (*isPosContig) (struct block_t *b);
byte * (*recordFirst) (struct block_t *b, int *size); const byte * (*recordFirst) (struct block_t *b);
byte * (*recordNext) (struct block_t *b, int *size); const byte * (*recordNext) (struct block_t *b);
int (*recordSize) (struct block_t *b);
int (*recordCount) (struct block_t *b); int (*recordCount) (struct block_t *b);
byte * (*recordPtrArray) (struct block_t *b); const byte **(*recordPtrArray) (struct block_t *b);
int * (*recordSizeArray) (struct block_t *b); int * (*recordSizeArray) (struct block_t *b);
// These two are not in paper // These two are not in paper
int (*recordFixedLen) (struct block_t *b); int (*recordFixedLen) (struct block_t *b);
@ -329,9 +333,34 @@ typedef struct block_t {
void * impl; void * impl;
} block_t; } block_t;
block_t pageBlockFirst(int xid, Page * p); /**
block_t pageBlockNext(int xid, Page * p, block_t prev); This function should work with any valid page implementation, but
it might be less efficient than a custom implementation.
This is a convenience function for page implementors. Other code
should call pageBlockFirst() instead.
*/
block_t *pageGenericBlockFirst(int xid, Page *p);
/**
This function should work with any valid page implementation, but
it might be less efficient than a custom implementation.
This is a convenience function for page implementors. Other code
should call pageBlockNext() instead.
*/
block_t * pageGenericBlockNext(int xid, Page *p, block_t *prev);
/**
This function should work with any valid page implementation, but
it might be less efficient than a custom implementation.
This is a convenience function for page implementors. Other code
should call pageBlockDone() instead.
*/
void pageGenericBlockDone(int xid, Page *p, block_t *b);
block_t * pageBlockFirst(int xid, Page * p);
block_t * pageBlockNext(int xid, Page * p, block_t * prev);
void pageBlockDone(int xid, Page * p, block_t * done);
/** /**
None of these functions obtain latches. Calling them without None of these functions obtain latches. Calling them without
holding rwlatch is an error. (Exception: dereferenceRid grabs the holding rwlatch is an error. (Exception: dereferenceRid grabs the
@ -388,7 +417,8 @@ typedef struct page_impl {
@return pointer to read region. The pointer will be guaranteed @return pointer to read region. The pointer will be guaranteed
valid while the page is read latched by this caller, or while valid while the page is read latched by this caller, or while
the page is write latched, and no other method has been called on the page is write latched, and no other method has been called on
this page. this page. Return null on error. (XXX current implementations
abort/crash)
*/ */
const byte* (*recordRead)(int xid, Page *p, recordid rid); const byte* (*recordRead)(int xid, Page *p, recordid rid);
/** /**
@ -398,6 +428,11 @@ typedef struct page_impl {
@return a pointer to the buffer manager's copy of the record. @return a pointer to the buffer manager's copy of the record.
*/ */
byte* (*recordWrite)(int xid, Page *p, recordid rid); byte* (*recordWrite)(int xid, Page *p, recordid rid);
/**
@todo Most code doesn't call recordReadDone() and recordWriteDone() yet.
*/
void (*recordReadDone)(int xid, Page *p, recordid rid, const byte *b);
void (*recordWriteDone)(int xid, Page *p, recordid rid, byte *b);
/** /**
Check to see if a slot is a normal slot, or something else, such Check to see if a slot is a normal slot, or something else, such
as a blob. This is stored in the size field in the slotted page as a blob. This is stored in the size field in the slotted page
@ -443,9 +478,9 @@ typedef struct page_impl {
more sophisticated blocks. more sophisticated blocks.
*/ */
block_t (*blockFirst)(int xid, Page *p); block_t* (*blockFirst)(int xid, Page *p);
block_t (*blockNext)(int xid, Page * p, block_t prev); block_t* (*blockNext)(int xid, Page *p, block_t *prev);
void (*blockDone)(int xid, Page *p, block_t *done);
// -------- Allocation methods. // -------- Allocation methods.
@ -525,7 +560,13 @@ typedef struct page_impl {
This function should set p->LSN to an appropriate value. This function should set p->LSN to an appropriate value.
@todo Arrange to call page_impl.loaded() and page_impl.flushed(). @todo In order to support "raw" pages, we need a new page read
method that lets the caller decide which page type should handle
the call to pageLoaded().
@todo pageLoaded() should set p->pageType.
@todo set *page_type_ptr() to UNINITIALIZED_PAGE when appropriate.
*/ */
void (*pageLoaded)(Page * p); void (*pageLoaded)(Page * p);

View file

@ -80,13 +80,24 @@ static int fixedGetLength(int xid, Page *p, recordid rid) {
checkRid(p, rid); // <-- XXX KLUDGE checkRid init's the page if necessary... checkRid(p, rid); // <-- XXX KLUDGE checkRid init's the page if necessary...
return rid.slot > *recordcount_ptr(p) ? return rid.slot > *recordcount_ptr(p) ?
INVALID_SLOT : physical_slot_length(*recordsize_ptr(p)); INVALID_SLOT : physical_slot_length(*recordsize_ptr(p));
}
/* XXXstatic recordid fixedFirst(int xid, Page *p, recordid rid) {
} }
static recordid fixedNext(int xid, Page *p, recordid rid) { static recordid fixedNext(int xid, Page *p, recordid rid) {
short n = *recordcount_ptr(p);
rid.slot++;
if(rid.slot >= n) {
return NULLRID;
} else {
return rid;
}
}
static recordid fixedFirst(int xid, Page *p) {
recordid rid = { p->id, -1, 0 };
rid.size = *recordsize_ptr(p);
return fixedNext(xid, p, rid);
}
static int notSupported(int xid, Page * p) { return 0; }
} */
static int fixedFreespace(int xid, Page * p) { static int fixedFreespace(int xid, Page * p) {
assertlocked(p->rwlatch); assertlocked(p->rwlatch);
if(fixedRecordsPerPage(*recordsize_ptr(p)) > *recordcount_ptr(p)) { if(fixedRecordsPerPage(*recordsize_ptr(p)) > *recordcount_ptr(p)) {
@ -126,36 +137,39 @@ static void fixedFree(int xid, Page *p, recordid rid) {
// leak space; there's no way to track it with this page format. // leak space; there's no way to track it with this page format.
} }
} }
//// XXX missing some functions w/ murky futures.
/* static lsn_t fixedReadLSN(int xid, Page * p) { // XXX dereferenceRID
return p->LSN;
void fixedLoaded(Page *p) {
p->LSN = *lsn_ptr(p);
}
void fixedFlushed(Page *p) {
*lsn_ptr(p) = p->LSN;
} }
static void fixedWriteLSN(int xid, Page * p, lsn_t lsn) {
p->LSN = lsn;
*lsn_ptr(p) = lsn;
dirtyPages_add(p);
} */
page_impl fixedImpl() { page_impl fixedImpl() {
static page_impl pi = { static page_impl pi = {
FIXED_PAGE, FIXED_PAGE,
fixedRead, fixedRead,
fixedWrite, fixedWrite,
0,// readDone
0,// writeDone
fixedGetType, fixedGetType,
fixedSetType, fixedSetType,
fixedGetLength, fixedGetLength,
0, // fixedFirst, fixedFirst,
0, // fixedNext, fixedNext,
0, // notSupported, notSupported, // notSupported,
0, // block first pageGenericBlockFirst,
0, // block next pageGenericBlockNext,
pageGenericBlockDone,
fixedFreespace, fixedFreespace,
fixedCompact, fixedCompact,
fixedPreAlloc, fixedPreAlloc,
fixedPostAlloc, fixedPostAlloc,
fixedFree, fixedFree,
0, // XXX dereference 0, // XXX dereference
0, // loaded fixedLoaded, // loaded
0, // flushed fixedFlushed, // flushed
}; };
return pi; return pi;
} }

View file

@ -233,3 +233,45 @@ compensated_function int indirectPageRecordCount(int xid, recordid rid) {
releasePage(p); releasePage(p);
return ret; return ret;
} }
static int notSupported(int xid, Page * p) { return 0; }
void indirectLoaded(Page *p) {
p->LSN = *lsn_ptr(p);
}
void indirectFlushed(Page *p) {
*lsn_ptr(p) = p->LSN;
}
static page_impl pi = {
INDIRECT_PAGE,
0, //read,
0, //write,
0, //readDone
0, //writeDone
0, //getType,
0, //setType,
0, //getLength,
0, //recordFirst,
0, //recordNext,
notSupported, // is block supported
0, //pageGenericBlockFirst,
0, //pageGenericBlockNext,
0, //pageGenericBlockDone,
0, //freespace,
0, //compact,
0, //preRalloc,
0, //postRalloc,
0, //Free,
0, //XXX page_impl_dereference_identity,
indirectLoaded,
indirectFlushed,
};
/**
@todo Flesh out INDIRECT_PAGE's implementation of new PAGE_API, or
remove INDIRECT_PAGE from Stasis.
*/
page_impl indirectImpl() {
return pi;
}

View file

@ -51,6 +51,8 @@ void indirectInitialize(Page * p, int height);
compensated_function recordid rallocMany(/*int parentPage, lsn_t lsn,*/int xid, int recordSize, int recordCount); compensated_function recordid rallocMany(/*int parentPage, lsn_t lsn,*/int xid, int recordSize, int recordCount);
compensated_function int indirectPageRecordCount(int xid, recordid rid); compensated_function int indirectPageRecordCount(int xid, recordid rid);
page_impl indirectImpl();
END_C_DECLS END_C_DECLS
#endif /*__LLADD_PAGE_INDIRECT_H*/ #endif /*__LLADD_PAGE_INDIRECT_H*/

View file

@ -4,6 +4,9 @@
/** /**
@todo Should rawPageInferMetadata set a page type in the Page @todo Should rawPageInferMetadata set a page type in the Page
struct? struct?
XXX rawPageInferMetadata is wrong; setting lsn to LogFlushedLSN() breaks
recovery.
*/ */
void rawPageInferMetadata(Page * p) { void rawPageInferMetadata(Page * p) {
p->LSN = LogFlushedLSN(); p->LSN = LogFlushedLSN();

View file

@ -425,7 +425,12 @@ static recordid slottedNext(int xid, Page *p, recordid rid) {
short n = *numslots_ptr(p); short n = *numslots_ptr(p);
rid.slot ++; rid.slot ++;
while(rid.slot < n && !isValidSlot(p, rid.slot)) { rid.slot++; } while(rid.slot < n && !isValidSlot(p, rid.slot)) { rid.slot++; }
return isValidSlot(p, rid.slot) ? rid : NULLRID; if(isValidSlot(p, rid.slot)) {
rid.slot = *slot_length_ptr(p, rid.slot);
return rid;
} else {
return NULLRID;
}
} }
static recordid slottedFirst(int xid, Page *p) { static recordid slottedFirst(int xid, Page *p) {
@ -493,41 +498,39 @@ static void slottedFree(int xid, Page * p, recordid rid) {
// XXX dereferenceRID // XXX dereferenceRID
/*static lsn_t slottedReadLSN(int xid, Page * p) { void slottedLoaded(Page *p) {
return p->LSN; p->LSN = *lsn_ptr(p);
slottedFsck(p); // @todo In normal case, arrange for fsck to run on load/flush, but nowhere else.
}
void slottedFlushed(Page *p) {
*lsn_ptr(p) = p->LSN;
slottedFsck(p);
} }
static void slottedWriteLSN(int xid, Page * p, lsn_t lsn) {
p->LSN = lsn;
*lsn_ptr(p) = lsn;
dirtyPages_add(p);
//pageWriteLSN(xid, p, lsn);
} */
// XXX loaded
// XXX flushed
page_impl slottedImpl() { page_impl slottedImpl() {
static page_impl pi = { static page_impl pi = {
SLOTTED_PAGE, SLOTTED_PAGE,
slottedRead, slottedRead,
slottedWrite, slottedWrite,
0,// readDone
0,// writeDone
slottedGetType, slottedGetType,
slottedSetType, slottedSetType,
slottedGetLength, slottedGetLength,
slottedFirst, slottedFirst,
slottedNext, slottedNext,
notSupported, // is block supported notSupported, // is block supported
0, //block first pageGenericBlockFirst,
0, //block next pageGenericBlockNext,
pageGenericBlockDone,
slottedFreespace, slottedFreespace,
slottedCompact, slottedCompact,
slottedPreRalloc, slottedPreRalloc,
slottedPostRalloc, slottedPostRalloc,
slottedFree, slottedFree,
0, //XXX page_impl_dereference_identity, 0, //XXX page_impl_dereference_identity,
0, //loaded slottedLoaded,
0, //flushed slottedFlushed,
}; };
return pi; return pi;
} }

View file

@ -68,7 +68,7 @@ static void pfPageRead(Page *ret) {
} }
ret->dirty = 0; ret->dirty = 0;
ret->LSN = *lsn_ptr(ret); pageLoaded(ret);
pthread_mutex_unlock(&stable_mutex); pthread_mutex_unlock(&stable_mutex);
@ -86,7 +86,7 @@ static void pfPageWrite(Page * ret) {
pageid_t pageoffset = ret->id * PAGE_SIZE; pageid_t pageoffset = ret->id * PAGE_SIZE;
pageid_t offset ; pageid_t offset ;
/* assert(ret->pending == 0); */ pageFlushed(ret);
// If necessary, force the log to disk so that ret's LSN will be stable. // If necessary, force the log to disk so that ret's LSN will be stable.

View file

@ -21,6 +21,11 @@ static stasis_handle_t * h;
*/ */
static void phWrite(Page * ret) { static void phWrite(Page * ret) {
if(!ret->dirty) { return; } if(!ret->dirty) { return; }
// This lock is only held to make the page implementation happy. We should
// implicitly have exclusive access to the page before this function is called,
// or we'll deadlock.
writelock(ret->rwlatch,0);
pageFlushed(ret);
LogForce(ret->LSN); LogForce(ret->LSN);
int err = h->write(h, PAGE_SIZE * ret->id, ret->memAddr, PAGE_SIZE); int err = h->write(h, PAGE_SIZE * ret->id, ret->memAddr, PAGE_SIZE);
if(err) { if(err) {
@ -29,9 +34,10 @@ static void phWrite(Page * ret) {
abort(); abort();
} }
dirtyPages_remove(ret); dirtyPages_remove(ret);
unlock(ret->rwlatch);
} }
static void phRead(Page * ret) { static void phRead(Page * ret) {
writelock(ret->rwlatch,0);
int err = h->read(h, PAGE_SIZE * ret->id, ret->memAddr, PAGE_SIZE); int err = h->read(h, PAGE_SIZE * ret->id, ret->memAddr, PAGE_SIZE);
if(err) { if(err) {
if(err == EDOM) { if(err == EDOM) {
@ -44,7 +50,8 @@ static void phRead(Page * ret) {
} }
} }
ret->dirty = 0; ret->dirty = 0;
ret->LSN = *lsn_ptr(ret); pageLoaded(ret);
unlock(ret->rwlatch);
} }
static void phForce() { static void phForce() {
if(!printedForceWarning) { if(!printedForceWarning) {

View file

@ -51,18 +51,18 @@ terms specified in this license.
* Second, we allow users to \#include headers that in turn \#include * Second, we allow users to \#include headers that in turn \#include
* common.h. If they do so, then their code should continue to 'do * common.h. If they do so, then their code should continue to 'do
* the right thing' and build, even though they do not \#include the * the right thing' and build, even though they do not \#include the
* config.h file that all of the LLADD stuff uses. * config.h file that all of the Stasis stuff uses.
* *
* @todo Need to make sure every .c file actually includes this thing, and * @todo Need to make sure every .c file actually includes this thing, and
* also includes constants.h, and that no .h files include config.h * also includes constants.h
* *
* $Id$ * $Id$
*/ */
//#define NDEBUG 1 //#define NDEBUG 1
#ifndef __lladd_common_h #ifndef __stasis_common_h
#define __lladd_common_h #define __stasis_common_h
#ifdef __cplusplus #ifdef __cplusplus
# define BEGIN_C_DECLS extern "C" { # define BEGIN_C_DECLS extern "C" {
@ -75,11 +75,6 @@ terms specified in this license.
#include <stdint.h> // uint32, et. al. #include <stdint.h> // uint32, et. al.
#include <limits.h> #include <limits.h>
/* Should be included by the .c files only. :( */
/*#if HAVE_CONFIG_H
# include "config.h"
#endif */
#if STDC_HEADERS #if STDC_HEADERS
# include <stdlib.h> # include <stdlib.h>
# include <string.h> # include <string.h>
@ -125,5 +120,4 @@ typedef long long pageid_t;
#endif /*DEBUGGING*/ #endif /*DEBUGGING*/
#endif /* __lladd_common_h */ #endif /* __stasis_common_h */

View file

@ -226,7 +226,9 @@ extern const short SLOT_TYPE_LENGTHS[];
#define FIXED_PAGE 5 #define FIXED_PAGE 5
#define ARRAY_LIST_PAGE 6 #define ARRAY_LIST_PAGE 6
#define BOUNDARY_TAG_PAGE 7 #define BOUNDARY_TAG_PAGE 7
#define MAX_PAGE_TYPE 8 #define BLOB_PAGE 8
#define USER_DEFINED_PAGE(n) (100+n) // 0 <= n < 155
#define MAX_PAGE_TYPE 255
#define TALLOC_REGION_SIZE 100 // Pages #define TALLOC_REGION_SIZE 100 // Pages

View file

@ -141,9 +141,6 @@ void * workerThreadWriting(void * q) {
recordWrite(1, p, 0, rids[i], (byte*)&val); recordWrite(1, p, 0, rids[i], (byte*)&val);
assert(p->id == rids[i].page); assert(p->id == rids[i].page);
/* p->LSN = 0;
*lsn_ptr(p) = 0; */
/* printf("LSN: %ld, %ld\n", p->LSN, *lsn_ptr(p)); */
releasePage(p); releasePage(p);
if(! (i % (RECORDS_PER_THREAD/10)) ) { if(! (i % (RECORDS_PER_THREAD/10)) ) {
@ -161,9 +158,6 @@ void * workerThreadWriting(void * q) {
recordRead(1, p, rids[i], (byte*)&val); recordRead(1, p, rids[i], (byte*)&val);
/* p->LSN = 0;
*lsn_ptr(p) = 0; */
/* printf("LSN: %ld, %ld\n", p->LSN, *lsn_ptr(p));*/
releasePage(p); releasePage(p);
if(! (i % (RECORDS_PER_THREAD/10))) { if(! (i % (RECORDS_PER_THREAD/10))) {

View file

@ -41,6 +41,11 @@ permission to use and distribute the software in accordance with the
terms specified in this license. terms specified in this license.
---*/ ---*/
/** @file
@todo check_page should judiciously avoid lsn_ptr()
*/
#include <config.h> #include <config.h>
#include <check.h> #include <check.h>