fixes / cleanups of Tprepare()

This commit is contained in:
Sears Russell 2008-09-24 03:08:32 +00:00
parent fb4e91debe
commit 944c7e984f
12 changed files with 167 additions and 67 deletions

View file

@ -57,7 +57,16 @@ LogEntry * allocCommonLogEntry(lsn_t prevLSN, int xid, unsigned int type) {
ret->type = type; ret->type = type;
return ret; return ret;
} }
LogEntry * allocPrepareLogEntry(lsn_t prevLSN, int xid, lsn_t recLSN) {
LogEntry * ret = calloc(1,sizeof(struct __raw_log_entry)+sizeof(lsn_t));
ret->LSN = -1;
ret->prevLSN = prevLSN;
ret->xid = xid;
ret->type = XPREPARE;
*(lsn_t*)(((struct __raw_log_entry*)ret)+1)=recLSN;
// assert(sizeofLogEntry(ret) == sizeof(struct __raw_log_entry)+sizeof(lsn_t));
return ret;
}
const byte * getUpdateArgs(const LogEntry * ret) { const byte * getUpdateArgs(const LogEntry * ret) {
assert(ret->type == UPDATELOG || assert(ret->type == UPDATELOG ||
ret->type == CLRLOG); ret->type == CLRLOG);
@ -84,6 +93,11 @@ const byte * getUpdatePreImage(const LogEntry * ret) {
} }
} }
lsn_t getPrepareRecLSN(const LogEntry *e) {
lsn_t ret = *(lsn_t*)(((struct __raw_log_entry*)e)+1);
if(ret == -1) { ret = e->LSN; }
return ret;
}
LogEntry * allocUpdateLogEntry(lsn_t prevLSN, int xid, LogEntry * allocUpdateLogEntry(lsn_t prevLSN, int xid,
unsigned int funcID, recordid rid, unsigned int funcID, recordid rid,
const byte * args, unsigned int argSize, const byte * args, unsigned int argSize,
@ -159,6 +173,8 @@ long sizeofLogEntry(const LogEntry * log) {
} }
case INTERNALLOG: case INTERNALLOG:
return LoggerSizeOfInternalLogEntry(log); return LoggerSizeOfInternalLogEntry(log);
case XPREPARE:
return sizeof(struct __raw_log_entry)+sizeof(lsn_t);
default: default:
return sizeof(struct __raw_log_entry); return sizeof(struct __raw_log_entry);
} }

View file

@ -297,7 +297,7 @@ int openLogWriter() {
@internal @internal
Unfortunately, this function can't just seek to the end of the Unfortunately, this function can't just seek to the end of the
log. If it did, and a prior instance of LLADD crashed (and wrote log. If it did, and a prior instance of Stasis crashed (and wrote
a partial entry), then the log would be corrupted. Therefore, we a partial entry), then the log would be corrupted. Therefore, we
need to be a little bit smarter, and track the next LSN value need to be a little bit smarter, and track the next LSN value
manually. Calculating it the first time would require a scan over manually. Calculating it the first time would require a scan over
@ -310,9 +310,6 @@ int openLogWriter() {
The first time writeLogEntry is called, we seek from the highest The first time writeLogEntry is called, we seek from the highest
LSN encountered so far to the end of the log. LSN encountered so far to the end of the log.
@todo writeLogEntry implicitly ignores all log entries with xid = -1.
This is probably the wrong thing to do...
*/ */
static int writeLogEntryUnlocked(LogEntry * e) { static int writeLogEntryUnlocked(LogEntry * e) {
@ -481,7 +478,6 @@ static LogEntry * readLogEntry() {
LogEntry * ret = 0; LogEntry * ret = 0;
lsn_t size; lsn_t size;
lsn_t entrySize; lsn_t entrySize;
lsn_t bytesRead = read(roLogFD, &size, sizeof(lsn_t)); lsn_t bytesRead = read(roLogFD, &size, sizeof(lsn_t));
if(bytesRead != sizeof(lsn_t)) { if(bytesRead != sizeof(lsn_t)) {

View file

@ -229,23 +229,37 @@ static lsn_t LogTransCommon(TransactionLog * l, int type) {
return ret; return ret;
} }
static lsn_t LogTransCommonPrepare(TransactionLog * l) {
LogEntry * e = allocPrepareLogEntry(l->prevLSN, l->xid, l->recLSN);
lsn_t ret;
/** DEBUG("Log prepare xid = %d prevlsn = %lld reclsn = %lld, %lld\n",e->xid,e->prevLSN,l->recLSN, getPrepareRecLSN(e));
@todo This should be usable by all calls that sync the log; not just commit. LogWrite(e);
*/
static lsn_t groupCommit(TransactionLog * l) { if(l->prevLSN == -1) { l->recLSN = e->LSN; }
l->prevLSN = e->LSN;
DEBUG("Log Common prepare XXX %d, LSN: %ld type: %ld (prevLSN %ld)\n", e->xid,
(long int)e->LSN, (long int)e->type, (long int)e->prevLSN);
ret = e->LSN;
FreeLogEntry(e);
return ret;
}
static void groupForce(lsn_t l) {
static pthread_mutex_t check_commit = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t check_commit = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t tooFewXacts = PTHREAD_COND_INITIALIZER; static pthread_cond_t tooFewXacts = PTHREAD_COND_INITIALIZER;
lsn_t ret = LogTransCommon(l, XCOMMIT);
struct timeval now; struct timeval now;
struct timespec timeout; struct timespec timeout;
pthread_mutex_lock(&check_commit); pthread_mutex_lock(&check_commit);
if(LogFlushedLSN() >= ret) { if(LogFlushedLSN() >= l) {
pthread_mutex_unlock(&check_commit); pthread_mutex_unlock(&check_commit);
return ret; return;
} }
gettimeofday(&now, NULL); gettimeofday(&now, NULL);
timeout.tv_sec = now.tv_sec; timeout.tv_sec = now.tv_sec;
@ -266,21 +280,32 @@ static lsn_t groupCommit(TransactionLog * l) {
printf("Warning: %s:%d: pthread_cond_timedwait was interrupted by a signal in groupCommit(). Acting as though it timed out.\n", __FILE__, __LINE__); printf("Warning: %s:%d: pthread_cond_timedwait was interrupted by a signal in groupCommit(). Acting as though it timed out.\n", __FILE__, __LINE__);
break; break;
} }
if(LogFlushedLSN() >= ret) { if(LogFlushedLSN() >= l) {
pendingCommits--; pendingCommits--;
pthread_mutex_unlock(&check_commit); pthread_mutex_unlock(&check_commit);
return ret; return;
} }
} }
} }
if(LogFlushedLSN() < ret) { if(LogFlushedLSN() < l) {
syncLog_LogWriter(); syncLog_LogWriter();
syncLogCount++; syncLogCount++;
pthread_cond_broadcast(&tooFewXacts); pthread_cond_broadcast(&tooFewXacts);
} }
assert(LogFlushedLSN() >= ret); assert(LogFlushedLSN() >= l);
pendingCommits--; pendingCommits--;
pthread_mutex_unlock(&check_commit); pthread_mutex_unlock(&check_commit);
return;
}
static lsn_t groupCommit(TransactionLog * l) {
lsn_t ret = LogTransCommon(l, XCOMMIT);
groupForce(ret);
return ret;
}
static lsn_t groupPrepare(TransactionLog * l) {
lsn_t ret = LogTransCommonPrepare(l);
groupForce(ret);
return ret; return ret;
} }
@ -291,7 +316,9 @@ lsn_t LogTransCommit(TransactionLog * l) {
lsn_t LogTransAbort(TransactionLog * l) { lsn_t LogTransAbort(TransactionLog * l) {
return LogTransCommon(l, XABORT); return LogTransCommon(l, XABORT);
} }
lsn_t LogTransPrepare(TransactionLog * l) {
return groupPrepare(l);
}
/** /**
@todo Does the handling of operation types / argument sizes belong @todo Does the handling of operation types / argument sizes belong

View file

@ -51,7 +51,7 @@ terms specified in this license.
#include <stdlib.h> #include <stdlib.h>
#include <assert.h> #include <assert.h>
#include <stdio.h>
recordid prepare_bogus_rec = { 0, 0, 0}; recordid prepare_bogus_rec = { 0, 0, 0};
static int operate(int xid, Page * p, lsn_t lsn, recordid rid, const void *dat) { static int operate(int xid, Page * p, lsn_t lsn, recordid rid, const void *dat) {
@ -75,6 +75,7 @@ typedef struct{
int continueIterating; int continueIterating;
int prevLSN; int prevLSN;
int xid; int xid;
int aborted;
} PrepareGuardState; } PrepareGuardState;
void * getPrepareGuardState() { void * getPrepareGuardState() {
@ -82,6 +83,7 @@ void * getPrepareGuardState() {
s->continueIterating = 1; s->continueIterating = 1;
s->prevLSN = -1; s->prevLSN = -1;
s->xid = -1; s->xid = -1;
s->aborted = 0;
return s; return s;
} }
@ -89,11 +91,14 @@ void * getPrepareGuardState() {
int prepareGuard(const LogEntry * e, void * state) { int prepareGuard(const LogEntry * e, void * state) {
PrepareGuardState * pgs = state; PrepareGuardState * pgs = state;
int ret = pgs->continueIterating; int ret = pgs->continueIterating;
if(e->type == UPDATELOG) { if(e->type == UPDATELOG && !pgs->aborted) {
if(e->update.funcID == OPERATION_PREPARE) { if(e->update.funcID == OPERATION_PREPARE) {
pgs->continueIterating = 0; pgs->continueIterating = 0;
pgs->prevLSN = e->prevLSN; pgs->prevLSN = e->prevLSN;
} }
} else if (e->type == XABORT) {
printf("xid %d aborted.\n", e->xid);
pgs->aborted = 1;
} }
if(pgs->xid == -1) { if(pgs->xid == -1) {
pgs->xid = e->xid; pgs->xid = e->xid;
@ -109,9 +114,10 @@ int prepareGuard(const LogEntry * e, void * state) {
int prepareAction(void * state) { int prepareAction(void * state) {
PrepareGuardState * pgs = state; PrepareGuardState * pgs = state;
int ret; int ret;
if(!pgs->continueIterating) { if(!(pgs->continueIterating || pgs->aborted)) {
assert(pgs->prevLSN != -1); //assert(pgs->prevLSN != -1);
Trevive(pgs->xid, pgs->prevLSN); abort();
// Trevive(pgs->xid, pgs->prevLSN);
ret = 1; ret = 1;
} else { } else {
ret = 0; ret = 0;

View file

@ -19,7 +19,7 @@
#include <stasis/lockManager.h> #include <stasis/lockManager.h>
/** @todo Add better log iterator guard support and remove this include.*/ /** @todo Add better log iterator guard support and remove this include.*/
#include <stasis/operations/prepare.h> //#include <stasis/operations/prepare.h>
#include <stasis/logger/logHandle.h> #include <stasis/logger/logHandle.h>
/** @todo Get rid of linkedlist */ /** @todo Get rid of linkedlist */
@ -61,10 +61,6 @@ static void Analysis () {
track of that value. */ track of that value. */
int highestXid = 0; int highestXid = 0;
/** @todo loadCheckPoint() - Jump forward in the log to the last
checkpoint. (getLogHandle should do this automatically,
since the log will be truncated on checkpoint anyway.) */
while((e = nextInLog(&lh))) { while((e = nextInLog(&lh))) {
lsn_t * xactLSN = (lsn_t*)pblHtLookup(transactionLSN, &(e->xid), sizeof(int)); lsn_t * xactLSN = (lsn_t*)pblHtLookup(transactionLSN, &(e->xid), sizeof(int));
@ -140,6 +136,9 @@ static void Analysis () {
DEBUG("Adding %ld\n", e->LSN); DEBUG("Adding %ld\n", e->LSN);
addSortedVal(&rollbackLSNs, e->LSN); addSortedVal(&rollbackLSNs, e->LSN);
break; break;
case XPREPARE:
addSortedVal(&rollbackLSNs, e->LSN);
break; // XXX check to see if the xact exists?
case INTERNALLOG: case INTERNALLOG:
// Created by the logger, just ignore it // Created by the logger, just ignore it
// Make sure the log entry doesn't interfere with real xacts. // Make sure the log entry doesn't interfere with real xacts.
@ -186,35 +185,34 @@ static void Redo() {
{ {
FreeLogEntry(e); FreeLogEntry(e);
} break; } break;
case XPREPARE:
{
FreeLogEntry(e);
} break;
default: default:
abort(); abort();
} }
} }
} }
} }
/**
@todo Guards shouldn't be hardcoded in Undo()
*/
static void Undo(int recovery) { static void Undo(int recovery) {
LogHandle lh; LogHandle lh;
void * prepare_guard_state;
while(rollbackLSNs != NULL) { while(rollbackLSNs != NULL) {
const LogEntry * e; const LogEntry * e;
lsn_t rollback = popMaxVal(&rollbackLSNs); lsn_t rollback = popMaxVal(&rollbackLSNs);
prepare_guard_state = getPrepareGuardState();
DEBUG("Undoing LSN %ld\n", (long int)rollback); DEBUG("Undoing LSN %ld\n", (long int)rollback);
if(recovery) { lh = getLSNHandle(rollback);
lh = getGuardedHandle(rollback, &prepareGuard, prepare_guard_state);
} else {
lh = getLSNHandle(rollback);
}
int thisXid = -1; int thisXid = -1;
while((e = previousInTransaction(&lh))) {
// Is this transaction just a loser, or was it aborted?
int reallyAborted = 0;
// Have we reached a XPREPARE that we should pay attention to?
int prepared = 0;
while((!prepared) && (e = previousInTransaction(&lh))) {
thisXid = e->xid; thisXid = e->xid;
lsn_t this_lsn, clr_lsn; lsn_t this_lsn, clr_lsn;
switch(e->type) { switch(e->type) {
@ -249,12 +247,25 @@ static void Undo(int recovery) {
// Don't undo CLRs; they were undone during Redo // Don't undo CLRs; they were undone during Redo
break; break;
case XABORT: case XABORT:
// Since XABORT is a no-op, we can silentlt ignore it. XABORT printf("Found abort for %d\n", e->xid);
reallyAborted = 1;
// Since XABORT is a no-op, we can silently ignore it. XABORT
// records may be passed in by undoTrans. // records may be passed in by undoTrans.
break; break;
case XCOMMIT: case XCOMMIT:
// Should never abort a transaction that contains a commit record // Should never abort a transaction that contains a commit record
abort(); abort();
case XPREPARE: {
printf("found prepared xact %d\n", e->xid);
if(!reallyAborted) {
printf("xact wasn't aborted\n");
prepared = 1;
Trevive(e->xid, e->LSN, getPrepareRecLSN(e));
} else {
printf("xact was aborted\n");
}
} break;
default: default:
printf printf
("Unknown log type to undo (TYPE=%d,XID= %d,LSN=%lld), skipping...\n", ("Unknown log type to undo (TYPE=%d,XID= %d,LSN=%lld), skipping...\n",
@ -264,9 +275,7 @@ static void Undo(int recovery) {
} }
FreeLogEntry(e); FreeLogEntry(e);
} }
int transactionWasPrepared = prepareAction(prepare_guard_state); if((!prepared) && globalLockManager.abort) {
free(prepare_guard_state);
if(!transactionWasPrepared && globalLockManager.abort) {
globalLockManager.abort(thisXid); globalLockManager.abort(thisXid);
} }
} }

View file

@ -48,7 +48,7 @@ void setupOperationsTable() {
operationsTable[OPERATION_INCREMENT] = getIncrement(); operationsTable[OPERATION_INCREMENT] = getIncrement();
operationsTable[OPERATION_DECREMENT] = getDecrement(); operationsTable[OPERATION_DECREMENT] = getDecrement();
operationsTable[OPERATION_ALLOC] = getAlloc(); operationsTable[OPERATION_ALLOC] = getAlloc();
operationsTable[OPERATION_PREPARE] = getPrepare(); // operationsTable[OPERATION_PREPARE] = getPrepare();
/* operationsTable[OPERATION_LHINSERT] = getLHInsert(); /* operationsTable[OPERATION_LHINSERT] = getLHInsert();
operationsTable[OPERATION_LHREMOVE] = getLHRemove(); */ operationsTable[OPERATION_LHREMOVE] = getLHRemove(); */
operationsTable[OPERATION_DEALLOC] = getDealloc(); operationsTable[OPERATION_DEALLOC] = getDealloc();
@ -350,6 +350,14 @@ int Tcommit(int xid) {
return 0; return 0;
} }
int Tprepare(int xid) {
assert(xid >= 0);
off_t i = xid % MAX_TRANSACTIONS;
assert(XactionTable[i].xid == xid);
LogTransPrepare(&XactionTable[i]);
return 0;
}
int Tabort(int xid) { int Tabort(int xid) {
lsn_t lsn; lsn_t lsn;
assert(xid >= 0); assert(xid >= 0);
@ -427,24 +435,26 @@ int TuncleanShutdown() {
return 0; return 0;
} }
void Trevive(int xid, long lsn) { void Trevive(int xid, lsn_t prevlsn, lsn_t reclsn) {
assert(xid >= 0); assert(xid >= 0);
assert(reclsn != -1);
int index = xid % MAX_TRANSACTIONS; int index = xid % MAX_TRANSACTIONS;
pthread_mutex_lock(&transactional_2_mutex); pthread_mutex_lock(&transactional_2_mutex);
DEBUG("Reviving xid %d at lsn %ld\n", xid, lsn); DEBUG("Reviving xid %d at lsn %ld\n", xid, lsn);
if(XactionTable[index].xid != INVALID_XTABLE_XID) { if(XactionTable[index].xid != INVALID_XTABLE_XID) {
if(xid != XactionTable[index].xid) { abort();
/* if(xid != XactionTable[index].xid) {
fprintf(stderr, "Clashing Tprepare()'ed XID's encountered on recovery!!\n"); fprintf(stderr, "Clashing Tprepare()'ed XID's encountered on recovery!!\n");
abort(); abort();
} }
assert(XactionTable[index].xid == xid); assert(XactionTable[index].xid == xid);
assert(XactionTable[index].prevLSN == lsn); assert(XactionTable[index].prevLSN == lsn); */
} else { } else {
XactionTable[index].xid = xid; XactionTable[index].xid = xid;
XactionTable[index].prevLSN = lsn; XactionTable[index].prevLSN = prevlsn;
XactionTable[index].recLSN = reclsn;
numActiveXactions++; numActiveXactions++;
} }
@ -472,6 +482,22 @@ lsn_t transactions_minRecLSN() {
return minRecLSN; return minRecLSN;
} }
int* TlistActiveTransactions() {
pthread_mutex_lock(&transactional_2_mutex);
int * ret = malloc(sizeof(*ret));
ret[0] = 0;
int retcount = 0;
for(int i = 0; i < MAX_TRANSACTIONS; i++) {
if(XactionTable[i].xid != INVALID_XTABLE_XID) {
ret[retcount] = XactionTable[i].xid;
retcount++;
ret = realloc(ret, (retcount+1) * sizeof(*ret));
ret[retcount] = 0;
}
}
pthread_mutex_unlock(&transactional_2_mutex);
return ret;
}
int TisActiveTransaction(int xid) { int TisActiveTransaction(int xid) {
if(xid < 0) { return 0; } if(xid < 0) { return 0; }
pthread_mutex_lock(&transactional_2_mutex); pthread_mutex_lock(&transactional_2_mutex);

View file

@ -216,13 +216,14 @@ extern const short SLOT_TYPE_LENGTHS[];
XEND is used for after the pages touched by a transaction have XEND is used for after the pages touched by a transaction have
been flushed to stable storage. been flushed to stable storage.
@todo Actually write XEND entries to the log so that log @todo Actually write XEND entries to the log so that we can
truncation can be implemented! use analysis to optimize redo.
*/ */
#define XEND 6 #define XEND 6
#define CLRLOG 7 #define CLRLOG 7
#define XPREPARE 8
/* Page types */ /* Page types */
#define UNINITIALIZED_PAGE 0 #define UNINITIALIZED_PAGE 0

View file

@ -90,6 +90,8 @@ typedef struct {
@return a LogEntry that should be freed with free(). @return a LogEntry that should be freed with free().
*/ */
LogEntry * allocCommonLogEntry(lsn_t prevLSN, int xid, unsigned int type); LogEntry * allocCommonLogEntry(lsn_t prevLSN, int xid, unsigned int type);
LogEntry * allocPrepareLogEntry(lsn_t prevLSN, int xid, lsn_t recLSN);
/** /**
Allocate a log entry associated with an operation implemention. This Allocate a log entry associated with an operation implemention. This
is usually called inside of Tupdate(). is usually called inside of Tupdate().
@ -124,6 +126,8 @@ const byte * getUpdateArgs(const LogEntry * e);
*/ */
const byte * getUpdatePreImage(const LogEntry * e); const byte * getUpdatePreImage(const LogEntry * e);
lsn_t getPrepareRecLSN(const LogEntry *e);
END_C_DECLS END_C_DECLS
#endif /* __LOGENTRY_H */ #endif /* __LOGENTRY_H */

View file

@ -130,18 +130,25 @@ lsn_t LogNextEntry(const LogEntry * e);
*/ */
TransactionLog LogTransBegin(int xid); TransactionLog LogTransBegin(int xid);
/**
Write a transaction PREPARE to the log tail. Blocks until the
prepare record is stable.
@return the lsn of the prepare log entry
*/
lsn_t LogTransPrepare(TransactionLog * l);
/** /**
Write a transaction COMMIT to the log tail. Blocks until the commit Write a transaction COMMIT to the log tail. Blocks until the commit
record is stable. record is stable.
@return The lsn of the commit log entry. @return the lsn of the commit log entry.
*/ */
lsn_t LogTransCommit(TransactionLog * l); lsn_t LogTransCommit(TransactionLog * l);
/** /**
Write a transaction ABORT to the log tail. Write a transaction ABORT to the log tail. Does not force the log.
@return The lsn of the abort log entry. @return the lsn of the abort log entry.
*/ */
lsn_t LogTransAbort(TransactionLog * l); lsn_t LogTransAbort(TransactionLog * l);

View file

@ -90,7 +90,7 @@ extern recordid prepare_bogus_rec;
@param rec must be a valid record id. any valid recordid will do. This parameter will be removed eventually. @param rec must be a valid record id. any valid recordid will do. This parameter will be removed eventually.
*/ */
#define Tprepare(xid, rec) Tupdate(xid, rec, 0, OPERATION_PREPARE) //#define Tprepare(xid) Tupdate(xid, NULLRID, 0, OPERATION_PREPARE)
Operation getPrepare(); Operation getPrepare();

View file

@ -591,7 +591,7 @@ extern const recordid NULLRID;
*/ */
typedef struct { typedef struct {
int xid; int xid;
long LSN; lsn_t LSN;
} Transaction; } Transaction;
@ -679,10 +679,11 @@ int TuncleanShutdown();
* Revives Tprepare'ed transactions. * Revives Tprepare'ed transactions.
* *
* @param xid The xid that is to be revived. * @param xid The xid that is to be revived.
* @param lsn The lsn of that xid's most recent PREPARE entry in the log. * @param prevlsn The lsn of that xid's most recent PREPARE entry in the log.
* @param reclsn The lsn of the transaction's BEGIN record.
*/ */
void Trevive(int xid, long lsn); void Trevive(int xid, lsn_t prevlsn, lsn_t reclsn);
int Tprepare(int xid);
/** /**
* Used by the recovery process. * Used by the recovery process.
* *
@ -693,6 +694,13 @@ void Trevive(int xid, long lsn);
*/ */
void TsetXIDCount(int xid); void TsetXIDCount(int xid);
/**
List all active transactions.
@return an array of transaction ids.
*/
int* TlistActiveTransactions();
/** /**
* Checks to see if a transaction is still active. * Checks to see if a transaction is still active.
* *

View file

@ -239,7 +239,7 @@ START_TEST(operation_prepare) {
Tset(loser, a, &three); Tset(loser, a, &three);
Tset(prepared, b, &three); Tset(prepared, b, &three);
Tprepare(prepared, a); Tprepare(prepared); //, a);
Tset(prepared, b, &two); Tset(prepared, b, &two);
@ -292,7 +292,7 @@ START_TEST(operation_prepare) {
Tset(loser, a, &three); Tset(loser, a, &three);
Tset(prepared, b, &three); Tset(prepared, b, &three);
Tprepare(prepared, a); Tprepare(prepared); //, a);
Tset(prepared, b, &two); Tset(prepared, b, &two);