From 0ce77903fb68d1ba6a532c1fc0e0fa0cd5e5121a Mon Sep 17 00:00:00 2001 From: Sears Russell Date: Tue, 20 Jul 2004 03:40:57 +0000 Subject: [PATCH] Continuing work on multi-threading. r/w access to buffer manager getting close, but still buggy. --- lladd/common.h | 4 +- src/lladd/bufferManager.c | 49 ++++++++++---- src/lladd/common.c | 33 +++++++++- src/lladd/operations/alloc.c | 6 ++ src/lladd/page.c | 5 +- src/lladd/page.h | 4 +- src/lladd/pageCache.c | 11 ++-- src/lladd/pageFile.c | 15 +++-- src/lladd/transactional2.c | 48 ++++++++++++-- test/lladd/check_bufferManager.c | 103 +++++++++++++++++++++++++++--- test/lladd/check_transactional2.c | 81 +++++++++++++++++++++++ 11 files changed, 319 insertions(+), 40 deletions(-) diff --git a/lladd/common.h b/lladd/common.h index 65b7d3b..20429cc 100644 --- a/lladd/common.h +++ b/lladd/common.h @@ -99,8 +99,8 @@ extern int errno; #define lsn_t long -/*#define DEBUGGING - #define PROFILE_LATCHES*/ +/*#define DEBUGGING */ +#define PROFILE_LATCHES #ifdef DEBUGGING /** @todo Files that use DEBUG have to pull in stdio.h, which is a pain! */ diff --git a/src/lladd/bufferManager.c b/src/lladd/bufferManager.c index 9f0399b..50c3ffc 100644 --- a/src/lladd/bufferManager.c +++ b/src/lladd/bufferManager.c @@ -68,6 +68,8 @@ terms specified in this license. */ static pthread_mutex_t lastFreepage_mutex; +pthread_mutex_t add_pending_mutex; + static unsigned int lastFreepage = 0; /** @@ -77,7 +79,7 @@ static unsigned int lastFreepage = 0; */ Page * loadPage(int pageid); - +pthread_cond_t addPendingOK; int bufInit() { @@ -89,7 +91,9 @@ int bufInit() { lastFreepage = 0; pthread_mutex_init(&lastFreepage_mutex , NULL); - + pthread_cond_init(&addPendingOK, NULL); + pthread_mutex_init(&add_pending_mutex, NULL); + return 0; } @@ -121,13 +125,13 @@ Page * loadPage (int pageid) { Page * lastRallocPage = 0; - +/** @todo ralloc ignores it's xid parameter; change the interface? */ recordid ralloc(int xid, /*lsn_t lsn,*/ long size) { recordid ret; Page * p; - DEBUG("Rallocing record of size %ld\n", (long int)size); + /* DEBUG("Rallocing record of size %ld\n", (long int)size); */ assert(size < BLOB_THRESHOLD_SIZE || size == BLOB_SLOT); @@ -140,7 +144,7 @@ recordid ralloc(int xid, /*lsn_t lsn,*/ long size) { unlock(p->loadlatch); pthread_mutex_unlock(&lastFreepage_mutex); - DEBUG("alloced rid = {%d, %d, %ld}\n", ret.page, ret.slot, ret.size); + /* DEBUG("alloced rid = {%d, %d, %ld}\n", ret.page, ret.slot, ret.size); */ return ret; } @@ -233,18 +237,35 @@ void setSlotType(int pageid, int slot, int type) { */ void addPendingEvent(int pageid){ - Page * p = loadPage(pageid); + Page * p; - pthread_mutex_lock(&(p->pending_mutex)); + p = loadPage(pageid); - assert(!(p->waiting)); + pthread_mutex_lock(&add_pending_mutex); + + while(p->waiting) { + + pthread_mutex_unlock(&add_pending_mutex); + + unlock(p->loadlatch); + DEBUG("B"); + pthread_mutex_lock(&add_pending_mutex); + pthread_cond_wait(&addPendingOK, &add_pending_mutex); + pthread_mutex_unlock(&add_pending_mutex); + + p = loadPage(pageid); + + pthread_mutex_lock(&add_pending_mutex); + + } p->pending++; - pthread_mutex_unlock(&(p->pending_mutex)); + pthread_mutex_unlock(&add_pending_mutex); unlock(p->loadlatch); + } /** @@ -262,12 +283,14 @@ void addPendingEvent(int pageid){ */ void removePendingEvent(int pageid) { - Page * p = loadPage(pageid); + Page * p; - pthread_mutex_lock(&(p->pending_mutex)); + p = loadPage(pageid); + pthread_mutex_lock(&(add_pending_mutex)); p->pending--; + assert(p->id == pageid); assert(p->pending >= 0); if(p->waiting && !p->pending) { @@ -275,9 +298,11 @@ void removePendingEvent(int pageid) { pthread_cond_signal(&(p->noMorePending)); } - pthread_mutex_unlock(&(p->pending_mutex)); + pthread_mutex_unlock(&(add_pending_mutex)); unlock(p->loadlatch); + + } diff --git a/src/lladd/common.c b/src/lladd/common.c index b086efc..50434eb 100644 --- a/src/lladd/common.c +++ b/src/lladd/common.c @@ -114,7 +114,38 @@ int __lladd_pthread_mutex_destroy(lladd_pthread_mutex_t *mutex) { */ int __lladd_pthread_cond_wait(pthread_cond_t *cond, lladd_pthread_mutex_t *mutex, char * file, int line, char * cond_name, char * mutex_name) { - return pthread_cond_wait(cond, &mutex->mutex); + int ret; + char * location; + int location_length; + + profile_tuple * tup = pblHtLookup(mutex->lockpoints, mutex->last_acquired_at, strlen(mutex->last_acquired_at)+1); + + released_lock(tup); + released_lock(&(mutex->tup)); + + free(mutex->last_acquired_at); + + ret = pthread_cond_wait(cond, &mutex->mutex); + + location_length = asprintf(&location, "%s %d", file, line); + + tup = pblHtLookup(mutex->lockpoints, location, location_length+1); + + mutex->last_acquired_at = location; + + if(!tup) { + tup = malloc(sizeof(profile_tuple)); + + init_tuple(tup); + + pblHtInsert(mutex->lockpoints, location, location_length+1, tup); + + } + + acquired_lock(&(mutex->tup), 0); + acquired_lock(tup, 0); + + return ret; } int __lladd_pthread_cond_timedwait(pthread_cond_t *cond, lladd_pthread_mutex_t *mutex, void *abstime, diff --git a/src/lladd/operations/alloc.c b/src/lladd/operations/alloc.c index 6766ee2..093ce1a 100644 --- a/src/lladd/operations/alloc.c +++ b/src/lladd/operations/alloc.c @@ -7,6 +7,8 @@ #include #include "../blobManager.h" /** + @file + Implementation of Talloc() as an operation This is a bit strange compared to other operations, as it happens @@ -20,6 +22,10 @@ space alloced during the crash is leaked. This doesn't seem to be too big of a deal, but it should be fixed someday. A more serious problem results from crashes during blob allocation. + + @ingroup OPERATIONS + + $Id$ */ diff --git a/src/lladd/page.c b/src/lladd/page.c index b113bc2..c3697a9 100644 --- a/src/lladd/page.c +++ b/src/lladd/page.c @@ -600,7 +600,7 @@ Page *pageAlloc(int id) { page->rwlatch = initlock(); page->loadlatch = initlock(); - pthread_mutex_init(&page->pending_mutex, NULL); + /* pthread_mutex_init(&page->pending_mutex, NULL);*/ pthread_cond_init(&page->noMorePending, NULL); page->memAddr = malloc(PAGE_SIZE); @@ -614,7 +614,8 @@ Page *pageAlloc(int id) { pthread_mutex_unlock(&pageAllocMutex); - + page->pending = 0; + page->waiting = 0; return page; } diff --git a/src/lladd/page.h b/src/lladd/page.h index 3d18339..a5c5123 100644 --- a/src/lladd/page.h +++ b/src/lladd/page.h @@ -125,8 +125,6 @@ typedef struct Page_s { this properly, and there are no read-only functions for the pending field. */ - pthread_mutex_t pending_mutex; /* pthread_mutex_t */ - pthread_cond_t noMorePending; /* pthread_cond_t */ int waiting; @@ -154,6 +152,8 @@ typedef struct Page_s { int pending; } Page; +extern pthread_cond_t addPendingOK; + /** * initializes all the important variables needed in all the * functions dealing with pages. diff --git a/src/lladd/pageCache.c b/src/lladd/pageCache.c index 5c047b8..7766cb4 100644 --- a/src/lladd/pageCache.c +++ b/src/lladd/pageCache.c @@ -213,9 +213,10 @@ Page * getPage(int pageid, int locktype) { ret = pblHtLookup(activePages, &pageid, sizeof(int)); - // Unfortunately, this is a heuristic, as a race condition exists. - // (Until we obtain a readlock on ret, we have no way of knowing if - // we've gotten the correct page.) + /* Unfortunately, this is a heuristic, as a race condition exists. + (Until we obtain a readlock on ret, we have no way of knowing if + we've gotten the correct page.) */ + if(ret) { cacheHitOnPage(ret); assert(ret->id == -1 || ret->id == pageid); @@ -227,7 +228,7 @@ Page * getPage(int pageid, int locktype) { ret = dummy_page; } - writelock(ret->loadlatch, 217); + readlock(ret->loadlatch, 217); while(ret->id != pageid) { /* Either we got a stale mapping from the HT, or no mapping at all. */ @@ -264,7 +265,7 @@ Page * getPage(int pageid, int locktype) { pthread_mutex_unlock(&loadPagePtr_mutex); - writelock(ret->loadlatch, 217); + readlock(ret->loadlatch, 217); } diff --git a/src/lladd/pageFile.c b/src/lladd/pageFile.c index 2e435cf..825f6be 100644 --- a/src/lladd/pageFile.c +++ b/src/lladd/pageFile.c @@ -12,6 +12,10 @@ #include "logger/logWriter.h" static FILE * stable = NULL; +/** Defined in bufferManager.c */ +extern pthread_mutex_t add_pending_mutex; + + /** This function blocks until there are no events pending for this page. @@ -21,15 +25,16 @@ static FILE * stable = NULL; void finalize(Page * p) { - pthread_mutex_lock(&(p->pending_mutex)); + pthread_mutex_lock(&(add_pending_mutex)); p->waiting++; while(p->pending) { - - pthread_cond_wait(&(p->noMorePending), &(p->pending_mutex)); + DEBUG("A"); + pthread_cond_wait(&(p->noMorePending), &(add_pending_mutex)); } - pthread_mutex_unlock(&(p->pending_mutex)); + pthread_mutex_unlock(&(add_pending_mutex)); + pthread_cond_broadcast(&addPendingOK); return; } @@ -90,6 +95,8 @@ void pageWrite(Page * ret) { long pageoffset = ret->id * PAGE_SIZE; long offset ; + assert(ret->pending == 0); + if(flushedLSN() < pageReadLSN(ret)) { DEBUG("pageWrite is calling syncLog()!\n"); syncLog(); diff --git a/src/lladd/transactional2.c b/src/lladd/transactional2.c index 774da92..3932b83 100644 --- a/src/lladd/transactional2.c +++ b/src/lladd/transactional2.c @@ -1,6 +1,6 @@ #include #include - +#include "latches.h" #include #include @@ -14,6 +14,16 @@ TransactionLog XactionTable[MAX_TRANSACTIONS]; int numActiveXactions = 0; int xidCount = 0; + +/** + Locking for transactional2.c works as follows: + + numActiveXactions, xidCount are protected, XactionTable is not. + This implies that we do not support multi-threaded transactions, + at least for now. +*/ +pthread_mutex_t transactional_2_mutex; + #define INVALID_XTABLE_XID -1 /** Needed for debugging -- sometimes we don't want to run all of Tinit() */ @@ -33,6 +43,8 @@ void setupOperationsTable() { int Tinit() { + pthread_mutex_init(&transactional_2_mutex, NULL); + setupOperationsTable(); /* pageInit(); */ @@ -42,6 +54,7 @@ int Tinit() { InitiateRecovery(); + return 0; } @@ -49,6 +62,9 @@ int Tinit() { int Tbegin() { int i, index = 0; + int xidCount_tmp; + + pthread_mutex_lock(&transactional_2_mutex); if( numActiveXactions == MAX_TRANSACTIONS ) return EXCEED_MAX_TRANSACTIONS; @@ -63,16 +79,26 @@ int Tbegin() { } } - assert( i < MAX_TRANSACTIONS ); + xidCount_tmp = xidCount; + /* Don't want to block while we're logging... */ + pthread_mutex_unlock(&transactional_2_mutex); - XactionTable[index] = LogTransBegin(xidCount); + assert( i < MAX_TRANSACTIONS ); + + XactionTable[index] = LogTransBegin(xidCount_tmp); return XactionTable[index].xid; } void Tupdate(int xid, recordid rid, const void *dat, int op) { LogEntry * e; + +#ifdef DEBUGGING + pthread_mutex_lock(&transactional_2_mutex); assert(numActiveXactions <= MAX_TRANSACTIONS); + pthread_mutex_unlock(&transactional_2_mutex); +#endif + e = LogUpdate(&XactionTable[xid % MAX_TRANSACTIONS], rid, op, dat); assert(XactionTable[xid % MAX_TRANSACTIONS].prevLSN == e->LSN); @@ -89,12 +115,20 @@ void Tread(int xid, recordid rid, void * dat) { int Tcommit(int xid) { lsn_t lsn; +#ifdef DEBUGGING + pthread_mutex_lock(&transactional_2_mutex); assert(numActiveXactions <= MAX_TRANSACTIONS); + pthread_mutex_unlock(&transactional_2_mutex); +#endif + lsn = LogTransCommit(&XactionTable[xid % MAX_TRANSACTIONS]); bufTransCommit(xid, lsn); /* unlocks pages */ XactionTable[xid%MAX_TRANSACTIONS].xid = INVALID_XTABLE_XID; + pthread_mutex_lock(&transactional_2_mutex); numActiveXactions--; assert( numActiveXactions >= 0 ); + pthread_mutex_unlock(&transactional_2_mutex); + return 0; } @@ -107,9 +141,11 @@ int Tabort(int xid) { bufTransAbort(xid, lsn); XactionTable[xid%MAX_TRANSACTIONS].xid = INVALID_XTABLE_XID; - numActiveXactions--; + pthread_mutex_lock(&transactional_2_mutex); + numActiveXactions--; assert( numActiveXactions >= 0 ); + pthread_mutex_unlock(&transactional_2_mutex); return 0; } @@ -141,10 +177,14 @@ void Trevive(int xid, long lsn) { } else { XactionTable[index].xid = xid; XactionTable[index].prevLSN = lsn; + pthread_mutex_lock(&transactional_2_mutex); numActiveXactions++; + pthread_mutex_unlock(&transactional_2_mutex); } } void TsetXIDCount(int xid) { + pthread_mutex_lock(&transactional_2_mutex); xidCount = xid; + pthread_mutex_unlock(&transactional_2_mutex); } diff --git a/test/lladd/check_bufferManager.c b/test/lladd/check_bufferManager.c index 1af86ca..9d8fe9c 100644 --- a/test/lladd/check_bufferManager.c +++ b/test/lladd/check_bufferManager.c @@ -17,45 +17,104 @@ #define LOG_NAME "check_bufferMananger.log" #define NUM_PAGES 1000 -#define THREAD_COUNT 5 -#define READS_PER_THREAD 50000 +#define THREAD_COUNT 25 +#define READS_PER_THREAD 10000 +#define RECORDS_PER_THREAD 10000 +#define RECORD_THREAD_COUNT 25 void initializePages() { int i; + printf("Initialization starting\n"); fflush(NULL); + for(i = 0 ; i < NUM_PAGES; i++) { + Page * p; recordid rid; rid.page = i; rid.slot = 0; rid.size = sizeof(int); + p = loadPage(rid.page); + assert(p->id != -1); + pageSlotRalloc(p, 0, rid); + /* addPendingEvent(rid.page); */ writeRecord(1, 1, rid, &i); + /* removePendingEvent(rid.page); */ + assert(p->pending == 0); + unlock(p->loadlatch); } + printf("Initialization complete.\n"); fflush(NULL); + } void * workerThread(void * p) { int i; + for(i = 0 ; i < READS_PER_THREAD; i++) { recordid rid; int j; int k = (int) (((double)NUM_PAGES)*rand()/(RAND_MAX+1.0)); - if(! (i % 5000) ) { - printf("%d", i / 5000); fflush(NULL); + if(! (i % 500) ) { + printf("%d", i / 500); fflush(NULL); } rid.page = k; rid.slot = 0; rid.size = sizeof(int); + addPendingEvent(rid.page); readRecord(1, rid, &j); + assert(rid.page == k); + removePendingEvent(rid.page); assert(k == j); } return NULL; } +void * workerThreadWriting(void * p) { + + int offset = *(int*)p; + recordid rids[RECORDS_PER_THREAD]; + for(int i = 0 ; i < RECORDS_PER_THREAD; i++) { + rids[i] = ralloc(1, sizeof(int)); + } + for(int i = 0; i < RECORDS_PER_THREAD; i++) { + int val = i + offset; + int oldpage = rids[i].page; + addPendingEvent(rids[i].page); + writeRecord(1, 0, rids[i], &val); + assert(oldpage == rids[i].page); + removePendingEvent(rids[i].page); + + if(! (i % 1000) ) { + printf("W%d", i / 1000); fflush(NULL); + } + } + for(int i = 0; i < RECORDS_PER_THREAD; i++) { + int val; + + addPendingEvent(rids[i].page); + readRecord(1, rids[i], &val); + + if(! (i % 1000) ) { + printf("R%d", i / 1000); fflush(NULL); + } + + + assert(val == i+offset); + + removePendingEvent(rids[i].page); + + } + + return NULL; +} + + + START_TEST(pageSingleThreadTest) { Tinit(); @@ -101,15 +160,43 @@ START_TEST(pageLoadTest) { Tdeinit(); } END_TEST +START_TEST(pageSingleThreadWriterTest) { + int i = 100; + + Tinit(); + + workerThreadWriting(&i); + + Tdeinit(); +}END_TEST + +START_TEST(pageThreadedWritersTest) { + pthread_t workers[RECORD_THREAD_COUNT]; + int i; + + Tinit(); + + for(i = 0; i < RECORD_THREAD_COUNT; i++) { + pthread_create(&workers[i], NULL, workerThreadWriting, &i); + } + for(i = 0; i < RECORD_THREAD_COUNT; i++) { + pthread_join(workers[i], NULL); + } + + Tdeinit(); +}END_TEST + Suite * check_suite(void) { - Suite *s = suite_create("logWriter"); + Suite *s = suite_create("bufferManager"); /* Begin a new test */ - TCase *tc = tcase_create("writeNew"); + TCase *tc = tcase_create("multithreaded"); /* Sub tests are added, one per line, here */ - /*tcase_add_test(tc, pageSingleThreadTest); */ - tcase_add_test(tc, pageLoadTest); + tcase_add_test(tc, pageSingleThreadTest); + tcase_add_test(tc, pageLoadTest); + tcase_add_test(tc, pageSingleThreadWriterTest); + tcase_add_test(tc, pageThreadedWritersTest); /* --------------------------------------------- */ diff --git a/test/lladd/check_transactional2.c b/test/lladd/check_transactional2.c index 3825dc8..67f029b 100644 --- a/test/lladd/check_transactional2.c +++ b/test/lladd/check_transactional2.c @@ -39,12 +39,63 @@ permission to use and distribute the software in accordance with the terms specified in this license. ---*/ #include +#include +#include <../../src/lladd/latches.h> #include #include #include #include "../check_includes.h" #define LOG_NAME "check_transactional2.log" +#define THREAD_COUNT 25 +#define RECORDS_PER_THREAD 10000 + +/** Allocate a bunch of stuff, set it, read it, commit it, and read it again. */ +void * writingWorkerThread ( void * v ) { + int offset = * (int *) v; + recordid * rids = malloc(RECORDS_PER_THREAD * sizeof(recordid)); + int xid = Tbegin(); + for(int i = 0; i < RECORDS_PER_THREAD; i++) { + rids[i] = Talloc(xid, sizeof(int)); + if(! (i %100)) { + printf("A%d", i/100);fflush(NULL); + } + + } + + for(int i = 0; i < RECORDS_PER_THREAD; i++) { + int tmp = i + offset; + Tset(xid, rids[i], &tmp); + if(! (i %100)) { + printf("W%d", i/100); fflush(NULL); + } + + + } + + for(int i = 0; i < RECORDS_PER_THREAD; i++) { + int j; + Tread(xid, rids[i], &j); + assert(i + offset == j); + if(! (i %100)) { + printf("R%d", i/100);fflush(NULL); + } + } + + Tcommit(xid); + + xid = Tbegin(); + + for(int i = 0; i < RECORDS_PER_THREAD; i++) { + int j; + Tread(xid, rids[i], &j); + assert(i + offset == j); + } + return NULL; +} + + + /** Assuming that the Tset() operation is implemented correctly, checks that doUpdate, redoUpdate and undoUpdate are working correctly, for @@ -142,6 +193,33 @@ START_TEST(transactional_blobSmokeTest) { } END_TEST +START_TEST(transactional_nothreads_commit) { + int five = 5; + Tinit(); + writingWorkerThread(&five); + Tdeinit(); +} END_TEST + +START_TEST(transactional_threads_commit) { + pthread_t workers[THREAD_COUNT]; + int i; + + Tinit(); + + for(i = 0; i < THREAD_COUNT; i++) { + int arg = i + 100; + pthread_create(&workers[i], NULL, writingWorkerThread, &arg); + + } + for(i = 0; i < THREAD_COUNT; i++) { + pthread_join(workers[i], NULL); + + + } + + Tdeinit(); +} END_TEST + /** Add suite declarations here */ @@ -153,6 +231,9 @@ Suite * check_suite(void) { /* Sub tests are added, one per line, here */ tcase_add_test(tc, transactional_smokeTest); tcase_add_test(tc, transactional_blobSmokeTest); + tcase_add_test(tc, transactional_nothreads_commit); + tcase_add_test(tc, transactional_threads_commit); + /** @todo still need to make blobs reentrant! */ /* --------------------------------------------- */ tcase_add_checked_fixture(tc, setup, teardown); suite_add_tcase(s, tc);