Blobs are now re-entrant.

This commit is contained in:
Sears Russell 2004-07-26 20:37:04 +00:00
parent 422198418a
commit 8cb59f0c23
10 changed files with 474 additions and 200 deletions

View file

@ -100,7 +100,7 @@ extern int errno;
/*#define DEBUGGING */ /*#define DEBUGGING */
#define PROFILE_LATCHES /*#define PROFILE_LATCHES */
#ifdef DEBUGGING #ifdef DEBUGGING
/** @todo Files that use DEBUG have to pull in stdio.h, which is a pain! */ /** @todo Files that use DEBUG have to pull in stdio.h, which is a pain! */

View file

@ -8,12 +8,16 @@
#include <lladd/bufferManager.h> #include <lladd/bufferManager.h>
#include <lladd/constants.h> #include <lladd/constants.h>
#include "latches.h"
#include "blobManager.h" #include "blobManager.h"
#include <pbl/pbl.h> #include <pbl/pbl.h>
#include <stdio.h> #include <stdio.h>
pthread_mutex_t blob_hash_mutex;
static FILE * blobf0 = NULL, * blobf1 = NULL; static FILE * blobf0 = NULL, * blobf1 = NULL;
/** /**
This is a hash of hash tables. The outer hash maps from xid to This is a hash of hash tables. The outer hash maps from xid to
@ -24,148 +28,42 @@ static pblHashTable_t * dirtyBlobs;
/** Plays a nasty trick on bufferManager to force it to read and write /** Plays a nasty trick on bufferManager to force it to read and write
blob_record_t items for us. Relies upon bufferManager (and blob_record_t items for us. Relies upon bufferManager (and
page.c's) trust in the rid.size field... */ page.c's) trust in the rid.size field... */
static void readRawRecord(int xid, recordid rid, void * buf, int size) { static void readRawRecord(int xid, Page * p, recordid rid, void * buf, int size) {
recordid blob_rec_rid = rid; recordid blob_rec_rid = rid;
blob_rec_rid.size = size; blob_rec_rid.size = size;
/*readRecord(xid, blob_rec_rid, buf);*/ readRecord(xid, p, blob_rec_rid, buf);
Tread(xid, blob_rec_rid, buf); /* Tread(xid, blob_rec_rid, buf); */
} }
static void writeRawRecord(int xid, recordid rid, const void * buf, int size) { static void writeRawRecord(int xid, Page * p, recordid rid, lsn_t lsn, const void * buf, int size) {
recordid blob_rec_rid = rid; recordid blob_rec_rid = rid;
blob_rec_rid.size = size; blob_rec_rid.size = size;
Tset(xid, blob_rec_rid, buf); writeRecord(xid, p, lsn, blob_rec_rid, buf);
/** Tset(xid, blob_rec_rid, buf); @todo how should we write the log entry? */
} }
/* moved verbatim from bufferManger.c, then hacked up to use FILE * instead of ints. */
void openBlobStore() {
/* the r+ mode opens an existing file read /write */
if( ! (blobf0 = fopen(BLOB0_FILE, "r+"))) { /* file may not exist */
/* the w+ mode truncates, creates, and opens read / write */
if(!(blobf0 = fopen(BLOB0_FILE, "w+"))) { perror("Couldn't open or create blob 0 file"); abort(); }
}
DEBUG("blobf0 opened.\n");
if( ! (blobf1 = fopen(BLOB1_FILE, "r+"))) { /* file may not exist */
if(!(blobf1 = fopen(BLOB1_FILE, "w+"))) { perror("Couldn't open or create blob 1 file"); abort(); }
}
DEBUG("blobf1 opened.\n");
dirtyBlobs = pblHtCreate();
}
/** Discards all changes to dirty blobs, and closes the blob store.
@todo memory leak: Will leak memory if there are any outstanding
xacts that have written to blobs. Should explicitly abort them
instead of just invalidating the dirtyBlobs hash.
(If the you fix the above todo, don't forget to fix
bufferManager's simulateBufferManagerCrash.)
*/
void closeBlobStore() {
int ret = fclose(blobf0);
assert(!ret);
ret = fclose(blobf1);
assert(!ret);
blobf0 = NULL;
blobf1 = NULL;
pblHtDelete(dirtyBlobs);
}
recordid preAllocBlob(int xid, long blobSize) {
long fileSize = myFseek(blobf1, 0, SEEK_END);
blob_record_t blob_rec;
/* Allocate space for the blob entry. */
DEBUG("Allocing blob (size %ld)\n", blobSize);
assert(blobSize > 0); /* Don't support zero length blobs right now... */
/* First in buffer manager. */
recordid rid = Talloc(xid, sizeof(blob_record_t));
Page * p = loadPage(rid.page); /** @todo blob's are almost surely broken! */
/** Finally, fix up the fields in the record that points to the blob.
The rest of this also should go into alloc.c
*/
blob_rec.fd = 0;
blob_rec.size = blobSize;
blob_rec.offset = fileSize;
pageSetSlotType(p, rid.slot, BLOB_SLOT);
rid.size = BLOB_SLOT;
releasePage(p);
/* Tset() needs to know to 'do the right thing' here, since we've
changed the size it has recorded for this record, and
writeRawRecord makes sure that that is the case. */
writeRawRecord (xid, rid, &blob_rec, sizeof(blob_record_t));
rid.size = blob_rec.size;
return rid;
}
void allocBlob(int xid, lsn_t lsn, recordid rid) {
long fileSize = myFseek(blobf1, 0, SEEK_END);
blob_record_t blob_rec;
char zero = 0;
/* Allocate space for the blob entry. */
DEBUG("post Allocing blob (size %ld)\n", rid.size);
assert(rid.size > 0); /* Don't support zero length blobs right now... */
/* First in buffer manager. */
/* Read in record to get the correct offset, size for the blob*/
/** @todo blobs deadlock... */
readRawRecord(xid, rid, &blob_rec, sizeof(blob_record_t));
myFseek(blobf0, fileSize + rid.size - 1, SEEK_SET);
myFseek(blobf1, fileSize + rid.size - 1, SEEK_SET);
if(1 != fwrite(&zero, sizeof(char), 1, blobf0)) { perror(NULL); abort(); }
if(1 != fwrite(&zero, sizeof(char), 1, blobf1)) { perror(NULL); abort(); }
}
static lsn_t * tripleHashLookup(int xid, recordid rid) { static lsn_t * tripleHashLookup(int xid, recordid rid) {
lsn_t * ret;
pthread_mutex_lock(&blob_hash_mutex);
pblHashTable_t * xidHash = pblHtLookup(dirtyBlobs, &xid, sizeof(xid)); pblHashTable_t * xidHash = pblHtLookup(dirtyBlobs, &xid, sizeof(xid));
if(xidHash == NULL) { if(xidHash == NULL) {
return NULL; ret = NULL;
} else { } else {
pblHashTable_t * pageXidHash = pblHtLookup(xidHash, &(rid.page), sizeof(int)); pblHashTable_t * pageXidHash = pblHtLookup(xidHash, &(rid.page), sizeof(int));
if(pageXidHash == NULL) { if(pageXidHash == NULL) {
return NULL; ret = NULL;
} else {
ret = pblHtLookup(pageXidHash, &rid, sizeof(recordid));
} }
return pblHtLookup(pageXidHash, &rid, sizeof(recordid));
} }
pthread_mutex_unlock(&blob_hash_mutex);
return ret;
} }
static void tripleHashInsert(int xid, recordid rid, lsn_t newLSN) { static void tripleHashInsert(int xid, recordid rid, lsn_t newLSN) {
pblHashTable_t * xidHash; pblHashTable_t * xidHash;
pblHashTable_t * pageXidHash; pblHashTable_t * pageXidHash;
lsn_t * copy; lsn_t * copy;
pthread_mutex_lock(&blob_hash_mutex);
xidHash = pblHtLookup(dirtyBlobs, &xid, sizeof(int)); /* Freed in doubleHashRemove */ xidHash = pblHtLookup(dirtyBlobs, &xid, sizeof(int)); /* Freed in doubleHashRemove */
if(xidHash == NULL) { if(xidHash == NULL) {
@ -184,7 +82,10 @@ static void tripleHashInsert(int xid, recordid rid, lsn_t newLSN) {
*copy = newLSN; *copy = newLSN;
pblHtInsert(pageXidHash, &rid, sizeof(recordid), copy); pblHtInsert(pageXidHash, &rid, sizeof(recordid), copy);
pthread_mutex_unlock(&blob_hash_mutex);
} }
/* /*
static void tripleHashRemove(int xid, recordid rid) { static void tripleHashRemove(int xid, recordid rid) {
pblHashTable_t * xidHash = pblHtLookup(dirtyBlobs, &xid, sizeof(int)); pblHashTable_t * xidHash = pblHtLookup(dirtyBlobs, &xid, sizeof(int));
@ -214,7 +115,143 @@ static void tripleHashRemove(int xid, recordid rid) {
} }
}*/ }*/
void readBlob(int xid, recordid rid, void * buf) { /* moved verbatim from bufferManger.c, then hacked up to use FILE * instead of ints. */
void openBlobStore() {
/* the r+ mode opens an existing file read /write */
if( ! (blobf0 = fopen(BLOB0_FILE, "r+"))) { /* file may not exist */
/* the w+ mode truncates, creates, and opens read / write */
if(!(blobf0 = fopen(BLOB0_FILE, "w+"))) { perror("Couldn't open or create blob 0 file"); abort(); }
}
DEBUG("blobf0 opened.\n");
if( ! (blobf1 = fopen(BLOB1_FILE, "r+"))) { /* file may not exist */
if(!(blobf1 = fopen(BLOB1_FILE, "w+"))) { perror("Couldn't open or create blob 1 file"); abort(); }
}
DEBUG("blobf1 opened.\n");
dirtyBlobs = pblHtCreate();
pthread_mutex_init(&blob_hash_mutex, NULL);
}
/** Discards all changes to dirty blobs, and closes the blob store.
@todo memory leak: Will leak memory if there are any outstanding
xacts that have written to blobs. Should explicitly abort them
instead of just invalidating the dirtyBlobs hash.
(If the you fix the above todo, don't forget to fix
bufferManager's simulateBufferManagerCrash.)
*/
void closeBlobStore() {
int ret = fclose(blobf0);
assert(!ret);
ret = fclose(blobf1);
assert(!ret);
blobf0 = NULL;
blobf1 = NULL;
pblHtDelete(dirtyBlobs);
pthread_mutex_destroy(&blob_hash_mutex);
}
/**
blob allocation:
generate rid
log rid (+ alloc)
alloc space in store
log rid write
perform rid write
write space allocation to store
(It used to be:)
allocate space in store
generate rid
write space in store
log rid
write rid alloc
log rid
The trick here is to make sure that the write to the record
happens after the record's allocation has been logged.
*/
recordid preAllocBlob(int xid, long blobSize) {
/* Allocate space for the blob entry. */
DEBUG("Allocing blob (size %ld)\n", blobSize);
assert(blobSize > 0); /* Don't support zero length blobs right now... */
/* First in buffer manager. */
recordid rid = Talloc(xid, sizeof(blob_record_t));
rid.size = blobSize;
return rid;
}
void allocBlob(int xid, Page * p, lsn_t lsn, recordid rid) {
long fileSize;
blob_record_t blob_rec;
char zero = 0;
DEBUG("post Allocing blob (size %ld)\n", rid.size);
/** Finally, fix up the fields in the record that points to the blob.
The rest of this also should go into alloc.c
*/
blob_rec.fd = 0;
blob_rec.size = rid.size;
flockfile(blobf1);
flockfile(blobf0);
fileSize = myFseek(blobf1, 0, SEEK_END);
blob_rec.offset = fileSize;
pageSetSlotType(p, rid.slot, BLOB_SLOT);
rid.size = BLOB_SLOT;
/* Tset() needs to know to 'do the right thing' here, since we've
changed the size it has recorded for this record, and
writeRawRecord makes sure that that is the case. */
writeRawRecord (xid, p, rid, lsn, &blob_rec, sizeof(blob_record_t));
/* releasePage(p); */
rid.size = blob_rec.size;
/* Allocate space for the blob entry. */
assert(rid.size > 0); /* Don't support zero length blobs right now... */
/* First in buffer manager. */
/* Read in record to get the correct offset, size for the blob*/
/* * @ todo blobs deadlock... */
/* readRawRecord(xid, p, rid, &blob_rec, sizeof(blob_record_t)); */
myFseek(blobf0, fileSize + rid.size - 1, SEEK_SET);
myFseek(blobf1, fileSize + rid.size - 1, SEEK_SET);
if(1 != fwrite(&zero, sizeof(char), 1, blobf0)) { perror(NULL); abort(); }
if(1 != fwrite(&zero, sizeof(char), 1, blobf1)) { perror(NULL); abort(); }
funlockfile(blobf0);
funlockfile(blobf1);
}
void readBlob(int xid, Page * p, recordid rid, void * buf) {
/* We don't care if the blob is dirty, since the record from the /* We don't care if the blob is dirty, since the record from the
buffer manager will reflect that if it is.. */ buffer manager will reflect that if it is.. */
@ -225,15 +262,19 @@ void readBlob(int xid, recordid rid, void * buf) {
assert(buf); assert(buf);
readRawRecord(xid, rid, &rec, sizeof(blob_record_t)); readRawRecord(xid, p, rid, &rec, sizeof(blob_record_t));
fd = rec.fd ? blobf1 : blobf0; fd = rec.fd ? blobf1 : blobf0;
DEBUG("reading blob at offset %d, size %ld, buffer %x\n", rec.offset, rec.size, (unsigned int) buf);
flockfile(fd);
offset = myFseek(fd, (long int) rec.offset, SEEK_SET); offset = myFseek(fd, (long int) rec.offset, SEEK_SET);
DEBUG("reading blob at offset %d (%ld), size %ld, buffer %x\n", rec.offset, offset, rec.size, (unsigned int) buf);
assert(rec.offset == offset); assert(rec.offset == offset);
if(1 != fread(buf, rec.size, 1, fd)) { if(1 != fread(buf, rec.size, 1, fd)) {
if(feof(fd)) { printf("Unexpected eof!\n"); fflush(NULL); abort(); } if(feof(fd)) { printf("Unexpected eof!\n"); fflush(NULL); abort(); }
@ -241,12 +282,13 @@ void readBlob(int xid, recordid rid, void * buf) {
} }
funlockfile(fd);
} }
/** @todo dirtyBlobs should contain the highest LSN that wrote to the /** @todo dirtyBlobs should contain the highest LSN that wrote to the
current version of the dirty blob, and the lsn field should be current version of the dirty blob, and the lsn field should be
checked to be sure that it increases monotonically. */ checked to be sure that it increases monotonically. */
void writeBlob(int xid, lsn_t lsn, recordid rid, const void * buf) { void writeBlob(int xid, Page * p, lsn_t lsn, recordid rid, const void * buf) {
/* First, determine if the blob is dirty. */ /* First, determine if the blob is dirty. */
lsn_t * dirty = tripleHashLookup(xid, rid); lsn_t * dirty = tripleHashLookup(xid, rid);
@ -260,7 +302,7 @@ void writeBlob(int xid, lsn_t lsn, recordid rid, const void * buf) {
/* Tread() raw record */ /* Tread() raw record */
readRawRecord(xid, rid, &rec, sizeof(blob_record_t)); readRawRecord(xid, p, rid, &rec, sizeof(blob_record_t));
assert(rec.size == rid.size); assert(rec.size == rid.size);
@ -278,16 +320,20 @@ void writeBlob(int xid, lsn_t lsn, recordid rid, const void * buf) {
rec.fd = rec.fd ? 0 : 1; rec.fd = rec.fd ? 0 : 1;
/* Tset() raw record */ /* Tset() raw record */
writeRawRecord(xid, rid, &rec, sizeof(blob_record_t)); writeRawRecord(xid, p, rid, lsn, &rec, sizeof(blob_record_t));
} }
fd = rec.fd ? blobf1 : blobf0; /* rec's fd is up-to-date, so use it directly */ fd = rec.fd ? blobf1 : blobf0; /* rec's fd is up-to-date, so use it directly */
DEBUG("Writing at offset = %d, size = %ld\n", rec.offset, rec.size);
flockfile(fd);
offset = myFseek(fd, rec.offset, SEEK_SET); offset = myFseek(fd, rec.offset, SEEK_SET);
DEBUG("Writing at offset = %d, size = %ld\n", rec.offset, rec.size);
assert(offset == rec.offset); assert(offset == rec.offset);
readcount = fwrite(buf, rec.size, 1, fd); readcount = fwrite(buf, rec.size, 1, fd);
funlockfile(fd);
assert(1 == readcount); assert(1 == readcount);
/* No need to update the raw blob record. */ /* No need to update the raw blob record. */
@ -297,9 +343,12 @@ void writeBlob(int xid, lsn_t lsn, recordid rid, const void * buf) {
files when it's called (are there any dirty blobs associated with files when it's called (are there any dirty blobs associated with
this transaction? */ this transaction? */
void commitBlobs(int xid) { void commitBlobs(int xid) {
flockfile(blobf0);
flockfile(blobf1);
fdatasync(fileno(blobf0)); fdatasync(fileno(blobf0));
fdatasync(fileno(blobf1)); fdatasync(fileno(blobf1));
funlockfile(blobf0);
funlockfile(blobf1);
abortBlobs(xid); abortBlobs(xid);
} }
@ -329,10 +378,12 @@ void abortBlobs(int xid) {
easier. easier.
*/ */
pthread_mutex_lock(&blob_hash_mutex);
pblHashTable_t * rid_buckets = pblHtLookup(dirtyBlobs, &xid, sizeof(int)); pblHashTable_t * rid_buckets = pblHtLookup(dirtyBlobs, &xid, sizeof(int));
pblHashTable_t * this_bucket; pblHashTable_t * this_bucket;
if(!rid_buckets) { return; } /* No dirty blobs for this xid.. */ if(rid_buckets) { /* Otherwise, there are no dirty blobs for this xid.. */
for(this_bucket = pblHtFirst(rid_buckets); this_bucket; this_bucket = pblHtNext(rid_buckets)) { for(this_bucket = pblHtFirst(rid_buckets); this_bucket; this_bucket = pblHtNext(rid_buckets)) {
lsn_t * rid_lsn; lsn_t * rid_lsn;
@ -351,5 +402,8 @@ void abortBlobs(int xid) {
pblHtDelete(this_bucket); pblHtDelete(this_bucket);
} }
pblHtDelete(rid_buckets); pblHtDelete(rid_buckets);
}
pthread_mutex_unlock(&blob_hash_mutex);
} }

View file

@ -48,13 +48,13 @@ BEGIN_C_DECLS
/** /**
Read the blob from the recordid rid into buf. Read the blob from the recordid rid into buf.
*/ */
void readBlob(int xid, recordid rid, void * buf); void readBlob(int xid, Page * p, recordid rid, void * buf);
/** /**
Write the contents of buf to the blob in recordid rid. Write the contents of buf to the blob in recordid rid.
*/ */
void writeBlob(int xid, lsn_t lsn, recordid rid, const void * buf); void writeBlob(int xid, Page * p, lsn_t lsn, recordid rid, const void * buf);
/** /**
@ -88,7 +88,7 @@ recordid preAllocBlob(int xid, long blobsize);
*/ */
void allocBlob(int xid, lsn_t lsn, recordid rid); void allocBlob(int xid, Page * p, lsn_t lsn, recordid rid);
void openBlobStore(); void openBlobStore();
void closeBlobStore(); void closeBlobStore();

View file

@ -143,7 +143,7 @@ void writeRecord(int xid, Page * p, lsn_t lsn, recordid rid, const void *dat) {
if(rid.size > BLOB_THRESHOLD_SIZE) { if(rid.size > BLOB_THRESHOLD_SIZE) {
/* DEBUG("Writing blob.\n"); */ /* DEBUG("Writing blob.\n"); */
writeBlob(xid, lsn, rid, dat); writeBlob(xid, p, lsn, rid, dat);
} else { } else {
/* DEBUG("Writing record.\n"); */ /* DEBUG("Writing record.\n"); */
@ -165,7 +165,7 @@ void readRecord(int xid, Page * p, recordid rid, void *buf) {
/* DEBUG("Reading blob. xid = %d rid = { %d %d %ld } buf = %x\n", /* DEBUG("Reading blob. xid = %d rid = { %d %d %ld } buf = %x\n",
xid, rid.page, rid.slot, rid.size, (unsigned int)buf); */ xid, rid.page, rid.slot, rid.size, (unsigned int)buf); */
/* @todo should readblob take a page pointer? */ /* @todo should readblob take a page pointer? */
readBlob(xid, rid, buf); readBlob(xid, p, rid, buf);
} else { } else {
assert(rid.page == p->id); assert(rid.page == p->id);
/* DEBUG("Reading record xid = %d rid = { %d %d %ld } buf = %x\n", /* DEBUG("Reading record xid = %d rid = { %d %d %ld } buf = %x\n",

View file

@ -45,7 +45,7 @@ int __lladd_pthread_mutex_lock(lladd_pthread_mutex_t *mutex, char * file, int li
blockCount ++; blockCount ++;
pthread_yield(); pthread_yield();
if(blockCount >= 10000 && ! (blockCount % 10000)) { if(blockCount >= 30000 && ! (blockCount % 30000)) {
printf("Spinning at %s:%d, %ld times. Held by: %s\n", file, line, blockCount, mutex->last_acquired_at); printf("Spinning at %s:%d, %ld times. Held by: %s\n", file, line, blockCount, mutex->last_acquired_at);
fflush(NULL); fflush(NULL);
} }

View file

@ -32,7 +32,7 @@
static int operate(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) { static int operate(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) {
if(rid.size >= BLOB_THRESHOLD_SIZE) { if(rid.size >= BLOB_THRESHOLD_SIZE) {
allocBlob(xid, lsn, rid); allocBlob(xid, p, lsn, rid);
} else { } else {
/* Page * loadedPage = loadPage(rid.page); */ /* Page * loadedPage = loadPage(rid.page); */
/* pageSlotRalloc(loadedPage, lsn, rid); */ /* pageSlotRalloc(loadedPage, lsn, rid); */

View file

@ -152,7 +152,11 @@ Page * getPage(int pageid, int locktype) {
ret = pblHtLookup(activePages, &pageid, sizeof(int)); ret = pblHtLookup(activePages, &pageid, sizeof(int));
if(ret) { if(ret) {
if(locktype == RW) {
writelock(ret->loadlatch, 217);
} else {
readlock(ret->loadlatch, 217); readlock(ret->loadlatch, 217);
}
//writelock(ret->loadlatch, 217); //writelock(ret->loadlatch, 217);
} }
@ -165,8 +169,12 @@ Page * getPage(int pageid, int locktype) {
if(ret) { if(ret) {
// writelock(ret->loadlatch, 217); // writelock(ret->loadlatch, 217);
if(locktype == RW) {
writelock(ret->loadlatch, 217);
} else {
readlock(ret->loadlatch, 217); readlock(ret->loadlatch, 217);
} }
}
spin++; spin++;
if(spin > 10000) { if(spin > 10000) {
printf("GetPage is stuck!"); printf("GetPage is stuck!");
@ -244,7 +252,11 @@ Page * getPage(int pageid, int locktype) {
/* downgradelock(ret->loadlatch); */ /* downgradelock(ret->loadlatch); */
// writelock(ret->loadlatch, 217); // writelock(ret->loadlatch, 217);
if(locktype == RW) {
writelock(ret->loadlatch, 217);
} else {
readlock(ret->loadlatch, 217); readlock(ret->loadlatch, 217);
}
if(ret->id != pageid) { if(ret->id != pageid) {
unlock(ret->loadlatch); unlock(ret->loadlatch);
printf("pageCache.c: Thrashing detected. Strongly consider increasing LLADD's buffer pool size!\n"); printf("pageCache.c: Thrashing detected. Strongly consider increasing LLADD's buffer pool size!\n");

View file

@ -17,9 +17,10 @@
#define LOG_NAME "check_bufferMananger.log" #define LOG_NAME "check_bufferMananger.log"
#define NUM_PAGES 1000 #define NUM_PAGES 1000
#define THREAD_COUNT 25 #define THREAD_COUNT 10
#define READS_PER_THREAD 10000 #define READS_PER_THREAD 100
#define RECORDS_PER_THREAD 10000 #define RECORDS_PER_THREAD 100
#define RECORD_THREAD_COUNT 25 #define RECORD_THREAD_COUNT 25
void initializePages() { void initializePages() {
@ -60,8 +61,8 @@ void * workerThread(void * p) {
int k = (int) (((double)NUM_PAGES)*rand()/(RAND_MAX+1.0)); int k = (int) (((double)NUM_PAGES)*rand()/(RAND_MAX+1.0));
Page * p; Page * p;
if(! (i % 500) ) { if(! (i % 50) ) {
printf("%d", i / 500); fflush(NULL); printf("%d", i / 50); fflush(NULL);
} }
rid.page = k; rid.page = k;
@ -94,8 +95,8 @@ void * workerThreadWriting(void * q) {
/* printf("\nRID:\t%d,%d\n", rids[i].page, rids[i].slot); */ /* printf("\nRID:\t%d,%d\n", rids[i].page, rids[i].slot); */
/* fflush(NULL); */ /* fflush(NULL); */
if(! (i % 1000) ) { if(! (i % 100) ) {
printf("A%d", i / 1000); fflush(NULL); printf("A%d", i / 100); fflush(NULL);
} }
@ -122,8 +123,8 @@ void * workerThreadWriting(void * q) {
assert(p->id == rids[i].page); assert(p->id == rids[i].page);
releasePage(p); releasePage(p);
if(! (i % 1000) ) { if(! (i % 100) ) {
printf("W%d", i / 1000); fflush(NULL); printf("W%d", i / 100); fflush(NULL);
} }
/* sched_yield(); */ /* sched_yield(); */
@ -139,8 +140,8 @@ void * workerThreadWriting(void * q) {
releasePage(p); releasePage(p);
if(! (i % 1000) ) { if(! (i % 100) ) {
printf("R%d", i / 1000); fflush(NULL); printf("R%d", i / 100); fflush(NULL);
} }
@ -236,9 +237,9 @@ Suite * check_suite(void) {
/* Sub tests are added, one per line, here */ /* Sub tests are added, one per line, here */
/* tcase_add_test(tc, pageSingleThreadTest); tcase_add_test(tc, pageSingleThreadTest);
tcase_add_test(tc, pageLoadTest); tcase_add_test(tc, pageLoadTest);
tcase_add_test(tc, pageSingleThreadWriterTest); */ tcase_add_test(tc, pageSingleThreadWriterTest);
tcase_add_test(tc, pageThreadedWritersTest); tcase_add_test(tc, pageThreadedWritersTest);
/* --------------------------------------------- */ /* --------------------------------------------- */

View file

@ -141,10 +141,11 @@ END_TEST
*/ */
START_TEST(pageThreadTest) { START_TEST(pageThreadTest) {
#define THREAD_COUNT 50 #define THREAD_COUNT 10
pthread_t workers[THREAD_COUNT]; pthread_t workers[THREAD_COUNT];
int i; int i;
pthread_mutex_init(&random_mutex, NULL); pthread_mutex_init(&random_mutex, NULL);
pthread_mutex_init(&lsn_mutex, NULL);
fail_unless(1, NULL); fail_unless(1, NULL);
Tinit(); Tinit();

View file

@ -47,8 +47,142 @@ terms specified in this license.
#include <lladd/transactional.h> #include <lladd/transactional.h>
#include "../check_includes.h" #include "../check_includes.h"
#define LOG_NAME "check_transactional2.log" #define LOG_NAME "check_transactional2.log"
#define THREAD_COUNT 25 #define THREAD_COUNT 10
#define RECORDS_PER_THREAD 10000 #define RECORDS_PER_THREAD 500
void arraySet(int * array, int val) {
int i;
for(i = 0; i < 1024; i++) {
array[i] = val;
}
}
int arrayCmp(int * array, int * array2) {
int i;
for(i = 0; i < 1024; i++) {
if(array[i] != array2[i]) {
return 0;
}
}
return 1;
}
/** Allocate a bunch of blobs, set them, read them, commit them, and read it again, chang them, abort, and read again. */
void * writingAbortingBlobWorkerThread ( void * v ) {
int offset = * (int *) v;
recordid * rids = malloc(RECORDS_PER_THREAD * sizeof(recordid));
int xid = Tbegin();
for(int i = 0; i < RECORDS_PER_THREAD; i++) {
rids[i] = /* ralloc(xid, sizeof(int)); */ Talloc(xid, 1024 * sizeof(int));
if(! (i %100)) {
printf("A%d", i/100);fflush(NULL);
}
}
for(int i = 0; i < RECORDS_PER_THREAD; i++) {
int tmp[1024];/* = i + offset; */
arraySet(tmp, i+ offset);
Tset(xid, rids[i], tmp);
if(! (i %100)) {
printf("W%d", i/100); fflush(NULL);
}
}
Tcommit(xid);
xid = Tbegin();
for(int i = 0; i < RECORDS_PER_THREAD; i++) {
int j[1024];
int k[1024];
arraySet(k, i+offset);
Tread(xid, rids[i], j);
assert(arrayCmp(j,k));/*i + offset == j); */
if(! (i %100)) {
printf("R%d", i/100);fflush(NULL);
}
arraySet(k, -1);
Tset(xid, rids[i], k);/*(void*)&minusOne); */
}
Tabort(xid);
xid = Tbegin();
for(int i = 0; i < RECORDS_PER_THREAD; i++) {
int j[1024];
int k[1024];
arraySet(k, i+offset);
Tread(xid, rids[i], j);
assert(arrayCmp(j,k));
if(! (i %100)) {
printf("S%d", i/100);fflush(NULL);
}
}
Tcommit(xid);
return NULL;
}
/** Allocate a bunch of stuff, set it, read it, commit it, and read it again. */
void * writingAbortingWorkerThread ( void * v ) {
int offset = * (int *) v;
recordid * rids = malloc(RECORDS_PER_THREAD * sizeof(recordid));
int xid = Tbegin();
for(int i = 0; i < RECORDS_PER_THREAD; i++) {
rids[i] = /* ralloc(xid, sizeof(int)); */ Talloc(xid, sizeof(int));
if(! (i %100)) {
printf("A%d", i/100);fflush(NULL);
}
}
for(int i = 0; i < RECORDS_PER_THREAD; i++) {
int tmp = i + offset;
Tset(xid, rids[i], &tmp);
if(! (i %100)) {
printf("W%d", i/100); fflush(NULL);
}
}
Tcommit(xid);
xid = Tbegin();
for(int i = 0; i < RECORDS_PER_THREAD; i++) {
int j;
int minusOne = -1;
Tread(xid, rids[i], &j);
assert(i + offset == j);
if(! (i %100)) {
printf("R%d", i/100);fflush(NULL);
}
Tset(xid, rids[i], (void*)&minusOne);
}
Tabort(xid);
xid = Tbegin();
for(int i = 0; i < RECORDS_PER_THREAD; i++) {
int j;
Tread(xid, rids[i], &j);
assert(i + offset == j);
if(! (i %100)) {
printf("S%d", i/100);fflush(NULL);
}
}
Tcommit(xid);
return NULL;
}
/** Allocate a bunch of stuff, set it, read it, commit it, and read it again. */ /** Allocate a bunch of stuff, set it, read it, commit it, and read it again. */
void * writingWorkerThread ( void * v ) { void * writingWorkerThread ( void * v ) {
@ -57,58 +191,47 @@ void * writingWorkerThread ( void * v ) {
int xid = Tbegin(); int xid = Tbegin();
for(int i = 0; i < RECORDS_PER_THREAD; i++) { for(int i = 0; i < RECORDS_PER_THREAD; i++) {
rids[i] = /* ralloc(xid, sizeof(int)); */ Talloc(xid, sizeof(int)); rids[i] = /* ralloc(xid, sizeof(int)); */ Talloc(xid, sizeof(int));
if(! (i %1000)) { if(! (i %100)) {
printf("A%d", i/1000);fflush(NULL); printf("A%d", i/100);fflush(NULL);
} }
} }
for(int i = 0; i < RECORDS_PER_THREAD; i++) { for(int i = 0; i < RECORDS_PER_THREAD; i++) {
int tmp = i + offset; int tmp = i + offset;
/* Page * p = loadPage(rids[i].page);
writeRecord(1, p, 0, rids[i], &tmp);
releasePage(p); */
Tset(xid, rids[i], &tmp); Tset(xid, rids[i], &tmp);
if(! (i %1000)) { if(! (i %100)) {
printf("W%d", i/1000); fflush(NULL); printf("W%d", i/100); fflush(NULL);
} }
} }
for(int i = 0; i < RECORDS_PER_THREAD; i++) { for(int i = 0; i < RECORDS_PER_THREAD; i++) {
int j; int j;
/* Page * p = loadPage(rids[i].page);
readRecord(1, p, rids[i], &j);
releasePage(p); */
Tread(xid, rids[i], &j); Tread(xid, rids[i], &j);
assert(i + offset == j); assert(i + offset == j);
if(! (i %1000)) { if(! (i %100)) {
printf("R%d", i/1000);fflush(NULL); printf("R%d", i/100);fflush(NULL);
} }
} }
Tcommit(xid); Tcommit(xid);
/* Tcommit(xid);
xid = Tbegin(); xid = Tbegin();
for(int i = 0; i < RECORDS_PER_THREAD; i++) { for(int i = 0; i < RECORDS_PER_THREAD; i++) {
int j; int j;
Tread(xid, rids[i], &j); Tread(xid, rids[i], &j);
assert(i + offset == j); assert(i + offset == j);
}*/ }
Tcommit(xid);
return NULL; return NULL;
} }
/** /**
@test
Assuming that the Tset() operation is implemented correctly, checks Assuming that the Tset() operation is implemented correctly, checks
that doUpdate, redoUpdate and undoUpdate are working correctly, for that doUpdate, redoUpdate and undoUpdate are working correctly, for
operations that use physical logging. operations that use physical logging.
@ -150,6 +273,7 @@ START_TEST(transactional_smokeTest) {
END_TEST END_TEST
/** /**
@test
Just like transactional_smokeTest, but check blobs instead. Just like transactional_smokeTest, but check blobs instead.
*/ */
START_TEST(transactional_blobSmokeTest) { START_TEST(transactional_blobSmokeTest) {
@ -205,6 +329,10 @@ START_TEST(transactional_blobSmokeTest) {
} }
END_TEST END_TEST
/**
@test
Make sure that the single threaded version of transactional_threads_commit passes.
*/
START_TEST(transactional_nothreads_commit) { START_TEST(transactional_nothreads_commit) {
int five = 5; int five = 5;
Tinit(); Tinit();
@ -212,6 +340,10 @@ START_TEST(transactional_nothreads_commit) {
Tdeinit(); Tdeinit();
} END_TEST } END_TEST
/**
@test
Test LLADD in a multi-threaded envrionment, where every transaction commits.
*/
START_TEST(transactional_threads_commit) { START_TEST(transactional_threads_commit) {
pthread_t workers[THREAD_COUNT]; pthread_t workers[THREAD_COUNT];
int i; int i;
@ -232,6 +364,76 @@ START_TEST(transactional_threads_commit) {
Tdeinit(); Tdeinit();
} END_TEST } END_TEST
/**
@test
Make sure that the single threaded version of transactional_threads_abort passes.
*/
START_TEST(transactional_nothreads_abort) {
int five = 5;
Tinit();
writingAbortingWorkerThread(&five);
Tdeinit();
} END_TEST
/**
@test
Test LLADD in a multi-threaded envrionment, with a mix of transaction commits and aborts.
*/
START_TEST(transactional_threads_abort) {
pthread_t workers[THREAD_COUNT];
int i;
Tinit();
for(i = 0; i < THREAD_COUNT; i++) {
int arg = i + 100;
pthread_create(&workers[i], NULL, writingAbortingWorkerThread, &arg);
}
for(i = 0; i < THREAD_COUNT; i++) {
pthread_join(workers[i], NULL);
}
Tdeinit();
} END_TEST
/**
@test
Make sure that the single threaded version of transactional_threads_abort passes.
*/
START_TEST(transactional_blobs_nothreads_abort) {
int five = 5;
Tinit();
writingAbortingBlobWorkerThread(&five);
Tdeinit();
} END_TEST
/**
@test
Test LLADD in a multi-threaded envrionment, with a mix of transaction commits and aborts.
*/
START_TEST(transactional_blobs_threads_abort) {
pthread_t workers[THREAD_COUNT];
int i;
Tinit();
for(i = 0; i < THREAD_COUNT; i++) {
int arg = i + 100;
pthread_create(&workers[i], NULL, writingAbortingBlobWorkerThread, &arg);
}
for(i = 0; i < THREAD_COUNT; i++) {
pthread_join(workers[i], NULL);
}
Tdeinit();
} END_TEST
/** /**
Add suite declarations here Add suite declarations here
*/ */
@ -241,10 +443,14 @@ Suite * check_suite(void) {
TCase *tc = tcase_create("transactional_smokeTest"); TCase *tc = tcase_create("transactional_smokeTest");
/* Sub tests are added, one per line, here */ /* Sub tests are added, one per line, here */
tcase_add_test(tc, transactional_smokeTest); /* tcase_add_test(tc, transactional_smokeTest);
/* tcase_add_test(tc, transactional_blobSmokeTest); */ tcase_add_test(tc, transactional_blobSmokeTest);
tcase_add_test(tc, transactional_nothreads_commit); tcase_add_test(tc, transactional_nothreads_commit);
tcase_add_test(tc, transactional_threads_commit); tcase_add_test(tc, transactional_threads_commit);
tcase_add_test(tc, transactional_nothreads_abort);
tcase_add_test(tc, transactional_threads_abort); */
tcase_add_test(tc, transactional_blobs_nothreads_abort);
tcase_add_test(tc, transactional_blobs_threads_abort);
/** @todo still need to make blobs reentrant! */ /** @todo still need to make blobs reentrant! */
/* --------------------------------------------- */ /* --------------------------------------------- */
tcase_add_checked_fixture(tc, setup, teardown); tcase_add_checked_fixture(tc, setup, teardown);