diff --git a/lladd/constants.h b/lladd/constants.h index 1be6142..3f57184 100644 --- a/lladd/constants.h +++ b/lladd/constants.h @@ -199,10 +199,11 @@ extern const short SLOT_TYPE_LENGTHS[]; (recovery, abort) should be prepared to accept and ignore these entries. */ #define INTERNALLOG 0 -#define UPDATELOG 1 -#define XBEGIN 2 -#define XCOMMIT 3 -#define XABORT 4 +#define XBEGIN 1 +#define XCOMMIT 2 +#define XABORT 3 +#define UPDATELOG 4 +#define DEFERLOG 5 /** XEND is used for after the pages touched by a transaction have been flushed to stable storage. diff --git a/lladd/logger/logEntry.h b/lladd/logger/logEntry.h index 10cdc1f..0240356 100644 --- a/lladd/logger/logEntry.h +++ b/lladd/logger/logEntry.h @@ -110,11 +110,22 @@ typedef struct { */ LogEntry * allocCommonLogEntry(lsn_t prevLSN, int xid, unsigned int type); /** - Allocate a log entry with extra payload info.(eg: Tupdate, Talloc, etc) + Allocate a log entry associated with an operation implemention. This + is usually called inside of Tupdate(). */ LogEntry * allocUpdateLogEntry(lsn_t prevLSN, int xid, unsigned int operation, recordid rid, - const byte * args, unsigned int argSize, const byte * preImage); + const byte * args, unsigned int argSize, + const byte * preImage); +/** + Alloc a deferred log entry. This is just like allocUpdateLogEntry(), except + the log entry's type will be DEFERLOG instead UPDATELOG. This is usually + called inside of Tdefer(). +*/ +LogEntry * allocDeferredLogEntry(lsn_t prevLSN, int xid, + unsigned int operation, recordid rid, + const byte * args, unsigned int argSize, + const byte * preImage); /** Allocate a CLR entry. These are written during recovery to indicate that the stable copy of the store file reflects the state diff --git a/lladd/logger/logger2.h b/lladd/logger/logger2.h index 1ff0b33..f86addc 100644 --- a/lladd/logger/logger2.h +++ b/lladd/logger/logger2.h @@ -144,9 +144,20 @@ lsn_t LogTransCommit(TransactionLog * l); lsn_t LogTransAbort(TransactionLog * l); /** - LogUpdate writes an UPDATE log record to the log tail + LogUpdate writes an UPDATELOG log record to the log tail. It also interprets + its operation argument to the extent necessary for allocating and laying out + the log entry. Finally, it updates the state of the parameter l. */ -LogEntry * LogUpdate(TransactionLog * l, Page * p, recordid rid, int operation, const byte * args); +LogEntry * LogUpdate(TransactionLog * l, Page * p, recordid rid, int operation, + const byte * args); +/** + LogDeferred writes a DEFERLOG log record to the log tail + + @see LogUpdate is analagous to this function, but wrutes UPDATELOG entries + instead. +*/ +LogEntry * LogDeferred(TransactionLog * l, Page * p, recordid rid, int operation, + const byte * args); /** Any LogEntry that is returned by a function in logger2.h or diff --git a/src/lladd/logger/logEntry.c b/src/lladd/logger/logEntry.c index 684b4f5..61adfd6 100644 --- a/src/lladd/logger/logEntry.c +++ b/src/lladd/logger/logEntry.c @@ -60,7 +60,7 @@ LogEntry * allocCommonLogEntry(lsn_t prevLSN, int xid, unsigned int type) { } const byte * getUpdateArgs(const LogEntry * ret) { - assert(ret->type == UPDATELOG); + assert(ret->type == UPDATELOG || ret->type == DEFERLOG); if(ret->contents.update.argSize == 0) { return NULL; } else { @@ -69,7 +69,7 @@ const byte * getUpdateArgs(const LogEntry * ret) { } const byte * getUpdatePreImage(const LogEntry * ret) { - assert(ret->type == UPDATELOG); + assert(ret->type == UPDATELOG || ret->type == DEFERLOG); if(operationsTable[ret->contents.update.funcID].undo != NO_INVERSE && operationsTable[ret->contents.update.funcID].undo != NO_INVERSE_WHOLE_PAGE) { return NULL; @@ -80,7 +80,8 @@ const byte * getUpdatePreImage(const LogEntry * ret) { LogEntry * allocUpdateLogEntry(lsn_t prevLSN, int xid, unsigned int funcID, recordid rid, - const byte * args, unsigned int argSize, const byte * preImage) { + const byte * args, unsigned int argSize, + const byte * preImage) { int invertible = operationsTable[funcID].undo != NO_INVERSE; int whole_page_phys = operationsTable[funcID].undo == NO_INVERSE_WHOLE_PAGE; @@ -111,6 +112,15 @@ LogEntry * allocUpdateLogEntry(lsn_t prevLSN, int xid, } +LogEntry * allocDeferredLogEntry(lsn_t prevLSN, int xid, + unsigned int funcID, recordid rid, + const byte * args, unsigned int argSize, + const byte * preImage) { + LogEntry * ret = allocUpdateLogEntry(prevLSN, xid, funcID, rid, args, argSize, + preImage); + ret->type = DEFERLOG; + return ret; +} LogEntry * allocCLRLogEntry (lsn_t prevLSN, int xid, lsn_t thisUpdateLSN, recordid rid, lsn_t undoNextLSN) { LogEntry * ret = malloc(sizeof(struct __raw_log_entry) + sizeof(CLRLogEntry)); @@ -132,7 +142,8 @@ long sizeofLogEntry(const LogEntry * log) { switch (log->type) { case CLRLOG: return sizeof(struct __raw_log_entry) + sizeof(CLRLogEntry); - case UPDATELOG: + case UPDATELOG: + case DEFERLOG: return sizeof(struct __raw_log_entry) + sizeof(UpdateLogEntry) + log->contents.update.argSize + ((operationsTable[log->contents.update.funcID].undo == NO_INVERSE) ? physical_slot_length(log->contents.update.rid.size) : 0) + ((operationsTable[log->contents.update.funcID].undo == NO_INVERSE_WHOLE_PAGE) ? PAGE_SIZE : 0) ; diff --git a/src/lladd/logger/logger2.c b/src/lladd/logger/logger2.c index 72489ea..356ef23 100644 --- a/src/lladd/logger/logger2.c +++ b/src/lladd/logger/logger2.c @@ -295,7 +295,8 @@ lsn_t LogTransAbort(TransactionLog * l) { here? Shouldn't it be in logEntry.c, or perhaps with other code that reasons about the various operation types? */ -LogEntry * LogUpdate(TransactionLog * l, Page * p, recordid rid, int operation, const byte * args) { +static LogEntry * LogAction(TransactionLog * l, Page * p, recordid rid, int operation, + const byte * args, int deferred) { void * preImage = NULL; long argSize = 0; LogEntry * e; @@ -322,7 +323,13 @@ LogEntry * LogUpdate(TransactionLog * l, Page * p, recordid rid, int operation, DEBUG("No pre-image"); } - e = allocUpdateLogEntry(l->prevLSN, l->xid, operation, rid, args, argSize, preImage); + if(!deferred) { + e = allocUpdateLogEntry(l->prevLSN, l->xid, operation, rid, args, argSize, + preImage); + } else { + e = allocDeferredLogEntry(l->prevLSN, l->xid, operation, rid, args, argSize, + preImage); + } LogWrite(e); DEBUG("Log Update %d, LSN: %ld type: %ld (prevLSN %ld) (argSize %ld)\n", e->xid, @@ -336,6 +343,16 @@ LogEntry * LogUpdate(TransactionLog * l, Page * p, recordid rid, int operation, return e; } +LogEntry * LogUpdate(TransactionLog * l, Page * p, recordid rid, int operation, + const byte * args) { + return LogAction(l, p, rid, operation, args, 0); // 0 -> not deferred +} +LogEntry * LogDeferred(TransactionLog * l, Page * p, recordid rid, int operation, + const byte * args) { + return LogAction(l, p, rid, operation, args, 1); // 1 -> deferred +} + + lsn_t LogCLR(int xid, lsn_t LSN, recordid rid, lsn_t prevLSN) { lsn_t ret; LogEntry * e = allocCLRLogEntry(-1, xid, LSN, rid, prevLSN); diff --git a/src/lladd/operations.c b/src/lladd/operations.c index f011058..6fb7690 100644 --- a/src/lladd/operations.c +++ b/src/lladd/operations.c @@ -61,6 +61,9 @@ void doUpdate(const LogEntry * e, Page * p) { } +/** + @todo redoUpdate()'s CLR handling is messy, at best; broken at worst. + */ void redoUpdate(const LogEntry * e) { if(e->type == UPDATELOG) { /* lsn_t pageLSN = readLSN(e->contents.update.rid.page); */ diff --git a/src/lladd/recovery2.c b/src/lladd/recovery2.c index cce2620..eca84fe 100644 --- a/src/lladd/recovery2.c +++ b/src/lladd/recovery2.c @@ -125,6 +125,8 @@ static void Analysis () { pblHtRemove(transactionLSN, &(e->xid), sizeof(int)); break; case UPDATELOG: + // XXX we should treat CLR's like REDO's, but things don't work + // that way yet. case CLRLOG: /* If the last record we see for a transaction is an update or clr, @@ -139,11 +141,16 @@ static void Analysis () { addSortedVal(&rollbackLSNs, e->LSN); break; case XABORT: - /* Don't want this XID in the list of rolled back lsn's since - this XACT will be rolled back during redo. */ + /* If the last record we see for a transaction is an abort, then + the transaction didn't commit, and must be rolled back. + */ + DEBUG("Adding %ld\n", e->LSN); + addSortedVal(&rollbackLSNs, e->LSN); break; case INTERNALLOG: /* Created by the logger, just ignore it. */ + // Make sure the log entry doesn't interfere with real xacts. + assert(e->xid == INVALID_XID); break; default: abort(); @@ -161,26 +168,46 @@ static void Redo() { /* Check to see if this log entry is part of a transaction that needs to be redone. */ if(pblHtLookup(transactionLSN, &(e->xid), sizeof(int)) != NULL) { - /* Check to see if this log entry contains an action that needs to be redone. */ - if(e->type == UPDATELOG || - e->type == CLRLOG) { - /* redoUpdate checks the page that contains e->rid, so we - don't need to check to see if the page is newer than this - log entry. */ - if(e->type == UPDATELOG) { - /* addPendingEvent(e->contents.update.rid.page); */ - } else { - /* addPendingEvent(e->contents.clr.rid.page); */ - } - redoUpdate(e); - } else if(e->type == XCOMMIT && globalLockManager.commit) { - globalLockManager.commit(e->xid); - } // if transaction aborted, wait until undo is complete before notifying the globalLockManager. - } - FreeLogEntry(e); + // Check to see if this entry's action needs to be redone + switch(e->type) { + case UPDATELOG: + case CLRLOG: + { + // redoUpdate checks the page that contains e->rid, so we + // don't need to check to see if the page is newer than this + // log entry. + redoUpdate(e); + FreeLogEntry(e); + } break; + case DEFERLOG: + { + //XXX deferred_push(e); + } break; + case XCOMMIT: + { + if(globalLockManager.commit) + globalLockManager.commit(e->xid); + FreeLogEntry(e); + } break; + case XABORT: + { + // wait until undo is complete before informing the lock manager + FreeLogEntry(e); + } break; + case INTERNALLOG: + { + FreeLogEntry(e); + } break; + default: + abort(); + } + } } } - +/** + XXX + @todo CLR handling seems to be broken for logical operations! +*/ static void Undo(int recovery) { LogHandle lh; void * prepare_guard_state; @@ -234,11 +261,17 @@ static void Undo(int recovery) { } else { // The log entry is not associated with a particular page. // (Therefore, it must be an idempotent logical log entry.) - clr_lsn = LogCLR(e->xid, e->LSN, e->contents.update.rid, e->prevLSN); + clr_lsn = LogCLR(e->xid, e->LSN, e->contents.update.rid, + e->prevLSN); undoUpdate(e, NULL, clr_lsn); } break; } + case DEFERLOG: + // The transaction is aborting, so it never committed. Therefore + // actions deferred to commit have never been applied; ignore this + // log entry. + break; case CLRLOG: /* Don't need to do anything special to handle CLR's. Iterator will correctly jump to clr's previous undo record. */ @@ -248,7 +281,9 @@ static void Undo(int recovery) { records may be passed in by undoTrans.)*/ break; default: - printf ("Unknown log type to undo (TYPE=%d, XID= %d, LSN=%lld), skipping...\n", e->type, e->xid, e->LSN); + print + ("Unknown log type to undo (TYPE=%d,XID= %d,LSN=%lld), skipping...\n", + e->type, e->xid, e->LSN); break; } FreeLogEntry(e); diff --git a/src/lladd/transactional2.c b/src/lladd/transactional2.c index e9cd4d9..b5a678e 100644 --- a/src/lladd/transactional2.c +++ b/src/lladd/transactional2.c @@ -199,7 +199,9 @@ int Tbegin() { return XactionTable[index].xid; } -static compensated_function void TupdateHelper(int xid, recordid rid, const void * dat, int op, Page * p) { +static compensated_function void TactionHelper(int xid, recordid rid, + const void * dat, int op, + Page * p, int deferred) { LogEntry * e; assert(xid >= 0); try { @@ -207,40 +209,65 @@ static compensated_function void TupdateHelper(int xid, recordid rid, const void globalLockManager.writeLockPage(xid, rid.page); } } end; + if(! deferred) { + e = LogUpdate(&XactionTable[xid % MAX_TRANSACTIONS], p, rid, op, dat); + assert(XactionTable[xid % MAX_TRANSACTIONS].prevLSN == e->LSN); + DEBUG("Tupdate() e->LSN: %ld\n", e->LSN); + doUpdate(e, p); + FreeLogEntry(e); + } else { + e = LogDeferred(&XactionTable[xid % MAX_TRANSACTIONS], p, rid, op, dat); + assert(XactionTable[xid % MAX_TRANSACTIONS].prevLSN == e->LSN); + DEBUG("Deferring e->LSN: %ld\n", e->LSN); + // XXX update XactionTable... + //XXX deferred_push(e); + } - - e = LogUpdate(&XactionTable[xid % MAX_TRANSACTIONS], p, rid, op, dat); - - assert(XactionTable[xid % MAX_TRANSACTIONS].prevLSN == e->LSN); - - DEBUG("T update() e->LSN: %ld\n", e->LSN); - - doUpdate(e, p); - FreeLogEntry(e); +} + +static recordid resolveForUpdate(int xid, Page * p, recordid rid) { + + if(*page_type_ptr(p) == INDIRECT_PAGE) { + rid = dereferenceRID(xid, rid); + } else if(*page_type_ptr(p) == ARRAY_LIST_PAGE) { + rid = dereferenceArrayListRid(p, rid.slot); + } + return rid; } compensated_function void TupdateRaw(int xid, recordid rid, const void * dat, int op) { assert(xid >= 0); Page * p = loadPage(xid, rid.page); - TupdateHelper(xid, rid, dat, op, p); + TactionHelper(xid, rid, dat, op, p, 0); // 0 -> not deferred releasePage(p); } compensated_function void Tupdate(int xid, recordid rid, - const void *dat, int op) { + const void *dat, int op) { Page * p = loadPage(xid, rid.page); - if(*page_type_ptr(p) == INDIRECT_PAGE) { - rid = dereferenceRID(xid, rid); - } else if(*page_type_ptr(p) == ARRAY_LIST_PAGE) { - rid = dereferenceArrayListRid(p, rid.slot); - } + rid = resolveForUpdate(xid, p, rid); + if(p->id != rid.page) { releasePage(p); p = loadPage(xid, rid.page); } - TupdateHelper(xid, rid, dat, op, p); + + TactionHelper(xid, rid, dat, op, p, 0); // 0 -> not deferred + releasePage(p); +} + +compensated_function void Tdefer(int xid, recordid rid, + const void * dat, int op) { + + Page * p = loadPage(xid, rid.page); + recordid newrid = resolveForUpdate(xid, p, rid); + // Caller cannot rely on late or early binding of rid. + assert(rid.page == newrid.page && + rid.slot == newrid.slot && + rid.size == newrid.size); + TactionHelper(xid, rid, dat, op, p, 1); // 1 -> deferred. releasePage(p); }