Fixed races on flushedLSN and page->dirty.

This commit is contained in:
Sears Russell 2006-06-12 20:05:28 +00:00
parent 35a5e80871
commit 27f7df6f09
8 changed files with 107 additions and 26 deletions

View file

@ -1,7 +1,7 @@
LDADD=$(top_builddir)/src/2pc/lib2pc.a $(top_builddir)/src/libdfa/libdfa.a \
$(top_builddir)/src/lladd/liblladd.a $(top_builddir)/src/pbl/libpbl.a \
$(top_builddir)/src/libdfa/librw.a
bin_PROGRAMS=naiveHash logicalHash readLogicalHash naiveMultiThreaded logicalMultThreaded rawSet \
bin_PROGRAMS=lhtableThreaded naiveHash logicalHash readLogicalHash naiveMultiThreaded logicalMultThreaded rawSet \
arrayListSet logicalMultiReaders linearHashNTA linkedListNTA pageOrientedListNTA \
linearHashNTAThreaded linearHashNTAMultiReader linearHashNTAWriteRequests transitiveClosure zeroCopy
AM_CFLAGS= -g -Wall -pedantic -std=gnu99

View file

@ -58,6 +58,7 @@ void dirtyPagesDeinit();
void dirtyPages_add(Page * p);
void dirtyPages_remove(Page * p);
int dirtyPages_isDirty(Page * p);
void truncationInit();
void truncationDeinit();

View file

@ -107,7 +107,6 @@ void bufDeinit() {
pblHtRemove( activePages, 0, 0 );
DEBUG("+");
pageWrite(p);
// dirtyPages_remove(p);
}
@ -233,7 +232,6 @@ static Page * getPage(int pageid, int locktype) {
assert(ret != dummy_page);
if(ret->id != -1) {
pageWrite(ret);
// dirtyPages_remove(ret);
}
pageFree(ret, pageid);

View file

@ -5,7 +5,7 @@
#include <assert.h>
#include <string.h>
#include <stdio.h>
#include <pthread.h>
/**
@todo Look up the balls + bins stuff, and pick FILL_FACTOR in a
principled way...
@ -27,12 +27,18 @@
*/
#define NAIVE_LOCKING
struct LH_ENTRY(table) {
struct LH_ENTRY(pair_t) * bucketList;
unsigned int bucketListLength;
unsigned char bucketListBits;
unsigned int bucketListNextExtension;
unsigned int occupancy;
#ifdef NAIVE_LOCKING
pthread_mutex_t lock;
#endif
};
@ -204,12 +210,18 @@ struct LH_ENTRY(table) * LH_ENTRY(create)(int initialSize) {
ret->bucketListLength = initialSize;
ret->occupancy = 0;
// printf("Table: {size = %d, bits = %d, ext = %d\n", ret->bucketListLength, ret->bucketListBits, ret->bucketListNextExtension);
#ifdef NAIVE_LOCKING
pthread_mutex_init(&(ret->lock), 0);
#endif
return ret;
}
LH_ENTRY(value_t) * LH_ENTRY(insert) (struct LH_ENTRY(table) * table,
const LH_ENTRY(key_t) * key, int len,
LH_ENTRY(value_t) * value) {
#ifdef NAIVE_LOCKING
pthread_mutex_lock(&(table->lock));
#endif
// @todo 32 vs. 64 bit..
long bucket = hash(key, len,
table->bucketListBits, table->bucketListNextExtension);
@ -266,20 +278,34 @@ LH_ENTRY(value_t) * LH_ENTRY(insert) (struct LH_ENTRY(table) * table,
)) {
extendHashTable(table);
}
#ifdef NAIVE_LOCKING
pthread_mutex_unlock(&(table->lock));
#endif
return ret;
}
LH_ENTRY(value_t) * LH_ENTRY(remove) (struct LH_ENTRY(table) * table,
const LH_ENTRY(key_t) * key, int len) {
#ifdef NAIVE_LOCKING
pthread_mutex_lock(&(table->lock));
#endif
// @todo 32 vs. 64 bit..
long bucket = hash(key, len,
table->bucketListBits, table->bucketListNextExtension);
return removeFromLinkedList(table, bucket, key, len);
LH_ENTRY(value_t) * ret = removeFromLinkedList(table, bucket, key, len);
#ifdef NAIVE_LOCKING
pthread_mutex_unlock(&(table->lock));
#endif
return ret;
}
LH_ENTRY(value_t) * LH_ENTRY(find)(struct LH_ENTRY(table) * table,
const LH_ENTRY(key_t) * key, int len) {
#ifdef NAIVE_LOCKING
pthread_mutex_lock(&(table->lock));
#endif
// @todo 32 vs. 64 bit..
int bucket = hash(key, len,
table->bucketListBits, table->bucketListNextExtension);
@ -288,6 +314,10 @@ LH_ENTRY(value_t) * LH_ENTRY(find)(struct LH_ENTRY(table) * table,
thePair = findInLinkedList(key, len,
&(table->bucketList[bucket]),
&predecessor);
#ifdef NAIVE_LOCKING
pthread_mutex_unlock(&(table->lock));
#endif
if(!thePair) {
return 0;
} else {
@ -297,14 +327,23 @@ LH_ENTRY(value_t) * LH_ENTRY(find)(struct LH_ENTRY(table) * table,
void LH_ENTRY(openlist)(const struct LH_ENTRY(table) * table,
struct LH_ENTRY(list) * list) {
#ifdef NAIVE_LOCKING
pthread_mutex_lock(&(((struct LH_ENTRY(table)*)table)->lock));
#endif
list->table = table;
list->currentPair = 0;
list->nextPair = 0;
list->currentBucket = -1;
#ifdef NAIVE_LOCKING
pthread_mutex_unlock(&(((struct LH_ENTRY(table)*)table)->lock));
#endif
}
const struct LH_ENTRY(pair_t)* LH_ENTRY(readlist)(struct LH_ENTRY(list) * list) {
#ifdef NAIVE_LOCKING
pthread_mutex_lock(&(((struct LH_ENTRY(table)*)(list->table))->lock));
#endif
assert(list->currentBucket != -2);
while(!list->nextPair) {
list->currentBucket++;
@ -319,12 +358,23 @@ const struct LH_ENTRY(pair_t)* LH_ENTRY(readlist)(struct LH_ENTRY(list) * list)
if(list->currentPair) {
list->nextPair = list->currentPair->next;
}
return list->currentPair;
// XXX is it even meaningful to return a pair object on an unlocked hashtable?
const struct LH_ENTRY(pair_t)* ret = list->currentPair;
#ifdef NAIVE_LOCKING
pthread_mutex_unlock(&(((struct LH_ENTRY(table)*)(list->table))->lock));
#endif
return ret;
}
void LH_ENTRY(closelist)(struct LH_ENTRY(list) * list) {
#ifdef NAIVE_LOCKING
pthread_mutex_lock(&(((struct LH_ENTRY(table)*)(list->table))->lock));
#endif
assert(list->currentBucket != -2);
list->currentBucket = -2;
#ifdef NAIVE_LOCKING
pthread_mutex_unlock(&(((struct LH_ENTRY(table)*)(list->table))->lock));
#endif
}
void LH_ENTRY(destroy) (struct LH_ENTRY(table) * t) {
@ -342,6 +392,9 @@ void LH_ENTRY(destroy) (struct LH_ENTRY(table) * t) {
}
LH_ENTRY(closelist)(&l);
free(t->bucketList);
#ifdef NAIVE_LOCKING
pthread_mutex_destroy(&(t->lock));
#endif
free(t);
}

View file

@ -77,8 +77,7 @@ static int roLogFD = 0;
static lsn_t flushedLSN_val;
/**
Invariant: No thread is writing to flushedLSN. (This lock is not
needed if doubles are set atomically by the processeor.) Since
Invariant: No thread is writing to flushedLSN. Since
flushedLSN is monotonically increasing, readers can immmediately
release their locks after checking the value of flushedLSN.
*/
@ -90,12 +89,15 @@ static rwl * flushedLSN_lock;
@see writeLogEntry
*/
static lsn_t nextAvailableLSN = 0;
static lsn_t nextAvailableLSN;
/**
The global offset for the current version of the log file.
*/
static lsn_t global_offset;
// Lock order: truncateLog_mutex, log_write_mutex, log_read_mutex
/**
This mutex makes sequences of calls to lseek() and read() atomic.
It is also used by truncateLog to block read requests while
@ -170,7 +172,9 @@ int openLogWriter() {
flushedLSN_val = 0;
nextAvailableLSN = 0;
/*
Seek append only log to the end of the file. This is unnecessary,
@ -207,6 +211,20 @@ int openLogWriter() {
}
}
// Initialize nextAvailableLSN.
LogHandle lh;
const LogEntry * le;
nextAvailableLSN = sizeof(lsn_t);
lh = getLSNHandle(nextAvailableLSN);
while((le = nextInLog(&lh))) {
nextAvailableLSN = le->LSN + sizeofLogEntry(le) + sizeof(lsn_t);;
FreeLogEntry(le);
}
return 0;
}
@ -239,7 +257,7 @@ int writeLogEntry(LogEntry * e) {
pthread_mutex_lock(&log_write_mutex);
if(!nextAvailableLSN) {
/* if(!nextAvailableLSN) {
LogHandle lh;
const LogEntry * le;
@ -251,15 +269,13 @@ int writeLogEntry(LogEntry * e) {
nextAvailableLSN = le->LSN + sizeofLogEntry(le) + sizeof(lsn_t);;
FreeLogEntry(le);
}
}
}*/
/* Set the log entry's LSN. */
e->LSN = nextAvailableLSN;
//printf ("\nLSN: %ld\n", e->LSN);
//fflush(stdout);
nextAvailableLSN += (size + sizeof(lsn_t));
size_t nmemb = fwrite(&size, sizeof(lsn_t), 1, log);
if(nmemb != 1) {
if(feof(log)) { abort(); /* feof makes no sense here */ }
@ -267,6 +283,7 @@ int writeLogEntry(LogEntry * e) {
fprintf(stderr, "writeLog couldn't write next log entry: %d\n", ferror(log));
abort();
}
// XXX nextAvailableLSN not set...
return LLADD_IO_ERROR;
}
@ -278,10 +295,16 @@ int writeLogEntry(LogEntry * e) {
fprintf(stderr, "writeLog couldn't write next log entry: %d\n", ferror(log));
abort();
}
// XXX nextAvailableLSN not set...
return LLADD_IO_ERROR;
}
//fflush(log);
pthread_mutex_lock(&log_read_mutex);
nextAvailableLSN += (size + sizeof(lsn_t));
pthread_mutex_unlock(&log_read_mutex);
pthread_mutex_unlock(&log_write_mutex);
@ -292,7 +315,8 @@ void syncLog() {
lsn_t newFlushedLSN;
pthread_mutex_lock(&log_read_mutex);
newFlushedLSN = ftell(log) + global_offset;
// newFlushedLSN = ftell(log) + global_offset;
newFlushedLSN = nextAvailableLSN -1;
pthread_mutex_unlock(&log_read_mutex);
// Wait to set the static variable until after the flush returns.
@ -365,19 +389,20 @@ static LogEntry * readLogEntry() {
if(bytesRead != size) {
if(bytesRead == 0) {
// fprintf(stderr, "eof reading entry\n");
// fflush(stderr);
return(NULL);
fprintf(stderr, "eof reading entry\n");
fflush(stderr);
abort();
// return(NULL);
} else if(bytesRead == -1) {
perror("error reading log");
abort();
return (LogEntry*)LLADD_IO_ERROR;
} else {
printf("short read from log w/ lsn %ld. Expected %ld bytes, got %ld.\nFIXME: This is 'normal', but currently not handled", debug_lsn, size, bytesRead);
fprintf(stderr, "short read from log w/ lsn %ld. Expected %ld bytes, got %ld.\nFIXME: This is 'normal', but currently not handled", debug_lsn, size, bytesRead);
fflush(stderr);
lsn_t newSize = size - bytesRead;
lsn_t newBytesRead = read (roLogFD, ((byte*)ret)+bytesRead, newSize);
printf("\nattempt to read again produced newBytesRead = %ld, newSize was %ld\n", newBytesRead, newSize);
fprintf(stderr, "\nattempt to read again produced newBytesRead = %ld, newSize was %ld\n", newBytesRead, newSize);
fflush(stderr);
abort();
return (LogEntry*)LLADD_IO_ERROR;

View file

@ -16,7 +16,6 @@ byte* rawPageGetData(int xid, Page * p) {
void rawPageSetData(int xid, lsn_t lsn, Page * p) {
writelock(p->rwlatch, 255);
rawPageWriteLSN(xid, p, lsn);
// p->dirty = 1;
dirtyPages_add(p);
unlock(p->rwlatch);
return;

View file

@ -76,7 +76,7 @@ void pageRead(Page *ret) {
dirty page table can be kept up to date. */
void pageWrite(Page * ret) {
/** If the page is clean, there's no reason to write it out. */
if(!ret->dirty) {
if(!dirtyPages_isDirty(ret)) {
DEBUG(" =^)~ ");
return;
}
@ -118,7 +118,6 @@ void pageWrite(Page * ret) {
}
}
ret->dirty = 0;
dirtyPages_remove(ret);
pthread_mutex_unlock(&stable_mutex);

View file

@ -36,6 +36,7 @@ void dirtyPages_remove(Page * p) {
// printf("Removing page %d\n", p->id);
//assert(pblHtLookup(dirtyPages, &(p->id), sizeof(int)));
// printf("With lsn = %d\n", (lsn_t)pblHtCurrent(dirtyPages));
p->dirty = 0;
pblHtRemove(dirtyPages, &(p->id), sizeof(int));
//assert(!ret); <--- Due to a bug in the PBL compatibility mode,
//there is no way to tell whether the value didn't exist, or if it
@ -43,6 +44,14 @@ void dirtyPages_remove(Page * p) {
pthread_mutex_unlock(&dirtyPages_mutex);
}
int dirtyPages_isDirty(Page * p) {
int ret;
pthread_mutex_lock(&dirtyPages_mutex);
ret = p->dirty;
pthread_mutex_unlock(&dirtyPages_mutex);
return ret;
}
static lsn_t dirtyPages_minRecLSN() {
lsn_t lsn = LogFlushedLSN ();
int* pageid;
@ -81,10 +90,7 @@ static void dirtyPages_flush() {
for(i = 0; i < MAX_BUFFER_SIZE && staleDirtyPages[i] != -1; i++) {
p = loadPage(-1, staleDirtyPages[i]);
//if(p->dirty) {
pageWrite(p);
// dirtyPages_remove(p);
//}
releasePage(p);
}
free(staleDirtyPages);