From 944c7e984f4978a07594a226e73a3c72e00db44e Mon Sep 17 00:00:00 2001 From: Sears Russell Date: Wed, 24 Sep 2008 03:08:32 +0000 Subject: [PATCH] fixes / cleanups of Tprepare() --- src/stasis/logger/logEntry.c | 18 ++++++++++- src/stasis/logger/logWriter.c | 6 +--- src/stasis/logger/logger2.c | 53 ++++++++++++++++++++++-------- src/stasis/operations/prepare.c | 16 ++++++--- src/stasis/recovery2.c | 57 +++++++++++++++++++-------------- src/stasis/transactional2.c | 38 ++++++++++++++++++---- stasis/constants.h | 7 ++-- stasis/logger/logEntry.h | 4 +++ stasis/logger/logger2.h | 13 ++++++-- stasis/operations/prepare.h | 2 +- stasis/transactional.h | 16 ++++++--- test/stasis/check_operations.c | 4 +-- 12 files changed, 167 insertions(+), 67 deletions(-) diff --git a/src/stasis/logger/logEntry.c b/src/stasis/logger/logEntry.c index 355c1a1..fdcbc65 100644 --- a/src/stasis/logger/logEntry.c +++ b/src/stasis/logger/logEntry.c @@ -57,7 +57,16 @@ LogEntry * allocCommonLogEntry(lsn_t prevLSN, int xid, unsigned int type) { ret->type = type; return ret; } - +LogEntry * allocPrepareLogEntry(lsn_t prevLSN, int xid, lsn_t recLSN) { + LogEntry * ret = calloc(1,sizeof(struct __raw_log_entry)+sizeof(lsn_t)); + ret->LSN = -1; + ret->prevLSN = prevLSN; + ret->xid = xid; + ret->type = XPREPARE; + *(lsn_t*)(((struct __raw_log_entry*)ret)+1)=recLSN; + // assert(sizeofLogEntry(ret) == sizeof(struct __raw_log_entry)+sizeof(lsn_t)); + return ret; +} const byte * getUpdateArgs(const LogEntry * ret) { assert(ret->type == UPDATELOG || ret->type == CLRLOG); @@ -84,6 +93,11 @@ const byte * getUpdatePreImage(const LogEntry * ret) { } } +lsn_t getPrepareRecLSN(const LogEntry *e) { + lsn_t ret = *(lsn_t*)(((struct __raw_log_entry*)e)+1); + if(ret == -1) { ret = e->LSN; } + return ret; +} LogEntry * allocUpdateLogEntry(lsn_t prevLSN, int xid, unsigned int funcID, recordid rid, const byte * args, unsigned int argSize, @@ -159,6 +173,8 @@ long sizeofLogEntry(const LogEntry * log) { } case INTERNALLOG: return LoggerSizeOfInternalLogEntry(log); + case XPREPARE: + return sizeof(struct __raw_log_entry)+sizeof(lsn_t); default: return sizeof(struct __raw_log_entry); } diff --git a/src/stasis/logger/logWriter.c b/src/stasis/logger/logWriter.c index 21a43ff..4fb1c33 100644 --- a/src/stasis/logger/logWriter.c +++ b/src/stasis/logger/logWriter.c @@ -297,7 +297,7 @@ int openLogWriter() { @internal Unfortunately, this function can't just seek to the end of the - log. If it did, and a prior instance of LLADD crashed (and wrote + log. If it did, and a prior instance of Stasis crashed (and wrote a partial entry), then the log would be corrupted. Therefore, we need to be a little bit smarter, and track the next LSN value manually. Calculating it the first time would require a scan over @@ -310,9 +310,6 @@ int openLogWriter() { The first time writeLogEntry is called, we seek from the highest LSN encountered so far to the end of the log. - @todo writeLogEntry implicitly ignores all log entries with xid = -1. - This is probably the wrong thing to do... - */ static int writeLogEntryUnlocked(LogEntry * e) { @@ -481,7 +478,6 @@ static LogEntry * readLogEntry() { LogEntry * ret = 0; lsn_t size; lsn_t entrySize; - lsn_t bytesRead = read(roLogFD, &size, sizeof(lsn_t)); if(bytesRead != sizeof(lsn_t)) { diff --git a/src/stasis/logger/logger2.c b/src/stasis/logger/logger2.c index 324f892..95aa4a7 100644 --- a/src/stasis/logger/logger2.c +++ b/src/stasis/logger/logger2.c @@ -229,23 +229,37 @@ static lsn_t LogTransCommon(TransactionLog * l, int type) { return ret; } +static lsn_t LogTransCommonPrepare(TransactionLog * l) { + LogEntry * e = allocPrepareLogEntry(l->prevLSN, l->xid, l->recLSN); + lsn_t ret; -/** - @todo This should be usable by all calls that sync the log; not just commit. -*/ -static lsn_t groupCommit(TransactionLog * l) { + DEBUG("Log prepare xid = %d prevlsn = %lld reclsn = %lld, %lld\n",e->xid,e->prevLSN,l->recLSN, getPrepareRecLSN(e)); + LogWrite(e); + + if(l->prevLSN == -1) { l->recLSN = e->LSN; } + l->prevLSN = e->LSN; + DEBUG("Log Common prepare XXX %d, LSN: %ld type: %ld (prevLSN %ld)\n", e->xid, + (long int)e->LSN, (long int)e->type, (long int)e->prevLSN); + + ret = e->LSN; + + FreeLogEntry(e); + + return ret; + +} + +static void groupForce(lsn_t l) { static pthread_mutex_t check_commit = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t tooFewXacts = PTHREAD_COND_INITIALIZER; - lsn_t ret = LogTransCommon(l, XCOMMIT); - struct timeval now; struct timespec timeout; pthread_mutex_lock(&check_commit); - if(LogFlushedLSN() >= ret) { + if(LogFlushedLSN() >= l) { pthread_mutex_unlock(&check_commit); - return ret; + return; } gettimeofday(&now, NULL); timeout.tv_sec = now.tv_sec; @@ -266,21 +280,32 @@ static lsn_t groupCommit(TransactionLog * l) { printf("Warning: %s:%d: pthread_cond_timedwait was interrupted by a signal in groupCommit(). Acting as though it timed out.\n", __FILE__, __LINE__); break; } - if(LogFlushedLSN() >= ret) { + if(LogFlushedLSN() >= l) { pendingCommits--; pthread_mutex_unlock(&check_commit); - return ret; + return; } } } - if(LogFlushedLSN() < ret) { + if(LogFlushedLSN() < l) { syncLog_LogWriter(); syncLogCount++; pthread_cond_broadcast(&tooFewXacts); } - assert(LogFlushedLSN() >= ret); + assert(LogFlushedLSN() >= l); pendingCommits--; pthread_mutex_unlock(&check_commit); + return; +} + +static lsn_t groupCommit(TransactionLog * l) { + lsn_t ret = LogTransCommon(l, XCOMMIT); + groupForce(ret); + return ret; +} +static lsn_t groupPrepare(TransactionLog * l) { + lsn_t ret = LogTransCommonPrepare(l); + groupForce(ret); return ret; } @@ -291,7 +316,9 @@ lsn_t LogTransCommit(TransactionLog * l) { lsn_t LogTransAbort(TransactionLog * l) { return LogTransCommon(l, XABORT); } - +lsn_t LogTransPrepare(TransactionLog * l) { + return groupPrepare(l); +} /** @todo Does the handling of operation types / argument sizes belong diff --git a/src/stasis/operations/prepare.c b/src/stasis/operations/prepare.c index 146002e..74c36d1 100644 --- a/src/stasis/operations/prepare.c +++ b/src/stasis/operations/prepare.c @@ -51,7 +51,7 @@ terms specified in this license. #include #include - +#include recordid prepare_bogus_rec = { 0, 0, 0}; static int operate(int xid, Page * p, lsn_t lsn, recordid rid, const void *dat) { @@ -75,6 +75,7 @@ typedef struct{ int continueIterating; int prevLSN; int xid; + int aborted; } PrepareGuardState; void * getPrepareGuardState() { @@ -82,6 +83,7 @@ void * getPrepareGuardState() { s->continueIterating = 1; s->prevLSN = -1; s->xid = -1; + s->aborted = 0; return s; } @@ -89,11 +91,14 @@ void * getPrepareGuardState() { int prepareGuard(const LogEntry * e, void * state) { PrepareGuardState * pgs = state; int ret = pgs->continueIterating; - if(e->type == UPDATELOG) { + if(e->type == UPDATELOG && !pgs->aborted) { if(e->update.funcID == OPERATION_PREPARE) { pgs->continueIterating = 0; pgs->prevLSN = e->prevLSN; } + } else if (e->type == XABORT) { + printf("xid %d aborted.\n", e->xid); + pgs->aborted = 1; } if(pgs->xid == -1) { pgs->xid = e->xid; @@ -109,9 +114,10 @@ int prepareGuard(const LogEntry * e, void * state) { int prepareAction(void * state) { PrepareGuardState * pgs = state; int ret; - if(!pgs->continueIterating) { - assert(pgs->prevLSN != -1); - Trevive(pgs->xid, pgs->prevLSN); + if(!(pgs->continueIterating || pgs->aborted)) { + //assert(pgs->prevLSN != -1); + abort(); + // Trevive(pgs->xid, pgs->prevLSN); ret = 1; } else { ret = 0; diff --git a/src/stasis/recovery2.c b/src/stasis/recovery2.c index 70ff6ae..8ad4cb7 100644 --- a/src/stasis/recovery2.c +++ b/src/stasis/recovery2.c @@ -19,7 +19,7 @@ #include /** @todo Add better log iterator guard support and remove this include.*/ -#include +//#include #include /** @todo Get rid of linkedlist */ @@ -60,13 +60,9 @@ static void Analysis () { log was so that we don't accidentally reuse XID's. This keeps track of that value. */ int highestXid = 0; - - /** @todo loadCheckPoint() - Jump forward in the log to the last - checkpoint. (getLogHandle should do this automatically, - since the log will be truncated on checkpoint anyway.) */ while((e = nextInLog(&lh))) { - + lsn_t * xactLSN = (lsn_t*)pblHtLookup(transactionLSN, &(e->xid), sizeof(int)); if(highestXid < e->xid) { @@ -140,6 +136,9 @@ static void Analysis () { DEBUG("Adding %ld\n", e->LSN); addSortedVal(&rollbackLSNs, e->LSN); break; + case XPREPARE: + addSortedVal(&rollbackLSNs, e->LSN); + break; // XXX check to see if the xact exists? case INTERNALLOG: // Created by the logger, just ignore it // Make sure the log entry doesn't interfere with real xacts. @@ -186,35 +185,34 @@ static void Redo() { { FreeLogEntry(e); } break; + case XPREPARE: + { + FreeLogEntry(e); + } break; default: abort(); } } } } -/** - @todo Guards shouldn't be hardcoded in Undo() -*/ static void Undo(int recovery) { LogHandle lh; - void * prepare_guard_state; while(rollbackLSNs != NULL) { const LogEntry * e; lsn_t rollback = popMaxVal(&rollbackLSNs); - prepare_guard_state = getPrepareGuardState(); - DEBUG("Undoing LSN %ld\n", (long int)rollback); - if(recovery) { - lh = getGuardedHandle(rollback, &prepareGuard, prepare_guard_state); - } else { - lh = getLSNHandle(rollback); - } + lh = getLSNHandle(rollback); int thisXid = -1; - while((e = previousInTransaction(&lh))) { + + // Is this transaction just a loser, or was it aborted? + int reallyAborted = 0; + // Have we reached a XPREPARE that we should pay attention to? + int prepared = 0; + while((!prepared) && (e = previousInTransaction(&lh))) { thisXid = e->xid; lsn_t this_lsn, clr_lsn; switch(e->type) { @@ -229,9 +227,9 @@ static void Undo(int recovery) { // If this fails, something is wrong with redo or normal operation. this_lsn = stasis_page_lsn_read(p); - assert(e->LSN <= this_lsn); + assert(e->LSN <= this_lsn); - } else { + } else { // The log entry is not associated with a particular page. // (Therefore, it must be an idempotent logical log entry.) } @@ -249,12 +247,25 @@ static void Undo(int recovery) { // Don't undo CLRs; they were undone during Redo break; case XABORT: - // Since XABORT is a no-op, we can silentlt ignore it. XABORT + printf("Found abort for %d\n", e->xid); + reallyAborted = 1; + // Since XABORT is a no-op, we can silently ignore it. XABORT // records may be passed in by undoTrans. break; case XCOMMIT: // Should never abort a transaction that contains a commit record abort(); + case XPREPARE: { + printf("found prepared xact %d\n", e->xid); + + if(!reallyAborted) { + printf("xact wasn't aborted\n"); + prepared = 1; + Trevive(e->xid, e->LSN, getPrepareRecLSN(e)); + } else { + printf("xact was aborted\n"); + } + } break; default: printf ("Unknown log type to undo (TYPE=%d,XID= %d,LSN=%lld), skipping...\n", @@ -264,9 +275,7 @@ static void Undo(int recovery) { } FreeLogEntry(e); } - int transactionWasPrepared = prepareAction(prepare_guard_state); - free(prepare_guard_state); - if(!transactionWasPrepared && globalLockManager.abort) { + if((!prepared) && globalLockManager.abort) { globalLockManager.abort(thisXid); } } diff --git a/src/stasis/transactional2.c b/src/stasis/transactional2.c index 2b85f59..0b7a73a 100644 --- a/src/stasis/transactional2.c +++ b/src/stasis/transactional2.c @@ -48,7 +48,7 @@ void setupOperationsTable() { operationsTable[OPERATION_INCREMENT] = getIncrement(); operationsTable[OPERATION_DECREMENT] = getDecrement(); operationsTable[OPERATION_ALLOC] = getAlloc(); - operationsTable[OPERATION_PREPARE] = getPrepare(); + // operationsTable[OPERATION_PREPARE] = getPrepare(); /* operationsTable[OPERATION_LHINSERT] = getLHInsert(); operationsTable[OPERATION_LHREMOVE] = getLHRemove(); */ operationsTable[OPERATION_DEALLOC] = getDealloc(); @@ -350,6 +350,14 @@ int Tcommit(int xid) { return 0; } +int Tprepare(int xid) { + assert(xid >= 0); + off_t i = xid % MAX_TRANSACTIONS; + assert(XactionTable[i].xid == xid); + LogTransPrepare(&XactionTable[i]); + return 0; +} + int Tabort(int xid) { lsn_t lsn; assert(xid >= 0); @@ -427,24 +435,26 @@ int TuncleanShutdown() { return 0; } -void Trevive(int xid, long lsn) { +void Trevive(int xid, lsn_t prevlsn, lsn_t reclsn) { assert(xid >= 0); + assert(reclsn != -1); int index = xid % MAX_TRANSACTIONS; pthread_mutex_lock(&transactional_2_mutex); DEBUG("Reviving xid %d at lsn %ld\n", xid, lsn); if(XactionTable[index].xid != INVALID_XTABLE_XID) { - if(xid != XactionTable[index].xid) { + abort(); + /* if(xid != XactionTable[index].xid) { fprintf(stderr, "Clashing Tprepare()'ed XID's encountered on recovery!!\n"); abort(); } assert(XactionTable[index].xid == xid); - assert(XactionTable[index].prevLSN == lsn); + assert(XactionTable[index].prevLSN == lsn); */ } else { XactionTable[index].xid = xid; - XactionTable[index].prevLSN = lsn; - + XactionTable[index].prevLSN = prevlsn; + XactionTable[index].recLSN = reclsn; numActiveXactions++; } @@ -472,6 +482,22 @@ lsn_t transactions_minRecLSN() { return minRecLSN; } +int* TlistActiveTransactions() { + pthread_mutex_lock(&transactional_2_mutex); + int * ret = malloc(sizeof(*ret)); + ret[0] = 0; + int retcount = 0; + for(int i = 0; i < MAX_TRANSACTIONS; i++) { + if(XactionTable[i].xid != INVALID_XTABLE_XID) { + ret[retcount] = XactionTable[i].xid; + retcount++; + ret = realloc(ret, (retcount+1) * sizeof(*ret)); + ret[retcount] = 0; + } + } + pthread_mutex_unlock(&transactional_2_mutex); + return ret; +} int TisActiveTransaction(int xid) { if(xid < 0) { return 0; } pthread_mutex_lock(&transactional_2_mutex); diff --git a/stasis/constants.h b/stasis/constants.h index 2d21db3..a996f54 100644 --- a/stasis/constants.h +++ b/stasis/constants.h @@ -215,14 +215,15 @@ extern const short SLOT_TYPE_LENGTHS[]; /** XEND is used for after the pages touched by a transaction have been flushed to stable storage. - - @todo Actually write XEND entries to the log so that log - truncation can be implemented! + @todo Actually write XEND entries to the log so that we can + use analysis to optimize redo. */ #define XEND 6 #define CLRLOG 7 +#define XPREPARE 8 + /* Page types */ #define UNINITIALIZED_PAGE 0 diff --git a/stasis/logger/logEntry.h b/stasis/logger/logEntry.h index b77290b..5f85331 100644 --- a/stasis/logger/logEntry.h +++ b/stasis/logger/logEntry.h @@ -90,6 +90,8 @@ typedef struct { @return a LogEntry that should be freed with free(). */ LogEntry * allocCommonLogEntry(lsn_t prevLSN, int xid, unsigned int type); + +LogEntry * allocPrepareLogEntry(lsn_t prevLSN, int xid, lsn_t recLSN); /** Allocate a log entry associated with an operation implemention. This is usually called inside of Tupdate(). @@ -124,6 +126,8 @@ const byte * getUpdateArgs(const LogEntry * e); */ const byte * getUpdatePreImage(const LogEntry * e); +lsn_t getPrepareRecLSN(const LogEntry *e); + END_C_DECLS #endif /* __LOGENTRY_H */ diff --git a/stasis/logger/logger2.h b/stasis/logger/logger2.h index 504296b..128b71c 100644 --- a/stasis/logger/logger2.h +++ b/stasis/logger/logger2.h @@ -130,18 +130,25 @@ lsn_t LogNextEntry(const LogEntry * e); */ TransactionLog LogTransBegin(int xid); +/** + Write a transaction PREPARE to the log tail. Blocks until the + prepare record is stable. + + @return the lsn of the prepare log entry + */ +lsn_t LogTransPrepare(TransactionLog * l); /** Write a transaction COMMIT to the log tail. Blocks until the commit record is stable. - @return The lsn of the commit log entry. + @return the lsn of the commit log entry. */ lsn_t LogTransCommit(TransactionLog * l); /** - Write a transaction ABORT to the log tail. + Write a transaction ABORT to the log tail. Does not force the log. - @return The lsn of the abort log entry. + @return the lsn of the abort log entry. */ lsn_t LogTransAbort(TransactionLog * l); diff --git a/stasis/operations/prepare.h b/stasis/operations/prepare.h index aedddb3..a5978df 100644 --- a/stasis/operations/prepare.h +++ b/stasis/operations/prepare.h @@ -90,7 +90,7 @@ extern recordid prepare_bogus_rec; @param rec must be a valid record id. any valid recordid will do. This parameter will be removed eventually. */ -#define Tprepare(xid, rec) Tupdate(xid, rec, 0, OPERATION_PREPARE) +//#define Tprepare(xid) Tupdate(xid, NULLRID, 0, OPERATION_PREPARE) Operation getPrepare(); diff --git a/stasis/transactional.h b/stasis/transactional.h index ad4bc9d..c508dbf 100644 --- a/stasis/transactional.h +++ b/stasis/transactional.h @@ -591,7 +591,7 @@ extern const recordid NULLRID; */ typedef struct { int xid; - long LSN; + lsn_t LSN; } Transaction; @@ -679,10 +679,11 @@ int TuncleanShutdown(); * Revives Tprepare'ed transactions. * * @param xid The xid that is to be revived. - * @param lsn The lsn of that xid's most recent PREPARE entry in the log. + * @param prevlsn The lsn of that xid's most recent PREPARE entry in the log. + * @param reclsn The lsn of the transaction's BEGIN record. */ -void Trevive(int xid, long lsn); - +void Trevive(int xid, lsn_t prevlsn, lsn_t reclsn); +int Tprepare(int xid); /** * Used by the recovery process. * @@ -693,6 +694,13 @@ void Trevive(int xid, long lsn); */ void TsetXIDCount(int xid); +/** + List all active transactions. + + @return an array of transaction ids. + */ +int* TlistActiveTransactions(); + /** * Checks to see if a transaction is still active. * diff --git a/test/stasis/check_operations.c b/test/stasis/check_operations.c index 4397a81..185df0f 100644 --- a/test/stasis/check_operations.c +++ b/test/stasis/check_operations.c @@ -239,7 +239,7 @@ START_TEST(operation_prepare) { Tset(loser, a, &three); Tset(prepared, b, &three); - Tprepare(prepared, a); + Tprepare(prepared); //, a); Tset(prepared, b, &two); @@ -292,7 +292,7 @@ START_TEST(operation_prepare) { Tset(loser, a, &three); Tset(prepared, b, &three); - Tprepare(prepared, a); + Tprepare(prepared); //, a); Tset(prepared, b, &two);