Recovery now supports the lock manager. (And vice-versa ;)

This commit is contained in:
Sears Russell 2005-02-10 21:56:32 +00:00
parent 8bf6ea29ff
commit 2ac1302062
5 changed files with 24 additions and 20 deletions

View file

@ -128,8 +128,8 @@ int lockManagerReadLockHashed(int xid, byte * dat, int datLen) {
ridLock->active--;
pthread_mutex_unlock(mut);
// printf("Deadlock!\n"); fflush(stdout);
// abort();
printf("Deadlock!\n"); fflush(stdout);
abort();
return LLADD_DEADLOCK;
}
} while(ridLock->writers);
@ -195,8 +195,8 @@ int lockManagerWriteLockHashed(int xid, byte * dat, int datLen) {
ridLock->waiting--;
ridLock->active--;
pthread_mutex_unlock(mut);
// printf("Deadlock!\n"); fflush(stdout);
// abort();
printf("Deadlock!\n"); fflush(stdout);
abort();
return LLADD_DEADLOCK;
}
}

View file

@ -103,12 +103,15 @@ int prepareGuard(LogEntry * e, void * state) {
}
/** @todo When fleshing out the logHandle's prepareAction interface, figure out what the return value should mean... */
int prepareAction(
void * state) {
int prepareAction(void * state) {
PrepareGuardState * pgs = state;
int ret;
if(!pgs->continueIterating) {
assert(pgs->prevLSN != -1);
Trevive(pgs->xid, pgs->prevLSN);
}
return 0;
ret = 1;
} else {
ret = 0;
}
return ret;
}

View file

@ -17,6 +17,7 @@
#include "logger/logHandle.h"
#include "logger/logWriter.h"
#include <lladd/bufferManager.h>
#include <lladd/lockManager.h>
/** @todo recovery2.c shouldn't include pageCache.h once refactoring is done. */
#include <lladd/pageCache.h>
@ -113,7 +114,7 @@ static void Analysis () {
Therefore, we can skip redoing any of its operations. (The
timestamps on each page guarantee that the redo phase will
not overwrite this transaction's work with stale data.)
1
The redo phase checks for a transaction's presence in
transactionLSN before redoing its actions. Therefore, if we
remove this transaction from the hash, it will not be redone.
@ -166,7 +167,9 @@ static void Redo() {
/* addPendingEvent(e->contents.clr.rid.page); */
}
redoUpdate(e);
}
} else if(e->type == XCOMMIT && globalLockManager.commit) {
globalLockManager.commit(e->xid);
} // if transaction aborted, wait until undo is complete before notifying the globalLockManager.
}
free(e);
}
@ -202,8 +205,9 @@ static void Undo(int recovery) {
/* printf("e->prev_offset: %ld\n", e->prevLSN);
printf("prev_offset: %ld\n", lh.prev_offset); */
int thisXid = -1;
while((e = previousInTransaction(&lh))) {
thisXid = e->xid;
lsn_t this_lsn, clr_lsn;
/* printf("."); fflush(NULL); */
switch(e->type) {
@ -259,8 +263,11 @@ static void Undo(int recovery) {
}
free(e);
}
prepareAction(prepare_guard_state);
int transactionWasPrepared = prepareAction(prepare_guard_state);
free(prepare_guard_state);
if(!transactionWasPrepared && globalLockManager.abort) {
globalLockManager.abort(thisXid);
}
/* printf("$"); fflush(NULL); */
}

View file

@ -93,8 +93,8 @@ int Tinit() {
initNestedTopActions();
ThashInit();
setupLockManagerCallbacksNil();
// setupLockManagerCallbacksPage();
//setupLockManagerCallbacksNil();
setupLockManagerCallbacksPage();
InitiateRecovery();

View file

@ -31,7 +31,6 @@ void * pageWorkerThread(void * j) {
if(rw) {
// readlock
if(LLADD_DEADLOCK == globalLockManager.readLockPage(xid, m)) {
// if(LLADD_DEADLOCK == lockManagerReadLockRecord(xid, rid)) {
k = 0;
globalLockManager.abort(xid);
deadlocks++;
@ -41,7 +40,6 @@ void * pageWorkerThread(void * j) {
} else {
// writelock
// if(LLADD_DEADLOCK == lockManagerWriteLockRecord(xid, rid)) {
if(LLADD_DEADLOCK == globalLockManager.writeLockPage(xid, m)) {
k = 0;
globalLockManager.abort(xid);
@ -54,7 +52,6 @@ void * pageWorkerThread(void * j) {
printf("%2d ", deadlocks); fflush(stdout);
// lockManagerCommit(xid);
globalLockManager.commit(xid);
return NULL;
@ -77,7 +74,6 @@ void * ridWorkerThread(void * j) {
if(rw) {
// readlock
if(LLADD_DEADLOCK == globalLockManager.readLockRecord(xid, rid)) {
// if(LLADD_DEADLOCK == lockManagerReadLockRecord(xid, rid)) {
k = 0;
globalLockManager.abort(xid);
deadlocks++;
@ -87,7 +83,6 @@ void * ridWorkerThread(void * j) {
} else {
// writelock
// if(LLADD_DEADLOCK == lockManagerWriteLockRecord(xid, rid)) {
if(LLADD_DEADLOCK == globalLockManager.writeLockRecord(xid, rid)) {
k = 0;
globalLockManager.abort(xid);
@ -100,7 +95,6 @@ void * ridWorkerThread(void * j) {
printf("%2d ", deadlocks); fflush(stdout);
// lockManagerCommit(xid);
globalLockManager.commit(xid);
return NULL;