Fixed logWriter abort performance bug, also cleaned up the file, simplified locking, etc.

This commit is contained in:
Sears Russell 2005-04-14 04:23:22 +00:00
parent a053b19757
commit 6b95cefc62

View file

@ -42,6 +42,7 @@ terms specified in this license.
#include <sys/types.h>
#include <sys/stat.h>
#include <stdio.h>
/** For O_DIRECT. It's unclear that this is the correct thing to #define, but it works under linux. */
#define __USE_GNU
#include <fcntl.h>
@ -64,11 +65,11 @@ terms specified in this license.
#include <lladd/bufferManager.h>
byte * logBuffer = 0;
/**
@todo Should the log file be global?
*/
static FILE * log;
static FILE * log = 0;
static int roLogFD = 0;
/**
@see flushedLSN()
@ -84,49 +85,29 @@ static lsn_t flushedLSN_val;
static rwl * flushedLSN_lock;
/**
Before writeLogEntry is called, this value is 0. Once writeLogEntry
is called, it is the next available LSN.
@see writeLogEntry
*/
static lsn_t nextAvailableLSN = 0;
static lsn_t writtenLSN_val = 0;
static int bufferedSize = 0;
/**
Invariant: writeLogEntry() must be able to atomicly read
nextAvailableLSN, and then update it. (This lock does not have to
be held while we're waiting for fwrite() to return.)
*/
static rwl * nextAvailableLSN_lock;
/**
The global offset for the current version of the log file.
*/
static lsn_t global_offset;
/**
Invariant: Any thread reading from the file must call flockfile()
if it needs the file position to be preserved across calls. (For
example, when using fseek(); myFseek() does this, but only
internally, so if it is used to position the stream, it should be
guarded with flockfile(). Unfortunately, it appears as though we
cannot use flockfile() on some systems, because this sequence does
not behave correctly:
flockfile(foo);
fclose(foo);
fopen(foo);
funlockfile(foo);
Oh well.
This mutex makes sequences of calls to lseek() and read() atomic.
It is also used by truncateLog to block read requests while
rename() is called.
*/
static rwl * log_read_lock;
pthread_mutex_t log_read_mutex;
/**
Invariant: Any thread writing to the file must hold this lock. The
log truncation thread hold this lock from the point where it copies
the tail of the old log to the new log, until after the rename call
returns.
returns. This mutex also protects nextAvailableLSN.
*/
pthread_mutex_t log_write_mutex;
@ -135,61 +116,66 @@ pthread_mutex_t log_write_mutex;
*/
pthread_mutex_t truncateLog_mutex;
/** If 1, flushLog needs to call fseek the next time it is called. */
static int sought = 1;
static char * buffer;
int openLogWriter() {
//#define BUFSIZE (1024*96)
/** The size of the in-memory log buffer. When the buffer is full,
the log is synchronously flushed to disk. */
#define BUFSIZE (1024 * 1024)
//#define BUFSIZE (1024*96)
//#define BUFSIZE (512)
//char * buffer ;/*= malloc(BUFSIZE);*/
/* int ret = posix_memalign((void*)&(buffer), PAGE_SIZE, BUFSIZE);
assert(!ret); */
char * buffer = malloc(BUFSIZE);
int openLogWriter() {
buffer = malloc(BUFSIZE);
if(!buffer) { return LLADD_NO_MEM; }
int logFD = open (LOG_FILE, O_CREAT | O_RDWR | O_APPEND | O_SYNC, S_IRWXU | S_IRWXG | S_IRWXO);
/* The file is opened twice for a reason. fseek() seems to call
fflush() under Linux, which normally would be a minor problem.
However, we open the log with O_SYNC, so the fflush() results in
synchronous disk writes. Therefore, all read accesses (and
therefore all seeks) run through the second descriptor. */
int logFD = open (LOG_FILE, O_CREAT | O_WRONLY | O_APPEND | O_SYNC, S_IRWXU | S_IRWXG | S_IRWXO);
if(logFD == -1) {
perror("Couldn't open log file (A)");
perror("Couldn't open log file for append.\n");
abort();
}
log = fdopen(logFD, "a+");
// log = fopen(LOG_FILE, "a+");
log = fdopen(logFD, "a");
if (log==NULL) {
perror("Couldn't open log file");
// abort();
/*there was an error opening this file */
return LLADD_IO_ERROR; //FILE_WRITE_OPEN_ERROR;
abort();
return LLADD_IO_ERROR;
}
setbuffer(log, buffer, BUFSIZE);
/* fread() doesn't notice when another handle writes to its file,
even if fflush() is used to push the changes out to disk.
Therefore, we use a file descriptor, and read() instead of a FILE
and fread(). */
roLogFD = open (LOG_FILE, O_RDONLY, 0);
if(roLogFD == -1) {
perror("Couldn't open log file for reads.\n");
}
/* Initialize locks. */
flushedLSN_lock = initlock();
nextAvailableLSN_lock = initlock();
log_read_lock = initlock();
pthread_mutex_init(&log_read_mutex, NULL);
pthread_mutex_init(&log_write_mutex, NULL);
pthread_mutex_init(&truncateLog_mutex, NULL);
flushedLSN_val = 0;
nextAvailableLSN = 0;
bufferedSize = 0;
writtenLSN_val= 0;
/* maxLSNEncountered = sizeof(lsn_t);
writeLogEntryIsReady = 0; */
/* Note that the position of the file between calls to this library
does not matter, since none of the functions in logWriter.h
assume anything about the position of the stream before they are
called.
However, we need to do this seek to check if the file is empty.
/*
Seek append only log to the end of the file. This is unnecessary,
since the file was opened in append only mode, but it returns the
length of the file.
*/
if (myFseek(log, 0, SEEK_END)==0) {
@ -203,16 +189,24 @@ int openLogWriter() {
if(nmemb != 1) {
perror("Couldn't start new log file!");
// assert(0);
return LLADD_IO_ERROR; //FILE_WRITE_OPEN_ERROR;
return LLADD_IO_ERROR;
}
global_offset = 0;
} else {
int count;
myFseek(log, 0, SEEK_SET);
count = fread(&global_offset, sizeof(lsn_t), 1, log);
assert(count == 1);
off_t newPosition = lseek(roLogFD, 0, SEEK_SET);
if(newPosition == -1) {
perror("Could not seek to head of log");
}
int bytesRead = read(roLogFD, &global_offset, sizeof(lsn_t));
if(bytesRead != sizeof(lsn_t)) {
printf("Could not read log header.");
abort();
}
}
sought =1;
return 0;
}
@ -234,33 +228,21 @@ int openLogWriter() {
The first time writeLogEntry is called, we seek from the highest
LSN encountered so far to the end of the log.
@todo writeLogEntry implicitly ignores all log entries with xid = -1.
This is probably the wrong thing to do...
*/
static int flushLog();
//static int flushLog();
int writeLogEntry(LogEntry * e) {
const long size = sizeofLogEntry(e);
if(e->type == UPDATELOG) {
/* addPendingEvent(e->contents.update.rid.page); */
}
if(e->type == CLRLOG) {
/* addPendingEvent(e->contents.clr.rid.page); */
}
if(e->xid == -1) { /* Don't write log entries for recovery xacts. */
e->LSN = -1;
return 0;
}
/* Need to prevent other writers from messing with nextAvailableLSN.
The log_write_mutex only blocks log truncation and writeLogEntry,
so it's exactly what we want here. .*/
pthread_mutex_lock(&log_write_mutex);
if(!nextAvailableLSN) {
/* if(!writeLogEntryIsReady) { */
LogHandle lh;
LogEntry * le;
@ -273,99 +255,46 @@ int writeLogEntry(LogEntry * e) {
}
}
writelock(log_read_lock, 100);
/* Set the log entry's LSN. */
#ifdef DEBUGGING
e->LSN = myFseek(log, 0, SEEK_END) + global_offset;
sought = 1;
if(nextAvailableLSN != e->LSN) {
assert(nextAvailableLSN <= e->LSN);
DEBUG("Detected log truncation: nextAvailableLSN = %ld, but log length is %ld.\n", (long)nextAvailableLSN, e->LSN);
}
#endif
e->LSN = nextAvailableLSN;
/* We have the write lock, so no one else can call fseek behind our back. */
/* flockfile(log); */ /* Prevent other threads from calling fseek... */
nextAvailableLSN += (size + sizeof(long));
int oldBufferedSize = bufferedSize;
bufferedSize += (size + sizeof(long));
logBuffer = realloc(logBuffer, size + sizeof(long));
if(! logBuffer) {
abort();
}
memcpy(logBuffer + oldBufferedSize, &size, sizeof(long));
memcpy(logBuffer + oldBufferedSize + sizeof(long), e, size);
flushLog();
pthread_mutex_unlock(&log_write_mutex);
writeunlock(log_read_lock);
/* We're done. */
return 0;
}
/**
Preliminary version of a function that will write multiple log
entries at once. It turns out that there are some nasty
interactions between write() calls and readLSN, and locking, so
this currently only writes one entry at a time. (If this function
weren't designed to bundle log entries together, it would not make
such heavy use of global variables...)
This function should only be called when the calling thread holds
a write lock on log_read_lock. In theory, this function will not
block on I/O, although in the case of long running-transactions,
log's buffer may fill up. Since we have opened the file with
O_SYNC, this function may block on disk I/O.
*/
static int flushLog() {
if (!logBuffer) { return 0;}
if(sought) {
fseek(log, writtenLSN_val /*nextAvailableLSN*/ - global_offset, SEEK_SET);
sought = 0;
int nmemb = fwrite(&size, sizeof(long), 1, log);
if(nmemb != 1) {
if(feof(log)) { abort(); /* feof makes no sense here */ }
if(ferror(log)) {
fprintf(stderr, "writeLog couldn't write next log entry: %d\n", ferror(log));
abort();
}
return LLADD_IO_ERROR;
}
int nmemb = fwrite(logBuffer, bufferedSize, 1, log);
writtenLSN_val += bufferedSize;
bufferedSize = 0;
nmemb = fwrite(e, size, 1, log);
if(nmemb != 1) {
perror("writeLog couldn't write next log entry!");
assert(0);
return LLADD_IO_ERROR; // FILE_WRITE_ERROR;
if(feof(log)) { abort(); /* feof makes no sense here */ }
if(ferror(log)) {
fprintf(stderr, "writeLog couldn't write next log entry: %d\n", ferror(log));
abort();
}
return LLADD_IO_ERROR;
}
return 0;
pthread_mutex_unlock(&log_write_mutex);
return 0;
}
void syncLog() {
lsn_t newFlushedLSN;
writelock(log_read_lock, 0);
if(sought) {
newFlushedLSN = myFseek(log, 0, SEEK_END);
sought = 1;
} else {
newFlushedLSN = ftell(log);
}
writeunlock(log_read_lock);
/* Wait to set the static variable until after the flush returns. */
newFlushedLSN = ftell(log);
// Wait to set the static variable until after the flush returns.
// Since we opened the logfile with O_SYNC, fflush() is sufficient.
fflush(log);
// Since we open the logfile with O_SYNC, fflush suffices.
#ifdef HAVE_FDATASYNC
/* Should be available in linux >= 2.4 */
// fdatasync(fileno(log));
#else
/* Slow - forces fs implementation to sync the file metadata to disk */
// fsync(fileno(log));
#endif
writelock(flushedLSN_lock, 0);
if(newFlushedLSN > flushedLSN_val) {
@ -375,9 +304,8 @@ void syncLog() {
}
lsn_t flushedLSN() {
lsn_t ret;
readlock(flushedLSN_lock, 0);
ret = flushedLSN_val;
lsn_t ret = flushedLSN_val;
readunlock(flushedLSN_lock);
return ret;
}
@ -385,16 +313,18 @@ lsn_t flushedLSN() {
void closeLogWriter() {
/* Get the whole thing to the disk before closing it. */
syncLog();
fclose(log);
close(roLogFD);
log = NULL;
/* Free locks. */
deletelock(flushedLSN_lock);
deletelock(nextAvailableLSN_lock);
deletelock(log_read_lock);
pthread_mutex_destroy(&log_read_mutex);
pthread_mutex_destroy(&log_write_mutex);
pthread_mutex_destroy(&truncateLog_mutex);
free (buffer); // breaks efence. :(
free(buffer);
}
void deleteLogWriter() {
@ -402,61 +332,49 @@ void deleteLogWriter() {
}
static LogEntry * readLogEntry() {
LogEntry * ret = NULL;
LogEntry * ret = 0;
long size;
// assert(!posix_memalign((void*)&(size), 512, sizeof(long)));
long entrySize;
int nmemb;
if(feof(log)) {
return NULL;
}
int bytesRead = read(roLogFD, &size, sizeof(long));
nmemb = fread(&size, sizeof(long), 1, log);
if(nmemb != 1) {
if(feof(log)) {
if(bytesRead != sizeof(long)) {
if(bytesRead == 0) {
// fprintf(stderr, "eof on log entry size\n");
// fflush(stderr);
return NULL;
}
if(ferror(log)) {
perror("Error reading log");
} else if(bytesRead == -1) {
perror("error reading log");
abort();
return (LogEntry*)LLADD_IO_ERROR;
} else {
fprintf(stderr, "short read from log. Expected %d bytes, got %d.\nFIXME: This is 'normal', but currently not handled", sizeof(long), bytesRead);
fflush(stderr);
abort(); // really abort here. This code should attempt to piece together short log reads...
}
}
// assert(!posix_memalign(&ret, 512, (*size)));
ret = malloc(size);
nmemb = fread(ret, size, 1, log);
bytesRead = read(roLogFD, ret, size);
if(nmemb != 1) {
/* Partial log entry. */
if(feof(log)) {
free(ret);
return NULL;
}
if(ferror(log)) {
perror("Error reading log!");
free(ret);
if(bytesRead != size) {
if(bytesRead == 0) {
// fprintf(stderr, "eof reading entry\n");
// fflush(stderr);
return(NULL);
} else if(bytesRead == -1) {
perror("error reading log");
abort();
return (LogEntry*)LLADD_IO_ERROR;
} else {
printf("short read from log. Expected %ld bytes, got %d.\nFIXME: This is 'normal', but currently not handled", size, bytesRead);
fflush(stderr);
abort();
return (LogEntry*)LLADD_IO_ERROR;
}
perror("Unknown error in readLogEntry");
return (LogEntry*)LLADD_IO_ERROR;
}
entrySize = sizeofLogEntry(ret);
// This sanity check makes no sense -- sizeOfLogEntry() has nothing
// to do with the number of bytes read. */
/** Sanity check -- Did we get the whole entry? */
// if(size < entrySize) {
/* Read partial entry. */
// free(ret);
// return 0;
// }
assert(size == entrySize);
return ret;
@ -465,16 +383,27 @@ static LogEntry * readLogEntry() {
LogEntry * readLSNEntry(lsn_t LSN) {
LogEntry * ret;
/* Irritating overhead; two mutex acquires to do a read. */
readlock(log_read_lock, 200);
/** Because we use two file descriptors to access the log, we need
to flush the log write buffer before concluding we're at EOF. */
if(flushedLSN() <= LSN) {
// fprintf(stderr, "Syncing log flushed = %d, requested = %d\n", flushedLSN(), LSN);
syncLog();
// fprintf(stderr, "Synced log flushed = %d, requested = %d\n", flushedLSN(), LSN);
}
pthread_mutex_lock(&log_read_mutex);
off_t newPosition = lseek(roLogFD, LSN - global_offset, SEEK_SET);
if(newPosition == -1) {
perror("Could not seek for log read");
} else {
// fprintf(stderr, "sought to %d\n", (int)newPosition);
// fflush(stderr);
}
flockfile(log);
fseek(log, LSN - global_offset, SEEK_SET);
sought = 1;
ret = readLogEntry();
funlockfile(log);
readunlock(log_read_lock);
pthread_mutex_unlock(&log_read_mutex);
return ret;
@ -483,14 +412,11 @@ LogEntry * readLSNEntry(lsn_t LSN) {
int truncateLog(lsn_t LSN) {
FILE *tmpLog;
LogEntry * le;
LogHandle lh;
long size;
/* int count; */
pthread_mutex_lock(&truncateLog_mutex);
if(global_offset + 4 >= LSN) {
@ -503,9 +429,9 @@ int truncateLog(lsn_t LSN) {
tmpLog = fopen(LOG_FILE_SCRATCH, "w+"); /* w+ = truncate, and open for writing. */
if (tmpLog==NULL) {
/*there was an error opening this file */
pthread_mutex_unlock(&truncateLog_mutex);
perror("logTruncate() couldn't create scratch log file!");
abort();
return LLADD_IO_ERROR;
}
@ -521,15 +447,13 @@ int truncateLog(lsn_t LSN) {
LSN += sizeof(lsn_t);
/**
@todo We block writers too early. Instead, read until EOF, then
@todo truncateLog blocks writers too early. Instead, read until EOF, then
lock, and then finish the truncate.
*/
pthread_mutex_lock(&log_write_mutex);
lh = getLSNHandle(LSN);
while((le = nextInLog(&lh))) {
size = sizeofLogEntry(le);
myFwrite(&size, sizeof(lsn_t), tmpLog);
@ -537,24 +461,26 @@ int truncateLog(lsn_t LSN) {
free (le);
}
writelock(log_read_lock, 300);
fflush(tmpLog);
#ifdef HAVE_FDATASYNC
// fdatasync(fileno(tmpLog));
fdatasync(fileno(tmpLog));
#else
// fsync(fileno(tmpLog));
fsync(fileno(tmpLog));
#endif
/** Time to shut out the readers */
/* flockfile(log); --- Don't need this; we hold the writelock. */
pthread_mutex_lock(&log_read_mutex);
fclose(log); /* closeLogWriter calls sync, but we don't need to. :) */
/* closeLogWriter calls sync, and does some extra stuff that we don't want, so we
basicly re-implement closeLogWriter and openLogWriter here...
*/
fclose(log);
close(roLogFD);
fclose(tmpLog);
if(rename(LOG_FILE_SCRATCH, LOG_FILE)) {
writeunlock(log_read_lock);
pthread_mutex_unlock(&log_read_mutex);
pthread_mutex_unlock(&log_write_mutex);
pthread_mutex_unlock(&truncateLog_mutex);
@ -562,21 +488,42 @@ int truncateLog(lsn_t LSN) {
return LLADD_IO_ERROR;
}
int logFD = open (LOG_FILE, O_CREAT | O_WRONLY | O_APPEND | O_SYNC, S_IRWXU | S_IRWXG | S_IRWXO);
if(logFD == -1) {
perror("Couldn't open log file for append.\n");
abort();
}
log = fdopen(logFD, "a");
log = fopen(LOG_FILE, "a+");
if (log==NULL) {
perror("Couldn't open log file");
abort();
return LLADD_IO_ERROR;
}
writeunlock(log_read_lock);
setbuffer(log, buffer, BUFSIZE);
roLogFD = open (LOG_FILE, O_RDONLY, 0);
if(roLogFD == -1) {
perror("Couldn't open log file for reads.\n");
abort();
return LLADD_IO_ERROR;
}
if (log==NULL) {
pthread_mutex_unlock(&log_read_mutex);
pthread_mutex_unlock(&log_write_mutex);
pthread_mutex_unlock(&truncateLog_mutex);
perror("Couldn't reopen log after truncate");
abort();
return LLADD_IO_ERROR;
}
global_offset = LSN - sizeof(lsn_t);
writeunlock(log_read_lock);
pthread_mutex_unlock(&log_read_mutex);
pthread_mutex_unlock(&log_write_mutex);
pthread_mutex_unlock(&truncateLog_mutex);
@ -585,5 +532,6 @@ int truncateLog(lsn_t LSN) {
}
lsn_t firstLogEntry() {
assert(log);
return global_offset + sizeof(lsn_t);
}