nested top action cleanup and bugfix; subsequent recovery calls could undo logical undos in some circumstances

This commit is contained in:
Sears Russell 2009-04-03 22:01:37 +00:00
parent fba041469f
commit 05077b62fd
7 changed files with 88 additions and 42 deletions

View file

@ -66,7 +66,7 @@ LogEntry * allocPrepareLogEntry(lsn_t prevLSN, int xid, lsn_t recLSN) {
*(lsn_t*)(((struct __raw_log_entry*)ret)+1)=recLSN;
return ret;
}
const byte * getUpdateArgs(const LogEntry * ret) {
const void * getUpdateArgs(const LogEntry * ret) {
assert(ret->type == UPDATELOG ||
ret->type == CLRLOG);
if(ret->update.arg_size == 0) {
@ -118,7 +118,6 @@ LogEntry * allocCLRLogEntry(const LogEntry * old_e) {
ret->xid = old_e->xid;
ret->type = CLRLOG;
DEBUG("compensates: %lld\n", old_e->LSN);
assert(old_e->LSN!=-1);
ret->clr.compensated_lsn = old_e->LSN;
return (LogEntry*)ret;

View file

@ -118,15 +118,18 @@ void stasis_operation_table_init() {
void stasis_operation_do(const LogEntry * e, Page * p) {
assert(p);
assertlocked(p->rwlatch);
assert(e->update.funcID != OPERATION_INVALID);
stasis_operation_table[e->update.funcID].run(e, p);
if(p) {
assertlocked(p->rwlatch);
assert(e->update.funcID != OPERATION_INVALID);
stasis_operation_table[e->update.funcID].run(e, p);
DEBUG("OPERATION xid %d Do, %lld {%lld:%lld}\n", e->xid,
e->LSN, e->update.page, stasis_page_lsn_read(p));
DEBUG("OPERATION xid %d Do, %lld {%lld:%lld}\n", e->xid,
e->LSN, e->update.page, stasis_page_lsn_read(p));
stasis_page_lsn_write(e->xid, p, e->LSN);
stasis_page_lsn_write(e->xid, p, e->LSN);
} else {
stasis_operation_table[e->update.funcID].run(e,NULL);
}
}
@ -149,7 +152,7 @@ void stasis_operation_redo(const LogEntry * e, Page * p) {
// Need to check the id field to find out what the REDO_action
// is for this log type.
// contrast with doUpdate(), which doesn't check the .redo field.
// contrast with stasis_operation_do(), which doesn't check the .redo field
assert(e->update.funcID != OPERATION_INVALID);
assert(stasis_operation_table[e->update.funcID].redo != OPERATION_INVALID);
stasis_operation_table[stasis_operation_table[e->update.funcID].redo]

View file

@ -18,7 +18,7 @@ static int op_page_set_range(const LogEntry* e, Page* p) {
assert(off+len <=PAGE_SIZE);
memcpy(p->memAddr + off, getUpdateArgs(e)+sizeof(int), len);
memcpy(p->memAddr + off, ((const byte*)getUpdateArgs(e))+sizeof(int), len);
return 0;
}
static int op_page_set_range_inverse(const LogEntry* e, Page* p) {
@ -30,7 +30,8 @@ static int op_page_set_range_inverse(const LogEntry* e, Page* p) {
assert(off+len <=PAGE_SIZE);
memcpy(p->memAddr + off, getUpdateArgs(e)+sizeof(int)+len, len);
memcpy(p->memAddr + off, ((const byte*)getUpdateArgs(e))+sizeof(int)+len,
len);
return 0;
}

View file

@ -199,22 +199,27 @@ static void Redo(stasis_log_t* log) {
} break;
case CLRLOG:
{
const LogEntry *ce =
log->read_entry(log,((CLRLogEntry*)e)->clr.compensated_lsn);
// if compensated_lsn == -1, then this clr is closing a nested top
// action that was performed during undo. Therefore, we do not
// want to undo it again.
if(-1 != ((CLRLogEntry*)e)->clr.compensated_lsn) {
const LogEntry *ce =
log->read_entry(log,((CLRLogEntry*)e)->clr.compensated_lsn);
if(ce->update.page == INVALID_PAGE) {
// logical redo of end of NTA; no-op
} else {
// need to grab latch page here so that Tabort() can be atomic
// below...
if(ce->update.page == INVALID_PAGE) {
// logical redo of end of NTA; no-op
} else {
// need to grab latch page here so that Tabort() can be atomic
// below...
Page * p = loadPage(e->xid, ce->update.page);
writelock(p->rwlatch,0);
stasis_operation_undo(ce, e->LSN, p);
unlock(p->rwlatch);
releasePage(p);
}
freeLogEntry(ce);
Page * p = loadPage(e->xid, ce->update.page);
writelock(p->rwlatch,0);
stasis_operation_undo(ce, e->LSN, p);
unlock(p->rwlatch);
releasePage(p);
}
freeLogEntry(ce);
}
} break;
case XCOMMIT:
{
@ -305,16 +310,24 @@ static void Undo(stasis_log_t* log, int recovery) {
}
case CLRLOG:
{
const LogEntry * ce
= log->read_entry(log, ((CLRLogEntry*)e)->clr.compensated_lsn);
if(ce->update.page == INVALID_PAGE) {
DEBUG("logical clr\n");
stasis_operation_undo(ce, 0, 0); // logical undo; effective LSN doesn't matter
} else {
DEBUG("physical clr: op %d lsn %lld\n", ce->update.funcID, ce->LSN);
// no-op. Already undone during redo. This would redo the original op.
}
freeLogEntry(ce);
if(-1 != ((CLRLogEntry*)e)->clr.compensated_lsn) {
const LogEntry * ce
= log->read_entry(log, ((CLRLogEntry*)e)->clr.compensated_lsn);
if(ce->update.page == INVALID_PAGE) {
DEBUG("logical clr\n");
// logical undo; effective LSN doesn't matter
stasis_operation_undo(ce, 0, 0);
// compensated_lsn = -1 -> that the logical undo is a NOOP.
// that way, we don't undo this operation twice.
LogDummyCLR(log, ce->xid, ce->prevLSN, -1);
} else {
DEBUG("physical clr: op %d lsn %lld\n",
ce->update.funcID, ce->LSN);
// no-op. Already undone during redo.
// This would redo the original op.
}
freeLogEntry(ce);
}
}
break;
case XABORT:

View file

@ -593,11 +593,25 @@ typedef struct {
lsn_t compensated_lsn;
} stasis_nta_handle;
/** @todo TbeginNestedTopAction's API might not be quite right.
Are there cases where we need to pass a recordid in?
int TnestedTopAction(int xid, int op, const byte * dat, size_t datSize) {
assert(xid >= 0);
LogEntry * e = LogUpdate(stasis_log_file,
&XactionTable[xid % MAX_TRANSACTIONS],
NULL, op, dat, datSize);
lsn_t prev_lsn = e->prevLSN;
lsn_t compensated_lsn = e->LSN;
stasis_operation_do(e, NULL);
freeLogEntry(e);
lsn_t clrLSN = LogDummyCLR(stasis_log_file, xid, prev_lsn, compensated_lsn);
XactionTable[xid % MAX_TRANSACTIONS].prevLSN = clrLSN;
return 0;
}
@return a handle that must be passed into TendNestedTopAction
*/
void * TbeginNestedTopAction(int xid, int op, const byte * dat, int datSize) {
assert(xid >= 0);
LogEntry * e = LogUpdate(stasis_log_file,

View file

@ -132,9 +132,10 @@ void freeLogEntry(const LogEntry * e);
*/
lsn_t sizeofLogEntry(const LogEntry * e);
/**
@todo Remove explicit casts from getUpdateArgs calls (so we don't accidentally strip the const).
@return the operation's arguments.
*/
const byte * getUpdateArgs(const LogEntry * e);
const void * getUpdateArgs(const LogEntry * e);
lsn_t getPrepareRecLSN(const LogEntry *e);

View file

@ -706,18 +706,33 @@ void Trevive(int xid, lsn_t prevlsn, lsn_t reclsn);
*/
int Tprepare(int xid);
/**
* Begin a nested top action
*
* Nested Top Actions allow you to register logical undo operations
* for data structure manipulation. This is generally a prerequisite
* to concurrent transaction systems.
*
* @see ex3.c for an example of nested top actions.
*/
int TnestedTopAction(int xid, int op, const byte * arg, size_t arg_len);
/**
* Begin a nested top action
*
* Nested top actions provide atomic updates to multiple pages within
* a single transaction. LLADD's nested top actions may be nested
* a single transaction. Stasis's nested top actions may be nested
* within each other.
*
* @see TnestedTopAction() is less expressive, but much more convenient.
*/
void * TbeginNestedTopAction(int xid, int op, const byte* log_arguments,
int log_arguments_length);
/**
* Complete a nested top action, atomically switching from physical to
* logical undo.
*
* @see TnestedTopAction() is less expressive, but much more convenient.
*/
lsn_t TendNestedTopAction(int xid, void * handle);