Simplified the page locking conventions.
Added paranoid calls to assertlock() to page.c Fixed race in abort(): - pick CLR LSN - release page lock - someone else updates page - lock page - apply undo
This commit is contained in:
parent
6d17442380
commit
ff0887624c
14 changed files with 153 additions and 146 deletions
|
@ -58,16 +58,14 @@ Operation operationsTable[MAX_OPERATIONS];
|
|||
|
||||
void doUpdate(const LogEntry * e, Page * p) {
|
||||
assert(p);
|
||||
|
||||
assertlocked(p->rwlatch);
|
||||
|
||||
operationsTable[e->update.funcID].run(e, p);
|
||||
|
||||
writelock(p->rwlatch,0);
|
||||
DEBUG("OPERATION xid %d Do, %lld {%lld:%lld}\n", e->xid,
|
||||
e->LSN, e->update.page, stasis_page_lsn_read(p));
|
||||
|
||||
stasis_page_lsn_write(e->xid, p, e->LSN);
|
||||
unlock(p->rwlatch);
|
||||
|
||||
}
|
||||
|
||||
|
@ -89,20 +87,18 @@ void redoUpdate(const LogEntry * e) {
|
|||
// is for this log type.
|
||||
|
||||
// contrast with doUpdate(), which doesn't check the .id field.
|
||||
stasis_page_lsn_write(e->xid, p, e->LSN); //XXX do this after run();
|
||||
unlock(p->rwlatch); /// XXX keep lock while calling run();
|
||||
|
||||
operationsTable[operationsTable[e->update.funcID].id]
|
||||
.run(e,p);
|
||||
stasis_page_lsn_write(e->xid, p, e->LSN);
|
||||
} else {
|
||||
DEBUG("OPERATION xid %d skip redo, %lld {%lld:%lld}\n", e->xid,
|
||||
e->LSN, e->update.page, stasis_page_lsn_read(p));
|
||||
unlock(p->rwlatch);
|
||||
}
|
||||
unlock(p->rwlatch);
|
||||
releasePage(p);
|
||||
}
|
||||
|
||||
void undoUpdate(const LogEntry * e, lsn_t effective_lsn) {
|
||||
void undoUpdate(const LogEntry * e, lsn_t effective_lsn, Page * p) {
|
||||
// Only handle update entries
|
||||
assert(e->type == UPDATELOG);
|
||||
|
||||
|
@ -116,19 +112,16 @@ void undoUpdate(const LogEntry * e, lsn_t effective_lsn) {
|
|||
|
||||
operationsTable[undo].run(e,0);
|
||||
} else {
|
||||
Page * p = loadPage(e->xid, e->update.page);
|
||||
writelock(p->rwlatch,0);
|
||||
assert(p->id == e->update.page);
|
||||
|
||||
if(stasis_page_lsn_read(p) < effective_lsn) {
|
||||
DEBUG("OPERATION xid %d Undo, %lld {%lld:%lld}\n", e->xid,
|
||||
e->LSN, e->update.page, stasis_page_lsn_read(p));
|
||||
stasis_page_lsn_write(e->xid, p, effective_lsn); // XXX call after run()
|
||||
unlock(p->rwlatch); // release after run()
|
||||
operationsTable[undo].run(e,p);
|
||||
stasis_page_lsn_write(e->xid, p, effective_lsn);
|
||||
} else {
|
||||
DEBUG("OPERATION xid %d skip undo, %lld {%lld:%lld}\n", e->xid,
|
||||
e->LSN, e->update.page, stasis_page_lsn_read(p));
|
||||
unlock(p->rwlatch);
|
||||
}
|
||||
releasePage(p);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -96,8 +96,6 @@ typedef struct {
|
|||
} alloc_arg;
|
||||
|
||||
static int op_alloc(const LogEntry* e, Page* p) { //(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) {
|
||||
writelock(p->rwlatch, 0);
|
||||
|
||||
assert(e->update.arg_size >= sizeof(alloc_arg));
|
||||
|
||||
const alloc_arg* arg = (const alloc_arg*)getUpdateArgs(e);
|
||||
|
@ -116,12 +114,11 @@ static int op_alloc(const LogEntry* e, Page* p) { //(int xid, Page * p, lsn_t ls
|
|||
// otherwise, no preimage
|
||||
assert(e->update.arg_size == sizeof(alloc_arg));
|
||||
}
|
||||
unlock(p->rwlatch);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
static int op_dealloc(const LogEntry* e, Page* p) { //deoperate(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) {
|
||||
writelock(p->rwlatch,0);
|
||||
assert(e->update.arg_size >= sizeof(alloc_arg));
|
||||
const alloc_arg* arg = (const alloc_arg*)getUpdateArgs(e);
|
||||
recordid rid = {
|
||||
|
@ -134,12 +131,10 @@ static int op_dealloc(const LogEntry* e, Page* p) { //deoperate(int xid, Page *
|
|||
|
||||
stasis_record_free(e->xid, p, rid);
|
||||
assert(stasis_record_type_read(e->xid, p, rid) == INVALID_SLOT);
|
||||
unlock(p->rwlatch);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int op_realloc(const LogEntry* e, Page* p) { //reoperate(int xid, Page *p, lsn_t lsn, recordid rid, const void * dat) {
|
||||
writelock(p->rwlatch,0);
|
||||
assert(e->update.arg_size >= sizeof(alloc_arg));
|
||||
const alloc_arg* arg = (const alloc_arg*)getUpdateArgs(e);
|
||||
|
||||
|
@ -157,8 +152,6 @@ static int op_realloc(const LogEntry* e, Page* p) { //reoperate(int xid, Page *p
|
|||
byte * buf = stasis_record_write_begin(e->xid,p,rid);
|
||||
memcpy(buf, arg+1, stasis_record_length_read(e->xid,p,rid));
|
||||
stasis_record_write_done(e->xid,p,rid,buf);
|
||||
unlock(p->rwlatch);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -411,21 +404,22 @@ compensated_function void Tdealloc(int xid, recordid rid) {
|
|||
p = loadPage(xid, rid.page);
|
||||
} end;
|
||||
|
||||
|
||||
readlock(p->rwlatch,0);
|
||||
|
||||
recordid newrid = stasis_record_dereference(xid, p, rid);
|
||||
allocationPolicyLockPage(allocPolicy, xid, newrid.page);
|
||||
|
||||
readlock(p->rwlatch,0);
|
||||
int64_t size = stasis_record_length_read(xid,p,rid);
|
||||
unlock(p->rwlatch);
|
||||
|
||||
byte * preimage = malloc(sizeof(alloc_arg)+rid.size);
|
||||
|
||||
|
||||
((alloc_arg*)preimage)->slot = rid.slot;
|
||||
((alloc_arg*)preimage)->size = size;
|
||||
|
||||
begin_action(releasePage, p) {
|
||||
stasis_record_read(xid, p, rid, preimage+sizeof(alloc_arg));
|
||||
unlock(p->rwlatch);
|
||||
|
||||
/** @todo race in Tdealloc; do we care, or is this something that the log manager should cope with? */
|
||||
Tupdate(xid, rid, preimage, sizeof(alloc_arg)+rid.size, OPERATION_DEALLOC);
|
||||
} compensate;
|
||||
|
@ -469,7 +463,6 @@ void TinitializeFixedPage(int xid, int pageid, int slotLength) {
|
|||
}
|
||||
|
||||
static int op_initialize_page(const LogEntry* e, Page* p) { //int xid, Page *p, lsn_t lsn, recordid rid, const void * dat) {
|
||||
writelock(p->rwlatch, 0);
|
||||
assert(e->update.arg_size == sizeof(alloc_arg));
|
||||
const alloc_arg* arg = (const alloc_arg*)getUpdateArgs(e);
|
||||
|
||||
|
@ -483,7 +476,6 @@ static int op_initialize_page(const LogEntry* e, Page* p) { //int xid, Page *p,
|
|||
default:
|
||||
abort();
|
||||
}
|
||||
unlock(p->rwlatch);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -67,9 +67,6 @@ static int op_array_list_alloc(const LogEntry* e, Page* p) {
|
|||
int multiplier = tlp->multiplier;
|
||||
int size = tlp->size;
|
||||
|
||||
/* Allocing this page -> implicit lock, but latch to conformt to
|
||||
fixedPage's interface. */
|
||||
writelock(p->rwlatch, 0);
|
||||
stasis_fixed_initialize_page(p, sizeof(int), stasis_fixed_records_per_page(sizeof(int)));
|
||||
|
||||
recordid countRid, multiplierRid, slotSizeRid, maxOffset, firstDataPageRid;
|
||||
|
@ -95,7 +92,6 @@ static int op_array_list_alloc(const LogEntry* e, Page* p) {
|
|||
ret.page = firstPage;
|
||||
ret.slot = 0; /* slot = # of slots in array... */
|
||||
ret.size = size;
|
||||
unlock(p->rwlatch);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -190,9 +186,6 @@ compensated_function int TarrayListLength(int xid, recordid rid) {
|
|||
|
||||
/*----------------------------------------------------------------------------*/
|
||||
|
||||
/**
|
||||
@todo XXX latching for dereference arraylist rid (and other dereference functions...)
|
||||
*/
|
||||
recordid dereferenceArrayListRid(int xid, Page * p, int offset) {
|
||||
readlock(p->rwlatch,0);
|
||||
TarrayListParameters tlp = pageToTLP(xid, p);
|
||||
|
|
|
@ -412,7 +412,9 @@ recordid ThashAlloc(int xid, int keySize, int valSize) {
|
|||
|
||||
assert(headerRidB);
|
||||
Page * p = loadPage(xid, rid.page);
|
||||
readlock(p->rwlatch,0);
|
||||
recordid * check = malloc(stasis_record_type_to_size(stasis_record_dereference(xid, p, rid).size));
|
||||
unlock(p->rwlatch);
|
||||
releasePage(p);
|
||||
rid.slot = 0;
|
||||
Tread(xid, rid, check);
|
||||
|
|
|
@ -103,11 +103,9 @@ compensated_function int TpageAlloc(int xid /*, int type */) {
|
|||
}
|
||||
|
||||
int op_fixed_page_alloc(const LogEntry* e, Page* p) {
|
||||
writelock(p->rwlatch,0);
|
||||
assert(e->update.arg_size == sizeof(int));
|
||||
int slot_size = *(const int*)getUpdateArgs(e);
|
||||
stasis_fixed_initialize_page(p, slot_size, stasis_fixed_records_per_page(slot_size));
|
||||
unlock(p->rwlatch);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -21,13 +21,8 @@ static void TdeallocBoundaryTag(int xid, unsigned int page);
|
|||
/** This doesn't need a latch since it is only initiated within nested
|
||||
top actions (and is local to this file. During abort(), the nested
|
||||
top action's logical undo grabs the necessary latches.
|
||||
|
||||
@todo opearate_alloc_boundary_tag is executed without holding the
|
||||
proper mutex during REDO. For now this doesn't matter, but it
|
||||
could matter in the future.
|
||||
*/
|
||||
static int op_alloc_boundary_tag(const LogEntry* e, Page* p) {
|
||||
writelock(p->rwlatch, 0);
|
||||
stasis_slotted_initialize_page(p);
|
||||
recordid rid = {p->id, 0, sizeof(boundary_tag)};
|
||||
assert(e->update.arg_size == sizeof(boundary_tag));
|
||||
|
@ -36,7 +31,6 @@ static int op_alloc_boundary_tag(const LogEntry* e, Page* p) {
|
|||
byte * buf = stasis_record_write_begin(e->xid, p, rid);
|
||||
memcpy(buf, getUpdateArgs(e), stasis_record_length_read(e->xid, p, rid));
|
||||
stasis_record_write_done(e->xid, p, rid, buf);
|
||||
unlock(p->rwlatch);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -164,8 +158,9 @@ void regionsInit() {
|
|||
// hack; allocate a fake log entry; pass it into ourselves.
|
||||
LogEntry * e = allocUpdateLogEntry(0,0,OPERATION_ALLOC_BOUNDARY_TAG,
|
||||
p->id, (const byte*)&t, sizeof(boundary_tag));
|
||||
|
||||
writelock(p->rwlatch,0);
|
||||
op_alloc_boundary_tag(e,p);
|
||||
unlock(p->rwlatch);
|
||||
FreeLogEntry(e);
|
||||
}
|
||||
holding_mutex = 0;
|
||||
|
|
|
@ -51,7 +51,6 @@ terms specified in this license.
|
|||
#include <string.h>
|
||||
#include <assert.h>
|
||||
static int op_set(const LogEntry *e, Page *p) {
|
||||
readlock(p->rwlatch,0);
|
||||
assert(e->update.arg_size >= sizeof(slotid_t) + sizeof(int64_t));
|
||||
const byte * b = getUpdateArgs(e);
|
||||
recordid rid;
|
||||
|
@ -66,12 +65,9 @@ static int op_set(const LogEntry *e, Page *p) {
|
|||
|
||||
stasis_record_write(e->xid, p, e->LSN, rid, b);
|
||||
|
||||
unlock(p->rwlatch);
|
||||
|
||||
return 0;
|
||||
}
|
||||
static int op_set_inverse(const LogEntry *e, Page *p) {
|
||||
readlock(p->rwlatch,0);
|
||||
assert(e->update.arg_size >= sizeof(slotid_t) + sizeof(int64_t));
|
||||
const byte * b = getUpdateArgs(e);
|
||||
recordid rid;
|
||||
|
@ -85,8 +81,6 @@ static int op_set_inverse(const LogEntry *e, Page *p) {
|
|||
|
||||
stasis_record_write(e->xid, p, e->LSN, rid, b+rid.size);
|
||||
|
||||
unlock(p->rwlatch);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -97,13 +91,15 @@ typedef struct {
|
|||
|
||||
int Tset(int xid, recordid rid, const void * dat) {
|
||||
Page * p = loadPage(xid, rid.page);
|
||||
readlock(p->rwlatch,0);
|
||||
rid = stasis_record_dereference(xid,p,rid);
|
||||
|
||||
rid.size = stasis_record_type_to_size(rid.size);
|
||||
if(rid.size > BLOB_THRESHOLD_SIZE) {
|
||||
writeBlob(xid,p,rid,dat);
|
||||
unlock(p->rwlatch);
|
||||
releasePage(p);
|
||||
} else {
|
||||
unlock(p->rwlatch);
|
||||
releasePage(p);
|
||||
|
||||
size_t sz = sizeof(slotid_t) + sizeof(int64_t) + 2 * rid.size;
|
||||
|
@ -141,7 +137,6 @@ int TsetRaw(int xid, recordid rid, const void * dat) {
|
|||
}
|
||||
|
||||
static int op_set_range(const LogEntry* e, Page* p) {
|
||||
readlock(p->rwlatch,0);
|
||||
int diffLength = e->update.arg_size - sizeof(set_range_t);
|
||||
assert(! (diffLength % 2));
|
||||
diffLength >>= 1;
|
||||
|
@ -160,12 +155,10 @@ static int op_set_range(const LogEntry* e, Page* p) {
|
|||
stasis_record_write(e->xid, p, e->LSN, rid, tmp);
|
||||
|
||||
free(tmp);
|
||||
unlock(p->rwlatch);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int op_set_range_inverse(const LogEntry* e, Page* p) {
|
||||
readlock(p->rwlatch,0);
|
||||
int diffLength = e->update.arg_size - sizeof(set_range_t);
|
||||
assert(! (diffLength % 2));
|
||||
diffLength >>= 1;
|
||||
|
@ -183,7 +176,6 @@ static int op_set_range_inverse(const LogEntry* e, Page* p) {
|
|||
stasis_record_write(e->xid, p, e->LSN, rid, tmp);
|
||||
|
||||
free(tmp);
|
||||
unlock(p->rwlatch);
|
||||
return 0;
|
||||
}
|
||||
compensated_function void TsetRange(int xid, recordid rid, int offset, int length, const void * dat) {
|
||||
|
@ -195,7 +187,6 @@ compensated_function void TsetRange(int xid, recordid rid, int offset, int lengt
|
|||
|
||||
/// XXX rewrite without malloc (use read_begin, read_done)
|
||||
set_range_t * range = malloc(sizeof(set_range_t) + 2 * length);
|
||||
byte * record = malloc(rid.size);
|
||||
|
||||
range->offset = offset;
|
||||
range->slot = rid.slot;
|
||||
|
@ -206,18 +197,19 @@ compensated_function void TsetRange(int xid, recordid rid, int offset, int lengt
|
|||
// No further locking is necessary here; readRecord protects the
|
||||
// page layout, but attempts at concurrent modification have undefined
|
||||
// results. (See page.c)
|
||||
stasis_record_read(xid, p, rid, record);
|
||||
readlock(p->rwlatch,0);
|
||||
|
||||
const byte* record = stasis_record_read_begin(xid,p,rid);
|
||||
// Copy old value into log structure
|
||||
memcpy((byte*)(range + 1) + length, record+offset, length);
|
||||
stasis_record_read_done(xid,p,rid,record);
|
||||
|
||||
free(record);
|
||||
/** @todo will leak 'range' if interrupted with pthread_cancel */
|
||||
begin_action(releasePage, p) {
|
||||
Tupdate(xid, rid, range, sizeof(set_range_t) + 2 * length, OPERATION_SET_RANGE);
|
||||
free(range);
|
||||
} compensate;
|
||||
unlock(p->rwlatch);
|
||||
|
||||
Tupdate(xid, rid, range, sizeof(set_range_t) + 2 * length, OPERATION_SET_RANGE);
|
||||
free(range);
|
||||
|
||||
releasePage(p);
|
||||
}
|
||||
|
||||
Operation getSet() {
|
||||
|
|
|
@ -150,27 +150,23 @@ int stasis_page_impl_register(page_impl p) {
|
|||
return 0;
|
||||
}
|
||||
void stasis_record_write(int xid, Page * p, lsn_t lsn, recordid rid, const byte *dat) {
|
||||
|
||||
assertlocked(p->rwlatch);
|
||||
assert( (p->id == rid.page) && (p->memAddr != NULL) );
|
||||
|
||||
readlock(p->rwlatch, 225);
|
||||
assert(rid.size <= BLOB_THRESHOLD_SIZE);
|
||||
|
||||
byte * buf = stasis_record_write_begin(xid, p, rid);
|
||||
memcpy(buf, dat, stasis_record_length_read(xid, p, rid));
|
||||
unlock(p->rwlatch);
|
||||
|
||||
stasis_record_write_done(xid,p,rid,buf);
|
||||
assert( (p->id == rid.page) && (p->memAddr != NULL) );
|
||||
}
|
||||
int stasis_record_read(int xid, Page * p, recordid rid, byte *buf) {
|
||||
assertlocked(p->rwlatch);
|
||||
assert(rid.page == p->id);
|
||||
|
||||
assert(rid.size <= BLOB_THRESHOLD_SIZE);
|
||||
|
||||
readlock(p->rwlatch, 0);
|
||||
const byte * dat = stasis_record_read_begin(xid,p,rid);
|
||||
memcpy(buf, dat, stasis_record_length_read(xid,p,rid));
|
||||
unlock(p->rwlatch);
|
||||
|
||||
return 0;
|
||||
|
||||
}
|
||||
|
@ -178,6 +174,8 @@ int stasis_record_read(int xid, Page * p, recordid rid, byte *buf) {
|
|||
@todo stasis_record_dereference should dispatch via page_impl...
|
||||
*/
|
||||
recordid stasis_record_dereference(int xid, Page * p, recordid rid) {
|
||||
assertlocked(p->rwlatch);
|
||||
|
||||
int page_type = *stasis_page_type_ptr(p);
|
||||
if(page_type == INDIRECT_PAGE) {
|
||||
rid = dereferenceIndirectRID(xid, rid);
|
||||
|
@ -191,6 +189,8 @@ recordid stasis_record_dereference(int xid, Page * p, recordid rid) {
|
|||
|
||||
static int recordWarnedAboutPageTypeKludge = 0;
|
||||
const byte * stasis_record_read_begin(int xid, Page * p, recordid rid) {
|
||||
assertlocked(p->rwlatch);
|
||||
|
||||
int page_type = *stasis_page_type_ptr(p);
|
||||
if(!page_type) {
|
||||
page_type = FIXED_PAGE;
|
||||
|
@ -203,6 +203,8 @@ const byte * stasis_record_read_begin(int xid, Page * p, recordid rid) {
|
|||
return page_impls[page_type].recordRead(xid, p, rid);
|
||||
}
|
||||
byte * stasis_record_write_begin(int xid, Page * p, recordid rid) {
|
||||
assertlocked(p->rwlatch);
|
||||
|
||||
int page_type = *stasis_page_type_ptr(p);
|
||||
if(!page_type) {
|
||||
page_type = FIXED_PAGE;
|
||||
|
@ -228,16 +230,19 @@ void stasis_record_write_done(int xid, Page *p, recordid rid, byte *b) {
|
|||
}
|
||||
}
|
||||
int stasis_record_type_read(int xid, Page *p, recordid rid) {
|
||||
assertlocked(p->rwlatch);
|
||||
if(page_impls[*stasis_page_type_ptr(p)].recordGetType)
|
||||
return page_impls[*stasis_page_type_ptr(p)].recordGetType(xid, p, rid);
|
||||
else
|
||||
return INVALID_SLOT;
|
||||
}
|
||||
void stasis_record_type_write(int xid, Page *p, recordid rid, int type) {
|
||||
assertlocked(p->rwlatch);
|
||||
page_impls[*stasis_page_type_ptr(p)]
|
||||
.recordSetType(xid, p, rid, type);
|
||||
}
|
||||
int stasis_record_length_read(int xid, Page *p, recordid rid) {
|
||||
assertlocked(p->rwlatch);
|
||||
return page_impls[*stasis_page_type_ptr(p)]
|
||||
.recordGetLength(xid,p,rid);
|
||||
}
|
||||
|
|
|
@ -195,7 +195,12 @@ static void Redo() {
|
|||
if(ce->update.page == INVALID_PAGE) {
|
||||
// logical redo of end of NTA; no-op
|
||||
} else {
|
||||
undoUpdate(ce, e->LSN);
|
||||
// ughh; need to grab page here so that abort() can be atomic below...
|
||||
Page * p = loadPage(e->xid, ce->update.page);
|
||||
writelock(p->rwlatch,0);
|
||||
undoUpdate(ce, e->LSN, p);
|
||||
unlock(p->rwlatch);
|
||||
releasePage(p);
|
||||
}
|
||||
FreeLogEntry(ce);
|
||||
} break;
|
||||
|
@ -252,11 +257,26 @@ static void Undo(int recovery) {
|
|||
// we've finished physical undo for this op
|
||||
} else {
|
||||
DEBUG("physical update\n");
|
||||
|
||||
// atomically log (getting clr), and apply undo.
|
||||
// otherwise, there's a race where the page's LSN is
|
||||
// updated before we undo.
|
||||
Page* p = NULL;
|
||||
if(e->update.page != INVALID_PAGE) {
|
||||
p = loadPage(e->xid, e->update.page);
|
||||
writelock(p->rwlatch,0);
|
||||
}
|
||||
|
||||
// Log a CLR for this entry
|
||||
lsn_t clr_lsn = LogCLR(e);
|
||||
DEBUG("logged clr\n");
|
||||
|
||||
undoUpdate(e, clr_lsn);
|
||||
undoUpdate(e, clr_lsn, p);
|
||||
|
||||
if(p) {
|
||||
unlock(p->rwlatch);
|
||||
releasePage(p);
|
||||
}
|
||||
|
||||
DEBUG("rolled back clr's update\n");
|
||||
}
|
||||
|
@ -267,7 +287,7 @@ static void Undo(int recovery) {
|
|||
const LogEntry * ce = LogReadLSN(((CLRLogEntry*)e)->clr.compensated_lsn);
|
||||
if(ce->update.page == INVALID_PAGE) {
|
||||
DEBUG("logical clr\n");
|
||||
undoUpdate(ce, 0); // logical undo; effective LSN doesn't matter
|
||||
undoUpdate(ce, 0, 0); // logical undo; effective LSN doesn't matter
|
||||
} else {
|
||||
DEBUG("physical clr: op %d lsn %lld\n", ce->update.funcID, ce->LSN);
|
||||
// no-op. Already undone during redo. This would redo the original op.
|
||||
|
|
|
@ -277,12 +277,15 @@ static compensated_function void TactionHelper(int xid, recordid rid,
|
|||
}
|
||||
} end;
|
||||
|
||||
writelock(p->rwlatch,0);
|
||||
|
||||
e = LogUpdate(&XactionTable[xid % MAX_TRANSACTIONS], p, op, dat, datlen);
|
||||
assert(XactionTable[xid % MAX_TRANSACTIONS].prevLSN == e->LSN);
|
||||
DEBUG("Tupdate() e->LSN: %ld\n", e->LSN);
|
||||
doUpdate(e, p);
|
||||
FreeLogEntry(e);
|
||||
|
||||
unlock(p->rwlatch);
|
||||
}
|
||||
|
||||
// XXX remove this function once it's clear that nobody is failing the assert in Tupdate()
|
||||
|
@ -302,9 +305,10 @@ compensated_function void TupdateStr(int xid, recordid rid,
|
|||
compensated_function void Tupdate(int xid, recordid rid,
|
||||
const void *dat, size_t datlen, int op) {
|
||||
Page * p = loadPage(xid, rid.page);
|
||||
readlock(p->rwlatch,0);
|
||||
recordid rid2 = stasis_record_dereference(xid, p, rid);
|
||||
|
||||
assert(rid2.page == rid.page);
|
||||
unlock(p->rwlatch);
|
||||
|
||||
TactionHelper(xid, rid, dat, datlen, op, p);
|
||||
releasePage(p);
|
||||
|
@ -320,10 +324,14 @@ compensated_function void Tread(int xid, recordid rid, void * dat) {
|
|||
p = loadPage(xid, rid.page);
|
||||
} end;
|
||||
|
||||
readlock(p->rwlatch,0);
|
||||
|
||||
rid = stasis_record_dereference(xid, p, rid);
|
||||
if(rid.page != p->id) {
|
||||
unlock(p->rwlatch);
|
||||
releasePage(p);
|
||||
p = loadPage(xid, rid.page);
|
||||
readlock(p->rwlatch,0);
|
||||
}
|
||||
if(rid.size > BLOB_THRESHOLD_SIZE) {
|
||||
DEBUG("call readBlob %lld %lld %lld\n", (long long)rid.page, (long long)rid.slot, (long long)rid.size);
|
||||
|
@ -332,6 +340,7 @@ compensated_function void Tread(int xid, recordid rid, void * dat) {
|
|||
} else {
|
||||
stasis_record_read(xid, p, rid, dat);
|
||||
}
|
||||
unlock(p->rwlatch);
|
||||
releasePage(p);
|
||||
}
|
||||
|
||||
|
@ -340,8 +349,9 @@ compensated_function void TreadRaw(int xid, recordid rid, void * dat) {
|
|||
try {
|
||||
p = loadPage(xid, rid.page);
|
||||
} end;
|
||||
|
||||
readlock(p->rwlatch,0);
|
||||
stasis_record_read(xid, p, rid, dat);
|
||||
unlock(p->rwlatch);
|
||||
releasePage(p);
|
||||
}
|
||||
|
||||
|
|
|
@ -66,40 +66,17 @@ terms specified in this license.
|
|||
|
||||
#include <stasis/constants.h>
|
||||
#include <stasis/transactional.h>
|
||||
#include <stasis/logger/logEntry.h>
|
||||
#include <stasis/logger/logEntry.h>
|
||||
#include <stasis/bufferManager.h>
|
||||
#include <stasis/iterator.h>
|
||||
#include <stasis/arrayCollection.h>
|
||||
BEGIN_C_DECLS
|
||||
|
||||
|
||||
/**
|
||||
/**
|
||||
* function pointer that the operation will run
|
||||
*/
|
||||
//typedef int (*Function)(int xid, Page * p, slotid_t slot, int64_t arg_size, lsn_t lsn, const void *d);
|
||||
typedef int (*Function)(const LogEntry* e, Page * p); //, slotid_t slot, int64_t arg_size, lsn_t lsn, const void *d);
|
||||
|
||||
/**
|
||||
|
||||
*/
|
||||
|
||||
/**
|
||||
If the Operation struct's sizeofData is set to this value, then the
|
||||
size field of the recordid is used to determine the size of the
|
||||
argument passed into the operation.
|
||||
*/
|
||||
//#define SIZEOF_RECORD -1
|
||||
/**
|
||||
Logical log entries (such as those used by nested top actions
|
||||
have a null recordid, as they are not assoicated with a specific page
|
||||
|
||||
If a log entry is not associated with a specific page, the page id can
|
||||
be overloaded to hold the size of the associated log entry. Contrast
|
||||
this with the description of SIZEOF_RECORD, which is used when the
|
||||
operation uses a variable length argument, but is associated with
|
||||
a specfic page.
|
||||
*/
|
||||
//#define SIZEIS_PAGEID -2
|
||||
typedef int (*Function)(const LogEntry* e, Page * p);
|
||||
|
||||
typedef struct {
|
||||
/**
|
||||
|
@ -114,7 +91,7 @@ typedef struct {
|
|||
|
||||
- Periodically checkpoint, syncing the data store to disk, and
|
||||
writing a checkpoint operation. No writes can be serviced
|
||||
during the sync, and this implies 'no steal'. See:
|
||||
during the sync, and this implies 'no steal'. See:
|
||||
|
||||
@@inproceedings{ woo97accommodating,
|
||||
author = "Seung-Kyoon Woo and Myoung-Ho Kim and Yoon-Joon Lee",
|
||||
|
@ -127,7 +104,7 @@ typedef struct {
|
|||
for a more complex scheme involving a hybrid logical/physical
|
||||
logging system that does not implement steal.
|
||||
|
||||
The other option:
|
||||
The other option:
|
||||
|
||||
- Get rid of operations that span records entirely by
|
||||
splitting complex logical operations into simpler ones.
|
||||
|
@ -141,7 +118,6 @@ typedef struct {
|
|||
general, since other transactions could read dirty information
|
||||
from the pinned pages, producsing nonsensical log entries that
|
||||
preceed the current transaction's log entry.
|
||||
|
||||
*/
|
||||
/**
|
||||
index into operations table of undo function
|
||||
|
@ -174,13 +150,20 @@ typedef struct {
|
|||
|
||||
extern Operation operationsTable[]; /* [MAX_OPERATIONS]; memset somewhere */
|
||||
|
||||
/** Performs an operation during normal execution.
|
||||
/**
|
||||
Performs an operation during normal execution.
|
||||
|
||||
Does not write to the log, and assumes that the operation's
|
||||
results are not already in the buffer manager.
|
||||
*/
|
||||
|
||||
@param e the logentry to play forward. will be played forward regardless of lsn's
|
||||
|
||||
@param p the page the update should be applied to (no support for
|
||||
logical redo). p->rwlatch should be writelock()'ed
|
||||
*/
|
||||
void doUpdate(const LogEntry * e, Page * p);
|
||||
/** Undo the update under normal operation, and during recovery.
|
||||
/**
|
||||
Undo the update under normal operation, and during recovery.
|
||||
|
||||
For logical undo, this unconditionally executes the requested operation.
|
||||
|
||||
|
@ -189,9 +172,11 @@ void doUpdate(const LogEntry * e, Page * p);
|
|||
|
||||
@param e The log entry containing the operation to be undone.
|
||||
@param clr_lsn The lsn of the clr that records this undo operation.
|
||||
@param p Like doUpdate(), this function is called during forward operation,
|
||||
so p->rwlatch must be writelock()'ed
|
||||
*/
|
||||
void undoUpdate(const LogEntry * e, lsn_t clr_lsn);
|
||||
/**
|
||||
void undoUpdate(const LogEntry * e, lsn_t clr_lsn, Page * p);
|
||||
/**
|
||||
Redoes an operation during recovery. This is different than
|
||||
doUpdate because it checks to see if the operation needs to be redone
|
||||
before redoing it. (if(e->lsn > e->rid.lsn) { doUpdate(e); } return)
|
||||
|
@ -199,7 +184,8 @@ void undoUpdate(const LogEntry * e, lsn_t clr_lsn);
|
|||
Also, this is the only function in operations.h that can take
|
||||
either CLR or UPDATE log entries. The other functions can handle update entries.
|
||||
|
||||
Does not write to the log.
|
||||
Does not write to the log. No need for a page parameter, Stasis' recovery is
|
||||
single-threaded, so redoUpdate can latch the page itself.
|
||||
*/
|
||||
void redoUpdate(const LogEntry * e);
|
||||
|
||||
|
|
|
@ -202,10 +202,10 @@ void testFunctions(){
|
|||
Page * p1 = loadPage(xid, pageid1);
|
||||
|
||||
// calling functions
|
||||
|
||||
writelock(p1->rwlatch,0);
|
||||
initializeNewBTreeNode(xid, p1, rid1);
|
||||
insert(xid, p1, rid1, 3);
|
||||
|
||||
unlock(p1->rwlatch);
|
||||
|
||||
// cleaning up
|
||||
|
||||
|
@ -277,14 +277,15 @@ int SimpleExample(){
|
|||
rid2.slot = 0;
|
||||
|
||||
// @todo This is a messy way to do this...
|
||||
unlock(p1->rwlatch);
|
||||
|
||||
|
||||
stasis_record_write(xid, p1, 1, rid2, b1);
|
||||
stasis_record_read(xid, p1, rid2, b2);
|
||||
if (DEBUGP) { printf("\nb2** = %d\n",*((int *) b2));}
|
||||
|
||||
// initializeNewBTreeNode(p1, rid1);
|
||||
|
||||
unlock(p1->rwlatch);
|
||||
|
||||
releasePage(p1);
|
||||
Tcommit(xid);
|
||||
|
||||
|
|
|
@ -101,20 +101,24 @@ START_TEST(operation_physical_do_undo) {
|
|||
DEBUG("B\n");
|
||||
|
||||
p = loadPage(xid, rid.page);
|
||||
// manually fill in UNDO field
|
||||
//stasis_record_read(xid, p, rid, ((byte*)(setToTwo) + sizeofLogEntry(setToTwo) - rid.size));
|
||||
|
||||
writelock(p->rwlatch,0);
|
||||
// manually fill in UNDO field
|
||||
stasis_record_write(xid, p, lsn, rid, (byte*)&buf);
|
||||
unlock(p->rwlatch);
|
||||
releasePage(p);
|
||||
setToTwo->LSN = 10;
|
||||
|
||||
DEBUG("C\n");
|
||||
p = loadPage(xid, rid.page);
|
||||
writelock(p->rwlatch,0);
|
||||
doUpdate(setToTwo, p); /* PAGE LSN= 10, value = 2. */
|
||||
unlock(p->rwlatch);
|
||||
releasePage(p);
|
||||
|
||||
p = loadPage(xid, rid.page);
|
||||
writelock(p->rwlatch,0);
|
||||
stasis_record_read(xid, p, rid, (byte*)&buf);
|
||||
unlock(p->rwlatch);
|
||||
releasePage(p);
|
||||
|
||||
assert(buf == 2);
|
||||
|
@ -125,29 +129,33 @@ START_TEST(operation_physical_do_undo) {
|
|||
p = loadPage(xid, rid.page);
|
||||
readlock(p->rwlatch,0);
|
||||
assert(10 == stasis_page_lsn_read(p)); // "page lsn not set correctly."
|
||||
unlock(p->rwlatch);
|
||||
|
||||
setToTwo->LSN = 5;
|
||||
|
||||
undoUpdate(setToTwo, 12); //, p, 8); /* Should succeed: log LSN is lower than page LSN, but effective LSN is higher than page LSN */
|
||||
undoUpdate(setToTwo, 12, p); /* Should succeed: log LSN is lower than page LSN, but effective LSN is higher than page LSN */
|
||||
|
||||
unlock(p->rwlatch);
|
||||
releasePage(p);
|
||||
|
||||
p = loadPage(xid, rid.page);
|
||||
readlock(p->rwlatch,0);
|
||||
stasis_record_read(xid, p, rid, (byte*)&buf);
|
||||
unlock(p->rwlatch);
|
||||
releasePage(p);
|
||||
|
||||
assert(buf == 1);
|
||||
|
||||
|
||||
DEBUG("E\n");
|
||||
redoUpdate(setToTwo);
|
||||
|
||||
|
||||
p = loadPage(xid, rid.page);
|
||||
readlock(p->rwlatch,0);
|
||||
stasis_record_read(xid, p, rid, (byte*)&buf);
|
||||
unlock(p->rwlatch);
|
||||
releasePage(p);
|
||||
|
||||
assert(buf == 1);
|
||||
|
||||
|
||||
/* Now, simulate scenarios from normal operation:
|
||||
do the operation, and update the LSN, (update happens)
|
||||
then undo, and update the LSN again. (undo happens)
|
||||
|
@ -162,13 +170,14 @@ START_TEST(operation_physical_do_undo) {
|
|||
buf = 1;
|
||||
|
||||
p = loadPage(xid, rid.page);
|
||||
writelock(p->rwlatch,0);
|
||||
stasis_record_write(xid, p, lsn, rid, (byte*)&buf);
|
||||
unlock(p->rwlatch);
|
||||
releasePage(p);
|
||||
/* Trace of test:
|
||||
/* Trace of test:
|
||||
|
||||
PAGE LSN LOG LSN CLR LSN TYPE SUCCEED?
|
||||
|
||||
2 10 - do write YES (C)
|
||||
2 10 - do write YES (C)
|
||||
10 5 8 undo write YES (D)
|
||||
8 5 - redo write NO (E)
|
||||
8 10 - redo write YES (F)
|
||||
|
@ -190,14 +199,15 @@ START_TEST(operation_physical_do_undo) {
|
|||
redoUpdate(setToTwo);
|
||||
|
||||
p = loadPage(xid, rid.page);
|
||||
writelock(p->rwlatch,0);
|
||||
stasis_record_read(xid, p, rid, (byte*)&buf);
|
||||
assert(buf == 2);
|
||||
|
||||
DEBUG("G undo set to 2\n");
|
||||
undoUpdate(setToTwo, 20); //, p, 20); /* Succeeds -- 20 is the 'CLR' entry's lsn.*/
|
||||
undoUpdate(setToTwo, 20, p); /* Succeeds -- 20 is the 'CLR' entry's lsn.*/
|
||||
|
||||
stasis_record_read(xid, p, rid, (byte*)&buf);
|
||||
|
||||
unlock(p->rwlatch);
|
||||
assert(buf == 1);
|
||||
releasePage(p);
|
||||
|
||||
|
@ -205,7 +215,7 @@ START_TEST(operation_physical_do_undo) {
|
|||
redoUpdate(setToTwo); /* Fails */
|
||||
|
||||
p = loadPage(xid, rid.page);
|
||||
|
||||
writelock(p->rwlatch,0);
|
||||
stasis_record_read(xid, p, rid, (byte*)&buf);
|
||||
|
||||
assert(buf == 1);
|
||||
|
@ -214,12 +224,16 @@ START_TEST(operation_physical_do_undo) {
|
|||
|
||||
DEBUG("I redo set to 2\n");
|
||||
|
||||
unlock(p->rwlatch);
|
||||
releasePage(p);
|
||||
redoUpdate(setToTwo); /* Succeeds */
|
||||
p = loadPage(xid, rid.page);
|
||||
stasis_record_read(xid, p, rid, (byte*)&buf);
|
||||
|
||||
redoUpdate(setToTwo); /* Succeeds */
|
||||
|
||||
p = loadPage(xid, rid.page);
|
||||
readlock(p->rwlatch,0);
|
||||
stasis_record_read(xid, p, rid, (byte*)&buf);
|
||||
assert(buf == 2);
|
||||
unlock(p->rwlatch);
|
||||
releasePage(p);
|
||||
Tdeinit();
|
||||
}
|
||||
|
@ -233,25 +247,25 @@ START_TEST(operation_prepare) {
|
|||
/* Check this sequence prepare, action, crash, recover, read, action, abort, read again. */
|
||||
|
||||
Tinit();
|
||||
|
||||
|
||||
int loser = Tbegin();
|
||||
int prepared = Tbegin();
|
||||
int winner = Tbegin();
|
||||
|
||||
|
||||
recordid a = Talloc(winner, sizeof(int));
|
||||
recordid b = Talloc(winner, sizeof(int));
|
||||
|
||||
|
||||
int one =1;
|
||||
int two =2;
|
||||
int three=3;
|
||||
|
||||
Tset(winner, a, &one);
|
||||
Tset(winner, b, &one);
|
||||
|
||||
|
||||
Tset(loser, a, &three);
|
||||
Tset(prepared, b, &three);
|
||||
|
||||
Tprepare(prepared); //, a);
|
||||
Tprepare(prepared);
|
||||
|
||||
Tset(prepared, b, &two);
|
||||
|
||||
|
@ -416,7 +430,7 @@ START_TEST(operation_instant_set) {
|
|||
Tinit();
|
||||
|
||||
int xid = Tbegin();
|
||||
recordid rid = Talloc(xid, sizeof(int)); /** @todo probably need an immediate version of TpageAlloc... */
|
||||
recordid rid = Talloc(xid, sizeof(int)); // @todo probably need an immediate version of TpageAlloc...
|
||||
int one = 1;
|
||||
int two = 2;
|
||||
int three = 3;
|
||||
|
@ -447,7 +461,7 @@ START_TEST(operation_instant_set) {
|
|||
|
||||
|
||||
|
||||
} END_TEST
|
||||
} END_TEST
|
||||
|
||||
START_TEST(operation_set_range) {
|
||||
Tinit();
|
||||
|
@ -589,11 +603,18 @@ Suite * check_suite(void) {
|
|||
tcase_set_timeout(tc, 0); // disable timeouts
|
||||
|
||||
/* Sub tests are added, one per line, here */
|
||||
/*(void)operation_physical_do_undo;
|
||||
(void)operation_nestedTopAction;
|
||||
(void)operation_set_range;
|
||||
(void)operation_prepare;
|
||||
(void)operation_alloc_test;
|
||||
(void)operation_array_list;*/
|
||||
|
||||
tcase_add_test(tc, operation_physical_do_undo);
|
||||
tcase_add_test(tc, operation_nestedTopAction);
|
||||
tcase_add_test(tc, operation_instant_set);
|
||||
tcase_add_test(tc, operation_set_range);
|
||||
if(loggerType != LOG_TO_MEMORY) {
|
||||
if(loggerType != LOG_TO_MEMORY) {
|
||||
tcase_add_test(tc, operation_prepare);
|
||||
}
|
||||
tcase_add_test(tc, operation_alloc_test);
|
||||
|
|
|
@ -88,16 +88,15 @@ static void * multiple_simultaneous_pages ( void * arg_ptr) {
|
|||
unlock(p->rwlatch);
|
||||
pthread_mutex_unlock(&lsn_mutex);
|
||||
|
||||
if(! first ) {
|
||||
if(!first) {
|
||||
for(k = 0; k < 100; k++) {
|
||||
stasis_record_read(1, p, rid[k], (byte*)&j);
|
||||
|
||||
assert((j + 1) == i + k);
|
||||
writelock(p->rwlatch,0);
|
||||
stasis_record_read(1, p, rid[k], (byte*)&j);
|
||||
assert((j + 1) == i + k);
|
||||
stasis_record_free(-1, p, rid[k]);
|
||||
stasis_page_lsn_write(-1, p, this_lsn);
|
||||
unlock(p->rwlatch);
|
||||
sched_yield();
|
||||
sched_yield();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -175,9 +174,9 @@ static void* worker_thread(void * arg_ptr) {
|
|||
pthread_mutex_unlock(&lsn_mutex);
|
||||
|
||||
if(! first ) {
|
||||
writelock(p->rwlatch,0);
|
||||
stasis_record_read(1, p, rid, (byte*)&j);
|
||||
assert((j + 1) == i);
|
||||
writelock(p->rwlatch,0);
|
||||
stasis_record_free(-1, p, rid);
|
||||
stasis_page_lsn_write(-1, p, this_lsn);
|
||||
unlock(p->rwlatch);
|
||||
|
|
Loading…
Reference in a new issue