Cleanup and refactoring for deferred updates. CLR's were broken; Tdefer requires them, so it is

only partially implemented.
This commit is contained in:
Sears Russell 2007-03-30 09:16:21 +00:00
parent 06de4dca0a
commit 8275eb71be
8 changed files with 170 additions and 54 deletions

View file

@ -199,10 +199,11 @@ extern const short SLOT_TYPE_LENGTHS[];
(recovery, abort) should be prepared to accept and ignore these (recovery, abort) should be prepared to accept and ignore these
entries. */ entries. */
#define INTERNALLOG 0 #define INTERNALLOG 0
#define UPDATELOG 1 #define XBEGIN 1
#define XBEGIN 2 #define XCOMMIT 2
#define XCOMMIT 3 #define XABORT 3
#define XABORT 4 #define UPDATELOG 4
#define DEFERLOG 5
/** /**
XEND is used for after the pages touched by a transaction have XEND is used for after the pages touched by a transaction have
been flushed to stable storage. been flushed to stable storage.

View file

@ -110,11 +110,22 @@ typedef struct {
*/ */
LogEntry * allocCommonLogEntry(lsn_t prevLSN, int xid, unsigned int type); LogEntry * allocCommonLogEntry(lsn_t prevLSN, int xid, unsigned int type);
/** /**
Allocate a log entry with extra payload info.(eg: Tupdate, Talloc, etc) Allocate a log entry associated with an operation implemention. This
is usually called inside of Tupdate().
*/ */
LogEntry * allocUpdateLogEntry(lsn_t prevLSN, int xid, LogEntry * allocUpdateLogEntry(lsn_t prevLSN, int xid,
unsigned int operation, recordid rid, unsigned int operation, recordid rid,
const byte * args, unsigned int argSize, const byte * preImage); const byte * args, unsigned int argSize,
const byte * preImage);
/**
Alloc a deferred log entry. This is just like allocUpdateLogEntry(), except
the log entry's type will be DEFERLOG instead UPDATELOG. This is usually
called inside of Tdefer().
*/
LogEntry * allocDeferredLogEntry(lsn_t prevLSN, int xid,
unsigned int operation, recordid rid,
const byte * args, unsigned int argSize,
const byte * preImage);
/** /**
Allocate a CLR entry. These are written during recovery to Allocate a CLR entry. These are written during recovery to
indicate that the stable copy of the store file reflects the state indicate that the stable copy of the store file reflects the state

View file

@ -144,9 +144,20 @@ lsn_t LogTransCommit(TransactionLog * l);
lsn_t LogTransAbort(TransactionLog * l); lsn_t LogTransAbort(TransactionLog * l);
/** /**
LogUpdate writes an UPDATE log record to the log tail LogUpdate writes an UPDATELOG log record to the log tail. It also interprets
its operation argument to the extent necessary for allocating and laying out
the log entry. Finally, it updates the state of the parameter l.
*/ */
LogEntry * LogUpdate(TransactionLog * l, Page * p, recordid rid, int operation, const byte * args); LogEntry * LogUpdate(TransactionLog * l, Page * p, recordid rid, int operation,
const byte * args);
/**
LogDeferred writes a DEFERLOG log record to the log tail
@see LogUpdate is analagous to this function, but wrutes UPDATELOG entries
instead.
*/
LogEntry * LogDeferred(TransactionLog * l, Page * p, recordid rid, int operation,
const byte * args);
/** /**
Any LogEntry that is returned by a function in logger2.h or Any LogEntry that is returned by a function in logger2.h or

View file

@ -60,7 +60,7 @@ LogEntry * allocCommonLogEntry(lsn_t prevLSN, int xid, unsigned int type) {
} }
const byte * getUpdateArgs(const LogEntry * ret) { const byte * getUpdateArgs(const LogEntry * ret) {
assert(ret->type == UPDATELOG); assert(ret->type == UPDATELOG || ret->type == DEFERLOG);
if(ret->contents.update.argSize == 0) { if(ret->contents.update.argSize == 0) {
return NULL; return NULL;
} else { } else {
@ -69,7 +69,7 @@ const byte * getUpdateArgs(const LogEntry * ret) {
} }
const byte * getUpdatePreImage(const LogEntry * ret) { const byte * getUpdatePreImage(const LogEntry * ret) {
assert(ret->type == UPDATELOG); assert(ret->type == UPDATELOG || ret->type == DEFERLOG);
if(operationsTable[ret->contents.update.funcID].undo != NO_INVERSE && if(operationsTable[ret->contents.update.funcID].undo != NO_INVERSE &&
operationsTable[ret->contents.update.funcID].undo != NO_INVERSE_WHOLE_PAGE) { operationsTable[ret->contents.update.funcID].undo != NO_INVERSE_WHOLE_PAGE) {
return NULL; return NULL;
@ -80,7 +80,8 @@ const byte * getUpdatePreImage(const LogEntry * ret) {
LogEntry * allocUpdateLogEntry(lsn_t prevLSN, int xid, LogEntry * allocUpdateLogEntry(lsn_t prevLSN, int xid,
unsigned int funcID, recordid rid, unsigned int funcID, recordid rid,
const byte * args, unsigned int argSize, const byte * preImage) { const byte * args, unsigned int argSize,
const byte * preImage) {
int invertible = operationsTable[funcID].undo != NO_INVERSE; int invertible = operationsTable[funcID].undo != NO_INVERSE;
int whole_page_phys = operationsTable[funcID].undo == NO_INVERSE_WHOLE_PAGE; int whole_page_phys = operationsTable[funcID].undo == NO_INVERSE_WHOLE_PAGE;
@ -111,6 +112,15 @@ LogEntry * allocUpdateLogEntry(lsn_t prevLSN, int xid,
} }
LogEntry * allocDeferredLogEntry(lsn_t prevLSN, int xid,
unsigned int funcID, recordid rid,
const byte * args, unsigned int argSize,
const byte * preImage) {
LogEntry * ret = allocUpdateLogEntry(prevLSN, xid, funcID, rid, args, argSize,
preImage);
ret->type = DEFERLOG;
return ret;
}
LogEntry * allocCLRLogEntry (lsn_t prevLSN, int xid, LogEntry * allocCLRLogEntry (lsn_t prevLSN, int xid,
lsn_t thisUpdateLSN, recordid rid, lsn_t undoNextLSN) { lsn_t thisUpdateLSN, recordid rid, lsn_t undoNextLSN) {
LogEntry * ret = malloc(sizeof(struct __raw_log_entry) + sizeof(CLRLogEntry)); LogEntry * ret = malloc(sizeof(struct __raw_log_entry) + sizeof(CLRLogEntry));
@ -132,7 +142,8 @@ long sizeofLogEntry(const LogEntry * log) {
switch (log->type) { switch (log->type) {
case CLRLOG: case CLRLOG:
return sizeof(struct __raw_log_entry) + sizeof(CLRLogEntry); return sizeof(struct __raw_log_entry) + sizeof(CLRLogEntry);
case UPDATELOG: case UPDATELOG:
case DEFERLOG:
return sizeof(struct __raw_log_entry) + sizeof(UpdateLogEntry) + log->contents.update.argSize + return sizeof(struct __raw_log_entry) + sizeof(UpdateLogEntry) + log->contents.update.argSize +
((operationsTable[log->contents.update.funcID].undo == NO_INVERSE) ? physical_slot_length(log->contents.update.rid.size) : 0) + ((operationsTable[log->contents.update.funcID].undo == NO_INVERSE) ? physical_slot_length(log->contents.update.rid.size) : 0) +
((operationsTable[log->contents.update.funcID].undo == NO_INVERSE_WHOLE_PAGE) ? PAGE_SIZE : 0) ; ((operationsTable[log->contents.update.funcID].undo == NO_INVERSE_WHOLE_PAGE) ? PAGE_SIZE : 0) ;

View file

@ -295,7 +295,8 @@ lsn_t LogTransAbort(TransactionLog * l) {
here? Shouldn't it be in logEntry.c, or perhaps with other code here? Shouldn't it be in logEntry.c, or perhaps with other code
that reasons about the various operation types? that reasons about the various operation types?
*/ */
LogEntry * LogUpdate(TransactionLog * l, Page * p, recordid rid, int operation, const byte * args) { static LogEntry * LogAction(TransactionLog * l, Page * p, recordid rid, int operation,
const byte * args, int deferred) {
void * preImage = NULL; void * preImage = NULL;
long argSize = 0; long argSize = 0;
LogEntry * e; LogEntry * e;
@ -322,7 +323,13 @@ LogEntry * LogUpdate(TransactionLog * l, Page * p, recordid rid, int operation,
DEBUG("No pre-image"); DEBUG("No pre-image");
} }
e = allocUpdateLogEntry(l->prevLSN, l->xid, operation, rid, args, argSize, preImage); if(!deferred) {
e = allocUpdateLogEntry(l->prevLSN, l->xid, operation, rid, args, argSize,
preImage);
} else {
e = allocDeferredLogEntry(l->prevLSN, l->xid, operation, rid, args, argSize,
preImage);
}
LogWrite(e); LogWrite(e);
DEBUG("Log Update %d, LSN: %ld type: %ld (prevLSN %ld) (argSize %ld)\n", e->xid, DEBUG("Log Update %d, LSN: %ld type: %ld (prevLSN %ld) (argSize %ld)\n", e->xid,
@ -336,6 +343,16 @@ LogEntry * LogUpdate(TransactionLog * l, Page * p, recordid rid, int operation,
return e; return e;
} }
LogEntry * LogUpdate(TransactionLog * l, Page * p, recordid rid, int operation,
const byte * args) {
return LogAction(l, p, rid, operation, args, 0); // 0 -> not deferred
}
LogEntry * LogDeferred(TransactionLog * l, Page * p, recordid rid, int operation,
const byte * args) {
return LogAction(l, p, rid, operation, args, 1); // 1 -> deferred
}
lsn_t LogCLR(int xid, lsn_t LSN, recordid rid, lsn_t prevLSN) { lsn_t LogCLR(int xid, lsn_t LSN, recordid rid, lsn_t prevLSN) {
lsn_t ret; lsn_t ret;
LogEntry * e = allocCLRLogEntry(-1, xid, LSN, rid, prevLSN); LogEntry * e = allocCLRLogEntry(-1, xid, LSN, rid, prevLSN);

View file

@ -61,6 +61,9 @@ void doUpdate(const LogEntry * e, Page * p) {
} }
/**
@todo redoUpdate()'s CLR handling is messy, at best; broken at worst.
*/
void redoUpdate(const LogEntry * e) { void redoUpdate(const LogEntry * e) {
if(e->type == UPDATELOG) { if(e->type == UPDATELOG) {
/* lsn_t pageLSN = readLSN(e->contents.update.rid.page); */ /* lsn_t pageLSN = readLSN(e->contents.update.rid.page); */

View file

@ -125,6 +125,8 @@ static void Analysis () {
pblHtRemove(transactionLSN, &(e->xid), sizeof(int)); pblHtRemove(transactionLSN, &(e->xid), sizeof(int));
break; break;
case UPDATELOG: case UPDATELOG:
// XXX we should treat CLR's like REDO's, but things don't work
// that way yet.
case CLRLOG: case CLRLOG:
/* /*
If the last record we see for a transaction is an update or clr, If the last record we see for a transaction is an update or clr,
@ -139,11 +141,16 @@ static void Analysis () {
addSortedVal(&rollbackLSNs, e->LSN); addSortedVal(&rollbackLSNs, e->LSN);
break; break;
case XABORT: case XABORT:
/* Don't want this XID in the list of rolled back lsn's since /* If the last record we see for a transaction is an abort, then
this XACT will be rolled back during redo. */ the transaction didn't commit, and must be rolled back.
*/
DEBUG("Adding %ld\n", e->LSN);
addSortedVal(&rollbackLSNs, e->LSN);
break; break;
case INTERNALLOG: case INTERNALLOG:
/* Created by the logger, just ignore it. */ /* Created by the logger, just ignore it. */
// Make sure the log entry doesn't interfere with real xacts.
assert(e->xid == INVALID_XID);
break; break;
default: default:
abort(); abort();
@ -161,26 +168,46 @@ static void Redo() {
/* Check to see if this log entry is part of a transaction that needs to be redone. */ /* Check to see if this log entry is part of a transaction that needs to be redone. */
if(pblHtLookup(transactionLSN, &(e->xid), sizeof(int)) != NULL) { if(pblHtLookup(transactionLSN, &(e->xid), sizeof(int)) != NULL) {
/* Check to see if this log entry contains an action that needs to be redone. */ // Check to see if this entry's action needs to be redone
if(e->type == UPDATELOG || switch(e->type) {
e->type == CLRLOG) { case UPDATELOG:
/* redoUpdate checks the page that contains e->rid, so we case CLRLOG:
don't need to check to see if the page is newer than this {
log entry. */ // redoUpdate checks the page that contains e->rid, so we
if(e->type == UPDATELOG) { // don't need to check to see if the page is newer than this
/* addPendingEvent(e->contents.update.rid.page); */ // log entry.
} else { redoUpdate(e);
/* addPendingEvent(e->contents.clr.rid.page); */ FreeLogEntry(e);
} } break;
redoUpdate(e); case DEFERLOG:
} else if(e->type == XCOMMIT && globalLockManager.commit) { {
globalLockManager.commit(e->xid); //XXX deferred_push(e);
} // if transaction aborted, wait until undo is complete before notifying the globalLockManager. } break;
} case XCOMMIT:
FreeLogEntry(e); {
if(globalLockManager.commit)
globalLockManager.commit(e->xid);
FreeLogEntry(e);
} break;
case XABORT:
{
// wait until undo is complete before informing the lock manager
FreeLogEntry(e);
} break;
case INTERNALLOG:
{
FreeLogEntry(e);
} break;
default:
abort();
}
}
} }
} }
/**
XXX
@todo CLR handling seems to be broken for logical operations!
*/
static void Undo(int recovery) { static void Undo(int recovery) {
LogHandle lh; LogHandle lh;
void * prepare_guard_state; void * prepare_guard_state;
@ -234,11 +261,17 @@ static void Undo(int recovery) {
} else { } else {
// The log entry is not associated with a particular page. // The log entry is not associated with a particular page.
// (Therefore, it must be an idempotent logical log entry.) // (Therefore, it must be an idempotent logical log entry.)
clr_lsn = LogCLR(e->xid, e->LSN, e->contents.update.rid, e->prevLSN); clr_lsn = LogCLR(e->xid, e->LSN, e->contents.update.rid,
e->prevLSN);
undoUpdate(e, NULL, clr_lsn); undoUpdate(e, NULL, clr_lsn);
} }
break; break;
} }
case DEFERLOG:
// The transaction is aborting, so it never committed. Therefore
// actions deferred to commit have never been applied; ignore this
// log entry.
break;
case CLRLOG: case CLRLOG:
/* Don't need to do anything special to handle CLR's. /* Don't need to do anything special to handle CLR's.
Iterator will correctly jump to clr's previous undo record. */ Iterator will correctly jump to clr's previous undo record. */
@ -248,7 +281,9 @@ static void Undo(int recovery) {
records may be passed in by undoTrans.)*/ records may be passed in by undoTrans.)*/
break; break;
default: default:
printf ("Unknown log type to undo (TYPE=%d, XID= %d, LSN=%lld), skipping...\n", e->type, e->xid, e->LSN); print
("Unknown log type to undo (TYPE=%d,XID= %d,LSN=%lld), skipping...\n",
e->type, e->xid, e->LSN);
break; break;
} }
FreeLogEntry(e); FreeLogEntry(e);

View file

@ -199,7 +199,9 @@ int Tbegin() {
return XactionTable[index].xid; return XactionTable[index].xid;
} }
static compensated_function void TupdateHelper(int xid, recordid rid, const void * dat, int op, Page * p) { static compensated_function void TactionHelper(int xid, recordid rid,
const void * dat, int op,
Page * p, int deferred) {
LogEntry * e; LogEntry * e;
assert(xid >= 0); assert(xid >= 0);
try { try {
@ -207,40 +209,65 @@ static compensated_function void TupdateHelper(int xid, recordid rid, const void
globalLockManager.writeLockPage(xid, rid.page); globalLockManager.writeLockPage(xid, rid.page);
} }
} end; } end;
if(! deferred) {
e = LogUpdate(&XactionTable[xid % MAX_TRANSACTIONS], p, rid, op, dat);
assert(XactionTable[xid % MAX_TRANSACTIONS].prevLSN == e->LSN);
DEBUG("Tupdate() e->LSN: %ld\n", e->LSN);
doUpdate(e, p);
FreeLogEntry(e);
} else {
e = LogDeferred(&XactionTable[xid % MAX_TRANSACTIONS], p, rid, op, dat);
assert(XactionTable[xid % MAX_TRANSACTIONS].prevLSN == e->LSN);
DEBUG("Deferring e->LSN: %ld\n", e->LSN);
// XXX update XactionTable...
//XXX deferred_push(e);
}
e = LogUpdate(&XactionTable[xid % MAX_TRANSACTIONS], p, rid, op, dat);
assert(XactionTable[xid % MAX_TRANSACTIONS].prevLSN == e->LSN);
DEBUG("T update() e->LSN: %ld\n", e->LSN);
doUpdate(e, p);
FreeLogEntry(e); }
static recordid resolveForUpdate(int xid, Page * p, recordid rid) {
if(*page_type_ptr(p) == INDIRECT_PAGE) {
rid = dereferenceRID(xid, rid);
} else if(*page_type_ptr(p) == ARRAY_LIST_PAGE) {
rid = dereferenceArrayListRid(p, rid.slot);
}
return rid;
} }
compensated_function void TupdateRaw(int xid, recordid rid, compensated_function void TupdateRaw(int xid, recordid rid,
const void * dat, int op) { const void * dat, int op) {
assert(xid >= 0); assert(xid >= 0);
Page * p = loadPage(xid, rid.page); Page * p = loadPage(xid, rid.page);
TupdateHelper(xid, rid, dat, op, p); TactionHelper(xid, rid, dat, op, p, 0); // 0 -> not deferred
releasePage(p); releasePage(p);
} }
compensated_function void Tupdate(int xid, recordid rid, compensated_function void Tupdate(int xid, recordid rid,
const void *dat, int op) { const void *dat, int op) {
Page * p = loadPage(xid, rid.page); Page * p = loadPage(xid, rid.page);
if(*page_type_ptr(p) == INDIRECT_PAGE) { rid = resolveForUpdate(xid, p, rid);
rid = dereferenceRID(xid, rid);
} else if(*page_type_ptr(p) == ARRAY_LIST_PAGE) {
rid = dereferenceArrayListRid(p, rid.slot);
}
if(p->id != rid.page) { if(p->id != rid.page) {
releasePage(p); releasePage(p);
p = loadPage(xid, rid.page); p = loadPage(xid, rid.page);
} }
TupdateHelper(xid, rid, dat, op, p);
TactionHelper(xid, rid, dat, op, p, 0); // 0 -> not deferred
releasePage(p);
}
compensated_function void Tdefer(int xid, recordid rid,
const void * dat, int op) {
Page * p = loadPage(xid, rid.page);
recordid newrid = resolveForUpdate(xid, p, rid);
// Caller cannot rely on late or early binding of rid.
assert(rid.page == newrid.page &&
rid.slot == newrid.slot &&
rid.size == newrid.size);
TactionHelper(xid, rid, dat, op, p, 1); // 1 -> deferred.
releasePage(p); releasePage(p);
} }