- Fix compile warning + broken fcn pointer from last commit.

- Address crashes in Tupdate() and recovery caused by mixing segments and page oriented recovery  (add loadPageForOperation() call)
This commit is contained in:
Sears Russell 2009-07-16 03:05:32 +00:00
parent b44f8b17b3
commit 9ef3adf40c
23 changed files with 114 additions and 43 deletions

View file

@ -203,12 +203,14 @@ int main (int argc, char ** argv) {
{
stasis_operation_impl op_ins = {
TREE_INSERT,
UNKNOWN_TYPE_PAGE,
OPERATION_NOOP,
TREE_REMOVE,
op_tree_insert
};
stasis_operation_impl op_rem = {
TREE_REMOVE,
UNKNOWN_TYPE_PAGE,
OPERATION_NOOP,
TREE_INSERT,
op_tree_remove

View file

@ -187,6 +187,23 @@ Page * loadUninitializedPage(int xid, pageid_t pageid) {
return loadUninitPageImpl(xid, pageid);
}
Page * loadPageForOperation(int xid, pageid_t pageid, int op) {
pagetype_t type = stasis_operation_type(op);
Page * p;
if(pageid == SEGMENT_PAGEID) {
assert(type = SEGMENT_PAGE);
p = 0;
} else if(type == UNINITIALIZED_PAGE) {
p = loadUninitializedPage(xid, pageid);
} else if(type == UNKNOWN_TYPE_PAGE) {
p = loadPage(xid, pageid);
} else {
p = loadPageOfType(xid, pageid, type);
}
return p;
}
Page * getCachedPage(int xid, pageid_t pageid) {
return getCachedPageImpl(xid, pageid);
}

View file

@ -22,6 +22,7 @@ static pthread_key_t lastPage;
static void bufManBufDeinit();
static compensated_function Page *bufManLoadPage(int xid, pageid_t pageid, pagetype_t type);
static compensated_function Page *bufManGetCachedPage(int xid, pageid_t pageid);
static compensated_function Page *bufManLoadUninitPage(int xid, pageid_t pageid);
static void bufManReleasePage (Page * p);
static void bufManSimulateBufferManagerCrash();
@ -45,7 +46,7 @@ int stasis_buffer_manager_deprecated_open(stasis_page_handle_t * ph) {
releasePageImpl = bufManReleasePage;
loadPageImpl = bufManLoadPage;
loadUninitPageImpl = bufManLoadUninitPage;
getCachedPageImpl = bufManLoadPage; // Since this code is deprecated, loadPage is "good enough" though it breaks segments.
getCachedPageImpl = bufManGetCachedPage;
writeBackPage = pageWrite_legacyWrapper;
forcePages = forcePageFile_legacyWrapper;
forcePageRange = forceRangePageFile_legacyWrapper;
@ -335,8 +336,7 @@ static Page* bufManGetPage(pageid_t pageid, int locktype, int uninitialized, pag
return ret;
}
static compensated_function Page *bufManLoadPage(int xid, pageid_t pageid, pagetype_t type) {
static compensated_function Page *bufManLoadPage(int xid, const pageid_t pageid, pagetype_t type) {
Page * ret = pthread_getspecific(lastPage);
@ -367,6 +367,11 @@ static compensated_function Page *bufManLoadPage(int xid, pageid_t pageid, paget
return ret;
}
static Page* bufManGetCachedPage(int xid, const pageid_t pageid) {
// XXX hack; but this code is deprecated
return bufManLoadPage(xid, pageid, UNKNOWN_TYPE_PAGE);
}
static compensated_function Page *bufManLoadUninitPage(int xid, pageid_t pageid) {
Page * ret = pthread_getspecific(lastPage);

View file

@ -40,7 +40,9 @@ static Page * paLoadPage(int xid, pageid_t pageid, pagetype_t type) {
pthread_mutex_unlock(&pageArray_mut);
return pageMap[pageid];
}
static Page* paGetCachedPage(int xid, pageid_t page) {
return paLoadPage(xid, page, UNKNOWN_TYPE_PAGE);
}
static void paReleasePage(Page * p) {
stasis_dirty_page_table_set_clean(stasis_dirty_page_table, p);
}
@ -62,7 +64,7 @@ void stasis_buffer_manager_mem_array_open () {
releasePageImpl = paReleasePage;
loadPageImpl = paLoadPage;
getCachedPageImpl = paLoadPage;
getCachedPageImpl = paGetCachedPage;
writeBackPage = paWriteBackPage;
forcePages = paForcePages;
stasis_buffer_manager_close = paBufDeinit;

View file

@ -193,3 +193,6 @@ void stasis_operation_undo(const LogEntry * e, lsn_t effective_lsn, Page * p) {
}
}
}
pagetype_t stasis_operation_type(int op) {
return stasis_operation_table[op].page_type;
}

View file

@ -167,6 +167,7 @@ static pthread_mutex_t talloc_mutex = PTHREAD_MUTEX_INITIALIZER;
stasis_operation_impl stasis_op_impl_alloc() {
stasis_operation_impl o = {
OPERATION_ALLOC,
UNKNOWN_TYPE_PAGE,
OPERATION_ALLOC,
OPERATION_DEALLOC,
op_alloc
@ -178,6 +179,7 @@ stasis_operation_impl stasis_op_impl_alloc() {
stasis_operation_impl stasis_op_impl_dealloc() {
stasis_operation_impl o = {
OPERATION_DEALLOC,
UNKNOWN_TYPE_PAGE,
OPERATION_DEALLOC,
OPERATION_REALLOC,
op_dealloc
@ -189,6 +191,7 @@ stasis_operation_impl stasis_op_impl_dealloc() {
stasis_operation_impl stasis_op_impl_realloc() {
stasis_operation_impl o = {
OPERATION_REALLOC,
UNKNOWN_TYPE_PAGE,
OPERATION_REALLOC,
OPERATION_NOOP,
op_realloc

View file

@ -101,6 +101,7 @@ static int array_list_op_init_header(const LogEntry* e, Page* p) {
stasis_operation_impl stasis_op_impl_array_list_header_init() {
stasis_operation_impl o = {
OPERATION_ARRAY_LIST_HEADER_INIT,
UNINITIALIZED_PAGE,
OPERATION_ARRAY_LIST_HEADER_INIT,
/* Do not need to roll back this page, since it will be deallocated. */
OPERATION_NOOP,

View file

@ -63,6 +63,7 @@ static int op_decrement(const LogEntry* e, Page* p) {
stasis_operation_impl stasis_op_impl_decrement() {
stasis_operation_impl o = {
OPERATION_DECREMENT,
UNKNOWN_TYPE_PAGE,
OPERATION_DECREMENT,
OPERATION_INCREMENT,
op_decrement

View file

@ -64,6 +64,7 @@ static int op_decrement(const LogEntry* e, Page* p) {
stasis_operation_impl stasis_op_impl_increment() {
stasis_operation_impl o = {
OPERATION_INCREMENT,
UNKNOWN_TYPE_PAGE,
OPERATION_INCREMENT,
OPERATION_DECREMENT,
op_decrement

View file

@ -163,6 +163,7 @@ compensated_function static int op_linear_hash_remove(const LogEntry* e, Page* p
stasis_operation_impl stasis_op_impl_linear_hash_insert() {
stasis_operation_impl o = {
OPERATION_LINEAR_HASH_INSERT,
UNKNOWN_TYPE_PAGE,
OPERATION_NOOP,
OPERATION_LINEAR_HASH_REMOVE,
&op_linear_hash_insert
@ -172,6 +173,7 @@ stasis_operation_impl stasis_op_impl_linear_hash_insert() {
stasis_operation_impl stasis_op_impl_linear_hash_remove() {
stasis_operation_impl o = {
OPERATION_LINEAR_HASH_REMOVE,
UNKNOWN_TYPE_PAGE,
OPERATION_NOOP,
OPERATION_LINEAR_HASH_INSERT,
&op_linear_hash_remove

View file

@ -133,6 +133,7 @@ compensated_function int TlinkedListInsert(int xid, recordid list, const byte *
stasis_operation_impl stasis_op_impl_linked_list_insert() {
stasis_operation_impl o = {
OPERATION_LINKED_LIST_INSERT,
UNKNOWN_TYPE_PAGE,
OPERATION_NOOP,
OPERATION_LINKED_LIST_REMOVE,
&op_linked_list_nta_insert
@ -142,6 +143,7 @@ stasis_operation_impl stasis_op_impl_linked_list_insert() {
stasis_operation_impl stasis_op_impl_linked_list_remove() {
stasis_operation_impl o = {
OPERATION_LINKED_LIST_REMOVE,
UNKNOWN_TYPE_PAGE,
OPERATION_NOOP,
OPERATION_LINKED_LIST_INSERT,
&op_linked_list_nta_remove

View file

@ -94,6 +94,7 @@ int TsetWriteBack(int xid, pageid_t page, pageoff_t off, pageoff_t len, const vo
stasis_operation_impl stasis_op_impl_lsn_free_set() {
stasis_operation_impl o = {
OPERATION_SET_LSN_FREE,
SEGMENT_PAGE,
OPERATION_SET_LSN_FREE,
OPERATION_SET_LSN_FREE_INVERSE,
op_lsn_free_set
@ -104,6 +105,7 @@ stasis_operation_impl stasis_op_impl_lsn_free_set() {
stasis_operation_impl stasis_op_impl_lsn_free_set_inverse() {
stasis_operation_impl o = {
OPERATION_SET_LSN_FREE_INVERSE,
SEGMENT_PAGE,
OPERATION_SET_LSN_FREE_INVERSE,
OPERATION_SET_LSN_FREE,
op_lsn_free_unset

View file

@ -55,6 +55,7 @@ int noop(const LogEntry* e, Page* p) {
stasis_operation_impl stasis_op_impl_noop() {
stasis_operation_impl o = {
OPERATION_NOOP,
UNKNOWN_TYPE_PAGE,
OPERATION_NOOP,
OPERATION_NOOP,
&noop

View file

@ -166,6 +166,7 @@ int TpageGetType(int xid, pageid_t page) {
stasis_operation_impl stasis_op_impl_page_set_range() {
stasis_operation_impl o = {
OPERATION_PAGE_SET_RANGE,
UNKNOWN_TYPE_PAGE,
OPERATION_PAGE_SET_RANGE,
OPERATION_PAGE_SET_RANGE_INVERSE,
op_page_set_range
@ -176,6 +177,7 @@ stasis_operation_impl stasis_op_impl_page_set_range() {
stasis_operation_impl stasis_op_impl_page_set_range_inverse() {
stasis_operation_impl o = {
OPERATION_PAGE_SET_RANGE_INVERSE,
UNKNOWN_TYPE_PAGE,
OPERATION_PAGE_SET_RANGE_INVERSE,
OPERATION_PAGE_SET_RANGE,
&op_page_set_range_inverse
@ -220,6 +222,7 @@ static int op_initialize_page(const LogEntry* e, Page* p) {
stasis_operation_impl stasis_op_impl_page_initialize() {
stasis_operation_impl o = {
OPERATION_INITIALIZE_PAGE,
UNINITIALIZED_PAGE,
OPERATION_INITIALIZE_PAGE,
OPERATION_NOOP,
op_initialize_page

View file

@ -62,6 +62,7 @@ static int op_prepare(const LogEntry * e, Page * p) {
stasis_operation_impl stasis_op_impl_prepare() {
stasis_operation_impl o = {
OPERATION_PREPARE, /* id */
UNKNOWN_TYPE_PAGE,
OPERATION_PREPARE,
OPERATION_NOOP,
&op_prepare/* Function */

View file

@ -483,6 +483,7 @@ pageid_t TregionAlloc(int xid, pageid_t pageCount, int allocationManager) {
stasis_operation_impl stasis_op_impl_boundary_tag_alloc() {
stasis_operation_impl o = {
OPERATION_ALLOC_BOUNDARY_TAG,
UNINITIALIZED_PAGE,
OPERATION_ALLOC_BOUNDARY_TAG,
OPERATION_NOOP,
op_alloc_boundary_tag
@ -493,6 +494,7 @@ stasis_operation_impl stasis_op_impl_boundary_tag_alloc() {
stasis_operation_impl stasis_op_impl_region_alloc() {
stasis_operation_impl o = {
OPERATION_ALLOC_REGION,
UNKNOWN_TYPE_PAGE,
OPERATION_NOOP,
OPERATION_ALLOC_REGION_INVERSE,
noop
@ -502,6 +504,7 @@ stasis_operation_impl stasis_op_impl_region_alloc() {
stasis_operation_impl stasis_op_impl_region_alloc_inverse() {
stasis_operation_impl o = {
OPERATION_ALLOC_REGION_INVERSE,
UNKNOWN_TYPE_PAGE,
OPERATION_ALLOC_REGION_INVERSE,
OPERATION_INVALID,
op_dealloc_region
@ -511,6 +514,7 @@ stasis_operation_impl stasis_op_impl_region_alloc_inverse() {
stasis_operation_impl stasis_op_impl_region_dealloc() {
stasis_operation_impl o = {
OPERATION_DEALLOC_REGION,
UNKNOWN_TYPE_PAGE,
OPERATION_NOOP,
OPERATION_DEALLOC_REGION_INVERSE,
noop
@ -521,6 +525,7 @@ stasis_operation_impl stasis_op_impl_region_dealloc() {
stasis_operation_impl stasis_op_impl_region_dealloc_inverse() {
stasis_operation_impl o = {
OPERATION_DEALLOC_REGION_INVERSE,
UNKNOWN_TYPE_PAGE,
OPERATION_DEALLOC_REGION_INVERSE,
OPERATION_INVALID,
op_alloc_region

View file

@ -129,6 +129,7 @@ static int op_segment_file_pwrite_inverse(const LogEntry* e, Page* p) {
stasis_operation_impl stasis_op_impl_segment_file_pwrite() {
static stasis_operation_impl o = {
OPERATION_SEGMENT_FILE_PWRITE,
SEGMENT_PAGE,
OPERATION_SEGMENT_FILE_PWRITE,
OPERATION_SEGMENT_FILE_PWRITE_INVERSE,
op_segment_file_pwrite
@ -139,6 +140,7 @@ stasis_operation_impl stasis_op_impl_segment_file_pwrite() {
stasis_operation_impl stasis_op_impl_segment_file_pwrite_inverse() {
static stasis_operation_impl o = {
OPERATION_SEGMENT_FILE_PWRITE_INVERSE,
SEGMENT_PAGE,
OPERATION_SEGMENT_FILE_PWRITE_INVERSE,
OPERATION_SEGMENT_FILE_PWRITE,
op_segment_file_pwrite_inverse

View file

@ -217,41 +217,45 @@ compensated_function void TsetRange(int xid, recordid rid, int offset, int lengt
}
stasis_operation_impl stasis_op_impl_set() {
stasis_operation_impl o = {
OPERATION_SET,
OPERATION_SET,
OPERATION_SET_INVERSE,
op_set
};
return o;
stasis_operation_impl o = {
OPERATION_SET,
UNKNOWN_TYPE_PAGE,
OPERATION_SET,
OPERATION_SET_INVERSE,
op_set
};
return o;
}
stasis_operation_impl stasis_op_impl_set_inverse() {
stasis_operation_impl o = {
OPERATION_SET_INVERSE,
OPERATION_SET_INVERSE,
OPERATION_SET,
op_set_inverse
};
return o;
stasis_operation_impl o = {
OPERATION_SET_INVERSE,
UNKNOWN_TYPE_PAGE,
OPERATION_SET_INVERSE,
OPERATION_SET,
op_set_inverse
};
return o;
}
stasis_operation_impl stasis_op_impl_set_range() {
stasis_operation_impl o = {
OPERATION_SET_RANGE,
OPERATION_SET_RANGE,
OPERATION_SET_RANGE_INVERSE,
op_set_range
};
return o;
stasis_operation_impl o = {
OPERATION_SET_RANGE,
UNKNOWN_TYPE_PAGE,
OPERATION_SET_RANGE,
OPERATION_SET_RANGE_INVERSE,
op_set_range
};
return o;
}
stasis_operation_impl stasis_op_impl_set_range_inverse() {
stasis_operation_impl o = {
OPERATION_SET_RANGE_INVERSE,
OPERATION_SET_RANGE_INVERSE,
OPERATION_SET_RANGE,
op_set_range_inverse
};
return o;
stasis_operation_impl o = {
OPERATION_SET_RANGE_INVERSE,
UNKNOWN_TYPE_PAGE,
OPERATION_SET_RANGE_INVERSE,
OPERATION_SET_RANGE,
op_set_range_inverse
};
return o;
}

View file

@ -2,7 +2,13 @@
#include <stasis/page.h>
#include <stasis/page/slotted.h>
#include <assert.h>
/** @todo should page implementations provide readLSN / writeLSN??? */
// plan: slotted fsck on read / write. Make it more thorough so that the other methods only check for record existence.
// intermediate ops assume that slotted.c is correctly implemented (ie: fsck passes iff page is OK, page always "stays" OK)
// stop calloing slotted fsck on each operation. This should speed things up a *lot*.
// benchmark page ops (real time) + hash table (user time)
#include <stasis/truncation.h>
/**

View file

@ -193,7 +193,7 @@ static void stasis_recovery_redo(stasis_log_t* log) {
} else if(e->update.page == SEGMENT_PAGEID) {
stasis_operation_redo(e,0);
} else {
Page * p = loadPage(e->xid, e->update.page);
Page * p = loadPageForOperation(e->xid, e->update.page, e->update.funcID);
writelock(p->rwlatch,0);
stasis_operation_redo(e,p);
unlock(p->rwlatch);
@ -213,7 +213,7 @@ static void stasis_recovery_redo(stasis_log_t* log) {
// need to grab latch page here so that Tabort() can be atomic
// below...
Page * p = loadPage(e->xid, ce->update.page);
Page * p = loadPageForOperation(e->xid, ce->update.page, ce->update.funcID);
writelock(p->rwlatch,0);
stasis_operation_undo(ce, e->LSN, p);
unlock(p->rwlatch);
@ -290,11 +290,8 @@ static void stasis_recovery_undo(stasis_log_t* log, int recovery) {
// atomically log (getting clr), and apply undo.
// otherwise, there's a race where the page's LSN is
// updated before we undo.
Page* p = NULL;
if(e->update.page != INVALID_PAGE) {
p = loadPage(e->xid, e->update.page);
writelock(p->rwlatch,0);
}
Page* p = loadPageForOperation(e->xid, e->update.page, e->update.funcID);
if(p) writelock(p->rwlatch,0);
// Log a CLR for this entry
lsn_t clr_lsn = stasis_log_write_clr(log, e);

View file

@ -159,7 +159,7 @@ compensated_function void Tupdate(int xid, pageid_t page,
LogEntry * e;
assert(xid >= 0 && stasis_transaction_table[xid % MAX_TRANSACTIONS].xid == xid);
Page * p = (page == SEGMENT_PAGEID) ? 0 : loadPage(xid, page);
Page * p = loadPageForOperation(xid, page, op);
try {
if(globalLockManager.writeLockPage && p) {

View file

@ -97,6 +97,8 @@ Page * loadPageOfType(int xid, pageid_t pageid, pagetype_t type);
Page * loadUninitializedPage(int xid, pageid_t pageid);
Page * loadPageForOperation(int xid, pageid_t pageid, int op);
Page * getCachedPage(int xid, const pageid_t pageid);
/**
@ -110,7 +112,7 @@ extern Page * (*loadUninitPageImpl)(int xid, pageid_t pageid);
@return a pointer to the page, or NULL if the page is not in cache, or is being read from disk.
*/
extern Page * (*getCachedPageImpl)(int xid, pageid_t pageid);
extern Page * (*getCachedPageImpl)(int xid, const pageid_t pageid);
/**
loadPage aquires a lock when it is called, effectively pinning it
in memory. releasePage releases this lock.

View file

@ -83,6 +83,13 @@ typedef struct {
* ID of operation, also index into operations table
*/
int id;
/**
* The type of the page this operation modifies. If UNKNWON_TYPE_PAGE,
* then the page's header will be trusted. If UNINITIALIZED_PAGE, then
* the contents of the in-memory version of the page will be undefined
* before the operation is applied. (This is used for allocation).
*/
pagetype_t page_type;
/**
* ID of redo operation; logical operations typically
* set this to OPERATION_NOOP.
@ -209,6 +216,8 @@ void stasis_operation_undo(const LogEntry * e, lsn_t clr_lsn,
*/
void stasis_operation_redo(const LogEntry * e, Page * p);
pagetype_t stasis_operation_type(int op);
END_C_DECLS
#endif