diff --git a/lladd/bufferManager.h b/lladd/bufferManager.h index 990287e..88ab509 100644 --- a/lladd/bufferManager.h +++ b/lladd/bufferManager.h @@ -100,7 +100,7 @@ typedef struct Page_s Page; * @param pageid ID of the page you want to load * @return fully formed Page type */ -Page * loadPage(int pageid); +Page * loadPage(int xid, int pageid); /** loadPage aquires a lock when it is called, effectively pinning it diff --git a/lladd/lockManager.h b/lladd/lockManager.h index 2995155..9ba461f 100644 --- a/lladd/lockManager.h +++ b/lladd/lockManager.h @@ -1,9 +1,28 @@ #include +typedef struct { + void (*init)(); + int (*readLockPage) (int xid, int page); + int (*writeLockPage) (int xid, int page); + int (*unlockPage) (int xid, int page); + int (*readLockRecord) (int xid, recordid rid); + int (*writeLockRecord)(int xid, recordid rid); + int (*unlockRecord) (int xid, recordid rid); + int (*begin) (int xid); + int (*commit) (int xid); + int (*abort) (int xid); +} LockManagerSetup; + +extern LockManagerSetup globalLockManager; + void lockManagerInit(); int lockManagerReadLockRecord(int xid, recordid rid); int lockManagerWriteLockRecord(int xid, recordid rid); int lockManagerUnlockRecord(int xid, recordid rid); -int lockManagerReleaseAll(int xid); +int lockManagerCommit(int xid); + +void setupLockManagerCallbacksRecord(); +void setupLockManagerCallbacksPage(); +void setupLockManagerCallbacksNil(); diff --git a/src/lladd/bufferManager.c b/src/lladd/bufferManager.c index 5fbb2da..558d35c 100644 --- a/src/lladd/bufferManager.c +++ b/src/lladd/bufferManager.c @@ -51,7 +51,7 @@ terms specified in this license. #include #include - +#include #include "page.h" #include "blobManager.h" #include @@ -133,9 +133,9 @@ void releasePage (Page * p) { } int bufTransCommit(int xid, lsn_t lsn) { - commitBlobs(xid); pageCommit(xid); + if(globalLockManager.commit) { globalLockManager.commit(xid);} return 0; } @@ -144,6 +144,7 @@ int bufTransAbort(int xid, lsn_t lsn) { abortBlobs(xid); /* abortBlobs doesn't write any log entries, so it doesn't need the lsn. */ pageAbort(xid); + if(globalLockManager.abort) { globalLockManager.abort(xid);} return 0; } @@ -151,6 +152,7 @@ int bufTransAbort(int xid, lsn_t lsn) { Page * getPage(int pageid, int locktype) { Page * ret; int spin = 0; + pthread_mutex_lock(&loadPagePtr_mutex); ret = pblHtLookup(activePages, &pageid, sizeof(int)); @@ -265,7 +267,9 @@ Page * getPage(int pageid, int locktype) { return ret; } -Page *loadPage(int pageid) { +Page *loadPage(int xid, int pageid) { + if(globalLockManager.readLockPage) { globalLockManager.readLockPage(xid, pageid); } + Page * ret = getPage(pageid, RO); return ret; } diff --git a/src/lladd/lockManager.c b/src/lladd/lockManager.c index f0302e8..35093b3 100644 --- a/src/lladd/lockManager.c +++ b/src/lladd/lockManager.c @@ -7,7 +7,7 @@ #include #include #include - +#include #include @@ -19,9 +19,9 @@ static pthread_mutex_t mutexes[MUTEX_COUNT]; static pthread_mutex_t xid_table_mutex = PTHREAD_MUTEX_INITIALIZER; - -static pthread_mutex_t * getMutex(recordid rid) { - return &mutexes[hash(&rid, sizeof(recordid), MUTEX_BITS, MUTEX_EXT)]; +static pthread_mutex_t rid_table_mutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_mutex_t * getMutex(byte * dat, int datLen) { + return &mutexes[hash(dat, datLen, MUTEX_BITS, MUTEX_EXT)]; } static pblHashTable_t * xidLockTable; @@ -36,7 +36,7 @@ typedef struct { int active; } lock; -void lockManagerInit() { +void lockManagerInitHashed() { int i = 0; for(i = 0; i < MUTEX_COUNT; i++) { pthread_mutex_init(&mutexes[i], NULL); @@ -45,16 +45,19 @@ void lockManagerInit() { ridLockTable = pblHtCreate(); } -/** @todo startTransaction needs a mutex!! */ -void startTransaction(int xid) { - pthread_mutex_lock(&xid_table_mutex); - +pblHashTable_t * lockManagerBeginTransactionUnlocked(int xid) { pblHashTable_t * xidLocks = pblHtCreate(); pblHtInsert(xidLockTable, &xid, sizeof(int), xidLocks); + return xidLocks; +} +int lockManagerBeginTransaction(int xid) { + pthread_mutex_lock(&xid_table_mutex); + lockManagerBeginTransactionUnlocked(xid); pthread_mutex_unlock(&xid_table_mutex); + return 0; } -lock* createLock(recordid rid) { +lock* createLock(byte * dat, int datLen) { lock * ret = malloc(sizeof(lock)); if(!ret) { return NULL; } @@ -62,43 +65,50 @@ lock* createLock(recordid rid) { // pthread_mutex_init(&ret->mut, NULL); pthread_cond_init(&ret->writeOK, NULL); pthread_cond_init(&ret->readOK, NULL); + ret->active = 0; ret->readers = 0; ret->writers = 0; ret->waiting = 0; - - pblHtInsert(ridLockTable, &rid, sizeof(recordid), ret); + + pblHtInsert(ridLockTable, dat, datLen, ret); return ret; } -void destroyLock(recordid rid, lock * l) { +void destroyLock(byte * dat, int datLen, lock * l) { pthread_cond_destroy(&l->writeOK); pthread_cond_destroy(&l->readOK); free (l); - pblHtRemove(ridLockTable, &rid, sizeof(recordid)); + pblHtRemove(ridLockTable, dat, datLen); } #define LM_READLOCK 1 #define LM_WRITELOCK 2 -int lockManagerReadLockRecord(int xid, recordid rid) { - +int lockManagerReadLockHashed(int xid, byte * dat, int datLen) { + if(xid == -1) { return 0; } pthread_mutex_lock(&xid_table_mutex); pblHashTable_t * xidLocks = pblHtLookup(xidLockTable, &xid, sizeof(int)); - if((int)pblHtLookup(xidLocks, &rid, sizeof(recordid)) >= LM_READLOCK) { + if(!xidLocks) { + xidLocks = lockManagerBeginTransactionUnlocked(xid); + } + int currentLockLevel = (int)pblHtLookup(xidLocks, dat, datLen); + // printf("xid %d read lock (%d)\n", xid, currentLockLevel); + if(currentLockLevel >= LM_READLOCK) { pthread_mutex_unlock(&xid_table_mutex); return 0; } + assert(!currentLockLevel); pthread_mutex_unlock(&xid_table_mutex); - pthread_mutex_t * mut = getMutex(rid); + pthread_mutex_t * mut = getMutex(dat, datLen); pthread_mutex_lock(mut); - - lock * ridLock = pblHtLookup(ridLockTable, &rid, sizeof(recordid)); + pthread_mutex_lock(&rid_table_mutex); + lock * ridLock = pblHtLookup(ridLockTable, dat, datLen); if(!ridLock) { - ridLock = createLock(rid); + ridLock = createLock(dat, datLen); } - + pthread_mutex_unlock(&rid_table_mutex); ridLock->active++; if(ridLock->writers || ridLock->waiting) { @@ -117,22 +127,36 @@ int lockManagerReadLockRecord(int xid, recordid rid) { if(wait_ret == ETIMEDOUT) { ridLock->active--; pthread_mutex_unlock(mut); + + // printf("Deadlock!\n"); fflush(stdout); + // abort(); return LLADD_DEADLOCK; } } while(ridLock->writers); } - ridLock->readers++; + if(currentLockLevel < LM_READLOCK) { + ridLock->readers++; + pblHtRemove(xidLocks, dat, datLen); + pblHtInsert(xidLocks, dat, datLen, (void*)LM_READLOCK); + } ridLock->active--; pthread_mutex_unlock(mut); - pblHtInsert(xidLocks, &rid, sizeof(recordid), (void*)LM_READLOCK); return 0; } -int lockManagerWriteLockRecord(int xid, recordid rid) { +int lockManagerWriteLockHashed(int xid, byte * dat, int datLen) { + + if(xid == -1) { return 0; } pthread_mutex_lock(&xid_table_mutex); pblHashTable_t * xidLocks = pblHtLookup(xidLockTable, &xid, sizeof(int)); + if(!xidLocks) { + xidLocks = lockManagerBeginTransactionUnlocked(xid); + } + + int currentLockLevel = (int)pblHtLookup(xidLocks, dat, datLen); + + // printf("xid %d write lock (%d)\n", xid, currentLockLevel); - int currentLockLevel = (int)pblHtLookup(xidLocks, &rid, sizeof(recordid)); int me = 0; pthread_mutex_unlock(&xid_table_mutex); @@ -142,13 +166,15 @@ int lockManagerWriteLockRecord(int xid, recordid rid) { me = 1; } - pthread_mutex_t * mut = getMutex(rid); + pthread_mutex_t * mut = getMutex(dat, datLen); pthread_mutex_lock(mut); - lock * ridLock = pblHtLookup(ridLockTable, &rid, sizeof(recordid)); + pthread_mutex_lock(&rid_table_mutex); + lock * ridLock = pblHtLookup(ridLockTable, dat, datLen); if(!ridLock) { - ridLock = createLock(rid); + ridLock = createLock(dat, datLen); } + pthread_mutex_unlock(&rid_table_mutex); ridLock->active++; ridLock->waiting++; @@ -169,6 +195,8 @@ int lockManagerWriteLockRecord(int xid, recordid rid) { ridLock->waiting--; ridLock->active--; pthread_mutex_unlock(mut); + // printf("Deadlock!\n"); fflush(stdout); + // abort(); return LLADD_DEADLOCK; } } @@ -176,33 +204,48 @@ int lockManagerWriteLockRecord(int xid, recordid rid) { ridLock->waiting--; if(currentLockLevel == 0) { ridLock->readers++; + ridLock->writers++; + } else if (currentLockLevel == LM_READLOCK) { + ridLock->writers++; + pblHtRemove(xidLocks, dat, datLen); } - ridLock->writers++; + if(currentLockLevel != LM_WRITELOCK) { + pblHtInsert(xidLocks, dat, datLen, (void*)LM_WRITELOCK); + } + ridLock->active--; pthread_mutex_unlock(mut); - pblHtInsert(xidLocks, &rid, sizeof(recordid), (void*)LM_WRITELOCK); return 0; } -int lockManagerUnlockRecord(int xid, recordid rid) { +int lockManagerUnlockHashed(int xid, byte * dat, int datLen) { + + + if(xid == -1) { return 0; } + // printf("xid %d unlock\n", xid); + pthread_mutex_lock(&xid_table_mutex); pblHashTable_t * xidLocks = pblHtLookup(xidLockTable, &xid, sizeof(int)); + if(!xidLocks) { + xidLocks = lockManagerBeginTransactionUnlocked(xid); + } - int currentLevel = (int)pblHtLookup(xidLocks, &rid, sizeof(recordid)); + int currentLevel = (int)pblHtLookup(xidLocks, dat, datLen); if(currentLevel) { - pblHtRemove(xidLocks, &rid, sizeof(recordid)); + pblHtRemove(xidLocks, dat, datLen); } pthread_mutex_unlock(&xid_table_mutex); - pthread_mutex_t * mut = getMutex(rid); + pthread_mutex_t * mut = getMutex(dat, datLen); pthread_mutex_lock(mut); - lock * ridLock = pblHtLookup(ridLockTable, &rid, sizeof(recordid)); + pthread_mutex_lock(&rid_table_mutex); + lock * ridLock = pblHtLookup(ridLockTable, dat, datLen); assert(ridLock); ridLock->active++; - + pthread_mutex_unlock(&rid_table_mutex); if(currentLevel == LM_WRITELOCK) { ridLock->writers--; ridLock->readers--; @@ -220,7 +263,10 @@ int lockManagerUnlockRecord(int xid, recordid rid) { ridLock->active--; if(!(ridLock->active || ridLock->waiting || ridLock->readers || ridLock->writers)) { - destroyLock(rid, ridLock); + // printf("destroyed lock"); + destroyLock(dat, datLen, ridLock); + } else { + // printf("(%d %d %d %d)", ridLock->active, ridLock->waiting, ridLock->readers, ridLock->writers); } pthread_mutex_unlock(mut); @@ -228,24 +274,102 @@ int lockManagerUnlockRecord(int xid, recordid rid) { return 0; } -int lockManagerReleaseAll(int xid) { - +int lockManagerCommitHashed(int xid, int datLen) { + if(xid == -1) { return 0; } pthread_mutex_lock(&xid_table_mutex); pblHashTable_t * xidLocks = pblHtLookup(xidLockTable, &xid, sizeof(int)); + if(!xidLocks) { + xidLocks = lockManagerBeginTransactionUnlocked(xid); + } + pthread_mutex_unlock(&xid_table_mutex); void * data; int ret = 0; + byte * dat = malloc(datLen); for(data = pblHtFirst(xidLocks); data; data = pblHtNext(xidLocks)) { - recordid rid = *(recordid*)pblHtCurrentKey(xidLocks); - int tmpret = lockManagerUnlockRecord(xid, rid); + memcpy(dat, pblHtCurrentKey(xidLocks), datLen); + int tmpret = lockManagerUnlockHashed(xid, dat, datLen); // Pass any error(s) up to the user. // (This logic relies on the fact that currently it only returns 0 and LLADD_INTERNAL_ERROR) if(tmpret) { ret = tmpret; } - pblHtRemove(xidLocks, &rid, sizeof(recordid)); + pblHtRemove(xidLocks, dat, datLen); } + free(dat); return ret; } + +int lockManagerReadLockRecord(int xid, recordid rid) { + return lockManagerReadLockHashed(xid, (byte*)&rid, sizeof(recordid)); +} +int lockManagerWriteLockRecord(int xid, recordid rid) { + return lockManagerWriteLockHashed(xid, (byte*)&rid, sizeof(recordid)); +} +int lockManagerUnlockRecord(int xid, recordid rid) { + return lockManagerUnlockHashed(xid, (byte*)&rid, sizeof(recordid)); +} +int lockManagerCommitRecords(int xid) { + return lockManagerCommitHashed(xid, sizeof(recordid)); +} + +int lockManagerReadLockPage(int xid, int p) { + return lockManagerReadLockHashed(xid, (byte*)&p, sizeof(int)); +} +int lockManagerWriteLockPage(int xid, int p) { + return lockManagerWriteLockHashed(xid, (byte*)&p, sizeof(int)); +} +int lockManagerUnlockPage(int xid, int p) { + return lockManagerUnlockHashed(xid, (byte*)&p, sizeof(int)); +} +int lockManagerCommitPages(int xid) { + return lockManagerCommitHashed(xid, sizeof(int)); +} + +LockManagerSetup globalLockManager; + +void setupLockManagerCallbacksPage() { + globalLockManager.init = &lockManagerInitHashed; + globalLockManager.readLockPage = &lockManagerReadLockPage; + globalLockManager.writeLockPage = &lockManagerWriteLockPage; + globalLockManager.unlockPage = &lockManagerUnlockPage; + globalLockManager.readLockRecord = NULL; + globalLockManager.writeLockRecord = NULL; + globalLockManager.unlockRecord = NULL; + globalLockManager.commit = &lockManagerCommitPages; + globalLockManager.abort = &lockManagerCommitPages; + globalLockManager.begin = &lockManagerBeginTransaction; + + globalLockManager.init(); +} + +void setupLockManagerCallbacksRecord () { + globalLockManager.init = &lockManagerInitHashed; + globalLockManager.readLockPage = NULL; + globalLockManager.writeLockPage = NULL; + globalLockManager.unlockPage = NULL; + globalLockManager.readLockRecord = &lockManagerReadLockRecord; + globalLockManager.writeLockRecord = &lockManagerWriteLockRecord; + globalLockManager.unlockRecord = &lockManagerUnlockRecord; + globalLockManager.commit = &lockManagerCommitRecords; + globalLockManager.abort = &lockManagerCommitRecords; + globalLockManager.begin = &lockManagerBeginTransaction; + globalLockManager.init(); +} + + +void setupLockManagerCallbacksNil () { + globalLockManager.init = NULL; + globalLockManager.readLockPage = NULL; + globalLockManager.writeLockPage = NULL; + globalLockManager.unlockPage = NULL; + globalLockManager.readLockRecord = NULL; + globalLockManager.writeLockRecord = NULL; + globalLockManager.unlockRecord = NULL; + globalLockManager.commit = NULL; + globalLockManager.abort = NULL; + globalLockManager.begin = NULL; +} + diff --git a/src/lladd/operations.c b/src/lladd/operations.c index 116344a..409d6b1 100644 --- a/src/lladd/operations.c +++ b/src/lladd/operations.c @@ -65,7 +65,7 @@ void redoUpdate(const LogEntry * e) { if(e->type == UPDATELOG) { /* lsn_t pageLSN = readLSN(e->contents.update.rid.page); */ recordid rid = e->contents.update.rid; - Page * p = loadPage(rid.page); + Page * p = loadPage(e->xid, rid.page); lsn_t pageLSN = pageReadLSN(p); if(e->LSN > pageLSN) { @@ -79,7 +79,7 @@ void redoUpdate(const LogEntry * e) { } else if(e->type == CLRLOG) { LogEntry * f = readLSNEntry(e->contents.clr.thisUpdateLSN); recordid rid = f->contents.update.rid; - Page * p = loadPage(rid.page); + Page * p = loadPage(e->xid, rid.page); assert(rid.page == e->contents.update.rid.page); /* @todo Should this always hold? */ @@ -127,7 +127,7 @@ void undoUpdate(const LogEntry * e, Page * p, lsn_t clr_lsn) { assert(p); DEBUG("OPERATION %d Whole page physical undo, %ld {%d}\n", undo, e->LSN, e->contents.update.rid.page); memcpy(p->memAddr, getUpdatePreImage(e), PAGE_SIZE); - pageWriteLSN(p, clr_lsn); + pageWriteLSN(e->xid, p, clr_lsn); } else { /* @see doUpdate() */ diff --git a/src/lladd/operations/alloc.c b/src/lladd/operations/alloc.c index 1045b9d..13538e1 100644 --- a/src/lladd/operations/alloc.c +++ b/src/lladd/operations/alloc.c @@ -44,7 +44,7 @@ static int operate(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) if(rid.size >= BLOB_THRESHOLD_SIZE && rid.size != BLOB_SLOT) { allocBlob(xid, p, lsn, rid); } else { - slottedPostRalloc(p, lsn, rid); + slottedPostRalloc(xid, p, lsn, rid); } return 0; @@ -53,7 +53,7 @@ static int operate(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) /** @todo Currently, we leak empty pages on dealloc. */ static int deoperate(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) { assert(rid.page == p->id); - slottedDeRalloc(p, lsn, rid); + slottedDeRalloc(xid, p, lsn, rid); return 0; } @@ -64,7 +64,7 @@ static int reoperate(int xid, Page *p, lsn_t lsn, recordid rid, const void * dat rid.size = sizeof(blob_record_t); } - slottedPostRalloc(p, lsn, rid); + slottedPostRalloc(xid, p, lsn, rid); /** @todo dat should be the pointer to the space in the blob store. */ writeRecord(xid, p, lsn, rid, dat); @@ -162,7 +162,7 @@ recordid TallocFromPage(int xid, long page, long size) { void Tdealloc(int xid, recordid rid) { void * preimage = malloc(rid.size); - Page * p = loadPage(rid.page); + Page * p = loadPage(xid, rid.page); readRecord(xid, p, rid, preimage); /** @todo race in Tdealloc; do we care, or is this something that the log manager should cope with? */ Tupdate(xid, rid, preimage, OPERATION_DEALLOC); @@ -171,7 +171,7 @@ void Tdealloc(int xid, recordid rid) { } int TrecordType(int xid, recordid rid) { - Page * p = loadPage(rid.page); + Page * p = loadPage(xid, rid.page); int ret = getRecordType(xid, p, rid); releasePage(p); return ret; @@ -179,14 +179,14 @@ int TrecordType(int xid, recordid rid) { int TrecordSize(int xid, recordid rid) { int ret; - Page * p = loadPage(rid.page); + Page * p = loadPage(xid, rid.page); ret = getRecordSize(xid, p, rid); releasePage(p); return ret; } int TrecordsInPage(int xid, int pageid) { - Page * p = loadPage(pageid); + Page * p = loadPage(xid, pageid); readlock(p->rwlatch, 187); int ret = *numslots_ptr(p); unlock(p->rwlatch); diff --git a/src/lladd/operations/arrayList.c b/src/lladd/operations/arrayList.c index c955a0f..a3cd174 100644 --- a/src/lladd/operations/arrayList.c +++ b/src/lladd/operations/arrayList.c @@ -96,7 +96,7 @@ static int operateAlloc(int xid, Page * p, lsn_t lsn, recordid rid, const void * *page_type_ptr(p) = ARRAY_LIST_PAGE; - pageWriteLSN(p, lsn); + pageWriteLSN(xid, p, lsn); recordid ret; ret.page = firstPage; @@ -118,7 +118,7 @@ Operation getArrayListAlloc() { /*----------------------------------------------------------------------------*/ int TarrayListExtend(int xid, recordid rid, int slots) { - Page * p = loadPage(rid.page); + Page * p = loadPage(xid, rid.page); TarrayListParameters tlp = pageToTLP(p); int lastCurrentBlock; @@ -181,7 +181,7 @@ int TarrayListExtend(int xid, recordid rid, int slots) { } /** @todo: TarrayListInstantExtend, is a hacked-up cut and paste version of TarrayListExtend */ int TarrayListInstantExtend(int xid, recordid rid, int slots) { - Page * p = loadPage(rid.page); + Page * p = loadPage(xid, rid.page); TarrayListParameters tlp = pageToTLP(p); int lastCurrentBlock; @@ -249,13 +249,13 @@ static int operateInitFixed(int xid, Page * p, lsn_t lsn, recordid rid, const vo fixedPageInitialize(p, rid.size, recordsPerPage(rid.size)); - pageWriteLSN(p, lsn); + pageWriteLSN(xid, p, lsn); return 0; } static int operateUnInitPage(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) { *page_type_ptr(p) = UNINITIALIZED_PAGE; - pageWriteLSN(p, lsn); + pageWriteLSN(xid, p, lsn); return 0; } diff --git a/src/lladd/operations/linearHash.c b/src/lladd/operations/linearHash.c index ae2c988..e191e8e 100644 --- a/src/lladd/operations/linearHash.c +++ b/src/lladd/operations/linearHash.c @@ -82,7 +82,7 @@ static int operateUndoDelete(int xid, Page * p, lsn_t lsn, recordid rid, const v argBytes + keySize, valSize); return 0; } -static int noop (int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) { pageWriteLSN(p, lsn); return 0; } +static int noop (int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) { pageWriteLSN(xid, p, lsn); return 0; } Operation getLinearInsert() { Operation o = { @@ -316,7 +316,8 @@ void instant_update_hash_header(int xid, recordid hash, int i, int next_split) { static void recover_split(int xid, recordid hashRid, int i, int next_split, int keySize, int valSize) { // This function would be simple, except for the ridiculous mount // of state that it must maintain. See above for a description of what it does. - recordid * headerRidB; + recordid headerRidBstack; + recordid * headerRidB = &headerRidBstack; hashRid.slot = 1; Tread(xid, hashRid, headerRidB); hashRid.slot = 0; diff --git a/src/lladd/operations/noop.c b/src/lladd/operations/noop.c index a8912fd..1a71d97 100644 --- a/src/lladd/operations/noop.c +++ b/src/lladd/operations/noop.c @@ -52,7 +52,7 @@ static int operate(int xid, Page *p, lsn_t lsn, recordid rid, const void *dat) /* If p is null, then this is a logical no-op that spans pages, so do nothing. Otherwise, write the LSN to the appropriate page (to keep recovery happy) and return */ - if(p) pageWriteLSN(p, lsn); + if(p) pageWriteLSN(xid, p, lsn); return 0; } diff --git a/src/lladd/operations/pageOperations.c b/src/lladd/operations/pageOperations.c index 015cc91..2bd9301 100644 --- a/src/lladd/operations/pageOperations.c +++ b/src/lladd/operations/pageOperations.c @@ -34,7 +34,7 @@ int __pageDealloc(int xid, Page * p, lsn_t lsn, recordid r, const void * d) { */ int __pageSet(int xid, Page * p, lsn_t lsn, recordid r, const void * d) { memcpy(p->memAddr, d, PAGE_SIZE); - pageWriteLSN(p, lsn); + pageWriteLSN(xid, p, lsn); return 0; } @@ -50,7 +50,7 @@ int __update_freepage(int xid, Page * p, lsn_t lsn, recordid r, const void * d) fflush(NULL); */ * headerFreepage_ptr(p) = t->after; freepage = t->after; - pageWriteLSN(p, lsn); + pageWriteLSN(xid, p, lsn); return 0; } @@ -64,7 +64,7 @@ int __update_freespace_inverse(int xid, Page * p, lsn_t lsn, recordid r, const v * headerFreepage_ptr(p) = t->before; freepage = t->before; #endif - pageWriteLSN(p, lsn); + pageWriteLSN(xid, p, lsn); return 0; } #ifdef REUSE_PAGES @@ -102,18 +102,18 @@ int __free_page(int xid, Page * p, lsn_t lsn, recordid r, const void * d) { memset(p->memAddr, 0, PAGE_SIZE); *page_type_ptr(p) = LLADD_FREE_PAGE; *nextfreepage_ptr(p) = *successor; - pageWriteLSN(p, lsn); + pageWriteLSN(xid, p, lsn); return 0; } int __alloc_freed(int xid, Page * p, lsn_t lsn, recordid r, const void * d) { memset(p->memAddr, 0, PAGE_SIZE); - pageWriteLSN(p, lsn); + pageWriteLSN(xid, p, lsn); return 0; } int TpageGet(int xid, int pageid, byte *memAddr) { - Page * q = loadPage(pageid); + Page * q = loadPage(xid, pageid); memcpy(memAddr, q->memAddr, PAGE_SIZE); releasePage(q); return 0; @@ -139,7 +139,8 @@ void pageOperationsInit() { assert(!posix_memalign((void **)&(p.memAddr), PAGE_SIZE, PAGE_SIZE)); p.id = 0;*/ - Page * p = loadPage(0); + Page * p = loadPage(-1, 0); + /** Release lock on page zero. */ // pageRead(&p); diff --git a/src/lladd/operations/set.c b/src/lladd/operations/set.c index c772b68..b3605f6 100644 --- a/src/lladd/operations/set.c +++ b/src/lladd/operations/set.c @@ -106,7 +106,7 @@ void TsetRange(int xid, recordid rid, int offset, int length, const void * dat) // Copy new value into log structure memcpy(range + 1, dat, length); - Page * p = loadPage(rid.page); + Page * p = loadPage(xid, rid.page); // No further locking is necessary here; readRecord protects the // page layout, but attempts at concurrent modification have undefined // results. (See page.c) diff --git a/src/lladd/page.c b/src/lladd/page.c index f9e0165..6f47027 100644 --- a/src/lladd/page.c +++ b/src/lladd/page.c @@ -83,6 +83,7 @@ terms specified in this license. #include #include #include "blobManager.h" +#include #include "pageFile.h" #include "page/slotted.h" @@ -102,9 +103,11 @@ static pthread_mutex_t pageMallocMutex; /** We need one dummy page for locking purposes, so this array has one extra page in it. */ Page pool[MAX_BUFFER_SIZE+1]; -void pageWriteLSN(Page * page, lsn_t lsn) { +void pageWriteLSN(int xid, Page * page, lsn_t lsn) { /* unlocked since we're only called by a function that holds the writelock. */ /* *(long *)(page->memAddr + START_OF_LSN) = page->LSN; */ + if(globalLockManager.writeLockPage) { globalLockManager.writeLockPage(xid, page->id); } + if(page->LSN < lsn) { page->LSN = lsn; *lsn_ptr(page) = page->LSN; @@ -263,7 +266,7 @@ void writeRecord(int xid, Page * p, lsn_t lsn, recordid rid, const void *dat) { assert( (p->id == rid.page) && (p->memAddr != NULL) ); writelock(p->rwlatch, 225); /* Need a writelock so that we can update the lsn. */ - pageWriteLSN(p, lsn); + pageWriteLSN(xid, p, lsn); unlock(p->rwlatch); } @@ -378,7 +381,7 @@ void writeRecordUnlocked(int xid, Page * p, lsn_t lsn, recordid rid, const void assert( (p->id == rid.page) && (p->memAddr != NULL) ); writelock(p->rwlatch, 225); /* Need a writelock so that we can update the lsn. */ - pageWriteLSN(p, lsn); + pageWriteLSN(xid, p, lsn); unlock(p->rwlatch); } diff --git a/src/lladd/page.h b/src/lladd/page.h index d050771..c73b4b6 100644 --- a/src/lladd/page.h +++ b/src/lladd/page.h @@ -221,7 +221,7 @@ void pageDeInit(); * @param page You must have a writelock on page before calling this function. * @param lsn The new lsn of the page. If the new lsn is less than the page's current lsn, then the page's lsn will not be changed. */ -void pageWriteLSN(Page * page, lsn_t lsn); +void pageWriteLSN(int xid, Page * page, lsn_t lsn); /** * assumes that the page is already loaded in memory. It takes diff --git a/src/lladd/page/indirect.c b/src/lladd/page/indirect.c index 610e775..c1c1d4a 100644 --- a/src/lladd/page/indirect.c +++ b/src/lladd/page/indirect.c @@ -14,8 +14,8 @@ void indirectInitialize(Page * p, int height) { memset(p->memAddr, INVALID_SLOT, ((int)level_ptr(p)) - ((int)p->memAddr)); } /** @todo locking for dereferenceRID? */ -recordid dereferenceRID(recordid rid) { - Page * this = loadPage(rid.page); +recordid dereferenceRID(int xid, recordid rid) { + Page * this = loadPage(xid, rid.page); int offset = 0; int max_slot; while(*page_type_ptr(this) == INDIRECT_PAGE) { @@ -32,7 +32,7 @@ recordid dereferenceRID(recordid rid) { int nextPage = *page_ptr(this, i); releasePage(this); - this = loadPage(nextPage); + this = loadPage(xid, nextPage); } rid.page = this->id; @@ -168,8 +168,8 @@ recordid __rallocMany(int xid, int parentPage, int recordSize, int recordCount) return rid; } -unsigned int indirectPageRecordCount(recordid rid) { - Page * p = loadPage(rid.page); +unsigned int indirectPageRecordCount(int xid, recordid rid) { + Page * p = loadPage(xid, rid.page); int i = 0; unsigned int ret; if(*page_type_ptr(p) == INDIRECT_PAGE) { diff --git a/src/lladd/page/indirect.h b/src/lladd/page/indirect.h index 54e2f68..0bda21b 100644 --- a/src/lladd/page/indirect.h +++ b/src/lladd/page/indirect.h @@ -43,11 +43,11 @@ BEGIN_C_DECLS Translates a recordid that points to an indirect block into the physical location of the record. */ -recordid dereferenceRID(recordid rid); -#define dereferenceRIDUnlocked(x) dereferenceRID((x)) +recordid dereferenceRID(int xid, recordid rid); +#define dereferenceRIDUnlocked(y, x) dereferenceRID((y), (x)) void indirectInitialize(Page * p, int height); recordid rallocMany(/*int parentPage, lsn_t lsn,*/int xid, int recordSize, int recordCount); -unsigned int indirectPageRecordCount(recordid rid); +unsigned int indirectPageRecordCount(int xid, recordid rid); END_C_DECLS diff --git a/src/lladd/page/slotted.c b/src/lladd/page/slotted.c index 5a3fd20..bb11d1e 100644 --- a/src/lladd/page/slotted.c +++ b/src/lladd/page/slotted.c @@ -175,18 +175,18 @@ recordid slottedPreRalloc(int xid, long size, Page ** pp) { if(lastFreepage == -1) { lastFreepage = TpageAlloc(xid); - *pp = loadPage(lastFreepage); + *pp = loadPage(xid, lastFreepage); assert(*page_type_ptr(*pp) == UNINITIALIZED_PAGE); slottedPageInitialize(*pp); } else { - *pp = loadPage(lastFreepage); + *pp = loadPage(xid, lastFreepage); } if(slottedFreespace(*pp) < size ) { releasePage(*pp); lastFreepage = TpageAlloc(xid); - *pp = loadPage(lastFreepage); + *pp = loadPage(xid, lastFreepage); slottedPageInitialize(*pp); } @@ -209,7 +209,7 @@ recordid slottedPreRallocFromPage(int xid, long page, long size, Page **pp) { size = sizeof(blob_record_t); } - *pp = loadPage(page); + *pp = loadPage(xid, page); if(slottedFreespace(*pp) < size) { releasePage(*pp); @@ -302,7 +302,7 @@ static void __really_do_ralloc(Page * page, recordid rid) { } -recordid slottedPostRalloc(Page * page, lsn_t lsn, recordid rid) { +recordid slottedPostRalloc(int xid, Page * page, lsn_t lsn, recordid rid) { writelock(page->rwlatch, 376); @@ -348,14 +348,14 @@ recordid slottedPostRalloc(Page * page, lsn_t lsn, recordid rid) { } - pageWriteLSN(page, lsn); + pageWriteLSN(xid, page, lsn); writeunlock(page->rwlatch); return rid; } -void slottedDeRalloc(Page * page, lsn_t lsn, recordid rid) { +void slottedDeRalloc(int xid, Page * page, lsn_t lsn, recordid rid) { readlock(page->rwlatch, 443); @@ -364,7 +364,7 @@ void slottedDeRalloc(Page * page, lsn_t lsn, recordid rid) { *freelist_ptr(page) = rid.slot; /* *slot_length_ptr(page, rid.slot) = 0; */ - pageWriteLSN(page, lsn); + pageWriteLSN(xid, page, lsn); unlock(page->rwlatch); } diff --git a/src/lladd/page/slotted.h b/src/lladd/page/slotted.h index be8f835..26ab206 100644 --- a/src/lladd/page/slotted.h +++ b/src/lladd/page/slotted.h @@ -113,13 +113,13 @@ recordid slottedPreRallocFromPage(int xid, long page, long size, Page**p); * page for this record (though it is possible that we need to compact * the page) */ -recordid slottedPostRalloc(Page * page, lsn_t lsn, recordid rid); +recordid slottedPostRalloc(int xid, Page * page, lsn_t lsn, recordid rid); /** * Mark the space used by a record for reclaimation. * * @param rid the recordid to be freed. */ -void slottedDeRalloc(Page * page, lsn_t lsn, recordid rid); +void slottedDeRalloc(int xid, Page * page, lsn_t lsn, recordid rid); void slottedPageInit(); void slottedPageDeinit(); diff --git a/src/lladd/transactional2.c b/src/lladd/transactional2.c index 8068f2c..3c78382 100644 --- a/src/lladd/transactional2.c +++ b/src/lladd/transactional2.c @@ -6,6 +6,8 @@ #include #include "logger/logWriter.h" #include +#include + #include "page.h" #include @@ -91,8 +93,10 @@ int Tinit() { initNestedTopActions(); ThashInit(); - InitiateRecovery(); + setupLockManagerCallbacksNil(); + // setupLockManagerCallbacksPage(); + InitiateRecovery(); return 0; } @@ -130,6 +134,8 @@ int Tbegin() { XactionTable[index] = LogTransBegin(xidCount_tmp); + if(globalLockManager.begin) { globalLockManager.begin(XactionTable[index].xid); } + return XactionTable[index].xid; } @@ -142,12 +148,12 @@ void Tupdate(int xid, recordid rid, const void *dat, int op) { pthread_mutex_unlock(&transactional_2_mutex); #endif - p = loadPage(rid.page); + p = loadPage(xid, rid.page); if(*page_type_ptr(p) == INDIRECT_PAGE) { releasePage(p); - rid = dereferenceRID(rid); - p = loadPage(rid.page); + rid = dereferenceRID(xid, rid); + p = loadPage(xid, rid.page); /** @todo Kludge! Shouldn't special case operations in transactional2. */ } else if(*page_type_ptr(p) == ARRAY_LIST_PAGE && op != OPERATION_LINEAR_INSERT && @@ -156,7 +162,7 @@ void Tupdate(int xid, recordid rid, const void *dat, int op) { op != OPERATION_UNDO_LINEAR_DELETE ) { rid = dereferenceArrayListRid(p, rid.slot); releasePage(p); - p = loadPage(rid.page); + p = loadPage(xid, rid.page); } e = LogUpdate(&XactionTable[xid % MAX_TRANSACTIONS], p, rid, op, dat); @@ -175,7 +181,7 @@ void Tupdate(int xid, recordid rid, const void *dat, int op) { void alTupdate(int xid, recordid rid, const void *dat, int op) { LogEntry * e; Page * p; - p = loadPage(rid.page); + p = loadPage(xid, rid.page); /* if(*page_type_ptr(p) == INDIRECT_PAGE) { releasePage(p); @@ -206,19 +212,19 @@ void alTupdate(int xid, recordid rid, const void *dat, int op) { void TreadUnlocked(int xid, recordid rid, void * dat) { - Page * p = loadPage(rid.page); + Page * p = loadPage(xid, rid.page); int page_type = *page_type_ptr(p); if(page_type == SLOTTED_PAGE || page_type == FIXED_PAGE || !page_type ) { } else if(page_type == INDIRECT_PAGE) { releasePage(p); - rid = dereferenceRIDUnlocked(rid); - p = loadPage(rid.page); + rid = dereferenceRIDUnlocked(xid, rid); + p = loadPage(xid, rid.page); } else if(page_type == ARRAY_LIST_PAGE) { rid = dereferenceArrayListRidUnlocked(p, rid.slot); releasePage(p); - p = loadPage(rid.page); + p = loadPage(xid, rid.page); } else { abort(); @@ -228,19 +234,19 @@ void TreadUnlocked(int xid, recordid rid, void * dat) { } void Tread(int xid, recordid rid, void * dat) { - Page * p = loadPage(rid.page); + Page * p = loadPage(xid, rid.page); int page_type = *page_type_ptr(p); if(page_type == SLOTTED_PAGE || page_type == FIXED_PAGE || !page_type ) { } else if(page_type == INDIRECT_PAGE) { releasePage(p); - rid = dereferenceRID(rid); - p = loadPage(rid.page); + rid = dereferenceRID(xid, rid); + p = loadPage(xid, rid.page); } else if(page_type == ARRAY_LIST_PAGE) { rid = dereferenceArrayListRid(p, rid.slot); releasePage(p); - p = loadPage(rid.page); + p = loadPage(xid, rid.page); } else { abort(); diff --git a/test/lladd/check_bufferManager.c b/test/lladd/check_bufferManager.c index 203c90a..ddf3afd 100644 --- a/test/lladd/check_bufferManager.c +++ b/test/lladd/check_bufferManager.c @@ -35,10 +35,10 @@ void initializePages() { rid.page = i; rid.slot = 0; rid.size = sizeof(int); - p = loadPage(rid.page); + p = loadPage(-1, rid.page); assert(p->id != -1); - slottedPostRalloc(p, 0, rid); + slottedPostRalloc(-1, p, 0, rid); writeRecord(1, p, 1, rid, &i); @@ -66,7 +66,7 @@ void * workerThread(void * p) { rid.slot = 0; rid.size = sizeof(int); - p = loadPage(rid.page); + p = loadPage(-1, rid.page); readRecord(1, p, rid, &j); @@ -89,7 +89,7 @@ void * workerThreadWriting(void * q) { Page * tmp; pthread_mutex_lock(&ralloc_mutex); rids[i] = slottedPreRalloc(1, sizeof(int), &tmp); - slottedPostRalloc(tmp, 1, rids[i]); + slottedPostRalloc(-1, tmp, 1, rids[i]); releasePage(tmp); pthread_mutex_unlock(&ralloc_mutex); @@ -106,7 +106,7 @@ void * workerThreadWriting(void * q) { for(int i = 0; i < RECORDS_PER_THREAD; i++) { int val = (i * 10000) + offset; int k; - Page * p = loadPage(rids[i].page); + Page * p = loadPage(-1, rids[i].page); assert(p->id == rids[i].page); @@ -135,7 +135,7 @@ void * workerThreadWriting(void * q) { Page * p; - p = loadPage(rids[i].page); + p = loadPage(-1, rids[i].page); readRecord(1, p, rids[i], &val); diff --git a/test/lladd/check_indirect.c b/test/lladd/check_indirect.c index ac9b584..a492887 100644 --- a/test/lladd/check_indirect.c +++ b/test/lladd/check_indirect.c @@ -101,7 +101,7 @@ START_TEST(indirectAlloc) { fail_unless(rid.slot == RECORD_ARRAY, NULL); fail_unless(rid.size == 1, NULL); - Page * p = loadPage(page); + Page * p = loadPage(xid, page); int page_type = *page_type_ptr(p); @@ -122,7 +122,7 @@ START_TEST(indirectAlloc) { fail_unless(rid.slot == RECORD_ARRAY, NULL); fail_unless(rid.size == 2000, NULL); - p = loadPage(page); + p = loadPage(xid, page); page_type = *page_type_ptr(p); @@ -146,7 +146,7 @@ START_TEST(indirectAlloc) { fail_unless(rid.slot == RECORD_ARRAY, NULL); fail_unless(rid.size == 2, NULL); - p = loadPage(page); + p = loadPage(xid, page); page_type = *page_type_ptr(p); @@ -177,7 +177,7 @@ START_TEST(indirectAccessDirect) { page = rid.page; /* Make sure that it didn't create any indirect pages. */ - Page * p = loadPage(page); + Page * p = loadPage(xid, page); int page_type = *page_type_ptr(p); @@ -193,7 +193,7 @@ START_TEST(indirectAccessDirect) { for(int i = 0; i < 500; i++) { rid.slot = i; - Tset(xid, dereferenceRID(rid), &i); + Tset(xid, dereferenceRID(xid, rid), &i); } Tcommit(xid); @@ -202,7 +202,7 @@ START_TEST(indirectAccessDirect) { for(int i = 0; i < 500; i++) { rid.slot = i; int j; - Tread(xid, dereferenceRID(rid), &j); + Tread(xid, dereferenceRID(xid, rid), &j); assert(j == i); } @@ -222,7 +222,7 @@ START_TEST(indirectAccessIndirect) { page = rid.page; /* Make sure that it didn't create any indirect pages. */ - Page * p = loadPage(page); + Page * p = loadPage(xid, page); int page_type = *page_type_ptr(p); @@ -236,7 +236,7 @@ START_TEST(indirectAccessIndirect) { for(int i = 0; i < 500000; i++) { rid.slot = i; - Tset(xid, dereferenceRID(rid), &i); + Tset(xid, dereferenceRID(xid, rid), &i); } Tcommit(xid); @@ -245,7 +245,7 @@ START_TEST(indirectAccessIndirect) { for(int i = 0; i < 500000; i++) { rid.slot = i; int j; - Tread(xid, dereferenceRID(rid), &j); + Tread(xid, dereferenceRID(xid, rid), &j); assert(j == i); } @@ -264,12 +264,12 @@ START_TEST(indirectSizeTest) { int xid = Tbegin(); recordid rid = rallocMany(xid, sizeof(int), 20); - int count = indirectPageRecordCount(rid); + int count = indirectPageRecordCount(xid, rid); assert(count == 20); recordid rid2 = rallocMany(xid, sizeof(int), 5000); - count = indirectPageRecordCount(rid2); + count = indirectPageRecordCount(xid, rid2); assert(count == 5000); Tcommit(xid); diff --git a/test/lladd/check_lockManager.c b/test/lladd/check_lockManager.c index abeeda6..46b5820 100644 --- a/test/lladd/check_lockManager.c +++ b/test/lladd/check_lockManager.c @@ -11,14 +11,60 @@ #define LOG_NAME "check_lockManager.log" /** Needs to be formatted as a floating point */ -#define NUM_RECORDS 100000000.0 +#define NUM_RECORDS 5000000.0 #define THREAD_COUNT 100 -#define RIDS_PER_THREAD 10000 +#define RIDS_PER_THREAD 1000 -void * workerThread(void * j) { +void * pageWorkerThread(void * j) { + int xid = *(int*)j; + //startTransaction(xid); + globalLockManager.begin(xid); + recordid rid; + rid.page = 0; + rid.size = 0; + int k; + int deadlocks = 0; + for(k = 0; k < RIDS_PER_THREAD; k++) { + int m = (int) (NUM_RECORDS*random()/(RAND_MAX+1.0)); + int rw = random() % 2; + + if(rw) { + // readlock + if(LLADD_DEADLOCK == globalLockManager.readLockPage(xid, m)) { + // if(LLADD_DEADLOCK == lockManagerReadLockRecord(xid, rid)) { + k = 0; + globalLockManager.abort(xid); + deadlocks++; + printf("-"); + } + + + } else { + // writelock + // if(LLADD_DEADLOCK == lockManagerWriteLockRecord(xid, rid)) { + if(LLADD_DEADLOCK == globalLockManager.writeLockPage(xid, m)) { + k = 0; + globalLockManager.abort(xid); + deadlocks++; + printf("-"); + } + + } + } + + printf("%2d ", deadlocks); fflush(stdout); + + // lockManagerCommit(xid); + globalLockManager.commit(xid); + + return NULL; + +} +void * ridWorkerThread(void * j) { int xid = *(int*)j; - startTransaction(xid); + //startTransaction(xid); + globalLockManager.begin(xid); recordid rid; rid.page = 0; rid.size = 0; @@ -30,16 +76,23 @@ void * workerThread(void * j) { if(rw) { // readlock - - if(LLADD_DEADLOCK == lockManagerReadLockRecord(xid, rid)) { + if(LLADD_DEADLOCK == globalLockManager.readLockRecord(xid, rid)) { + // if(LLADD_DEADLOCK == lockManagerReadLockRecord(xid, rid)) { + k = 0; + globalLockManager.abort(xid); deadlocks++; + printf("-"); } } else { // writelock - if(LLADD_DEADLOCK == lockManagerWriteLockRecord(xid, rid)) { + // if(LLADD_DEADLOCK == lockManagerWriteLockRecord(xid, rid)) { + if(LLADD_DEADLOCK == globalLockManager.writeLockRecord(xid, rid)) { + k = 0; + globalLockManager.abort(xid); deadlocks++; + printf("-"); } } @@ -47,21 +100,43 @@ void * workerThread(void * j) { printf("%2d ", deadlocks); fflush(stdout); - lockManagerReleaseAll(xid); + // lockManagerCommit(xid); + globalLockManager.commit(xid); return NULL; } -START_TEST(lockManagerTest) { +START_TEST(recordidLockManagerTest) { + printf("\n"); + + + setupLockManagerCallbacksRecord(); - lockManagerInit(); pthread_t workers[THREAD_COUNT]; int i; for(i = 0; i < THREAD_COUNT; i++) { int *j = malloc(sizeof(int)); *j = i; - pthread_create(&workers[i], NULL, workerThread, j); + pthread_create(&workers[i], NULL, ridWorkerThread, j); + } + for(i = 0; i < THREAD_COUNT; i++) { + pthread_join(workers[i], NULL); + } +} END_TEST + +START_TEST(pageLockManagerTest) { + printf("\n"); + + + setupLockManagerCallbacksPage(); + + pthread_t workers[THREAD_COUNT]; + int i; + for(i = 0; i < THREAD_COUNT; i++) { + int *j = malloc(sizeof(int)); + *j = i; + pthread_create(&workers[i], NULL, pageWorkerThread, j); } for(i = 0; i < THREAD_COUNT; i++) { pthread_join(workers[i], NULL); @@ -76,8 +151,9 @@ Suite * check_suite(void) { /* Sub tests are added, one per line, here */ - tcase_add_test(tc, lockManagerTest); - + tcase_add_test(tc, recordidLockManagerTest); + tcase_add_test(tc, pageLockManagerTest); + /* --------------------------------------------- */ tcase_add_checked_fixture(tc, setup, teardown); diff --git a/test/lladd/check_operations.c b/test/lladd/check_operations.c index 826f971..53075df 100644 --- a/test/lladd/check_operations.c +++ b/test/lladd/check_operations.c @@ -74,7 +74,7 @@ START_TEST(operation_physical_do_undo) { Page * p; Tinit(); - + xid = -1; rid = slottedPreRalloc(xid, sizeof(int), &p); releasePage(p); buf = 1; @@ -87,17 +87,17 @@ START_TEST(operation_physical_do_undo) { DEBUG("B\n"); - p = loadPage(rid.page); + p = loadPage(xid, rid.page); writeRecord(xid, p, lsn, rid, &buf); releasePage(p); setToTwo->LSN = 10; DEBUG("C\n"); - p = loadPage(rid.page); + p = loadPage(xid, rid.page); doUpdate(setToTwo, p); /* PAGE LSN= 10, value = 2. */ releasePage(p); - p = loadPage(rid.page); + p = loadPage(xid, rid.page); readRecord(xid, p, rid, &buf); releasePage(p); @@ -107,7 +107,7 @@ START_TEST(operation_physical_do_undo) { DEBUG("D\n"); - p = loadPage(rid.page); + p = loadPage(xid, rid.page); fail_unless(10 == pageReadLSN(p), "page lsn not set correctly."); @@ -116,7 +116,7 @@ START_TEST(operation_physical_do_undo) { undoUpdate(setToTwo, p, 8); /* Should succeed, CLR LSN is too low, but undoUpdate only checks the log entry. */ releasePage(p); - p = loadPage(rid.page); + p = loadPage(xid, rid.page); readRecord(xid, p, rid, &buf); releasePage(p); @@ -126,7 +126,7 @@ START_TEST(operation_physical_do_undo) { redoUpdate(setToTwo); - p = loadPage(rid.page); + p = loadPage(xid, rid.page); readRecord(xid, p, rid, &buf); releasePage(p); @@ -145,7 +145,7 @@ START_TEST(operation_physical_do_undo) { lsn = 0; buf = 1; - p = loadPage(rid.page); + p = loadPage(xid, rid.page); writeRecord(xid, p, lsn, rid, &buf); releasePage(p); /* Trace of test: @@ -169,7 +169,7 @@ START_TEST(operation_physical_do_undo) { DEBUG("F\n"); redoUpdate(setToTwo); - p = loadPage(rid.page); + p = loadPage(xid, rid.page); readRecord(xid, p, rid, &buf); assert(buf == 2); fail_unless(buf == 2, NULL); @@ -185,7 +185,7 @@ START_TEST(operation_physical_do_undo) { DEBUG("H don't redo set to 2\n"); redoUpdate(setToTwo); /* Fails */ - p = loadPage(rid.page); + p = loadPage(xid, rid.page); readRecord(xid, p, rid, &buf); @@ -197,7 +197,7 @@ START_TEST(operation_physical_do_undo) { releasePage(p); redoUpdate(setToTwo); /* Succeeds */ - p = loadPage(rid.page); + p = loadPage(xid, rid.page); readRecord(xid, p, rid, &buf); fail_unless(buf == 2, NULL); @@ -330,6 +330,7 @@ START_TEST(operation_prepare) { and checks that the result is as expected. @todo Write a more thorough (threaded!) nested top action test. + */ START_TEST(operation_nestedTopAction) { @@ -512,7 +513,8 @@ START_TEST(operation_set_range) { } Tcommit(xid); - + Tdeinit(); + } END_TEST START_TEST(operation_alloc_test) { diff --git a/test/lladd/check_page.c b/test/lladd/check_page.c index 01d7327..46f1819 100644 --- a/test/lladd/check_page.c +++ b/test/lladd/check_page.c @@ -86,7 +86,7 @@ static void * multiple_simultaneous_pages ( void * arg_ptr) { readRecord(1, p, rid[k], (byte*)&j); assert((j + 1) == i + k); - slottedDeRalloc(p, lsn, rid[k]); + slottedDeRalloc(-1, p, lsn, rid[k]); sched_yield(); } } @@ -98,7 +98,7 @@ static void * multiple_simultaneous_pages ( void * arg_ptr) { rid[k] = slottedRawRalloc(p, sizeof(short)); i +=k; /* printf("Slot %d = %d\n", rid[k].slot, i); */ - writeRecord(1, p, lsn, rid[k], (byte*)&i); + writeRecord(-1, p, lsn, rid[k], (byte*)&i); i -=k; sched_yield(); } @@ -159,14 +159,14 @@ static void* worker_thread(void * arg_ptr) { if(! first ) { readRecord(1, p, rid, (byte*)&j); assert((j + 1) == i); - slottedDeRalloc(p, lsn, rid); + slottedDeRalloc(-1, p, lsn, rid); sched_yield(); } first = 0; rid = slottedRawRalloc(p, sizeof(int)); - writeRecord(1, p, lsn, rid, (byte*)&i); + writeRecord(-1, p, lsn, rid, (byte*)&i); sched_yield(); assert(pageReadLSN(p) <= lsn); @@ -198,7 +198,8 @@ START_TEST(pageNoThreadTest) Tinit(); - p = loadPage(0); + p = loadPage(-1, 0); + slottedPageInitialize(p); worker_thread(p); @@ -287,7 +288,7 @@ START_TEST(pageNoThreadMultPageTest) Tinit(); - p = loadPage(1); + p = loadPage(-1, 1); slottedPageInitialize(p); multiple_simultaneous_pages(p); releasePage(p); @@ -315,7 +316,7 @@ START_TEST(pageThreadTest) { Tinit(); fail_unless(1, NULL); - Page * p = loadPage(2); + Page * p = loadPage(-1, 2); slottedPageInitialize(p); fail_unless(1, NULL); @@ -345,7 +346,7 @@ START_TEST(fixedPageThreadTest) { pthread_mutex_init(&random_mutex, NULL); pthread_mutex_init(&lsn_mutex, NULL); Tinit(); - Page * p = loadPage(2); + Page * p = loadPage(-1, 2); fixedPageInitialize(p, sizeof(int), 0); for(i = 0; i < THREAD_COUNT; i++) { @@ -368,25 +369,26 @@ START_TEST(pageCheckSlotTypeTest) { recordid fixedRoot = TarrayListAlloc(xid, 2, 10, 10); recordid blob = Talloc(xid, PAGE_SIZE * 2); - Page * p = loadPage(slot.page); + Page * p = loadPage(-1, slot.page); assert(getRecordType(xid, p, slot) == SLOTTED_RECORD); releasePage(p); /** @todo the use of the fixedRoot recordid to check getRecordType is a bit questionable, but should work. */ - p = loadPage(fixedRoot.page); + p = loadPage(-1, fixedRoot.page); + assert(getRecordType(xid, p, fixedRoot) == FIXED_RECORD); releasePage(p); fixedRoot.slot = 1; - recordid fixedEntry = dereferenceRID(fixedRoot); + recordid fixedEntry = dereferenceRID(xid, fixedRoot); fixedRoot.slot = 0; - p = loadPage(fixedEntry.page); + p = loadPage(-1, fixedEntry.page); assert(getRecordType(xid, p, fixedEntry) == FIXED_RECORD); releasePage(p); - p = loadPage(blob.page); + p = loadPage(-1, blob.page); int type = getRecordType(xid, p, blob); assert(type == BLOB_RECORD); releasePage(p); @@ -396,7 +398,7 @@ START_TEST(pageCheckSlotTypeTest) { bad.slot = slot.slot + 10; bad.size = 4; - p = loadPage(bad.page); + p = loadPage(xid, bad.page); assert(getRecordType(xid, p, bad) == UNINITIALIZED_RECORD); bad.size = 100000; assert(getRecordType(xid, p, bad) == UNINITIALIZED_RECORD); @@ -430,7 +432,7 @@ START_TEST(pageTrecordTypeTest) { assert(TrecordType(xid, fixedRoot) == FIXED_RECORD); fixedRoot.slot = 1; - recordid fixedEntry = dereferenceRID(fixedRoot); + recordid fixedEntry = dereferenceRID(xid, fixedRoot); fixedRoot.slot = 0; assert(TrecordType(xid, fixedEntry) == FIXED_RECORD);