diff --git a/lladd/hash.h b/lladd/hash.h index a67d519..db9bbfa 100644 --- a/lladd/hash.h +++ b/lladd/hash.h @@ -3,7 +3,6 @@ #ifndef __HASH_H #define __HASH_H -/** @todo replace() powl in hash with something more efficient, if hash() becomes a bottleneck. */ unsigned int max_bucket(unsigned char tableBits, unsigned long nextExtension); unsigned int hash(const void * val, long val_length, unsigned char tableBits, unsigned long nextExtension); #define twoToThe(x) (1 << (x)) diff --git a/lladd/lockManager.h b/lladd/lockManager.h new file mode 100644 index 0000000..2995155 --- /dev/null +++ b/lladd/lockManager.h @@ -0,0 +1,9 @@ +#include + +void lockManagerInit(); + +int lockManagerReadLockRecord(int xid, recordid rid); +int lockManagerWriteLockRecord(int xid, recordid rid); + +int lockManagerUnlockRecord(int xid, recordid rid); +int lockManagerReleaseAll(int xid); diff --git a/src/libdfa/rw.c b/src/libdfa/rw.c index c03f6c1..f98d8b6 100644 --- a/src/libdfa/rw.c +++ b/src/libdfa/rw.c @@ -67,7 +67,7 @@ void writelock (rwl *lock, int d) /* printf ("writer %d blocked. %d readers, %d writers, %d waiting\n", d, lock->readers, lock->writers, lock->waiting); */ pthread_cond_wait (lock->writeOK, lock->mut); /* printf ("writer %d unblocked.\n", d); */ - } + } lock->waiting--; lock->writers++; pthread_mutex_unlock (lock->mut); @@ -111,44 +111,13 @@ void unlock(rwl * lock) { pthread_mutex_unlock (lock->mut); } -/*void readunlock(rwl *lock) { - writeunlock(lock); - }*/ void readunlock(rwl * lock) { unlock(lock); } void writeunlock(rwl * lock) { unlock(lock); } -/* - void readunlock (rwl *lock) - { - pthread_mutex_lock (lock->mut); - lock->readers--; - pthread_cond_signal (lock->writeOK); - / * Don't need to broadcast, since only one writer can run at - once. * / - - / * pthread_cond_broadcast (lock->writeOK); * / - - pthread_mutex_unlock (lock->mut); - / * printf("readunlock done\n"); * / - } - - void writeunlock (rwl *lock) - { - / * printf("writeunlock done\n"); - fflush(NULL); * / - - pthread_mutex_lock (lock->mut); - lock->writers--; - / * Need this as well (in case there's another writer, which is blocking the all of the readers. * / - pthread_cond_signal (lock->writeOK); - pthread_cond_broadcast (lock->readOK); - pthread_mutex_unlock (lock->mut); - } -*/ void deletelock (rwl *lock) { pthread_mutex_destroy (lock->mut); diff --git a/src/lladd/Makefile.am b/src/lladd/Makefile.am index 08b0d00..8338752 100644 --- a/src/lladd/Makefile.am +++ b/src/lladd/Makefile.am @@ -5,11 +5,12 @@ lib_LIBRARIES=liblladd.a # removed: recovery.c transactional.c logger.c logger/logparser.c logger/logstreamer.c liblladd_a_SOURCES=crc32.c common.c stats.c io.c bufferManager.c linkedlist.c operations.c \ pageFile.c pageCache.c page.c blobManager.c recovery2.c transactional2.c \ - logger/logEntry.c logger/logWriter.c logger/logHandle.c logger/logger2.c \ + lockManager.c \ + logger/logEntry.c logger/logWriter.c logger/logHandle.c logger/logger2.c \ + page/slotted.c page/header.c page/fixed.c \ operations/pageOperations.c page/indirect.c operations/decrement.c \ operations/increment.c operations/prepare.c operations/set.c \ operations/alloc.c operations/noop.c operations/instantSet.c \ - page/slotted.c page/header.c page/fixed.c \ operations/arrayList.c hash.c operations/linearHash.c \ operations/naiveLinearHash.c operations/nestedTopActions.c \ operations/linearHashNTA.c operations/linkedListNTA.c \ diff --git a/src/lladd/lockManager.c b/src/lladd/lockManager.c new file mode 100644 index 0000000..f0302e8 --- /dev/null +++ b/src/lladd/lockManager.c @@ -0,0 +1,251 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + + +#define MUTEX_COUNT 32 +// These next two correspond to MUTEX count, and are the appropriate values to pass into hash(). +#define MUTEX_BITS 5 +#define MUTEX_EXT 32 + +static pthread_mutex_t mutexes[MUTEX_COUNT]; + +static pthread_mutex_t xid_table_mutex = PTHREAD_MUTEX_INITIALIZER; + +static pthread_mutex_t * getMutex(recordid rid) { + return &mutexes[hash(&rid, sizeof(recordid), MUTEX_BITS, MUTEX_EXT)]; +} + +static pblHashTable_t * xidLockTable; +static pblHashTable_t * ridLockTable; + +typedef struct { + pthread_cond_t writeOK; + pthread_cond_t readOK; + int readers; + int writers; + int waiting; + int active; +} lock; + +void lockManagerInit() { + int i = 0; + for(i = 0; i < MUTEX_COUNT; i++) { + pthread_mutex_init(&mutexes[i], NULL); + } + xidLockTable = pblHtCreate(); + ridLockTable = pblHtCreate(); + +} +/** @todo startTransaction needs a mutex!! */ +void startTransaction(int xid) { + pthread_mutex_lock(&xid_table_mutex); + + pblHashTable_t * xidLocks = pblHtCreate(); + pblHtInsert(xidLockTable, &xid, sizeof(int), xidLocks); + pthread_mutex_unlock(&xid_table_mutex); +} + +lock* createLock(recordid rid) { + lock * ret = malloc(sizeof(lock)); + + if(!ret) { return NULL; } + + // pthread_mutex_init(&ret->mut, NULL); + pthread_cond_init(&ret->writeOK, NULL); + pthread_cond_init(&ret->readOK, NULL); + ret->readers = 0; + ret->writers = 0; + ret->waiting = 0; + + pblHtInsert(ridLockTable, &rid, sizeof(recordid), ret); + return ret; +} + +void destroyLock(recordid rid, lock * l) { + pthread_cond_destroy(&l->writeOK); + pthread_cond_destroy(&l->readOK); + free (l); + pblHtRemove(ridLockTable, &rid, sizeof(recordid)); +} + +#define LM_READLOCK 1 +#define LM_WRITELOCK 2 + +int lockManagerReadLockRecord(int xid, recordid rid) { + + pthread_mutex_lock(&xid_table_mutex); + pblHashTable_t * xidLocks = pblHtLookup(xidLockTable, &xid, sizeof(int)); + if((int)pblHtLookup(xidLocks, &rid, sizeof(recordid)) >= LM_READLOCK) { + pthread_mutex_unlock(&xid_table_mutex); + return 0; + } + pthread_mutex_unlock(&xid_table_mutex); + pthread_mutex_t * mut = getMutex(rid); + + pthread_mutex_lock(mut); + + lock * ridLock = pblHtLookup(ridLockTable, &rid, sizeof(recordid)); + + if(!ridLock) { + ridLock = createLock(rid); + } + + ridLock->active++; + + if(ridLock->writers || ridLock->waiting) { + struct timeval tv; + int tod_ret = gettimeofday (&tv, NULL); + tv.tv_sec++; // Wait up to one second to obtain a lock before detecting deadlock. + struct timespec ts; + ts.tv_sec = tv.tv_sec; + ts.tv_nsec = tv.tv_usec * 1000; + if(tod_ret != 0) { + perror("Could not get time of day"); + return LLADD_INTERNAL_ERROR; + } + do { + int wait_ret = pthread_cond_timedwait(&ridLock->readOK, mut, &ts); + if(wait_ret == ETIMEDOUT) { + ridLock->active--; + pthread_mutex_unlock(mut); + return LLADD_DEADLOCK; + } + } while(ridLock->writers); + } + ridLock->readers++; + ridLock->active--; + pthread_mutex_unlock(mut); + pblHtInsert(xidLocks, &rid, sizeof(recordid), (void*)LM_READLOCK); + return 0; +} +int lockManagerWriteLockRecord(int xid, recordid rid) { + pthread_mutex_lock(&xid_table_mutex); + pblHashTable_t * xidLocks = pblHtLookup(xidLockTable, &xid, sizeof(int)); + + + int currentLockLevel = (int)pblHtLookup(xidLocks, &rid, sizeof(recordid)); + int me = 0; + pthread_mutex_unlock(&xid_table_mutex); + + if(currentLockLevel >= LM_WRITELOCK) { + return 0; + } else if(currentLockLevel == LM_READLOCK) { + me = 1; + } + + pthread_mutex_t * mut = getMutex(rid); + + pthread_mutex_lock(mut); + lock * ridLock = pblHtLookup(ridLockTable, &rid, sizeof(recordid)); + if(!ridLock) { + ridLock = createLock(rid); + } + + ridLock->active++; + ridLock->waiting++; + if(ridLock->writers || (ridLock->readers - me)) { + struct timeval tv; + int tod_ret = gettimeofday(&tv, NULL); + tv.tv_sec++; + struct timespec ts; + ts.tv_sec = tv.tv_sec; + ts.tv_nsec = tv.tv_usec * 1000; + if(tod_ret != 0) { + perror("Could not get time of day"); + return LLADD_INTERNAL_ERROR; + } + while(ridLock->writers || (ridLock->readers - me)) { + int lockret = pthread_cond_timedwait(&ridLock->writeOK, mut, &ts); + if(lockret == ETIMEDOUT) { + ridLock->waiting--; + ridLock->active--; + pthread_mutex_unlock(mut); + return LLADD_DEADLOCK; + } + } + } + ridLock->waiting--; + if(currentLockLevel == 0) { + ridLock->readers++; + } + ridLock->writers++; + ridLock->active--; + pthread_mutex_unlock(mut); + pblHtInsert(xidLocks, &rid, sizeof(recordid), (void*)LM_WRITELOCK); + return 0; +} + +int lockManagerUnlockRecord(int xid, recordid rid) { + pthread_mutex_lock(&xid_table_mutex); + + pblHashTable_t * xidLocks = pblHtLookup(xidLockTable, &xid, sizeof(int)); + + + int currentLevel = (int)pblHtLookup(xidLocks, &rid, sizeof(recordid)); + + if(currentLevel) { + pblHtRemove(xidLocks, &rid, sizeof(recordid)); + } + + pthread_mutex_unlock(&xid_table_mutex); + pthread_mutex_t * mut = getMutex(rid); + pthread_mutex_lock(mut); + lock * ridLock = pblHtLookup(ridLockTable, &rid, sizeof(recordid)); + assert(ridLock); + ridLock->active++; + + if(currentLevel == LM_WRITELOCK) { + ridLock->writers--; + ridLock->readers--; + } else if(currentLevel == LM_READLOCK) { + ridLock->readers--; + } else if(currentLevel == 0) { + assert(0); // Someone tried to release a lock they didn't own! + } else { + fprintf(stderr, "Unknown lock type encountered!"); + ridLock->active--; + pthread_mutex_unlock(mut); + return LLADD_INTERNAL_ERROR; + } + + ridLock->active--; + + if(!(ridLock->active || ridLock->waiting || ridLock->readers || ridLock->writers)) { + destroyLock(rid, ridLock); + } + + pthread_mutex_unlock(mut); + + return 0; +} + +int lockManagerReleaseAll(int xid) { + + pthread_mutex_lock(&xid_table_mutex); + + pblHashTable_t * xidLocks = pblHtLookup(xidLockTable, &xid, sizeof(int)); + + pthread_mutex_unlock(&xid_table_mutex); + void * data; + int ret = 0; + for(data = pblHtFirst(xidLocks); data; data = pblHtNext(xidLocks)) { + recordid rid = *(recordid*)pblHtCurrentKey(xidLocks); + int tmpret = lockManagerUnlockRecord(xid, rid); + // Pass any error(s) up to the user. + // (This logic relies on the fact that currently it only returns 0 and LLADD_INTERNAL_ERROR) + if(tmpret) { + ret = tmpret; + } + pblHtRemove(xidLocks, &rid, sizeof(recordid)); + } + return ret; +} diff --git a/src/lladd/page.h b/src/lladd/page.h index 7e48052..d050771 100644 --- a/src/lladd/page.h +++ b/src/lladd/page.h @@ -136,8 +136,7 @@ struct Page_s { /** @todo The Page.LSN field seems extraneous. Why do we need it? */ long LSN; byte *memAddr; - /** @todo dirty pages currently aren't marked dirty! */ - int dirty; + byte dirty; /** The next item in the replacement policy's queue */ struct Page_s *next; /** The previous item in the replacement policy's queue. */ @@ -201,6 +200,7 @@ struct Page_s { */ rwl * loadlatch; + }; /** diff --git a/test/lladd/Makefile.am b/test/lladd/Makefile.am index a438e2c..ae7f07a 100644 --- a/test/lladd/Makefile.am +++ b/test/lladd/Makefile.am @@ -1,12 +1,12 @@ INCLUDES = @CHECK_CFLAGS@ if HAVE_CHECK ## Had to disable check_lht because lht needs to be rewritten. -TESTS = check_logEntry check_logWriter check_page check_operations check_transactional2 check_recovery check_blobRecovery check_bufferManager check_indirect check_pageOperations check_linearHash check_logicalLinearHash check_header check_linkedListNTA check_linearHashNTA check_pageOrientedList +TESTS = check_logEntry check_logWriter check_page check_operations check_transactional2 check_recovery check_blobRecovery check_bufferManager check_indirect check_pageOperations check_linearHash check_logicalLinearHash check_header check_linkedListNTA check_linearHashNTA check_pageOrientedList check_lockManager #check_lladdhash else TESTS = endif noinst_PROGRAMS = $(TESTS) LDADD = @CHECK_LIBS@ $(top_builddir)/src/lladd/liblladd.a $(top_builddir)/src/pbl/libpbl.a $(top_builddir)/src/libdfa/librw.a #-lefence -CLEANFILES = check_lht.log check_logEntry.log storefile.txt logfile.txt blob0_file.txt blob1_file.txt check_blobRecovery.log check_logWriter.log check_operations.log check_recovery.log check_transactional2.log check_page.log check_bufferManager.log check_indirect.log check_bufferMananger.log check_lladdhash.log check_pageOperations.log check_linearhash.log check_linkedListNTA.log check_linearHashNTA.log check_pageOrientedListNTA.log +CLEANFILES = check_lht.log check_logEntry.log storefile.txt logfile.txt blob0_file.txt blob1_file.txt check_blobRecovery.log check_logWriter.log check_operations.log check_recovery.log check_transactional2.log check_page.log check_bufferManager.log check_indirect.log check_bufferMananger.log check_lladdhash.log check_pageOperations.log check_linearhash.log check_linkedListNTA.log check_linearHashNTA.log check_pageOrientedListNTA.log check_lockManager.log AM_CFLAGS= -g -Wall -pedantic -std=gnu99 diff --git a/test/lladd/check_lockManager.c b/test/lladd/check_lockManager.c new file mode 100644 index 0000000..abeeda6 --- /dev/null +++ b/test/lladd/check_lockManager.c @@ -0,0 +1,90 @@ +#include +#include +#include +#include +#include + +#include +#include +#include "../check_includes.h" +#include +#define LOG_NAME "check_lockManager.log" + +/** Needs to be formatted as a floating point */ +#define NUM_RECORDS 100000000.0 +#define THREAD_COUNT 100 +#define RIDS_PER_THREAD 10000 + +void * workerThread(void * j) { + + int xid = *(int*)j; + startTransaction(xid); + recordid rid; + rid.page = 0; + rid.size = 0; + int k; + int deadlocks = 0; + for(k = 0; k < RIDS_PER_THREAD; k++) { + rid.slot = (int) (NUM_RECORDS*random()/(RAND_MAX+1.0)); + int rw = random() % 2; + + if(rw) { + // readlock + + if(LLADD_DEADLOCK == lockManagerReadLockRecord(xid, rid)) { + deadlocks++; + } + + + } else { + // writelock + if(LLADD_DEADLOCK == lockManagerWriteLockRecord(xid, rid)) { + deadlocks++; + } + + } + } + + printf("%2d ", deadlocks); fflush(stdout); + + lockManagerReleaseAll(xid); + + return NULL; + +} + +START_TEST(lockManagerTest) { + + lockManagerInit(); + pthread_t workers[THREAD_COUNT]; + int i; + for(i = 0; i < THREAD_COUNT; i++) { + int *j = malloc(sizeof(int)); + *j = i; + pthread_create(&workers[i], NULL, workerThread, j); + } + for(i = 0; i < THREAD_COUNT; i++) { + pthread_join(workers[i], NULL); + } + +} END_TEST + +Suite * check_suite(void) { + Suite *s = suite_create("lockManager"); + /* Begin a new test */ + TCase *tc = tcase_create("multithreaded"); + + /* Sub tests are added, one per line, here */ + + tcase_add_test(tc, lockManagerTest); + + /* --------------------------------------------- */ + + tcase_add_checked_fixture(tc, setup, teardown); + + + suite_add_tcase(s, tc); + return s; +} + +#include "../check_setup.h"