diff --git a/benchmarks/Makefile.am b/benchmarks/Makefile.am index 2dfcc8d..9caacdd 100644 --- a/benchmarks/Makefile.am +++ b/benchmarks/Makefile.am @@ -1,7 +1,7 @@ LDADD=$(top_builddir)/src/2pc/lib2pc.a $(top_builddir)/src/libdfa/libdfa.a \ $(top_builddir)/src/lladd/liblladd.a $(top_builddir)/src/pbl/libpbl.a \ $(top_builddir)/src/libdfa/librw.a -bin_PROGRAMS=naiveHash logicalHash readLogicalHash naiveMultiThreaded logicalMultThreaded rawSet \ +bin_PROGRAMS=lhtableThreaded naiveHash logicalHash readLogicalHash naiveMultiThreaded logicalMultThreaded rawSet \ arrayListSet logicalMultiReaders linearHashNTA linkedListNTA pageOrientedListNTA \ linearHashNTAThreaded linearHashNTAMultiReader linearHashNTAWriteRequests transitiveClosure zeroCopy AM_CFLAGS= -g -Wall -pedantic -std=gnu99 diff --git a/lladd/truncation.h b/lladd/truncation.h index 1e1e577..2e938b6 100644 --- a/lladd/truncation.h +++ b/lladd/truncation.h @@ -58,6 +58,7 @@ void dirtyPagesDeinit(); void dirtyPages_add(Page * p); void dirtyPages_remove(Page * p); +int dirtyPages_isDirty(Page * p); void truncationInit(); void truncationDeinit(); diff --git a/src/lladd/bufferManager.c b/src/lladd/bufferManager.c index 6644774..6b5d599 100644 --- a/src/lladd/bufferManager.c +++ b/src/lladd/bufferManager.c @@ -107,7 +107,6 @@ void bufDeinit() { pblHtRemove( activePages, 0, 0 ); DEBUG("+"); pageWrite(p); - // dirtyPages_remove(p); } @@ -233,7 +232,6 @@ static Page * getPage(int pageid, int locktype) { assert(ret != dummy_page); if(ret->id != -1) { pageWrite(ret); - // dirtyPages_remove(ret); } pageFree(ret, pageid); diff --git a/src/lladd/lhtable.c b/src/lladd/lhtable.c index 3e8778c..19f3ac7 100644 --- a/src/lladd/lhtable.c +++ b/src/lladd/lhtable.c @@ -5,7 +5,7 @@ #include #include #include - +#include /** @todo Look up the balls + bins stuff, and pick FILL_FACTOR in a principled way... @@ -27,12 +27,18 @@ */ +#define NAIVE_LOCKING + + struct LH_ENTRY(table) { struct LH_ENTRY(pair_t) * bucketList; unsigned int bucketListLength; unsigned char bucketListBits; unsigned int bucketListNextExtension; unsigned int occupancy; +#ifdef NAIVE_LOCKING + pthread_mutex_t lock; +#endif }; @@ -204,12 +210,18 @@ struct LH_ENTRY(table) * LH_ENTRY(create)(int initialSize) { ret->bucketListLength = initialSize; ret->occupancy = 0; // printf("Table: {size = %d, bits = %d, ext = %d\n", ret->bucketListLength, ret->bucketListBits, ret->bucketListNextExtension); +#ifdef NAIVE_LOCKING + pthread_mutex_init(&(ret->lock), 0); +#endif return ret; } LH_ENTRY(value_t) * LH_ENTRY(insert) (struct LH_ENTRY(table) * table, const LH_ENTRY(key_t) * key, int len, LH_ENTRY(value_t) * value) { +#ifdef NAIVE_LOCKING + pthread_mutex_lock(&(table->lock)); +#endif // @todo 32 vs. 64 bit.. long bucket = hash(key, len, table->bucketListBits, table->bucketListNextExtension); @@ -266,20 +278,34 @@ LH_ENTRY(value_t) * LH_ENTRY(insert) (struct LH_ENTRY(table) * table, )) { extendHashTable(table); } +#ifdef NAIVE_LOCKING + pthread_mutex_unlock(&(table->lock)); +#endif return ret; } LH_ENTRY(value_t) * LH_ENTRY(remove) (struct LH_ENTRY(table) * table, const LH_ENTRY(key_t) * key, int len) { +#ifdef NAIVE_LOCKING + pthread_mutex_lock(&(table->lock)); +#endif // @todo 32 vs. 64 bit.. long bucket = hash(key, len, table->bucketListBits, table->bucketListNextExtension); - return removeFromLinkedList(table, bucket, key, len); + + LH_ENTRY(value_t) * ret = removeFromLinkedList(table, bucket, key, len); +#ifdef NAIVE_LOCKING + pthread_mutex_unlock(&(table->lock)); +#endif + return ret; } LH_ENTRY(value_t) * LH_ENTRY(find)(struct LH_ENTRY(table) * table, const LH_ENTRY(key_t) * key, int len) { +#ifdef NAIVE_LOCKING + pthread_mutex_lock(&(table->lock)); +#endif // @todo 32 vs. 64 bit.. int bucket = hash(key, len, table->bucketListBits, table->bucketListNextExtension); @@ -288,6 +314,10 @@ LH_ENTRY(value_t) * LH_ENTRY(find)(struct LH_ENTRY(table) * table, thePair = findInLinkedList(key, len, &(table->bucketList[bucket]), &predecessor); +#ifdef NAIVE_LOCKING + pthread_mutex_unlock(&(table->lock)); +#endif + if(!thePair) { return 0; } else { @@ -297,14 +327,23 @@ LH_ENTRY(value_t) * LH_ENTRY(find)(struct LH_ENTRY(table) * table, void LH_ENTRY(openlist)(const struct LH_ENTRY(table) * table, struct LH_ENTRY(list) * list) { +#ifdef NAIVE_LOCKING + pthread_mutex_lock(&(((struct LH_ENTRY(table)*)table)->lock)); +#endif list->table = table; list->currentPair = 0; list->nextPair = 0; list->currentBucket = -1; +#ifdef NAIVE_LOCKING + pthread_mutex_unlock(&(((struct LH_ENTRY(table)*)table)->lock)); +#endif } const struct LH_ENTRY(pair_t)* LH_ENTRY(readlist)(struct LH_ENTRY(list) * list) { +#ifdef NAIVE_LOCKING + pthread_mutex_lock(&(((struct LH_ENTRY(table)*)(list->table))->lock)); +#endif assert(list->currentBucket != -2); while(!list->nextPair) { list->currentBucket++; @@ -319,12 +358,23 @@ const struct LH_ENTRY(pair_t)* LH_ENTRY(readlist)(struct LH_ENTRY(list) * list) if(list->currentPair) { list->nextPair = list->currentPair->next; } - return list->currentPair; + // XXX is it even meaningful to return a pair object on an unlocked hashtable? + const struct LH_ENTRY(pair_t)* ret = list->currentPair; +#ifdef NAIVE_LOCKING + pthread_mutex_unlock(&(((struct LH_ENTRY(table)*)(list->table))->lock)); +#endif + return ret; } void LH_ENTRY(closelist)(struct LH_ENTRY(list) * list) { +#ifdef NAIVE_LOCKING + pthread_mutex_lock(&(((struct LH_ENTRY(table)*)(list->table))->lock)); +#endif assert(list->currentBucket != -2); list->currentBucket = -2; +#ifdef NAIVE_LOCKING + pthread_mutex_unlock(&(((struct LH_ENTRY(table)*)(list->table))->lock)); +#endif } void LH_ENTRY(destroy) (struct LH_ENTRY(table) * t) { @@ -342,6 +392,9 @@ void LH_ENTRY(destroy) (struct LH_ENTRY(table) * t) { } LH_ENTRY(closelist)(&l); free(t->bucketList); +#ifdef NAIVE_LOCKING + pthread_mutex_destroy(&(t->lock)); +#endif free(t); } diff --git a/src/lladd/logger/logWriter.c b/src/lladd/logger/logWriter.c index 6b4f4db..62e2984 100644 --- a/src/lladd/logger/logWriter.c +++ b/src/lladd/logger/logWriter.c @@ -77,8 +77,7 @@ static int roLogFD = 0; static lsn_t flushedLSN_val; /** - Invariant: No thread is writing to flushedLSN. (This lock is not - needed if doubles are set atomically by the processeor.) Since + Invariant: No thread is writing to flushedLSN. Since flushedLSN is monotonically increasing, readers can immmediately release their locks after checking the value of flushedLSN. */ @@ -90,12 +89,15 @@ static rwl * flushedLSN_lock; @see writeLogEntry */ -static lsn_t nextAvailableLSN = 0; +static lsn_t nextAvailableLSN; /** The global offset for the current version of the log file. */ static lsn_t global_offset; + +// Lock order: truncateLog_mutex, log_write_mutex, log_read_mutex + /** This mutex makes sequences of calls to lseek() and read() atomic. It is also used by truncateLog to block read requests while @@ -170,7 +172,9 @@ int openLogWriter() { flushedLSN_val = 0; - nextAvailableLSN = 0; + + + /* Seek append only log to the end of the file. This is unnecessary, @@ -207,6 +211,20 @@ int openLogWriter() { } } + + // Initialize nextAvailableLSN. + + LogHandle lh; + const LogEntry * le; + + nextAvailableLSN = sizeof(lsn_t); + lh = getLSNHandle(nextAvailableLSN); + + while((le = nextInLog(&lh))) { + nextAvailableLSN = le->LSN + sizeofLogEntry(le) + sizeof(lsn_t);; + FreeLogEntry(le); + } + return 0; } @@ -239,7 +257,7 @@ int writeLogEntry(LogEntry * e) { pthread_mutex_lock(&log_write_mutex); - if(!nextAvailableLSN) { + /* if(!nextAvailableLSN) { LogHandle lh; const LogEntry * le; @@ -251,15 +269,13 @@ int writeLogEntry(LogEntry * e) { nextAvailableLSN = le->LSN + sizeofLogEntry(le) + sizeof(lsn_t);; FreeLogEntry(le); } - } + }*/ /* Set the log entry's LSN. */ e->LSN = nextAvailableLSN; //printf ("\nLSN: %ld\n", e->LSN); //fflush(stdout); - nextAvailableLSN += (size + sizeof(lsn_t)); - size_t nmemb = fwrite(&size, sizeof(lsn_t), 1, log); if(nmemb != 1) { if(feof(log)) { abort(); /* feof makes no sense here */ } @@ -267,6 +283,7 @@ int writeLogEntry(LogEntry * e) { fprintf(stderr, "writeLog couldn't write next log entry: %d\n", ferror(log)); abort(); } + // XXX nextAvailableLSN not set... return LLADD_IO_ERROR; } @@ -278,10 +295,16 @@ int writeLogEntry(LogEntry * e) { fprintf(stderr, "writeLog couldn't write next log entry: %d\n", ferror(log)); abort(); } + // XXX nextAvailableLSN not set... return LLADD_IO_ERROR; } //fflush(log); + + pthread_mutex_lock(&log_read_mutex); + nextAvailableLSN += (size + sizeof(lsn_t)); + pthread_mutex_unlock(&log_read_mutex); + pthread_mutex_unlock(&log_write_mutex); @@ -292,7 +315,8 @@ void syncLog() { lsn_t newFlushedLSN; pthread_mutex_lock(&log_read_mutex); - newFlushedLSN = ftell(log) + global_offset; + // newFlushedLSN = ftell(log) + global_offset; + newFlushedLSN = nextAvailableLSN -1; pthread_mutex_unlock(&log_read_mutex); // Wait to set the static variable until after the flush returns. @@ -365,19 +389,20 @@ static LogEntry * readLogEntry() { if(bytesRead != size) { if(bytesRead == 0) { - // fprintf(stderr, "eof reading entry\n"); - // fflush(stderr); - return(NULL); + fprintf(stderr, "eof reading entry\n"); + fflush(stderr); + abort(); + // return(NULL); } else if(bytesRead == -1) { perror("error reading log"); abort(); return (LogEntry*)LLADD_IO_ERROR; } else { - printf("short read from log w/ lsn %ld. Expected %ld bytes, got %ld.\nFIXME: This is 'normal', but currently not handled", debug_lsn, size, bytesRead); + fprintf(stderr, "short read from log w/ lsn %ld. Expected %ld bytes, got %ld.\nFIXME: This is 'normal', but currently not handled", debug_lsn, size, bytesRead); fflush(stderr); lsn_t newSize = size - bytesRead; lsn_t newBytesRead = read (roLogFD, ((byte*)ret)+bytesRead, newSize); - printf("\nattempt to read again produced newBytesRead = %ld, newSize was %ld\n", newBytesRead, newSize); + fprintf(stderr, "\nattempt to read again produced newBytesRead = %ld, newSize was %ld\n", newBytesRead, newSize); fflush(stderr); abort(); return (LogEntry*)LLADD_IO_ERROR; diff --git a/src/lladd/page/raw.c b/src/lladd/page/raw.c index 7587960..a52de38 100644 --- a/src/lladd/page/raw.c +++ b/src/lladd/page/raw.c @@ -16,7 +16,6 @@ byte* rawPageGetData(int xid, Page * p) { void rawPageSetData(int xid, lsn_t lsn, Page * p) { writelock(p->rwlatch, 255); rawPageWriteLSN(xid, p, lsn); - // p->dirty = 1; dirtyPages_add(p); unlock(p->rwlatch); return; diff --git a/src/lladd/pageFile.c b/src/lladd/pageFile.c index 29c6f2b..1c69162 100644 --- a/src/lladd/pageFile.c +++ b/src/lladd/pageFile.c @@ -76,7 +76,7 @@ void pageRead(Page *ret) { dirty page table can be kept up to date. */ void pageWrite(Page * ret) { /** If the page is clean, there's no reason to write it out. */ - if(!ret->dirty) { + if(!dirtyPages_isDirty(ret)) { DEBUG(" =^)~ "); return; } @@ -118,7 +118,6 @@ void pageWrite(Page * ret) { } } - ret->dirty = 0; dirtyPages_remove(ret); pthread_mutex_unlock(&stable_mutex); diff --git a/src/lladd/truncation.c b/src/lladd/truncation.c index f219387..a40e38b 100644 --- a/src/lladd/truncation.c +++ b/src/lladd/truncation.c @@ -36,6 +36,7 @@ void dirtyPages_remove(Page * p) { // printf("Removing page %d\n", p->id); //assert(pblHtLookup(dirtyPages, &(p->id), sizeof(int))); // printf("With lsn = %d\n", (lsn_t)pblHtCurrent(dirtyPages)); + p->dirty = 0; pblHtRemove(dirtyPages, &(p->id), sizeof(int)); //assert(!ret); <--- Due to a bug in the PBL compatibility mode, //there is no way to tell whether the value didn't exist, or if it @@ -43,6 +44,14 @@ void dirtyPages_remove(Page * p) { pthread_mutex_unlock(&dirtyPages_mutex); } +int dirtyPages_isDirty(Page * p) { + int ret; + pthread_mutex_lock(&dirtyPages_mutex); + ret = p->dirty; + pthread_mutex_unlock(&dirtyPages_mutex); + return ret; +} + static lsn_t dirtyPages_minRecLSN() { lsn_t lsn = LogFlushedLSN (); int* pageid; @@ -81,10 +90,7 @@ static void dirtyPages_flush() { for(i = 0; i < MAX_BUFFER_SIZE && staleDirtyPages[i] != -1; i++) { p = loadPage(-1, staleDirtyPages[i]); - //if(p->dirty) { pageWrite(p); - // dirtyPages_remove(p); - //} releasePage(p); } free(staleDirtyPages);