Numerous lock manager fixes; more realistic test case for lock manager, added support for optional page-level locking.(Still need to report deadlocks back up to the caller.)

This commit is contained in:
Sears Russell 2005-02-10 03:51:09 +00:00
parent 6f022cc577
commit b8a67cbfb2
23 changed files with 405 additions and 167 deletions

View file

@ -100,7 +100,7 @@ typedef struct Page_s Page;
* @param pageid ID of the page you want to load * @param pageid ID of the page you want to load
* @return fully formed Page type * @return fully formed Page type
*/ */
Page * loadPage(int pageid); Page * loadPage(int xid, int pageid);
/** /**
loadPage aquires a lock when it is called, effectively pinning it loadPage aquires a lock when it is called, effectively pinning it

View file

@ -1,9 +1,28 @@
#include <lladd/transactional.h> #include <lladd/transactional.h>
typedef struct {
void (*init)();
int (*readLockPage) (int xid, int page);
int (*writeLockPage) (int xid, int page);
int (*unlockPage) (int xid, int page);
int (*readLockRecord) (int xid, recordid rid);
int (*writeLockRecord)(int xid, recordid rid);
int (*unlockRecord) (int xid, recordid rid);
int (*begin) (int xid);
int (*commit) (int xid);
int (*abort) (int xid);
} LockManagerSetup;
extern LockManagerSetup globalLockManager;
void lockManagerInit(); void lockManagerInit();
int lockManagerReadLockRecord(int xid, recordid rid); int lockManagerReadLockRecord(int xid, recordid rid);
int lockManagerWriteLockRecord(int xid, recordid rid); int lockManagerWriteLockRecord(int xid, recordid rid);
int lockManagerUnlockRecord(int xid, recordid rid); int lockManagerUnlockRecord(int xid, recordid rid);
int lockManagerReleaseAll(int xid); int lockManagerCommit(int xid);
void setupLockManagerCallbacksRecord();
void setupLockManagerCallbacksPage();
void setupLockManagerCallbacksNil();

View file

@ -51,7 +51,7 @@ terms specified in this license.
#include <assert.h> #include <assert.h>
#include <lladd/bufferManager.h> #include <lladd/bufferManager.h>
#include <lladd/lockManager.h>
#include "page.h" #include "page.h"
#include "blobManager.h" #include "blobManager.h"
#include <lladd/pageCache.h> #include <lladd/pageCache.h>
@ -133,9 +133,9 @@ void releasePage (Page * p) {
} }
int bufTransCommit(int xid, lsn_t lsn) { int bufTransCommit(int xid, lsn_t lsn) {
commitBlobs(xid); commitBlobs(xid);
pageCommit(xid); pageCommit(xid);
if(globalLockManager.commit) { globalLockManager.commit(xid);}
return 0; return 0;
} }
@ -144,6 +144,7 @@ int bufTransAbort(int xid, lsn_t lsn) {
abortBlobs(xid); /* abortBlobs doesn't write any log entries, so it doesn't need the lsn. */ abortBlobs(xid); /* abortBlobs doesn't write any log entries, so it doesn't need the lsn. */
pageAbort(xid); pageAbort(xid);
if(globalLockManager.abort) { globalLockManager.abort(xid);}
return 0; return 0;
} }
@ -151,6 +152,7 @@ int bufTransAbort(int xid, lsn_t lsn) {
Page * getPage(int pageid, int locktype) { Page * getPage(int pageid, int locktype) {
Page * ret; Page * ret;
int spin = 0; int spin = 0;
pthread_mutex_lock(&loadPagePtr_mutex); pthread_mutex_lock(&loadPagePtr_mutex);
ret = pblHtLookup(activePages, &pageid, sizeof(int)); ret = pblHtLookup(activePages, &pageid, sizeof(int));
@ -265,7 +267,9 @@ Page * getPage(int pageid, int locktype) {
return ret; return ret;
} }
Page *loadPage(int pageid) { Page *loadPage(int xid, int pageid) {
if(globalLockManager.readLockPage) { globalLockManager.readLockPage(xid, pageid); }
Page * ret = getPage(pageid, RO); Page * ret = getPage(pageid, RO);
return ret; return ret;
} }

View file

@ -7,7 +7,7 @@
#include <errno.h> #include <errno.h>
#include <stdio.h> #include <stdio.h>
#include <assert.h> #include <assert.h>
#include <string.h>
#include <lladd/hash.h> #include <lladd/hash.h>
@ -19,9 +19,9 @@
static pthread_mutex_t mutexes[MUTEX_COUNT]; static pthread_mutex_t mutexes[MUTEX_COUNT];
static pthread_mutex_t xid_table_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t xid_table_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t rid_table_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t * getMutex(recordid rid) { static pthread_mutex_t * getMutex(byte * dat, int datLen) {
return &mutexes[hash(&rid, sizeof(recordid), MUTEX_BITS, MUTEX_EXT)]; return &mutexes[hash(dat, datLen, MUTEX_BITS, MUTEX_EXT)];
} }
static pblHashTable_t * xidLockTable; static pblHashTable_t * xidLockTable;
@ -36,7 +36,7 @@ typedef struct {
int active; int active;
} lock; } lock;
void lockManagerInit() { void lockManagerInitHashed() {
int i = 0; int i = 0;
for(i = 0; i < MUTEX_COUNT; i++) { for(i = 0; i < MUTEX_COUNT; i++) {
pthread_mutex_init(&mutexes[i], NULL); pthread_mutex_init(&mutexes[i], NULL);
@ -45,16 +45,19 @@ void lockManagerInit() {
ridLockTable = pblHtCreate(); ridLockTable = pblHtCreate();
} }
/** @todo startTransaction needs a mutex!! */ pblHashTable_t * lockManagerBeginTransactionUnlocked(int xid) {
void startTransaction(int xid) {
pthread_mutex_lock(&xid_table_mutex);
pblHashTable_t * xidLocks = pblHtCreate(); pblHashTable_t * xidLocks = pblHtCreate();
pblHtInsert(xidLockTable, &xid, sizeof(int), xidLocks); pblHtInsert(xidLockTable, &xid, sizeof(int), xidLocks);
return xidLocks;
}
int lockManagerBeginTransaction(int xid) {
pthread_mutex_lock(&xid_table_mutex);
lockManagerBeginTransactionUnlocked(xid);
pthread_mutex_unlock(&xid_table_mutex); pthread_mutex_unlock(&xid_table_mutex);
return 0;
} }
lock* createLock(recordid rid) { lock* createLock(byte * dat, int datLen) {
lock * ret = malloc(sizeof(lock)); lock * ret = malloc(sizeof(lock));
if(!ret) { return NULL; } if(!ret) { return NULL; }
@ -62,43 +65,50 @@ lock* createLock(recordid rid) {
// pthread_mutex_init(&ret->mut, NULL); // pthread_mutex_init(&ret->mut, NULL);
pthread_cond_init(&ret->writeOK, NULL); pthread_cond_init(&ret->writeOK, NULL);
pthread_cond_init(&ret->readOK, NULL); pthread_cond_init(&ret->readOK, NULL);
ret->active = 0;
ret->readers = 0; ret->readers = 0;
ret->writers = 0; ret->writers = 0;
ret->waiting = 0; ret->waiting = 0;
pblHtInsert(ridLockTable, &rid, sizeof(recordid), ret); pblHtInsert(ridLockTable, dat, datLen, ret);
return ret; return ret;
} }
void destroyLock(recordid rid, lock * l) { void destroyLock(byte * dat, int datLen, lock * l) {
pthread_cond_destroy(&l->writeOK); pthread_cond_destroy(&l->writeOK);
pthread_cond_destroy(&l->readOK); pthread_cond_destroy(&l->readOK);
free (l); free (l);
pblHtRemove(ridLockTable, &rid, sizeof(recordid)); pblHtRemove(ridLockTable, dat, datLen);
} }
#define LM_READLOCK 1 #define LM_READLOCK 1
#define LM_WRITELOCK 2 #define LM_WRITELOCK 2
int lockManagerReadLockRecord(int xid, recordid rid) { int lockManagerReadLockHashed(int xid, byte * dat, int datLen) {
if(xid == -1) { return 0; }
pthread_mutex_lock(&xid_table_mutex); pthread_mutex_lock(&xid_table_mutex);
pblHashTable_t * xidLocks = pblHtLookup(xidLockTable, &xid, sizeof(int)); pblHashTable_t * xidLocks = pblHtLookup(xidLockTable, &xid, sizeof(int));
if((int)pblHtLookup(xidLocks, &rid, sizeof(recordid)) >= LM_READLOCK) { if(!xidLocks) {
xidLocks = lockManagerBeginTransactionUnlocked(xid);
}
int currentLockLevel = (int)pblHtLookup(xidLocks, dat, datLen);
// printf("xid %d read lock (%d)\n", xid, currentLockLevel);
if(currentLockLevel >= LM_READLOCK) {
pthread_mutex_unlock(&xid_table_mutex); pthread_mutex_unlock(&xid_table_mutex);
return 0; return 0;
} }
assert(!currentLockLevel);
pthread_mutex_unlock(&xid_table_mutex); pthread_mutex_unlock(&xid_table_mutex);
pthread_mutex_t * mut = getMutex(rid); pthread_mutex_t * mut = getMutex(dat, datLen);
pthread_mutex_lock(mut); pthread_mutex_lock(mut);
pthread_mutex_lock(&rid_table_mutex);
lock * ridLock = pblHtLookup(ridLockTable, &rid, sizeof(recordid)); lock * ridLock = pblHtLookup(ridLockTable, dat, datLen);
if(!ridLock) { if(!ridLock) {
ridLock = createLock(rid); ridLock = createLock(dat, datLen);
} }
pthread_mutex_unlock(&rid_table_mutex);
ridLock->active++; ridLock->active++;
if(ridLock->writers || ridLock->waiting) { if(ridLock->writers || ridLock->waiting) {
@ -117,22 +127,36 @@ int lockManagerReadLockRecord(int xid, recordid rid) {
if(wait_ret == ETIMEDOUT) { if(wait_ret == ETIMEDOUT) {
ridLock->active--; ridLock->active--;
pthread_mutex_unlock(mut); pthread_mutex_unlock(mut);
// printf("Deadlock!\n"); fflush(stdout);
// abort();
return LLADD_DEADLOCK; return LLADD_DEADLOCK;
} }
} while(ridLock->writers); } while(ridLock->writers);
} }
ridLock->readers++; if(currentLockLevel < LM_READLOCK) {
ridLock->readers++;
pblHtRemove(xidLocks, dat, datLen);
pblHtInsert(xidLocks, dat, datLen, (void*)LM_READLOCK);
}
ridLock->active--; ridLock->active--;
pthread_mutex_unlock(mut); pthread_mutex_unlock(mut);
pblHtInsert(xidLocks, &rid, sizeof(recordid), (void*)LM_READLOCK);
return 0; return 0;
} }
int lockManagerWriteLockRecord(int xid, recordid rid) { int lockManagerWriteLockHashed(int xid, byte * dat, int datLen) {
if(xid == -1) { return 0; }
pthread_mutex_lock(&xid_table_mutex); pthread_mutex_lock(&xid_table_mutex);
pblHashTable_t * xidLocks = pblHtLookup(xidLockTable, &xid, sizeof(int)); pblHashTable_t * xidLocks = pblHtLookup(xidLockTable, &xid, sizeof(int));
if(!xidLocks) {
xidLocks = lockManagerBeginTransactionUnlocked(xid);
}
int currentLockLevel = (int)pblHtLookup(xidLocks, dat, datLen);
// printf("xid %d write lock (%d)\n", xid, currentLockLevel);
int currentLockLevel = (int)pblHtLookup(xidLocks, &rid, sizeof(recordid));
int me = 0; int me = 0;
pthread_mutex_unlock(&xid_table_mutex); pthread_mutex_unlock(&xid_table_mutex);
@ -142,13 +166,15 @@ int lockManagerWriteLockRecord(int xid, recordid rid) {
me = 1; me = 1;
} }
pthread_mutex_t * mut = getMutex(rid); pthread_mutex_t * mut = getMutex(dat, datLen);
pthread_mutex_lock(mut); pthread_mutex_lock(mut);
lock * ridLock = pblHtLookup(ridLockTable, &rid, sizeof(recordid)); pthread_mutex_lock(&rid_table_mutex);
lock * ridLock = pblHtLookup(ridLockTable, dat, datLen);
if(!ridLock) { if(!ridLock) {
ridLock = createLock(rid); ridLock = createLock(dat, datLen);
} }
pthread_mutex_unlock(&rid_table_mutex);
ridLock->active++; ridLock->active++;
ridLock->waiting++; ridLock->waiting++;
@ -169,6 +195,8 @@ int lockManagerWriteLockRecord(int xid, recordid rid) {
ridLock->waiting--; ridLock->waiting--;
ridLock->active--; ridLock->active--;
pthread_mutex_unlock(mut); pthread_mutex_unlock(mut);
// printf("Deadlock!\n"); fflush(stdout);
// abort();
return LLADD_DEADLOCK; return LLADD_DEADLOCK;
} }
} }
@ -176,33 +204,48 @@ int lockManagerWriteLockRecord(int xid, recordid rid) {
ridLock->waiting--; ridLock->waiting--;
if(currentLockLevel == 0) { if(currentLockLevel == 0) {
ridLock->readers++; ridLock->readers++;
ridLock->writers++;
} else if (currentLockLevel == LM_READLOCK) {
ridLock->writers++;
pblHtRemove(xidLocks, dat, datLen);
} }
ridLock->writers++; if(currentLockLevel != LM_WRITELOCK) {
pblHtInsert(xidLocks, dat, datLen, (void*)LM_WRITELOCK);
}
ridLock->active--; ridLock->active--;
pthread_mutex_unlock(mut); pthread_mutex_unlock(mut);
pblHtInsert(xidLocks, &rid, sizeof(recordid), (void*)LM_WRITELOCK);
return 0; return 0;
} }
int lockManagerUnlockRecord(int xid, recordid rid) { int lockManagerUnlockHashed(int xid, byte * dat, int datLen) {
if(xid == -1) { return 0; }
// printf("xid %d unlock\n", xid);
pthread_mutex_lock(&xid_table_mutex); pthread_mutex_lock(&xid_table_mutex);
pblHashTable_t * xidLocks = pblHtLookup(xidLockTable, &xid, sizeof(int)); pblHashTable_t * xidLocks = pblHtLookup(xidLockTable, &xid, sizeof(int));
if(!xidLocks) {
xidLocks = lockManagerBeginTransactionUnlocked(xid);
}
int currentLevel = (int)pblHtLookup(xidLocks, &rid, sizeof(recordid)); int currentLevel = (int)pblHtLookup(xidLocks, dat, datLen);
if(currentLevel) { if(currentLevel) {
pblHtRemove(xidLocks, &rid, sizeof(recordid)); pblHtRemove(xidLocks, dat, datLen);
} }
pthread_mutex_unlock(&xid_table_mutex); pthread_mutex_unlock(&xid_table_mutex);
pthread_mutex_t * mut = getMutex(rid); pthread_mutex_t * mut = getMutex(dat, datLen);
pthread_mutex_lock(mut); pthread_mutex_lock(mut);
lock * ridLock = pblHtLookup(ridLockTable, &rid, sizeof(recordid)); pthread_mutex_lock(&rid_table_mutex);
lock * ridLock = pblHtLookup(ridLockTable, dat, datLen);
assert(ridLock); assert(ridLock);
ridLock->active++; ridLock->active++;
pthread_mutex_unlock(&rid_table_mutex);
if(currentLevel == LM_WRITELOCK) { if(currentLevel == LM_WRITELOCK) {
ridLock->writers--; ridLock->writers--;
ridLock->readers--; ridLock->readers--;
@ -220,7 +263,10 @@ int lockManagerUnlockRecord(int xid, recordid rid) {
ridLock->active--; ridLock->active--;
if(!(ridLock->active || ridLock->waiting || ridLock->readers || ridLock->writers)) { if(!(ridLock->active || ridLock->waiting || ridLock->readers || ridLock->writers)) {
destroyLock(rid, ridLock); // printf("destroyed lock");
destroyLock(dat, datLen, ridLock);
} else {
// printf("(%d %d %d %d)", ridLock->active, ridLock->waiting, ridLock->readers, ridLock->writers);
} }
pthread_mutex_unlock(mut); pthread_mutex_unlock(mut);
@ -228,24 +274,102 @@ int lockManagerUnlockRecord(int xid, recordid rid) {
return 0; return 0;
} }
int lockManagerReleaseAll(int xid) { int lockManagerCommitHashed(int xid, int datLen) {
if(xid == -1) { return 0; }
pthread_mutex_lock(&xid_table_mutex); pthread_mutex_lock(&xid_table_mutex);
pblHashTable_t * xidLocks = pblHtLookup(xidLockTable, &xid, sizeof(int)); pblHashTable_t * xidLocks = pblHtLookup(xidLockTable, &xid, sizeof(int));
if(!xidLocks) {
xidLocks = lockManagerBeginTransactionUnlocked(xid);
}
pthread_mutex_unlock(&xid_table_mutex); pthread_mutex_unlock(&xid_table_mutex);
void * data; void * data;
int ret = 0; int ret = 0;
byte * dat = malloc(datLen);
for(data = pblHtFirst(xidLocks); data; data = pblHtNext(xidLocks)) { for(data = pblHtFirst(xidLocks); data; data = pblHtNext(xidLocks)) {
recordid rid = *(recordid*)pblHtCurrentKey(xidLocks); memcpy(dat, pblHtCurrentKey(xidLocks), datLen);
int tmpret = lockManagerUnlockRecord(xid, rid); int tmpret = lockManagerUnlockHashed(xid, dat, datLen);
// Pass any error(s) up to the user. // Pass any error(s) up to the user.
// (This logic relies on the fact that currently it only returns 0 and LLADD_INTERNAL_ERROR) // (This logic relies on the fact that currently it only returns 0 and LLADD_INTERNAL_ERROR)
if(tmpret) { if(tmpret) {
ret = tmpret; ret = tmpret;
} }
pblHtRemove(xidLocks, &rid, sizeof(recordid)); pblHtRemove(xidLocks, dat, datLen);
} }
free(dat);
return ret; return ret;
} }
int lockManagerReadLockRecord(int xid, recordid rid) {
return lockManagerReadLockHashed(xid, (byte*)&rid, sizeof(recordid));
}
int lockManagerWriteLockRecord(int xid, recordid rid) {
return lockManagerWriteLockHashed(xid, (byte*)&rid, sizeof(recordid));
}
int lockManagerUnlockRecord(int xid, recordid rid) {
return lockManagerUnlockHashed(xid, (byte*)&rid, sizeof(recordid));
}
int lockManagerCommitRecords(int xid) {
return lockManagerCommitHashed(xid, sizeof(recordid));
}
int lockManagerReadLockPage(int xid, int p) {
return lockManagerReadLockHashed(xid, (byte*)&p, sizeof(int));
}
int lockManagerWriteLockPage(int xid, int p) {
return lockManagerWriteLockHashed(xid, (byte*)&p, sizeof(int));
}
int lockManagerUnlockPage(int xid, int p) {
return lockManagerUnlockHashed(xid, (byte*)&p, sizeof(int));
}
int lockManagerCommitPages(int xid) {
return lockManagerCommitHashed(xid, sizeof(int));
}
LockManagerSetup globalLockManager;
void setupLockManagerCallbacksPage() {
globalLockManager.init = &lockManagerInitHashed;
globalLockManager.readLockPage = &lockManagerReadLockPage;
globalLockManager.writeLockPage = &lockManagerWriteLockPage;
globalLockManager.unlockPage = &lockManagerUnlockPage;
globalLockManager.readLockRecord = NULL;
globalLockManager.writeLockRecord = NULL;
globalLockManager.unlockRecord = NULL;
globalLockManager.commit = &lockManagerCommitPages;
globalLockManager.abort = &lockManagerCommitPages;
globalLockManager.begin = &lockManagerBeginTransaction;
globalLockManager.init();
}
void setupLockManagerCallbacksRecord () {
globalLockManager.init = &lockManagerInitHashed;
globalLockManager.readLockPage = NULL;
globalLockManager.writeLockPage = NULL;
globalLockManager.unlockPage = NULL;
globalLockManager.readLockRecord = &lockManagerReadLockRecord;
globalLockManager.writeLockRecord = &lockManagerWriteLockRecord;
globalLockManager.unlockRecord = &lockManagerUnlockRecord;
globalLockManager.commit = &lockManagerCommitRecords;
globalLockManager.abort = &lockManagerCommitRecords;
globalLockManager.begin = &lockManagerBeginTransaction;
globalLockManager.init();
}
void setupLockManagerCallbacksNil () {
globalLockManager.init = NULL;
globalLockManager.readLockPage = NULL;
globalLockManager.writeLockPage = NULL;
globalLockManager.unlockPage = NULL;
globalLockManager.readLockRecord = NULL;
globalLockManager.writeLockRecord = NULL;
globalLockManager.unlockRecord = NULL;
globalLockManager.commit = NULL;
globalLockManager.abort = NULL;
globalLockManager.begin = NULL;
}

View file

@ -65,7 +65,7 @@ void redoUpdate(const LogEntry * e) {
if(e->type == UPDATELOG) { if(e->type == UPDATELOG) {
/* lsn_t pageLSN = readLSN(e->contents.update.rid.page); */ /* lsn_t pageLSN = readLSN(e->contents.update.rid.page); */
recordid rid = e->contents.update.rid; recordid rid = e->contents.update.rid;
Page * p = loadPage(rid.page); Page * p = loadPage(e->xid, rid.page);
lsn_t pageLSN = pageReadLSN(p); lsn_t pageLSN = pageReadLSN(p);
if(e->LSN > pageLSN) { if(e->LSN > pageLSN) {
@ -79,7 +79,7 @@ void redoUpdate(const LogEntry * e) {
} else if(e->type == CLRLOG) { } else if(e->type == CLRLOG) {
LogEntry * f = readLSNEntry(e->contents.clr.thisUpdateLSN); LogEntry * f = readLSNEntry(e->contents.clr.thisUpdateLSN);
recordid rid = f->contents.update.rid; recordid rid = f->contents.update.rid;
Page * p = loadPage(rid.page); Page * p = loadPage(e->xid, rid.page);
assert(rid.page == e->contents.update.rid.page); /* @todo Should this always hold? */ assert(rid.page == e->contents.update.rid.page); /* @todo Should this always hold? */
@ -127,7 +127,7 @@ void undoUpdate(const LogEntry * e, Page * p, lsn_t clr_lsn) {
assert(p); assert(p);
DEBUG("OPERATION %d Whole page physical undo, %ld {%d}\n", undo, e->LSN, e->contents.update.rid.page); DEBUG("OPERATION %d Whole page physical undo, %ld {%d}\n", undo, e->LSN, e->contents.update.rid.page);
memcpy(p->memAddr, getUpdatePreImage(e), PAGE_SIZE); memcpy(p->memAddr, getUpdatePreImage(e), PAGE_SIZE);
pageWriteLSN(p, clr_lsn); pageWriteLSN(e->xid, p, clr_lsn);
} else { } else {
/* @see doUpdate() */ /* @see doUpdate() */

View file

@ -44,7 +44,7 @@ static int operate(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat)
if(rid.size >= BLOB_THRESHOLD_SIZE && rid.size != BLOB_SLOT) { if(rid.size >= BLOB_THRESHOLD_SIZE && rid.size != BLOB_SLOT) {
allocBlob(xid, p, lsn, rid); allocBlob(xid, p, lsn, rid);
} else { } else {
slottedPostRalloc(p, lsn, rid); slottedPostRalloc(xid, p, lsn, rid);
} }
return 0; return 0;
@ -53,7 +53,7 @@ static int operate(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat)
/** @todo Currently, we leak empty pages on dealloc. */ /** @todo Currently, we leak empty pages on dealloc. */
static int deoperate(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) { static int deoperate(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) {
assert(rid.page == p->id); assert(rid.page == p->id);
slottedDeRalloc(p, lsn, rid); slottedDeRalloc(xid, p, lsn, rid);
return 0; return 0;
} }
@ -64,7 +64,7 @@ static int reoperate(int xid, Page *p, lsn_t lsn, recordid rid, const void * dat
rid.size = sizeof(blob_record_t); rid.size = sizeof(blob_record_t);
} }
slottedPostRalloc(p, lsn, rid); slottedPostRalloc(xid, p, lsn, rid);
/** @todo dat should be the pointer to the space in the blob store. */ /** @todo dat should be the pointer to the space in the blob store. */
writeRecord(xid, p, lsn, rid, dat); writeRecord(xid, p, lsn, rid, dat);
@ -162,7 +162,7 @@ recordid TallocFromPage(int xid, long page, long size) {
void Tdealloc(int xid, recordid rid) { void Tdealloc(int xid, recordid rid) {
void * preimage = malloc(rid.size); void * preimage = malloc(rid.size);
Page * p = loadPage(rid.page); Page * p = loadPage(xid, rid.page);
readRecord(xid, p, rid, preimage); readRecord(xid, p, rid, preimage);
/** @todo race in Tdealloc; do we care, or is this something that the log manager should cope with? */ /** @todo race in Tdealloc; do we care, or is this something that the log manager should cope with? */
Tupdate(xid, rid, preimage, OPERATION_DEALLOC); Tupdate(xid, rid, preimage, OPERATION_DEALLOC);
@ -171,7 +171,7 @@ void Tdealloc(int xid, recordid rid) {
} }
int TrecordType(int xid, recordid rid) { int TrecordType(int xid, recordid rid) {
Page * p = loadPage(rid.page); Page * p = loadPage(xid, rid.page);
int ret = getRecordType(xid, p, rid); int ret = getRecordType(xid, p, rid);
releasePage(p); releasePage(p);
return ret; return ret;
@ -179,14 +179,14 @@ int TrecordType(int xid, recordid rid) {
int TrecordSize(int xid, recordid rid) { int TrecordSize(int xid, recordid rid) {
int ret; int ret;
Page * p = loadPage(rid.page); Page * p = loadPage(xid, rid.page);
ret = getRecordSize(xid, p, rid); ret = getRecordSize(xid, p, rid);
releasePage(p); releasePage(p);
return ret; return ret;
} }
int TrecordsInPage(int xid, int pageid) { int TrecordsInPage(int xid, int pageid) {
Page * p = loadPage(pageid); Page * p = loadPage(xid, pageid);
readlock(p->rwlatch, 187); readlock(p->rwlatch, 187);
int ret = *numslots_ptr(p); int ret = *numslots_ptr(p);
unlock(p->rwlatch); unlock(p->rwlatch);

View file

@ -96,7 +96,7 @@ static int operateAlloc(int xid, Page * p, lsn_t lsn, recordid rid, const void *
*page_type_ptr(p) = ARRAY_LIST_PAGE; *page_type_ptr(p) = ARRAY_LIST_PAGE;
pageWriteLSN(p, lsn); pageWriteLSN(xid, p, lsn);
recordid ret; recordid ret;
ret.page = firstPage; ret.page = firstPage;
@ -118,7 +118,7 @@ Operation getArrayListAlloc() {
/*----------------------------------------------------------------------------*/ /*----------------------------------------------------------------------------*/
int TarrayListExtend(int xid, recordid rid, int slots) { int TarrayListExtend(int xid, recordid rid, int slots) {
Page * p = loadPage(rid.page); Page * p = loadPage(xid, rid.page);
TarrayListParameters tlp = pageToTLP(p); TarrayListParameters tlp = pageToTLP(p);
int lastCurrentBlock; int lastCurrentBlock;
@ -181,7 +181,7 @@ int TarrayListExtend(int xid, recordid rid, int slots) {
} }
/** @todo: TarrayListInstantExtend, is a hacked-up cut and paste version of TarrayListExtend */ /** @todo: TarrayListInstantExtend, is a hacked-up cut and paste version of TarrayListExtend */
int TarrayListInstantExtend(int xid, recordid rid, int slots) { int TarrayListInstantExtend(int xid, recordid rid, int slots) {
Page * p = loadPage(rid.page); Page * p = loadPage(xid, rid.page);
TarrayListParameters tlp = pageToTLP(p); TarrayListParameters tlp = pageToTLP(p);
int lastCurrentBlock; int lastCurrentBlock;
@ -249,13 +249,13 @@ static int operateInitFixed(int xid, Page * p, lsn_t lsn, recordid rid, const vo
fixedPageInitialize(p, rid.size, recordsPerPage(rid.size)); fixedPageInitialize(p, rid.size, recordsPerPage(rid.size));
pageWriteLSN(p, lsn); pageWriteLSN(xid, p, lsn);
return 0; return 0;
} }
static int operateUnInitPage(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) { static int operateUnInitPage(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) {
*page_type_ptr(p) = UNINITIALIZED_PAGE; *page_type_ptr(p) = UNINITIALIZED_PAGE;
pageWriteLSN(p, lsn); pageWriteLSN(xid, p, lsn);
return 0; return 0;
} }

View file

@ -82,7 +82,7 @@ static int operateUndoDelete(int xid, Page * p, lsn_t lsn, recordid rid, const v
argBytes + keySize, valSize); argBytes + keySize, valSize);
return 0; return 0;
} }
static int noop (int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) { pageWriteLSN(p, lsn); return 0; } static int noop (int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) { pageWriteLSN(xid, p, lsn); return 0; }
Operation getLinearInsert() { Operation getLinearInsert() {
Operation o = { Operation o = {
@ -316,7 +316,8 @@ void instant_update_hash_header(int xid, recordid hash, int i, int next_split) {
static void recover_split(int xid, recordid hashRid, int i, int next_split, int keySize, int valSize) { static void recover_split(int xid, recordid hashRid, int i, int next_split, int keySize, int valSize) {
// This function would be simple, except for the ridiculous mount // This function would be simple, except for the ridiculous mount
// of state that it must maintain. See above for a description of what it does. // of state that it must maintain. See above for a description of what it does.
recordid * headerRidB; recordid headerRidBstack;
recordid * headerRidB = &headerRidBstack;
hashRid.slot = 1; hashRid.slot = 1;
Tread(xid, hashRid, headerRidB); Tread(xid, hashRid, headerRidB);
hashRid.slot = 0; hashRid.slot = 0;

View file

@ -52,7 +52,7 @@ static int operate(int xid, Page *p, lsn_t lsn, recordid rid, const void *dat)
/* If p is null, then this is a logical no-op that spans pages, so do nothing. /* If p is null, then this is a logical no-op that spans pages, so do nothing.
Otherwise, write the LSN to the appropriate page (to keep recovery happy) Otherwise, write the LSN to the appropriate page (to keep recovery happy)
and return */ and return */
if(p) pageWriteLSN(p, lsn); if(p) pageWriteLSN(xid, p, lsn);
return 0; return 0;
} }

View file

@ -34,7 +34,7 @@ int __pageDealloc(int xid, Page * p, lsn_t lsn, recordid r, const void * d) {
*/ */
int __pageSet(int xid, Page * p, lsn_t lsn, recordid r, const void * d) { int __pageSet(int xid, Page * p, lsn_t lsn, recordid r, const void * d) {
memcpy(p->memAddr, d, PAGE_SIZE); memcpy(p->memAddr, d, PAGE_SIZE);
pageWriteLSN(p, lsn); pageWriteLSN(xid, p, lsn);
return 0; return 0;
} }
@ -50,7 +50,7 @@ int __update_freepage(int xid, Page * p, lsn_t lsn, recordid r, const void * d)
fflush(NULL); */ fflush(NULL); */
* headerFreepage_ptr(p) = t->after; * headerFreepage_ptr(p) = t->after;
freepage = t->after; freepage = t->after;
pageWriteLSN(p, lsn); pageWriteLSN(xid, p, lsn);
return 0; return 0;
} }
@ -64,7 +64,7 @@ int __update_freespace_inverse(int xid, Page * p, lsn_t lsn, recordid r, const v
* headerFreepage_ptr(p) = t->before; * headerFreepage_ptr(p) = t->before;
freepage = t->before; freepage = t->before;
#endif #endif
pageWriteLSN(p, lsn); pageWriteLSN(xid, p, lsn);
return 0; return 0;
} }
#ifdef REUSE_PAGES #ifdef REUSE_PAGES
@ -102,18 +102,18 @@ int __free_page(int xid, Page * p, lsn_t lsn, recordid r, const void * d) {
memset(p->memAddr, 0, PAGE_SIZE); memset(p->memAddr, 0, PAGE_SIZE);
*page_type_ptr(p) = LLADD_FREE_PAGE; *page_type_ptr(p) = LLADD_FREE_PAGE;
*nextfreepage_ptr(p) = *successor; *nextfreepage_ptr(p) = *successor;
pageWriteLSN(p, lsn); pageWriteLSN(xid, p, lsn);
return 0; return 0;
} }
int __alloc_freed(int xid, Page * p, lsn_t lsn, recordid r, const void * d) { int __alloc_freed(int xid, Page * p, lsn_t lsn, recordid r, const void * d) {
memset(p->memAddr, 0, PAGE_SIZE); memset(p->memAddr, 0, PAGE_SIZE);
pageWriteLSN(p, lsn); pageWriteLSN(xid, p, lsn);
return 0; return 0;
} }
int TpageGet(int xid, int pageid, byte *memAddr) { int TpageGet(int xid, int pageid, byte *memAddr) {
Page * q = loadPage(pageid); Page * q = loadPage(xid, pageid);
memcpy(memAddr, q->memAddr, PAGE_SIZE); memcpy(memAddr, q->memAddr, PAGE_SIZE);
releasePage(q); releasePage(q);
return 0; return 0;
@ -139,7 +139,8 @@ void pageOperationsInit() {
assert(!posix_memalign((void **)&(p.memAddr), PAGE_SIZE, PAGE_SIZE)); assert(!posix_memalign((void **)&(p.memAddr), PAGE_SIZE, PAGE_SIZE));
p.id = 0;*/ p.id = 0;*/
Page * p = loadPage(0); Page * p = loadPage(-1, 0);
/** Release lock on page zero. */
// pageRead(&p); // pageRead(&p);

View file

@ -106,7 +106,7 @@ void TsetRange(int xid, recordid rid, int offset, int length, const void * dat)
// Copy new value into log structure // Copy new value into log structure
memcpy(range + 1, dat, length); memcpy(range + 1, dat, length);
Page * p = loadPage(rid.page); Page * p = loadPage(xid, rid.page);
// No further locking is necessary here; readRecord protects the // No further locking is necessary here; readRecord protects the
// page layout, but attempts at concurrent modification have undefined // page layout, but attempts at concurrent modification have undefined
// results. (See page.c) // results. (See page.c)

View file

@ -83,6 +83,7 @@ terms specified in this license.
#include <lladd/constants.h> #include <lladd/constants.h>
#include <assert.h> #include <assert.h>
#include "blobManager.h" #include "blobManager.h"
#include <lladd/lockManager.h>
#include "pageFile.h" #include "pageFile.h"
#include "page/slotted.h" #include "page/slotted.h"
@ -102,9 +103,11 @@ static pthread_mutex_t pageMallocMutex;
/** We need one dummy page for locking purposes, so this array has one extra page in it. */ /** We need one dummy page for locking purposes, so this array has one extra page in it. */
Page pool[MAX_BUFFER_SIZE+1]; Page pool[MAX_BUFFER_SIZE+1];
void pageWriteLSN(Page * page, lsn_t lsn) { void pageWriteLSN(int xid, Page * page, lsn_t lsn) {
/* unlocked since we're only called by a function that holds the writelock. */ /* unlocked since we're only called by a function that holds the writelock. */
/* *(long *)(page->memAddr + START_OF_LSN) = page->LSN; */ /* *(long *)(page->memAddr + START_OF_LSN) = page->LSN; */
if(globalLockManager.writeLockPage) { globalLockManager.writeLockPage(xid, page->id); }
if(page->LSN < lsn) { if(page->LSN < lsn) {
page->LSN = lsn; page->LSN = lsn;
*lsn_ptr(page) = page->LSN; *lsn_ptr(page) = page->LSN;
@ -263,7 +266,7 @@ void writeRecord(int xid, Page * p, lsn_t lsn, recordid rid, const void *dat) {
assert( (p->id == rid.page) && (p->memAddr != NULL) ); assert( (p->id == rid.page) && (p->memAddr != NULL) );
writelock(p->rwlatch, 225); /* Need a writelock so that we can update the lsn. */ writelock(p->rwlatch, 225); /* Need a writelock so that we can update the lsn. */
pageWriteLSN(p, lsn); pageWriteLSN(xid, p, lsn);
unlock(p->rwlatch); unlock(p->rwlatch);
} }
@ -378,7 +381,7 @@ void writeRecordUnlocked(int xid, Page * p, lsn_t lsn, recordid rid, const void
assert( (p->id == rid.page) && (p->memAddr != NULL) ); assert( (p->id == rid.page) && (p->memAddr != NULL) );
writelock(p->rwlatch, 225); /* Need a writelock so that we can update the lsn. */ writelock(p->rwlatch, 225); /* Need a writelock so that we can update the lsn. */
pageWriteLSN(p, lsn); pageWriteLSN(xid, p, lsn);
unlock(p->rwlatch); unlock(p->rwlatch);
} }

View file

@ -221,7 +221,7 @@ void pageDeInit();
* @param page You must have a writelock on page before calling this function. * @param page You must have a writelock on page before calling this function.
* @param lsn The new lsn of the page. If the new lsn is less than the page's current lsn, then the page's lsn will not be changed. * @param lsn The new lsn of the page. If the new lsn is less than the page's current lsn, then the page's lsn will not be changed.
*/ */
void pageWriteLSN(Page * page, lsn_t lsn); void pageWriteLSN(int xid, Page * page, lsn_t lsn);
/** /**
* assumes that the page is already loaded in memory. It takes * assumes that the page is already loaded in memory. It takes

View file

@ -14,8 +14,8 @@ void indirectInitialize(Page * p, int height) {
memset(p->memAddr, INVALID_SLOT, ((int)level_ptr(p)) - ((int)p->memAddr)); memset(p->memAddr, INVALID_SLOT, ((int)level_ptr(p)) - ((int)p->memAddr));
} }
/** @todo locking for dereferenceRID? */ /** @todo locking for dereferenceRID? */
recordid dereferenceRID(recordid rid) { recordid dereferenceRID(int xid, recordid rid) {
Page * this = loadPage(rid.page); Page * this = loadPage(xid, rid.page);
int offset = 0; int offset = 0;
int max_slot; int max_slot;
while(*page_type_ptr(this) == INDIRECT_PAGE) { while(*page_type_ptr(this) == INDIRECT_PAGE) {
@ -32,7 +32,7 @@ recordid dereferenceRID(recordid rid) {
int nextPage = *page_ptr(this, i); int nextPage = *page_ptr(this, i);
releasePage(this); releasePage(this);
this = loadPage(nextPage); this = loadPage(xid, nextPage);
} }
rid.page = this->id; rid.page = this->id;
@ -168,8 +168,8 @@ recordid __rallocMany(int xid, int parentPage, int recordSize, int recordCount)
return rid; return rid;
} }
unsigned int indirectPageRecordCount(recordid rid) { unsigned int indirectPageRecordCount(int xid, recordid rid) {
Page * p = loadPage(rid.page); Page * p = loadPage(xid, rid.page);
int i = 0; int i = 0;
unsigned int ret; unsigned int ret;
if(*page_type_ptr(p) == INDIRECT_PAGE) { if(*page_type_ptr(p) == INDIRECT_PAGE) {

View file

@ -43,11 +43,11 @@ BEGIN_C_DECLS
Translates a recordid that points to an indirect block into the Translates a recordid that points to an indirect block into the
physical location of the record. physical location of the record.
*/ */
recordid dereferenceRID(recordid rid); recordid dereferenceRID(int xid, recordid rid);
#define dereferenceRIDUnlocked(x) dereferenceRID((x)) #define dereferenceRIDUnlocked(y, x) dereferenceRID((y), (x))
void indirectInitialize(Page * p, int height); void indirectInitialize(Page * p, int height);
recordid rallocMany(/*int parentPage, lsn_t lsn,*/int xid, int recordSize, int recordCount); recordid rallocMany(/*int parentPage, lsn_t lsn,*/int xid, int recordSize, int recordCount);
unsigned int indirectPageRecordCount(recordid rid); unsigned int indirectPageRecordCount(int xid, recordid rid);
END_C_DECLS END_C_DECLS

View file

@ -175,18 +175,18 @@ recordid slottedPreRalloc(int xid, long size, Page ** pp) {
if(lastFreepage == -1) { if(lastFreepage == -1) {
lastFreepage = TpageAlloc(xid); lastFreepage = TpageAlloc(xid);
*pp = loadPage(lastFreepage); *pp = loadPage(xid, lastFreepage);
assert(*page_type_ptr(*pp) == UNINITIALIZED_PAGE); assert(*page_type_ptr(*pp) == UNINITIALIZED_PAGE);
slottedPageInitialize(*pp); slottedPageInitialize(*pp);
} else { } else {
*pp = loadPage(lastFreepage); *pp = loadPage(xid, lastFreepage);
} }
if(slottedFreespace(*pp) < size ) { if(slottedFreespace(*pp) < size ) {
releasePage(*pp); releasePage(*pp);
lastFreepage = TpageAlloc(xid); lastFreepage = TpageAlloc(xid);
*pp = loadPage(lastFreepage); *pp = loadPage(xid, lastFreepage);
slottedPageInitialize(*pp); slottedPageInitialize(*pp);
} }
@ -209,7 +209,7 @@ recordid slottedPreRallocFromPage(int xid, long page, long size, Page **pp) {
size = sizeof(blob_record_t); size = sizeof(blob_record_t);
} }
*pp = loadPage(page); *pp = loadPage(xid, page);
if(slottedFreespace(*pp) < size) { if(slottedFreespace(*pp) < size) {
releasePage(*pp); releasePage(*pp);
@ -302,7 +302,7 @@ static void __really_do_ralloc(Page * page, recordid rid) {
} }
recordid slottedPostRalloc(Page * page, lsn_t lsn, recordid rid) { recordid slottedPostRalloc(int xid, Page * page, lsn_t lsn, recordid rid) {
writelock(page->rwlatch, 376); writelock(page->rwlatch, 376);
@ -348,14 +348,14 @@ recordid slottedPostRalloc(Page * page, lsn_t lsn, recordid rid) {
} }
pageWriteLSN(page, lsn); pageWriteLSN(xid, page, lsn);
writeunlock(page->rwlatch); writeunlock(page->rwlatch);
return rid; return rid;
} }
void slottedDeRalloc(Page * page, lsn_t lsn, recordid rid) { void slottedDeRalloc(int xid, Page * page, lsn_t lsn, recordid rid) {
readlock(page->rwlatch, 443); readlock(page->rwlatch, 443);
@ -364,7 +364,7 @@ void slottedDeRalloc(Page * page, lsn_t lsn, recordid rid) {
*freelist_ptr(page) = rid.slot; *freelist_ptr(page) = rid.slot;
/* *slot_length_ptr(page, rid.slot) = 0; */ /* *slot_length_ptr(page, rid.slot) = 0; */
pageWriteLSN(page, lsn); pageWriteLSN(xid, page, lsn);
unlock(page->rwlatch); unlock(page->rwlatch);
} }

View file

@ -113,13 +113,13 @@ recordid slottedPreRallocFromPage(int xid, long page, long size, Page**p);
* page for this record (though it is possible that we need to compact * page for this record (though it is possible that we need to compact
* the page) * the page)
*/ */
recordid slottedPostRalloc(Page * page, lsn_t lsn, recordid rid); recordid slottedPostRalloc(int xid, Page * page, lsn_t lsn, recordid rid);
/** /**
* Mark the space used by a record for reclaimation. * Mark the space used by a record for reclaimation.
* *
* @param rid the recordid to be freed. * @param rid the recordid to be freed.
*/ */
void slottedDeRalloc(Page * page, lsn_t lsn, recordid rid); void slottedDeRalloc(int xid, Page * page, lsn_t lsn, recordid rid);
void slottedPageInit(); void slottedPageInit();
void slottedPageDeinit(); void slottedPageDeinit();

View file

@ -6,6 +6,8 @@
#include <lladd/recovery.h> #include <lladd/recovery.h>
#include "logger/logWriter.h" #include "logger/logWriter.h"
#include <lladd/bufferManager.h> #include <lladd/bufferManager.h>
#include <lladd/lockManager.h>
#include "page.h" #include "page.h"
#include <lladd/logger/logger2.h> #include <lladd/logger/logger2.h>
@ -91,8 +93,10 @@ int Tinit() {
initNestedTopActions(); initNestedTopActions();
ThashInit(); ThashInit();
InitiateRecovery(); setupLockManagerCallbacksNil();
// setupLockManagerCallbacksPage();
InitiateRecovery();
return 0; return 0;
} }
@ -130,6 +134,8 @@ int Tbegin() {
XactionTable[index] = LogTransBegin(xidCount_tmp); XactionTable[index] = LogTransBegin(xidCount_tmp);
if(globalLockManager.begin) { globalLockManager.begin(XactionTable[index].xid); }
return XactionTable[index].xid; return XactionTable[index].xid;
} }
@ -142,12 +148,12 @@ void Tupdate(int xid, recordid rid, const void *dat, int op) {
pthread_mutex_unlock(&transactional_2_mutex); pthread_mutex_unlock(&transactional_2_mutex);
#endif #endif
p = loadPage(rid.page); p = loadPage(xid, rid.page);
if(*page_type_ptr(p) == INDIRECT_PAGE) { if(*page_type_ptr(p) == INDIRECT_PAGE) {
releasePage(p); releasePage(p);
rid = dereferenceRID(rid); rid = dereferenceRID(xid, rid);
p = loadPage(rid.page); p = loadPage(xid, rid.page);
/** @todo Kludge! Shouldn't special case operations in transactional2. */ /** @todo Kludge! Shouldn't special case operations in transactional2. */
} else if(*page_type_ptr(p) == ARRAY_LIST_PAGE && } else if(*page_type_ptr(p) == ARRAY_LIST_PAGE &&
op != OPERATION_LINEAR_INSERT && op != OPERATION_LINEAR_INSERT &&
@ -156,7 +162,7 @@ void Tupdate(int xid, recordid rid, const void *dat, int op) {
op != OPERATION_UNDO_LINEAR_DELETE ) { op != OPERATION_UNDO_LINEAR_DELETE ) {
rid = dereferenceArrayListRid(p, rid.slot); rid = dereferenceArrayListRid(p, rid.slot);
releasePage(p); releasePage(p);
p = loadPage(rid.page); p = loadPage(xid, rid.page);
} }
e = LogUpdate(&XactionTable[xid % MAX_TRANSACTIONS], p, rid, op, dat); e = LogUpdate(&XactionTable[xid % MAX_TRANSACTIONS], p, rid, op, dat);
@ -175,7 +181,7 @@ void Tupdate(int xid, recordid rid, const void *dat, int op) {
void alTupdate(int xid, recordid rid, const void *dat, int op) { void alTupdate(int xid, recordid rid, const void *dat, int op) {
LogEntry * e; LogEntry * e;
Page * p; Page * p;
p = loadPage(rid.page); p = loadPage(xid, rid.page);
/* if(*page_type_ptr(p) == INDIRECT_PAGE) { /* if(*page_type_ptr(p) == INDIRECT_PAGE) {
releasePage(p); releasePage(p);
@ -206,19 +212,19 @@ void alTupdate(int xid, recordid rid, const void *dat, int op) {
void TreadUnlocked(int xid, recordid rid, void * dat) { void TreadUnlocked(int xid, recordid rid, void * dat) {
Page * p = loadPage(rid.page); Page * p = loadPage(xid, rid.page);
int page_type = *page_type_ptr(p); int page_type = *page_type_ptr(p);
if(page_type == SLOTTED_PAGE || page_type == FIXED_PAGE || !page_type ) { if(page_type == SLOTTED_PAGE || page_type == FIXED_PAGE || !page_type ) {
} else if(page_type == INDIRECT_PAGE) { } else if(page_type == INDIRECT_PAGE) {
releasePage(p); releasePage(p);
rid = dereferenceRIDUnlocked(rid); rid = dereferenceRIDUnlocked(xid, rid);
p = loadPage(rid.page); p = loadPage(xid, rid.page);
} else if(page_type == ARRAY_LIST_PAGE) { } else if(page_type == ARRAY_LIST_PAGE) {
rid = dereferenceArrayListRidUnlocked(p, rid.slot); rid = dereferenceArrayListRidUnlocked(p, rid.slot);
releasePage(p); releasePage(p);
p = loadPage(rid.page); p = loadPage(xid, rid.page);
} else { } else {
abort(); abort();
@ -228,19 +234,19 @@ void TreadUnlocked(int xid, recordid rid, void * dat) {
} }
void Tread(int xid, recordid rid, void * dat) { void Tread(int xid, recordid rid, void * dat) {
Page * p = loadPage(rid.page); Page * p = loadPage(xid, rid.page);
int page_type = *page_type_ptr(p); int page_type = *page_type_ptr(p);
if(page_type == SLOTTED_PAGE || page_type == FIXED_PAGE || !page_type ) { if(page_type == SLOTTED_PAGE || page_type == FIXED_PAGE || !page_type ) {
} else if(page_type == INDIRECT_PAGE) { } else if(page_type == INDIRECT_PAGE) {
releasePage(p); releasePage(p);
rid = dereferenceRID(rid); rid = dereferenceRID(xid, rid);
p = loadPage(rid.page); p = loadPage(xid, rid.page);
} else if(page_type == ARRAY_LIST_PAGE) { } else if(page_type == ARRAY_LIST_PAGE) {
rid = dereferenceArrayListRid(p, rid.slot); rid = dereferenceArrayListRid(p, rid.slot);
releasePage(p); releasePage(p);
p = loadPage(rid.page); p = loadPage(xid, rid.page);
} else { } else {
abort(); abort();

View file

@ -35,10 +35,10 @@ void initializePages() {
rid.page = i; rid.page = i;
rid.slot = 0; rid.slot = 0;
rid.size = sizeof(int); rid.size = sizeof(int);
p = loadPage(rid.page); p = loadPage(-1, rid.page);
assert(p->id != -1); assert(p->id != -1);
slottedPostRalloc(p, 0, rid); slottedPostRalloc(-1, p, 0, rid);
writeRecord(1, p, 1, rid, &i); writeRecord(1, p, 1, rid, &i);
@ -66,7 +66,7 @@ void * workerThread(void * p) {
rid.slot = 0; rid.slot = 0;
rid.size = sizeof(int); rid.size = sizeof(int);
p = loadPage(rid.page); p = loadPage(-1, rid.page);
readRecord(1, p, rid, &j); readRecord(1, p, rid, &j);
@ -89,7 +89,7 @@ void * workerThreadWriting(void * q) {
Page * tmp; Page * tmp;
pthread_mutex_lock(&ralloc_mutex); pthread_mutex_lock(&ralloc_mutex);
rids[i] = slottedPreRalloc(1, sizeof(int), &tmp); rids[i] = slottedPreRalloc(1, sizeof(int), &tmp);
slottedPostRalloc(tmp, 1, rids[i]); slottedPostRalloc(-1, tmp, 1, rids[i]);
releasePage(tmp); releasePage(tmp);
pthread_mutex_unlock(&ralloc_mutex); pthread_mutex_unlock(&ralloc_mutex);
@ -106,7 +106,7 @@ void * workerThreadWriting(void * q) {
for(int i = 0; i < RECORDS_PER_THREAD; i++) { for(int i = 0; i < RECORDS_PER_THREAD; i++) {
int val = (i * 10000) + offset; int val = (i * 10000) + offset;
int k; int k;
Page * p = loadPage(rids[i].page); Page * p = loadPage(-1, rids[i].page);
assert(p->id == rids[i].page); assert(p->id == rids[i].page);
@ -135,7 +135,7 @@ void * workerThreadWriting(void * q) {
Page * p; Page * p;
p = loadPage(rids[i].page); p = loadPage(-1, rids[i].page);
readRecord(1, p, rids[i], &val); readRecord(1, p, rids[i], &val);

View file

@ -101,7 +101,7 @@ START_TEST(indirectAlloc) {
fail_unless(rid.slot == RECORD_ARRAY, NULL); fail_unless(rid.slot == RECORD_ARRAY, NULL);
fail_unless(rid.size == 1, NULL); fail_unless(rid.size == 1, NULL);
Page * p = loadPage(page); Page * p = loadPage(xid, page);
int page_type = *page_type_ptr(p); int page_type = *page_type_ptr(p);
@ -122,7 +122,7 @@ START_TEST(indirectAlloc) {
fail_unless(rid.slot == RECORD_ARRAY, NULL); fail_unless(rid.slot == RECORD_ARRAY, NULL);
fail_unless(rid.size == 2000, NULL); fail_unless(rid.size == 2000, NULL);
p = loadPage(page); p = loadPage(xid, page);
page_type = *page_type_ptr(p); page_type = *page_type_ptr(p);
@ -146,7 +146,7 @@ START_TEST(indirectAlloc) {
fail_unless(rid.slot == RECORD_ARRAY, NULL); fail_unless(rid.slot == RECORD_ARRAY, NULL);
fail_unless(rid.size == 2, NULL); fail_unless(rid.size == 2, NULL);
p = loadPage(page); p = loadPage(xid, page);
page_type = *page_type_ptr(p); page_type = *page_type_ptr(p);
@ -177,7 +177,7 @@ START_TEST(indirectAccessDirect) {
page = rid.page; page = rid.page;
/* Make sure that it didn't create any indirect pages. */ /* Make sure that it didn't create any indirect pages. */
Page * p = loadPage(page); Page * p = loadPage(xid, page);
int page_type = *page_type_ptr(p); int page_type = *page_type_ptr(p);
@ -193,7 +193,7 @@ START_TEST(indirectAccessDirect) {
for(int i = 0; i < 500; i++) { for(int i = 0; i < 500; i++) {
rid.slot = i; rid.slot = i;
Tset(xid, dereferenceRID(rid), &i); Tset(xid, dereferenceRID(xid, rid), &i);
} }
Tcommit(xid); Tcommit(xid);
@ -202,7 +202,7 @@ START_TEST(indirectAccessDirect) {
for(int i = 0; i < 500; i++) { for(int i = 0; i < 500; i++) {
rid.slot = i; rid.slot = i;
int j; int j;
Tread(xid, dereferenceRID(rid), &j); Tread(xid, dereferenceRID(xid, rid), &j);
assert(j == i); assert(j == i);
} }
@ -222,7 +222,7 @@ START_TEST(indirectAccessIndirect) {
page = rid.page; page = rid.page;
/* Make sure that it didn't create any indirect pages. */ /* Make sure that it didn't create any indirect pages. */
Page * p = loadPage(page); Page * p = loadPage(xid, page);
int page_type = *page_type_ptr(p); int page_type = *page_type_ptr(p);
@ -236,7 +236,7 @@ START_TEST(indirectAccessIndirect) {
for(int i = 0; i < 500000; i++) { for(int i = 0; i < 500000; i++) {
rid.slot = i; rid.slot = i;
Tset(xid, dereferenceRID(rid), &i); Tset(xid, dereferenceRID(xid, rid), &i);
} }
Tcommit(xid); Tcommit(xid);
@ -245,7 +245,7 @@ START_TEST(indirectAccessIndirect) {
for(int i = 0; i < 500000; i++) { for(int i = 0; i < 500000; i++) {
rid.slot = i; rid.slot = i;
int j; int j;
Tread(xid, dereferenceRID(rid), &j); Tread(xid, dereferenceRID(xid, rid), &j);
assert(j == i); assert(j == i);
} }
@ -264,12 +264,12 @@ START_TEST(indirectSizeTest) {
int xid = Tbegin(); int xid = Tbegin();
recordid rid = rallocMany(xid, sizeof(int), 20); recordid rid = rallocMany(xid, sizeof(int), 20);
int count = indirectPageRecordCount(rid); int count = indirectPageRecordCount(xid, rid);
assert(count == 20); assert(count == 20);
recordid rid2 = rallocMany(xid, sizeof(int), 5000); recordid rid2 = rallocMany(xid, sizeof(int), 5000);
count = indirectPageRecordCount(rid2); count = indirectPageRecordCount(xid, rid2);
assert(count == 5000); assert(count == 5000);
Tcommit(xid); Tcommit(xid);

View file

@ -11,14 +11,60 @@
#define LOG_NAME "check_lockManager.log" #define LOG_NAME "check_lockManager.log"
/** Needs to be formatted as a floating point */ /** Needs to be formatted as a floating point */
#define NUM_RECORDS 100000000.0 #define NUM_RECORDS 5000000.0
#define THREAD_COUNT 100 #define THREAD_COUNT 100
#define RIDS_PER_THREAD 10000 #define RIDS_PER_THREAD 1000
void * workerThread(void * j) { void * pageWorkerThread(void * j) {
int xid = *(int*)j;
//startTransaction(xid);
globalLockManager.begin(xid);
recordid rid;
rid.page = 0;
rid.size = 0;
int k;
int deadlocks = 0;
for(k = 0; k < RIDS_PER_THREAD; k++) {
int m = (int) (NUM_RECORDS*random()/(RAND_MAX+1.0));
int rw = random() % 2;
if(rw) {
// readlock
if(LLADD_DEADLOCK == globalLockManager.readLockPage(xid, m)) {
// if(LLADD_DEADLOCK == lockManagerReadLockRecord(xid, rid)) {
k = 0;
globalLockManager.abort(xid);
deadlocks++;
printf("-");
}
} else {
// writelock
// if(LLADD_DEADLOCK == lockManagerWriteLockRecord(xid, rid)) {
if(LLADD_DEADLOCK == globalLockManager.writeLockPage(xid, m)) {
k = 0;
globalLockManager.abort(xid);
deadlocks++;
printf("-");
}
}
}
printf("%2d ", deadlocks); fflush(stdout);
// lockManagerCommit(xid);
globalLockManager.commit(xid);
return NULL;
}
void * ridWorkerThread(void * j) {
int xid = *(int*)j; int xid = *(int*)j;
startTransaction(xid); //startTransaction(xid);
globalLockManager.begin(xid);
recordid rid; recordid rid;
rid.page = 0; rid.page = 0;
rid.size = 0; rid.size = 0;
@ -30,16 +76,23 @@ void * workerThread(void * j) {
if(rw) { if(rw) {
// readlock // readlock
if(LLADD_DEADLOCK == globalLockManager.readLockRecord(xid, rid)) {
if(LLADD_DEADLOCK == lockManagerReadLockRecord(xid, rid)) { // if(LLADD_DEADLOCK == lockManagerReadLockRecord(xid, rid)) {
k = 0;
globalLockManager.abort(xid);
deadlocks++; deadlocks++;
printf("-");
} }
} else { } else {
// writelock // writelock
if(LLADD_DEADLOCK == lockManagerWriteLockRecord(xid, rid)) { // if(LLADD_DEADLOCK == lockManagerWriteLockRecord(xid, rid)) {
if(LLADD_DEADLOCK == globalLockManager.writeLockRecord(xid, rid)) {
k = 0;
globalLockManager.abort(xid);
deadlocks++; deadlocks++;
printf("-");
} }
} }
@ -47,21 +100,43 @@ void * workerThread(void * j) {
printf("%2d ", deadlocks); fflush(stdout); printf("%2d ", deadlocks); fflush(stdout);
lockManagerReleaseAll(xid); // lockManagerCommit(xid);
globalLockManager.commit(xid);
return NULL; return NULL;
} }
START_TEST(lockManagerTest) { START_TEST(recordidLockManagerTest) {
printf("\n");
setupLockManagerCallbacksRecord();
lockManagerInit();
pthread_t workers[THREAD_COUNT]; pthread_t workers[THREAD_COUNT];
int i; int i;
for(i = 0; i < THREAD_COUNT; i++) { for(i = 0; i < THREAD_COUNT; i++) {
int *j = malloc(sizeof(int)); int *j = malloc(sizeof(int));
*j = i; *j = i;
pthread_create(&workers[i], NULL, workerThread, j); pthread_create(&workers[i], NULL, ridWorkerThread, j);
}
for(i = 0; i < THREAD_COUNT; i++) {
pthread_join(workers[i], NULL);
}
} END_TEST
START_TEST(pageLockManagerTest) {
printf("\n");
setupLockManagerCallbacksPage();
pthread_t workers[THREAD_COUNT];
int i;
for(i = 0; i < THREAD_COUNT; i++) {
int *j = malloc(sizeof(int));
*j = i;
pthread_create(&workers[i], NULL, pageWorkerThread, j);
} }
for(i = 0; i < THREAD_COUNT; i++) { for(i = 0; i < THREAD_COUNT; i++) {
pthread_join(workers[i], NULL); pthread_join(workers[i], NULL);
@ -76,7 +151,8 @@ Suite * check_suite(void) {
/* Sub tests are added, one per line, here */ /* Sub tests are added, one per line, here */
tcase_add_test(tc, lockManagerTest); tcase_add_test(tc, recordidLockManagerTest);
tcase_add_test(tc, pageLockManagerTest);
/* --------------------------------------------- */ /* --------------------------------------------- */

View file

@ -74,7 +74,7 @@ START_TEST(operation_physical_do_undo) {
Page * p; Page * p;
Tinit(); Tinit();
xid = -1;
rid = slottedPreRalloc(xid, sizeof(int), &p); rid = slottedPreRalloc(xid, sizeof(int), &p);
releasePage(p); releasePage(p);
buf = 1; buf = 1;
@ -87,17 +87,17 @@ START_TEST(operation_physical_do_undo) {
DEBUG("B\n"); DEBUG("B\n");
p = loadPage(rid.page); p = loadPage(xid, rid.page);
writeRecord(xid, p, lsn, rid, &buf); writeRecord(xid, p, lsn, rid, &buf);
releasePage(p); releasePage(p);
setToTwo->LSN = 10; setToTwo->LSN = 10;
DEBUG("C\n"); DEBUG("C\n");
p = loadPage(rid.page); p = loadPage(xid, rid.page);
doUpdate(setToTwo, p); /* PAGE LSN= 10, value = 2. */ doUpdate(setToTwo, p); /* PAGE LSN= 10, value = 2. */
releasePage(p); releasePage(p);
p = loadPage(rid.page); p = loadPage(xid, rid.page);
readRecord(xid, p, rid, &buf); readRecord(xid, p, rid, &buf);
releasePage(p); releasePage(p);
@ -107,7 +107,7 @@ START_TEST(operation_physical_do_undo) {
DEBUG("D\n"); DEBUG("D\n");
p = loadPage(rid.page); p = loadPage(xid, rid.page);
fail_unless(10 == pageReadLSN(p), "page lsn not set correctly."); fail_unless(10 == pageReadLSN(p), "page lsn not set correctly.");
@ -116,7 +116,7 @@ START_TEST(operation_physical_do_undo) {
undoUpdate(setToTwo, p, 8); /* Should succeed, CLR LSN is too low, but undoUpdate only checks the log entry. */ undoUpdate(setToTwo, p, 8); /* Should succeed, CLR LSN is too low, but undoUpdate only checks the log entry. */
releasePage(p); releasePage(p);
p = loadPage(rid.page); p = loadPage(xid, rid.page);
readRecord(xid, p, rid, &buf); readRecord(xid, p, rid, &buf);
releasePage(p); releasePage(p);
@ -126,7 +126,7 @@ START_TEST(operation_physical_do_undo) {
redoUpdate(setToTwo); redoUpdate(setToTwo);
p = loadPage(rid.page); p = loadPage(xid, rid.page);
readRecord(xid, p, rid, &buf); readRecord(xid, p, rid, &buf);
releasePage(p); releasePage(p);
@ -145,7 +145,7 @@ START_TEST(operation_physical_do_undo) {
lsn = 0; lsn = 0;
buf = 1; buf = 1;
p = loadPage(rid.page); p = loadPage(xid, rid.page);
writeRecord(xid, p, lsn, rid, &buf); writeRecord(xid, p, lsn, rid, &buf);
releasePage(p); releasePage(p);
/* Trace of test: /* Trace of test:
@ -169,7 +169,7 @@ START_TEST(operation_physical_do_undo) {
DEBUG("F\n"); DEBUG("F\n");
redoUpdate(setToTwo); redoUpdate(setToTwo);
p = loadPage(rid.page); p = loadPage(xid, rid.page);
readRecord(xid, p, rid, &buf); readRecord(xid, p, rid, &buf);
assert(buf == 2); assert(buf == 2);
fail_unless(buf == 2, NULL); fail_unless(buf == 2, NULL);
@ -185,7 +185,7 @@ START_TEST(operation_physical_do_undo) {
DEBUG("H don't redo set to 2\n"); DEBUG("H don't redo set to 2\n");
redoUpdate(setToTwo); /* Fails */ redoUpdate(setToTwo); /* Fails */
p = loadPage(rid.page); p = loadPage(xid, rid.page);
readRecord(xid, p, rid, &buf); readRecord(xid, p, rid, &buf);
@ -197,7 +197,7 @@ START_TEST(operation_physical_do_undo) {
releasePage(p); releasePage(p);
redoUpdate(setToTwo); /* Succeeds */ redoUpdate(setToTwo); /* Succeeds */
p = loadPage(rid.page); p = loadPage(xid, rid.page);
readRecord(xid, p, rid, &buf); readRecord(xid, p, rid, &buf);
fail_unless(buf == 2, NULL); fail_unless(buf == 2, NULL);
@ -330,6 +330,7 @@ START_TEST(operation_prepare) {
and checks that the result is as expected. and checks that the result is as expected.
@todo Write a more thorough (threaded!) nested top action test. @todo Write a more thorough (threaded!) nested top action test.
*/ */
START_TEST(operation_nestedTopAction) { START_TEST(operation_nestedTopAction) {
@ -512,6 +513,7 @@ START_TEST(operation_set_range) {
} }
Tcommit(xid); Tcommit(xid);
Tdeinit();
} END_TEST } END_TEST

View file

@ -86,7 +86,7 @@ static void * multiple_simultaneous_pages ( void * arg_ptr) {
readRecord(1, p, rid[k], (byte*)&j); readRecord(1, p, rid[k], (byte*)&j);
assert((j + 1) == i + k); assert((j + 1) == i + k);
slottedDeRalloc(p, lsn, rid[k]); slottedDeRalloc(-1, p, lsn, rid[k]);
sched_yield(); sched_yield();
} }
} }
@ -98,7 +98,7 @@ static void * multiple_simultaneous_pages ( void * arg_ptr) {
rid[k] = slottedRawRalloc(p, sizeof(short)); rid[k] = slottedRawRalloc(p, sizeof(short));
i +=k; i +=k;
/* printf("Slot %d = %d\n", rid[k].slot, i); */ /* printf("Slot %d = %d\n", rid[k].slot, i); */
writeRecord(1, p, lsn, rid[k], (byte*)&i); writeRecord(-1, p, lsn, rid[k], (byte*)&i);
i -=k; i -=k;
sched_yield(); sched_yield();
} }
@ -159,14 +159,14 @@ static void* worker_thread(void * arg_ptr) {
if(! first ) { if(! first ) {
readRecord(1, p, rid, (byte*)&j); readRecord(1, p, rid, (byte*)&j);
assert((j + 1) == i); assert((j + 1) == i);
slottedDeRalloc(p, lsn, rid); slottedDeRalloc(-1, p, lsn, rid);
sched_yield(); sched_yield();
} }
first = 0; first = 0;
rid = slottedRawRalloc(p, sizeof(int)); rid = slottedRawRalloc(p, sizeof(int));
writeRecord(1, p, lsn, rid, (byte*)&i); writeRecord(-1, p, lsn, rid, (byte*)&i);
sched_yield(); sched_yield();
assert(pageReadLSN(p) <= lsn); assert(pageReadLSN(p) <= lsn);
@ -198,7 +198,8 @@ START_TEST(pageNoThreadTest)
Tinit(); Tinit();
p = loadPage(0); p = loadPage(-1, 0);
slottedPageInitialize(p); slottedPageInitialize(p);
worker_thread(p); worker_thread(p);
@ -287,7 +288,7 @@ START_TEST(pageNoThreadMultPageTest)
Tinit(); Tinit();
p = loadPage(1); p = loadPage(-1, 1);
slottedPageInitialize(p); slottedPageInitialize(p);
multiple_simultaneous_pages(p); multiple_simultaneous_pages(p);
releasePage(p); releasePage(p);
@ -315,7 +316,7 @@ START_TEST(pageThreadTest) {
Tinit(); Tinit();
fail_unless(1, NULL); fail_unless(1, NULL);
Page * p = loadPage(2); Page * p = loadPage(-1, 2);
slottedPageInitialize(p); slottedPageInitialize(p);
fail_unless(1, NULL); fail_unless(1, NULL);
@ -345,7 +346,7 @@ START_TEST(fixedPageThreadTest) {
pthread_mutex_init(&random_mutex, NULL); pthread_mutex_init(&random_mutex, NULL);
pthread_mutex_init(&lsn_mutex, NULL); pthread_mutex_init(&lsn_mutex, NULL);
Tinit(); Tinit();
Page * p = loadPage(2); Page * p = loadPage(-1, 2);
fixedPageInitialize(p, sizeof(int), 0); fixedPageInitialize(p, sizeof(int), 0);
for(i = 0; i < THREAD_COUNT; i++) { for(i = 0; i < THREAD_COUNT; i++) {
@ -368,25 +369,26 @@ START_TEST(pageCheckSlotTypeTest) {
recordid fixedRoot = TarrayListAlloc(xid, 2, 10, 10); recordid fixedRoot = TarrayListAlloc(xid, 2, 10, 10);
recordid blob = Talloc(xid, PAGE_SIZE * 2); recordid blob = Talloc(xid, PAGE_SIZE * 2);
Page * p = loadPage(slot.page); Page * p = loadPage(-1, slot.page);
assert(getRecordType(xid, p, slot) == SLOTTED_RECORD); assert(getRecordType(xid, p, slot) == SLOTTED_RECORD);
releasePage(p); releasePage(p);
/** @todo the use of the fixedRoot recordid to check getRecordType is /** @todo the use of the fixedRoot recordid to check getRecordType is
a bit questionable, but should work. */ a bit questionable, but should work. */
p = loadPage(fixedRoot.page); p = loadPage(-1, fixedRoot.page);
assert(getRecordType(xid, p, fixedRoot) == FIXED_RECORD); assert(getRecordType(xid, p, fixedRoot) == FIXED_RECORD);
releasePage(p); releasePage(p);
fixedRoot.slot = 1; fixedRoot.slot = 1;
recordid fixedEntry = dereferenceRID(fixedRoot); recordid fixedEntry = dereferenceRID(xid, fixedRoot);
fixedRoot.slot = 0; fixedRoot.slot = 0;
p = loadPage(fixedEntry.page); p = loadPage(-1, fixedEntry.page);
assert(getRecordType(xid, p, fixedEntry) == FIXED_RECORD); assert(getRecordType(xid, p, fixedEntry) == FIXED_RECORD);
releasePage(p); releasePage(p);
p = loadPage(blob.page); p = loadPage(-1, blob.page);
int type = getRecordType(xid, p, blob); int type = getRecordType(xid, p, blob);
assert(type == BLOB_RECORD); assert(type == BLOB_RECORD);
releasePage(p); releasePage(p);
@ -396,7 +398,7 @@ START_TEST(pageCheckSlotTypeTest) {
bad.slot = slot.slot + 10; bad.slot = slot.slot + 10;
bad.size = 4; bad.size = 4;
p = loadPage(bad.page); p = loadPage(xid, bad.page);
assert(getRecordType(xid, p, bad) == UNINITIALIZED_RECORD); assert(getRecordType(xid, p, bad) == UNINITIALIZED_RECORD);
bad.size = 100000; bad.size = 100000;
assert(getRecordType(xid, p, bad) == UNINITIALIZED_RECORD); assert(getRecordType(xid, p, bad) == UNINITIALIZED_RECORD);
@ -430,7 +432,7 @@ START_TEST(pageTrecordTypeTest) {
assert(TrecordType(xid, fixedRoot) == FIXED_RECORD); assert(TrecordType(xid, fixedRoot) == FIXED_RECORD);
fixedRoot.slot = 1; fixedRoot.slot = 1;
recordid fixedEntry = dereferenceRID(fixedRoot); recordid fixedEntry = dereferenceRID(xid, fixedRoot);
fixedRoot.slot = 0; fixedRoot.slot = 0;
assert(TrecordType(xid, fixedEntry) == FIXED_RECORD); assert(TrecordType(xid, fixedEntry) == FIXED_RECORD);