From 8cb59f0c23a922bdcfc95264df81041fe451243d Mon Sep 17 00:00:00 2001 From: Sears Russell Date: Mon, 26 Jul 2004 20:37:04 +0000 Subject: [PATCH] Blobs are now re-entrant. --- lladd/common.h | 2 +- src/lladd/blobManager.c | 352 +++++++++++++++++------------- src/lladd/blobManager.h | 6 +- src/lladd/bufferManager.c | 4 +- src/lladd/common.c | 2 +- src/lladd/operations/alloc.c | 2 +- src/lladd/pageCache.c | 18 +- test/lladd/check_bufferManager.c | 27 +-- test/lladd/check_page.c | 3 +- test/lladd/check_transactional2.c | 258 +++++++++++++++++++--- 10 files changed, 474 insertions(+), 200 deletions(-) diff --git a/lladd/common.h b/lladd/common.h index 0323fc3..1ab7bd1 100644 --- a/lladd/common.h +++ b/lladd/common.h @@ -100,7 +100,7 @@ extern int errno; /*#define DEBUGGING */ -#define PROFILE_LATCHES +/*#define PROFILE_LATCHES */ #ifdef DEBUGGING /** @todo Files that use DEBUG have to pull in stdio.h, which is a pain! */ diff --git a/src/lladd/blobManager.c b/src/lladd/blobManager.c index 284b45d..e004455 100644 --- a/src/lladd/blobManager.c +++ b/src/lladd/blobManager.c @@ -8,12 +8,16 @@ #include #include +#include "latches.h" + #include "blobManager.h" #include #include +pthread_mutex_t blob_hash_mutex; + static FILE * blobf0 = NULL, * blobf1 = NULL; /** This is a hash of hash tables. The outer hash maps from xid to @@ -24,148 +28,42 @@ static pblHashTable_t * dirtyBlobs; /** Plays a nasty trick on bufferManager to force it to read and write blob_record_t items for us. Relies upon bufferManager (and page.c's) trust in the rid.size field... */ -static void readRawRecord(int xid, recordid rid, void * buf, int size) { +static void readRawRecord(int xid, Page * p, recordid rid, void * buf, int size) { recordid blob_rec_rid = rid; blob_rec_rid.size = size; - /*readRecord(xid, blob_rec_rid, buf);*/ - Tread(xid, blob_rec_rid, buf); + readRecord(xid, p, blob_rec_rid, buf); + /* Tread(xid, blob_rec_rid, buf); */ } -static void writeRawRecord(int xid, recordid rid, const void * buf, int size) { +static void writeRawRecord(int xid, Page * p, recordid rid, lsn_t lsn, const void * buf, int size) { recordid blob_rec_rid = rid; blob_rec_rid.size = size; - Tset(xid, blob_rec_rid, buf); + writeRecord(xid, p, lsn, blob_rec_rid, buf); + /** Tset(xid, blob_rec_rid, buf); @todo how should we write the log entry? */ } - - - -/* moved verbatim from bufferManger.c, then hacked up to use FILE * instead of ints. */ -void openBlobStore() { - - /* the r+ mode opens an existing file read /write */ - if( ! (blobf0 = fopen(BLOB0_FILE, "r+"))) { /* file may not exist */ - /* the w+ mode truncates, creates, and opens read / write */ - if(!(blobf0 = fopen(BLOB0_FILE, "w+"))) { perror("Couldn't open or create blob 0 file"); abort(); } - } - - DEBUG("blobf0 opened.\n"); - - if( ! (blobf1 = fopen(BLOB1_FILE, "r+"))) { /* file may not exist */ - if(!(blobf1 = fopen(BLOB1_FILE, "w+"))) { perror("Couldn't open or create blob 1 file"); abort(); } - } - - DEBUG("blobf1 opened.\n"); - - dirtyBlobs = pblHtCreate(); -} - -/** Discards all changes to dirty blobs, and closes the blob store. - - @todo memory leak: Will leak memory if there are any outstanding - xacts that have written to blobs. Should explicitly abort them - instead of just invalidating the dirtyBlobs hash. - - (If the you fix the above todo, don't forget to fix - bufferManager's simulateBufferManagerCrash.) -*/ -void closeBlobStore() { - int ret = fclose(blobf0); - assert(!ret); - ret = fclose(blobf1); - assert(!ret); - blobf0 = NULL; - blobf1 = NULL; - - pblHtDelete(dirtyBlobs); -} - -recordid preAllocBlob(int xid, long blobSize) { - long fileSize = myFseek(blobf1, 0, SEEK_END); - blob_record_t blob_rec; - - /* Allocate space for the blob entry. */ - - DEBUG("Allocing blob (size %ld)\n", blobSize); - - assert(blobSize > 0); /* Don't support zero length blobs right now... */ - - /* First in buffer manager. */ - - recordid rid = Talloc(xid, sizeof(blob_record_t)); - - Page * p = loadPage(rid.page); /** @todo blob's are almost surely broken! */ - - /** Finally, fix up the fields in the record that points to the blob. - The rest of this also should go into alloc.c - */ - - blob_rec.fd = 0; - blob_rec.size = blobSize; - blob_rec.offset = fileSize; - - pageSetSlotType(p, rid.slot, BLOB_SLOT); - rid.size = BLOB_SLOT; - - releasePage(p); - - - /* Tset() needs to know to 'do the right thing' here, since we've - changed the size it has recorded for this record, and - writeRawRecord makes sure that that is the case. */ - writeRawRecord (xid, rid, &blob_rec, sizeof(blob_record_t)); - - rid.size = blob_rec.size; - - - return rid; - -} - -void allocBlob(int xid, lsn_t lsn, recordid rid) { - - long fileSize = myFseek(blobf1, 0, SEEK_END); - blob_record_t blob_rec; - char zero = 0; - - /* Allocate space for the blob entry. */ - - DEBUG("post Allocing blob (size %ld)\n", rid.size); - - assert(rid.size > 0); /* Don't support zero length blobs right now... */ - - /* First in buffer manager. */ - - /* Read in record to get the correct offset, size for the blob*/ - - /** @todo blobs deadlock... */ - readRawRecord(xid, rid, &blob_rec, sizeof(blob_record_t)); - - myFseek(blobf0, fileSize + rid.size - 1, SEEK_SET); - myFseek(blobf1, fileSize + rid.size - 1, SEEK_SET); - - if(1 != fwrite(&zero, sizeof(char), 1, blobf0)) { perror(NULL); abort(); } - if(1 != fwrite(&zero, sizeof(char), 1, blobf1)) { perror(NULL); abort(); } - -} - static lsn_t * tripleHashLookup(int xid, recordid rid) { + lsn_t * ret; + pthread_mutex_lock(&blob_hash_mutex); pblHashTable_t * xidHash = pblHtLookup(dirtyBlobs, &xid, sizeof(xid)); if(xidHash == NULL) { - return NULL; + ret = NULL; } else { pblHashTable_t * pageXidHash = pblHtLookup(xidHash, &(rid.page), sizeof(int)); if(pageXidHash == NULL) { - return NULL; + ret = NULL; + } else { + ret = pblHtLookup(pageXidHash, &rid, sizeof(recordid)); } - return pblHtLookup(pageXidHash, &rid, sizeof(recordid)); } + pthread_mutex_unlock(&blob_hash_mutex); + return ret; } static void tripleHashInsert(int xid, recordid rid, lsn_t newLSN) { pblHashTable_t * xidHash; pblHashTable_t * pageXidHash; lsn_t * copy; - + pthread_mutex_lock(&blob_hash_mutex); xidHash = pblHtLookup(dirtyBlobs, &xid, sizeof(int)); /* Freed in doubleHashRemove */ if(xidHash == NULL) { @@ -184,7 +82,10 @@ static void tripleHashInsert(int xid, recordid rid, lsn_t newLSN) { *copy = newLSN; pblHtInsert(pageXidHash, &rid, sizeof(recordid), copy); + + pthread_mutex_unlock(&blob_hash_mutex); } + /* static void tripleHashRemove(int xid, recordid rid) { pblHashTable_t * xidHash = pblHtLookup(dirtyBlobs, &xid, sizeof(int)); @@ -214,7 +115,143 @@ static void tripleHashRemove(int xid, recordid rid) { } }*/ -void readBlob(int xid, recordid rid, void * buf) { +/* moved verbatim from bufferManger.c, then hacked up to use FILE * instead of ints. */ +void openBlobStore() { + + /* the r+ mode opens an existing file read /write */ + if( ! (blobf0 = fopen(BLOB0_FILE, "r+"))) { /* file may not exist */ + /* the w+ mode truncates, creates, and opens read / write */ + if(!(blobf0 = fopen(BLOB0_FILE, "w+"))) { perror("Couldn't open or create blob 0 file"); abort(); } + } + + DEBUG("blobf0 opened.\n"); + + if( ! (blobf1 = fopen(BLOB1_FILE, "r+"))) { /* file may not exist */ + if(!(blobf1 = fopen(BLOB1_FILE, "w+"))) { perror("Couldn't open or create blob 1 file"); abort(); } + } + + DEBUG("blobf1 opened.\n"); + + dirtyBlobs = pblHtCreate(); + pthread_mutex_init(&blob_hash_mutex, NULL); +} + +/** Discards all changes to dirty blobs, and closes the blob store. + + @todo memory leak: Will leak memory if there are any outstanding + xacts that have written to blobs. Should explicitly abort them + instead of just invalidating the dirtyBlobs hash. + + (If the you fix the above todo, don't forget to fix + bufferManager's simulateBufferManagerCrash.) +*/ +void closeBlobStore() { + int ret = fclose(blobf0); + assert(!ret); + ret = fclose(blobf1); + assert(!ret); + blobf0 = NULL; + blobf1 = NULL; + + pblHtDelete(dirtyBlobs); + + pthread_mutex_destroy(&blob_hash_mutex); +} + +/** + blob allocation: + + generate rid + log rid (+ alloc) + alloc space in store + log rid write + perform rid write + write space allocation to store + + (It used to be:) + + allocate space in store + generate rid + write space in store + log rid + write rid alloc + log rid + + The trick here is to make sure that the write to the record + happens after the record's allocation has been logged. + +*/ + +recordid preAllocBlob(int xid, long blobSize) { + + /* Allocate space for the blob entry. */ + + DEBUG("Allocing blob (size %ld)\n", blobSize); + + assert(blobSize > 0); /* Don't support zero length blobs right now... */ + + /* First in buffer manager. */ + + recordid rid = Talloc(xid, sizeof(blob_record_t)); + + rid.size = blobSize; + + return rid; + +} + +void allocBlob(int xid, Page * p, lsn_t lsn, recordid rid) { + + long fileSize; + blob_record_t blob_rec; + + char zero = 0; + + DEBUG("post Allocing blob (size %ld)\n", rid.size); + + /** Finally, fix up the fields in the record that points to the blob. + The rest of this also should go into alloc.c + */ + blob_rec.fd = 0; + blob_rec.size = rid.size; + flockfile(blobf1); + flockfile(blobf0); + fileSize = myFseek(blobf1, 0, SEEK_END); + blob_rec.offset = fileSize; + + pageSetSlotType(p, rid.slot, BLOB_SLOT); + rid.size = BLOB_SLOT; + + /* Tset() needs to know to 'do the right thing' here, since we've + changed the size it has recorded for this record, and + writeRawRecord makes sure that that is the case. */ + writeRawRecord (xid, p, rid, lsn, &blob_rec, sizeof(blob_record_t)); + /* releasePage(p); */ + rid.size = blob_rec.size; + + /* Allocate space for the blob entry. */ + + assert(rid.size > 0); /* Don't support zero length blobs right now... */ + + /* First in buffer manager. */ + + /* Read in record to get the correct offset, size for the blob*/ + + /* * @ todo blobs deadlock... */ + /* readRawRecord(xid, p, rid, &blob_rec, sizeof(blob_record_t)); */ + + myFseek(blobf0, fileSize + rid.size - 1, SEEK_SET); + myFseek(blobf1, fileSize + rid.size - 1, SEEK_SET); + + if(1 != fwrite(&zero, sizeof(char), 1, blobf0)) { perror(NULL); abort(); } + if(1 != fwrite(&zero, sizeof(char), 1, blobf1)) { perror(NULL); abort(); } + + funlockfile(blobf0); + funlockfile(blobf1); + +} + +void readBlob(int xid, Page * p, recordid rid, void * buf) { /* We don't care if the blob is dirty, since the record from the buffer manager will reflect that if it is.. */ @@ -225,15 +262,19 @@ void readBlob(int xid, recordid rid, void * buf) { assert(buf); - readRawRecord(xid, rid, &rec, sizeof(blob_record_t)); + readRawRecord(xid, p, rid, &rec, sizeof(blob_record_t)); fd = rec.fd ? blobf1 : blobf0; + + DEBUG("reading blob at offset %d, size %ld, buffer %x\n", rec.offset, rec.size, (unsigned int) buf); + + flockfile(fd); + offset = myFseek(fd, (long int) rec.offset, SEEK_SET); - DEBUG("reading blob at offset %d (%ld), size %ld, buffer %x\n", rec.offset, offset, rec.size, (unsigned int) buf); - assert(rec.offset == offset); + if(1 != fread(buf, rec.size, 1, fd)) { if(feof(fd)) { printf("Unexpected eof!\n"); fflush(NULL); abort(); } @@ -241,12 +282,13 @@ void readBlob(int xid, recordid rid, void * buf) { } + funlockfile(fd); } /** @todo dirtyBlobs should contain the highest LSN that wrote to the current version of the dirty blob, and the lsn field should be checked to be sure that it increases monotonically. */ -void writeBlob(int xid, lsn_t lsn, recordid rid, const void * buf) { +void writeBlob(int xid, Page * p, lsn_t lsn, recordid rid, const void * buf) { /* First, determine if the blob is dirty. */ lsn_t * dirty = tripleHashLookup(xid, rid); @@ -260,7 +302,7 @@ void writeBlob(int xid, lsn_t lsn, recordid rid, const void * buf) { /* Tread() raw record */ - readRawRecord(xid, rid, &rec, sizeof(blob_record_t)); + readRawRecord(xid, p, rid, &rec, sizeof(blob_record_t)); assert(rec.size == rid.size); @@ -278,16 +320,20 @@ void writeBlob(int xid, lsn_t lsn, recordid rid, const void * buf) { rec.fd = rec.fd ? 0 : 1; /* Tset() raw record */ - writeRawRecord(xid, rid, &rec, sizeof(blob_record_t)); + writeRawRecord(xid, p, rid, lsn, &rec, sizeof(blob_record_t)); } fd = rec.fd ? blobf1 : blobf0; /* rec's fd is up-to-date, so use it directly */ + DEBUG("Writing at offset = %d, size = %ld\n", rec.offset, rec.size); + + flockfile(fd); + offset = myFseek(fd, rec.offset, SEEK_SET); - DEBUG("Writing at offset = %d, size = %ld\n", rec.offset, rec.size); assert(offset == rec.offset); readcount = fwrite(buf, rec.size, 1, fd); + funlockfile(fd); assert(1 == readcount); /* No need to update the raw blob record. */ @@ -297,9 +343,12 @@ void writeBlob(int xid, lsn_t lsn, recordid rid, const void * buf) { files when it's called (are there any dirty blobs associated with this transaction? */ void commitBlobs(int xid) { - + flockfile(blobf0); + flockfile(blobf1); fdatasync(fileno(blobf0)); fdatasync(fileno(blobf1)); + funlockfile(blobf0); + funlockfile(blobf1); abortBlobs(xid); } @@ -329,27 +378,32 @@ void abortBlobs(int xid) { easier. */ + pthread_mutex_lock(&blob_hash_mutex); + pblHashTable_t * rid_buckets = pblHtLookup(dirtyBlobs, &xid, sizeof(int)); pblHashTable_t * this_bucket; - if(!rid_buckets) { return; } /* No dirty blobs for this xid.. */ - - for(this_bucket = pblHtFirst(rid_buckets); this_bucket; this_bucket = pblHtNext(rid_buckets)) { - lsn_t * rid_lsn; - int page_number; - - /* All right, this_bucket contains all of the rids for this page. */ - - for(rid_lsn = pblHtFirst(this_bucket); rid_lsn; rid_lsn = pblHtNext(this_bucket)) { - recordid * rid = pblHtCurrentKey(this_bucket); - page_number = rid->page; - pblHtRemove(this_bucket, rid, sizeof(recordid)); - free(rid_lsn); - } - - pblHtRemove(rid_buckets, &page_number, sizeof(int)); - pblHtDelete(this_bucket); - } - pblHtDelete(rid_buckets); + if(rid_buckets) { /* Otherwise, there are no dirty blobs for this xid.. */ + for(this_bucket = pblHtFirst(rid_buckets); this_bucket; this_bucket = pblHtNext(rid_buckets)) { + lsn_t * rid_lsn; + int page_number; + + /* All right, this_bucket contains all of the rids for this page. */ + + for(rid_lsn = pblHtFirst(this_bucket); rid_lsn; rid_lsn = pblHtNext(this_bucket)) { + recordid * rid = pblHtCurrentKey(this_bucket); + page_number = rid->page; + pblHtRemove(this_bucket, rid, sizeof(recordid)); + free(rid_lsn); + } + + pblHtRemove(rid_buckets, &page_number, sizeof(int)); + pblHtDelete(this_bucket); + } + pblHtDelete(rid_buckets); + } + + pthread_mutex_unlock(&blob_hash_mutex); + } diff --git a/src/lladd/blobManager.h b/src/lladd/blobManager.h index 3c8f610..0bb7822 100644 --- a/src/lladd/blobManager.h +++ b/src/lladd/blobManager.h @@ -48,13 +48,13 @@ BEGIN_C_DECLS /** Read the blob from the recordid rid into buf. */ -void readBlob(int xid, recordid rid, void * buf); +void readBlob(int xid, Page * p, recordid rid, void * buf); /** Write the contents of buf to the blob in recordid rid. */ -void writeBlob(int xid, lsn_t lsn, recordid rid, const void * buf); +void writeBlob(int xid, Page * p, lsn_t lsn, recordid rid, const void * buf); /** @@ -88,7 +88,7 @@ recordid preAllocBlob(int xid, long blobsize); */ -void allocBlob(int xid, lsn_t lsn, recordid rid); +void allocBlob(int xid, Page * p, lsn_t lsn, recordid rid); void openBlobStore(); void closeBlobStore(); diff --git a/src/lladd/bufferManager.c b/src/lladd/bufferManager.c index 4518179..96a67ff 100644 --- a/src/lladd/bufferManager.c +++ b/src/lladd/bufferManager.c @@ -143,7 +143,7 @@ void writeRecord(int xid, Page * p, lsn_t lsn, recordid rid, const void *dat) { if(rid.size > BLOB_THRESHOLD_SIZE) { /* DEBUG("Writing blob.\n"); */ - writeBlob(xid, lsn, rid, dat); + writeBlob(xid, p, lsn, rid, dat); } else { /* DEBUG("Writing record.\n"); */ @@ -165,7 +165,7 @@ void readRecord(int xid, Page * p, recordid rid, void *buf) { /* DEBUG("Reading blob. xid = %d rid = { %d %d %ld } buf = %x\n", xid, rid.page, rid.slot, rid.size, (unsigned int)buf); */ /* @todo should readblob take a page pointer? */ - readBlob(xid, rid, buf); + readBlob(xid, p, rid, buf); } else { assert(rid.page == p->id); /* DEBUG("Reading record xid = %d rid = { %d %d %ld } buf = %x\n", diff --git a/src/lladd/common.c b/src/lladd/common.c index 5e992a9..6d56329 100644 --- a/src/lladd/common.c +++ b/src/lladd/common.c @@ -45,7 +45,7 @@ int __lladd_pthread_mutex_lock(lladd_pthread_mutex_t *mutex, char * file, int li blockCount ++; pthread_yield(); - if(blockCount >= 10000 && ! (blockCount % 10000)) { + if(blockCount >= 30000 && ! (blockCount % 30000)) { printf("Spinning at %s:%d, %ld times. Held by: %s\n", file, line, blockCount, mutex->last_acquired_at); fflush(NULL); } diff --git a/src/lladd/operations/alloc.c b/src/lladd/operations/alloc.c index 4e55eab..f6e4d9c 100644 --- a/src/lladd/operations/alloc.c +++ b/src/lladd/operations/alloc.c @@ -32,7 +32,7 @@ static int operate(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) { if(rid.size >= BLOB_THRESHOLD_SIZE) { - allocBlob(xid, lsn, rid); + allocBlob(xid, p, lsn, rid); } else { /* Page * loadedPage = loadPage(rid.page); */ /* pageSlotRalloc(loadedPage, lsn, rid); */ diff --git a/src/lladd/pageCache.c b/src/lladd/pageCache.c index e0118e1..f20eb61 100644 --- a/src/lladd/pageCache.c +++ b/src/lladd/pageCache.c @@ -152,7 +152,11 @@ Page * getPage(int pageid, int locktype) { ret = pblHtLookup(activePages, &pageid, sizeof(int)); if(ret) { - readlock(ret->loadlatch, 217); + if(locktype == RW) { + writelock(ret->loadlatch, 217); + } else { + readlock(ret->loadlatch, 217); + } //writelock(ret->loadlatch, 217); } @@ -165,7 +169,11 @@ Page * getPage(int pageid, int locktype) { if(ret) { // writelock(ret->loadlatch, 217); - readlock(ret->loadlatch, 217); + if(locktype == RW) { + writelock(ret->loadlatch, 217); + } else { + readlock(ret->loadlatch, 217); + } } spin++; if(spin > 10000) { @@ -244,7 +252,11 @@ Page * getPage(int pageid, int locktype) { /* downgradelock(ret->loadlatch); */ // writelock(ret->loadlatch, 217); - readlock(ret->loadlatch, 217); + if(locktype == RW) { + writelock(ret->loadlatch, 217); + } else { + readlock(ret->loadlatch, 217); + } if(ret->id != pageid) { unlock(ret->loadlatch); printf("pageCache.c: Thrashing detected. Strongly consider increasing LLADD's buffer pool size!\n"); diff --git a/test/lladd/check_bufferManager.c b/test/lladd/check_bufferManager.c index 46bb42a..4843c51 100644 --- a/test/lladd/check_bufferManager.c +++ b/test/lladd/check_bufferManager.c @@ -17,9 +17,10 @@ #define LOG_NAME "check_bufferMananger.log" #define NUM_PAGES 1000 -#define THREAD_COUNT 25 -#define READS_PER_THREAD 10000 -#define RECORDS_PER_THREAD 10000 +#define THREAD_COUNT 10 +#define READS_PER_THREAD 100 +#define RECORDS_PER_THREAD 100 + #define RECORD_THREAD_COUNT 25 void initializePages() { @@ -60,8 +61,8 @@ void * workerThread(void * p) { int k = (int) (((double)NUM_PAGES)*rand()/(RAND_MAX+1.0)); Page * p; - if(! (i % 500) ) { - printf("%d", i / 500); fflush(NULL); + if(! (i % 50) ) { + printf("%d", i / 50); fflush(NULL); } rid.page = k; @@ -94,8 +95,8 @@ void * workerThreadWriting(void * q) { /* printf("\nRID:\t%d,%d\n", rids[i].page, rids[i].slot); */ /* fflush(NULL); */ - if(! (i % 1000) ) { - printf("A%d", i / 1000); fflush(NULL); + if(! (i % 100) ) { + printf("A%d", i / 100); fflush(NULL); } @@ -122,8 +123,8 @@ void * workerThreadWriting(void * q) { assert(p->id == rids[i].page); releasePage(p); - if(! (i % 1000) ) { - printf("W%d", i / 1000); fflush(NULL); + if(! (i % 100) ) { + printf("W%d", i / 100); fflush(NULL); } /* sched_yield(); */ @@ -139,8 +140,8 @@ void * workerThreadWriting(void * q) { releasePage(p); - if(! (i % 1000) ) { - printf("R%d", i / 1000); fflush(NULL); + if(! (i % 100) ) { + printf("R%d", i / 100); fflush(NULL); } @@ -236,9 +237,9 @@ Suite * check_suite(void) { /* Sub tests are added, one per line, here */ - /* tcase_add_test(tc, pageSingleThreadTest); + tcase_add_test(tc, pageSingleThreadTest); tcase_add_test(tc, pageLoadTest); - tcase_add_test(tc, pageSingleThreadWriterTest); */ + tcase_add_test(tc, pageSingleThreadWriterTest); tcase_add_test(tc, pageThreadedWritersTest); /* --------------------------------------------- */ diff --git a/test/lladd/check_page.c b/test/lladd/check_page.c index ede1e4c..f7c6cdf 100644 --- a/test/lladd/check_page.c +++ b/test/lladd/check_page.c @@ -141,10 +141,11 @@ END_TEST */ START_TEST(pageThreadTest) { -#define THREAD_COUNT 50 +#define THREAD_COUNT 10 pthread_t workers[THREAD_COUNT]; int i; pthread_mutex_init(&random_mutex, NULL); + pthread_mutex_init(&lsn_mutex, NULL); fail_unless(1, NULL); Tinit(); diff --git a/test/lladd/check_transactional2.c b/test/lladd/check_transactional2.c index 695b0e0..abdc4b9 100644 --- a/test/lladd/check_transactional2.c +++ b/test/lladd/check_transactional2.c @@ -47,8 +47,142 @@ terms specified in this license. #include #include "../check_includes.h" #define LOG_NAME "check_transactional2.log" -#define THREAD_COUNT 25 -#define RECORDS_PER_THREAD 10000 +#define THREAD_COUNT 10 +#define RECORDS_PER_THREAD 500 + +void arraySet(int * array, int val) { + int i; + for(i = 0; i < 1024; i++) { + array[i] = val; + } +} + +int arrayCmp(int * array, int * array2) { + int i; + for(i = 0; i < 1024; i++) { + if(array[i] != array2[i]) { + return 0; + } + } + return 1; +} + +/** Allocate a bunch of blobs, set them, read them, commit them, and read it again, chang them, abort, and read again. */ +void * writingAbortingBlobWorkerThread ( void * v ) { + int offset = * (int *) v; + recordid * rids = malloc(RECORDS_PER_THREAD * sizeof(recordid)); + int xid = Tbegin(); + for(int i = 0; i < RECORDS_PER_THREAD; i++) { + rids[i] = /* ralloc(xid, sizeof(int)); */ Talloc(xid, 1024 * sizeof(int)); + if(! (i %100)) { + printf("A%d", i/100);fflush(NULL); + } + } + + for(int i = 0; i < RECORDS_PER_THREAD; i++) { + int tmp[1024];/* = i + offset; */ + arraySet(tmp, i+ offset); + Tset(xid, rids[i], tmp); + if(! (i %100)) { + printf("W%d", i/100); fflush(NULL); + } + } + + Tcommit(xid); + xid = Tbegin(); + + + for(int i = 0; i < RECORDS_PER_THREAD; i++) { + int j[1024]; + int k[1024]; + arraySet(k, i+offset); + Tread(xid, rids[i], j); + assert(arrayCmp(j,k));/*i + offset == j); */ + if(! (i %100)) { + printf("R%d", i/100);fflush(NULL); + } + arraySet(k, -1); + Tset(xid, rids[i], k);/*(void*)&minusOne); */ + } + + Tabort(xid); + + xid = Tbegin(); + + for(int i = 0; i < RECORDS_PER_THREAD; i++) { + int j[1024]; + int k[1024]; + arraySet(k, i+offset); + Tread(xid, rids[i], j); + assert(arrayCmp(j,k)); + if(! (i %100)) { + printf("S%d", i/100);fflush(NULL); + } + } + + Tcommit(xid); + + return NULL; +} + + + + +/** Allocate a bunch of stuff, set it, read it, commit it, and read it again. */ +void * writingAbortingWorkerThread ( void * v ) { + int offset = * (int *) v; + recordid * rids = malloc(RECORDS_PER_THREAD * sizeof(recordid)); + int xid = Tbegin(); + for(int i = 0; i < RECORDS_PER_THREAD; i++) { + rids[i] = /* ralloc(xid, sizeof(int)); */ Talloc(xid, sizeof(int)); + if(! (i %100)) { + printf("A%d", i/100);fflush(NULL); + } + } + + for(int i = 0; i < RECORDS_PER_THREAD; i++) { + int tmp = i + offset; + + Tset(xid, rids[i], &tmp); + if(! (i %100)) { + printf("W%d", i/100); fflush(NULL); + } + } + + Tcommit(xid); + xid = Tbegin(); + + + for(int i = 0; i < RECORDS_PER_THREAD; i++) { + int j; + int minusOne = -1; + + Tread(xid, rids[i], &j); + assert(i + offset == j); + if(! (i %100)) { + printf("R%d", i/100);fflush(NULL); + } + Tset(xid, rids[i], (void*)&minusOne); + } + + Tabort(xid); + + xid = Tbegin(); + + for(int i = 0; i < RECORDS_PER_THREAD; i++) { + int j; + Tread(xid, rids[i], &j); + assert(i + offset == j); + if(! (i %100)) { + printf("S%d", i/100);fflush(NULL); + } + } + + Tcommit(xid); + + return NULL; +} + /** Allocate a bunch of stuff, set it, read it, commit it, and read it again. */ void * writingWorkerThread ( void * v ) { @@ -57,58 +191,47 @@ void * writingWorkerThread ( void * v ) { int xid = Tbegin(); for(int i = 0; i < RECORDS_PER_THREAD; i++) { rids[i] = /* ralloc(xid, sizeof(int)); */ Talloc(xid, sizeof(int)); - if(! (i %1000)) { - printf("A%d", i/1000);fflush(NULL); + if(! (i %100)) { + printf("A%d", i/100);fflush(NULL); } } for(int i = 0; i < RECORDS_PER_THREAD; i++) { int tmp = i + offset; - /* Page * p = loadPage(rids[i].page); - - writeRecord(1, p, 0, rids[i], &tmp); - - releasePage(p); */ Tset(xid, rids[i], &tmp); - if(! (i %1000)) { - printf("W%d", i/1000); fflush(NULL); + if(! (i %100)) { + printf("W%d", i/100); fflush(NULL); } } for(int i = 0; i < RECORDS_PER_THREAD; i++) { int j; - /* Page * p = loadPage(rids[i].page); - - readRecord(1, p, rids[i], &j); - - releasePage(p); */ - Tread(xid, rids[i], &j); assert(i + offset == j); - if(! (i %1000)) { - printf("R%d", i/1000);fflush(NULL); + if(! (i %100)) { + printf("R%d", i/100);fflush(NULL); } } Tcommit(xid); - /* Tcommit(xid); - xid = Tbegin(); for(int i = 0; i < RECORDS_PER_THREAD; i++) { int j; Tread(xid, rids[i], &j); assert(i + offset == j); - }*/ + } + + Tcommit(xid); + return NULL; } - - /** + @test Assuming that the Tset() operation is implemented correctly, checks that doUpdate, redoUpdate and undoUpdate are working correctly, for operations that use physical logging. @@ -150,6 +273,7 @@ START_TEST(transactional_smokeTest) { END_TEST /** + @test Just like transactional_smokeTest, but check blobs instead. */ START_TEST(transactional_blobSmokeTest) { @@ -205,6 +329,10 @@ START_TEST(transactional_blobSmokeTest) { } END_TEST +/** + @test + Make sure that the single threaded version of transactional_threads_commit passes. +*/ START_TEST(transactional_nothreads_commit) { int five = 5; Tinit(); @@ -212,6 +340,10 @@ START_TEST(transactional_nothreads_commit) { Tdeinit(); } END_TEST +/** + @test + Test LLADD in a multi-threaded envrionment, where every transaction commits. +*/ START_TEST(transactional_threads_commit) { pthread_t workers[THREAD_COUNT]; int i; @@ -232,6 +364,76 @@ START_TEST(transactional_threads_commit) { Tdeinit(); } END_TEST +/** + @test + Make sure that the single threaded version of transactional_threads_abort passes. +*/ +START_TEST(transactional_nothreads_abort) { + int five = 5; + Tinit(); + writingAbortingWorkerThread(&five); + Tdeinit(); +} END_TEST + +/** + @test + Test LLADD in a multi-threaded envrionment, with a mix of transaction commits and aborts. +*/ +START_TEST(transactional_threads_abort) { + pthread_t workers[THREAD_COUNT]; + int i; + + Tinit(); + + for(i = 0; i < THREAD_COUNT; i++) { + int arg = i + 100; + pthread_create(&workers[i], NULL, writingAbortingWorkerThread, &arg); + + } + for(i = 0; i < THREAD_COUNT; i++) { + pthread_join(workers[i], NULL); + + + } + + Tdeinit(); +} END_TEST + +/** + @test + Make sure that the single threaded version of transactional_threads_abort passes. +*/ +START_TEST(transactional_blobs_nothreads_abort) { + int five = 5; + Tinit(); + writingAbortingBlobWorkerThread(&five); + Tdeinit(); +} END_TEST + +/** + @test + Test LLADD in a multi-threaded envrionment, with a mix of transaction commits and aborts. +*/ +START_TEST(transactional_blobs_threads_abort) { + pthread_t workers[THREAD_COUNT]; + int i; + + Tinit(); + + for(i = 0; i < THREAD_COUNT; i++) { + int arg = i + 100; + pthread_create(&workers[i], NULL, writingAbortingBlobWorkerThread, &arg); + + } + for(i = 0; i < THREAD_COUNT; i++) { + pthread_join(workers[i], NULL); + + + } + + Tdeinit(); +} END_TEST + /** Add suite declarations here */ @@ -241,10 +443,14 @@ Suite * check_suite(void) { TCase *tc = tcase_create("transactional_smokeTest"); /* Sub tests are added, one per line, here */ - tcase_add_test(tc, transactional_smokeTest); - /* tcase_add_test(tc, transactional_blobSmokeTest); */ + /* tcase_add_test(tc, transactional_smokeTest); + tcase_add_test(tc, transactional_blobSmokeTest); tcase_add_test(tc, transactional_nothreads_commit); tcase_add_test(tc, transactional_threads_commit); + tcase_add_test(tc, transactional_nothreads_abort); + tcase_add_test(tc, transactional_threads_abort); */ + tcase_add_test(tc, transactional_blobs_nothreads_abort); + tcase_add_test(tc, transactional_blobs_threads_abort); /** @todo still need to make blobs reentrant! */ /* --------------------------------------------- */ tcase_add_checked_fixture(tc, setup, teardown);