fix blob recovery issue; pages were being initialized outside of tupdate, breaking recovery

This commit is contained in:
Sears Russell 2011-03-29 22:27:31 +00:00
parent b20cd8cd0f
commit 188b03152c
4 changed files with 32 additions and 22 deletions

View file

@ -17,6 +17,7 @@ void stasis_blob_alloc(int xid, recordid rid) {
assert(rid.size>0); assert(rid.size>0);
pageid_t pageCount = (rid.size / USABLE_SIZE_OF_PAGE) + ((rid.size % USABLE_SIZE_OF_PAGE) ? 1 : 0); pageid_t pageCount = (rid.size / USABLE_SIZE_OF_PAGE) + ((rid.size % USABLE_SIZE_OF_PAGE) ? 1 : 0);
long startPage = TpageAllocMany(xid, pageCount); long startPage = TpageAllocMany(xid, pageCount);
TinitializeBlobPageRange(xid, startPage, pageCount);
blob_record_t rec; blob_record_t rec;
rec.offset = startPage; rec.offset = startPage;
rec.size = rid.size; rec.size = rid.size;
@ -57,28 +58,17 @@ void stasis_blob_write(int xid, Page * p, recordid rid, const void* dat) {
assert(rec.offset); assert(rec.offset);
pageid_t chunk = 0; pageid_t chunk = 0;
// Don't need to do any latching on the page range, since writes in race
// have undefined semantics.
for(; (chunk+1) * USABLE_SIZE_OF_PAGE < rid.size; chunk++) { for(; (chunk+1) * USABLE_SIZE_OF_PAGE < rid.size; chunk++) {
Page * cnk = loadPage(xid, rec.offset+chunk); // TODO: assert(page->pageType == BLOB_PAGE) in TpageSetRange?
writelock(cnk->rwlatch,0);
if(cnk->pageType != BLOB_PAGE) {
stasis_page_blob_initialize(cnk);
}
unlock(cnk->rwlatch);
// Don't care about race; writes in race have undefined semantics...
TpageSetRange(xid,rec.offset+chunk,0,((const byte*)dat)+(chunk*USABLE_SIZE_OF_PAGE),USABLE_SIZE_OF_PAGE); TpageSetRange(xid,rec.offset+chunk,0,((const byte*)dat)+(chunk*USABLE_SIZE_OF_PAGE),USABLE_SIZE_OF_PAGE);
releasePage(cnk);
} }
Page * cnk = loadPage(xid, rec.offset+chunk); // Painful; allocate buffer for zero padding. TODO: Remove zero padding?
writelock(cnk->rwlatch,0);
if(p->pageType != BLOB_PAGE) {
stasis_page_blob_initialize(cnk);
}
unlock(cnk->rwlatch);
byte * buf = calloc(1,USABLE_SIZE_OF_PAGE); byte * buf = calloc(1,USABLE_SIZE_OF_PAGE);
memcpy(buf, ((const byte*)dat)+(chunk*USABLE_SIZE_OF_PAGE), rid.size % USABLE_SIZE_OF_PAGE); memcpy(buf, ((const byte*)dat)+(chunk*USABLE_SIZE_OF_PAGE), rid.size % USABLE_SIZE_OF_PAGE);
TpageSetRange(xid,rec.offset+chunk,0,buf,USABLE_SIZE_OF_PAGE); TpageSetRange(xid,rec.offset+chunk,0,buf,USABLE_SIZE_OF_PAGE);
free(buf); free(buf);
releasePage(cnk);
} }
static int stasis_page_not_supported(int xid, Page * p) { return 0; } static int stasis_page_not_supported(int xid, Page * p) { return 0; }
@ -124,7 +114,7 @@ page_impl stasis_page_blob_impl() {
}; };
return pi; return pi;
} }
void stasis_page_blob_initialize(Page * p) { void stasis_page_blob_initialize_page(Page * p) {
assertlocked(p->rwlatch); assertlocked(p->rwlatch);
DEBUG("lsn: %lld\n",(long long)p->LSN); DEBUG("lsn: %lld\n",(long long)p->LSN);
stasis_page_cleanup(p); stasis_page_cleanup(p);

View file

@ -226,12 +226,14 @@ static int op_init_multipage_impl(const LogEntry *e, Page *ignored) {
Page * p = loadPage(e->xid, arg->firstPage + i); Page * p = loadPage(e->xid, arg->firstPage + i);
if(stasis_operation_multi_should_apply(e, p)) { if(stasis_operation_multi_should_apply(e, p)) {
writelock(p->rwlatch, 0); writelock(p->rwlatch, 0);
if(arg->recordSize) { if(arg->recordSize == 0) {
stasis_page_slotted_initialize_page(p);
} else if(arg->recordSize == BLOB_SLOT) {
stasis_page_blob_initialize_page(p);
} else {
stasis_fixed_initialize_page(p, arg->recordSize, stasis_fixed_initialize_page(p, arg->recordSize,
stasis_fixed_records_per_page stasis_fixed_records_per_page
(stasis_record_type_to_size(arg->recordSize))); (stasis_record_type_to_size(arg->recordSize)));
} else {
stasis_page_slotted_initialize_page(p);
} }
stasis_page_lsn_write(e->xid, p, e->LSN); stasis_page_lsn_write(e->xid, p, e->LSN);
unlock(p->rwlatch); unlock(p->rwlatch);
@ -258,7 +260,15 @@ int TinitializeFixedPageRange(int xid, pageid_t start, pageid_t count, size_t si
Tupdate(xid, MULTI_PAGEID, &arg, sizeof(arg), OPERATION_INITIALIZE_MULTIPAGE); Tupdate(xid, MULTI_PAGEID, &arg, sizeof(arg), OPERATION_INITIALIZE_MULTIPAGE);
return 0; return 0;
} }
int TinitializeBlobPageRange(int xid, pageid_t start, pageid_t count) {
init_multipage_arg arg;
arg.firstPage = start;
arg.numPages = count;
arg.recordSize = BLOB_SLOT;
Tupdate(xid, MULTI_PAGEID, &arg, sizeof(arg), OPERATION_INITIALIZE_MULTIPAGE);
return 0;
}
stasis_operation_impl stasis_op_impl_page_initialize() { stasis_operation_impl stasis_op_impl_page_initialize() {
stasis_operation_impl o = { stasis_operation_impl o = {
OPERATION_INITIALIZE_PAGE, OPERATION_INITIALIZE_PAGE,

View file

@ -81,10 +81,20 @@ void TinitializeFixedPage(int xid, pageid_t page,
*/ */
int TinitializeSlottedPageRange(int xid, pageid_t start, pageid_t count); int TinitializeSlottedPageRange(int xid, pageid_t start, pageid_t count);
/** /**
* @see TinitializeSlottedPageRange for an analogous function, and a description of * Initialize a contiguous range of pages for storage of fixed size records.
* this function's non-standard impacts upon recovery. *
* This method uses blind-writes to conserve log bandwidth. It is only safe
* to call this function against newly allocated pages.
*
* @see fixed.c for more information about the page format.
*/ */
int TinitializeFixedPageRange(int xid, pageid_t start, pageid_t count, size_t size); int TinitializeFixedPageRange(int xid, pageid_t start, pageid_t count, size_t size);
/**
* Initialize a contiguous range of pages for storage of a blob (large object).
*
* @see blobManager.c for more information about the page format.
*/
int TinitializeBlobPageRange(int xid, pageid_t start, pageid_t count);
int TpageGetType(int xid, pageid_t page); int TpageGetType(int xid, pageid_t page);

View file

@ -923,7 +923,7 @@ void stasis_slotted_lsn_free_initialize_page(Page * p);
void stasis_fixed_initialize_page(Page * page, size_t size, int count); void stasis_fixed_initialize_page(Page * page, size_t size, int count);
void stasis_indirect_initialize_page(Page * p, int height); void stasis_indirect_initialize_page(Page * p, int height);
int stasis_fixed_records_per_page(size_t size); int stasis_fixed_records_per_page(size_t size);
void stasis_page_blob_initialize(Page * p); void stasis_page_blob_initialize_page(Page * p);
page_impl slottedLsnFreeImpl(); page_impl slottedLsnFreeImpl();
page_impl segmentImpl(); page_impl segmentImpl();