From 6b95cefc6238e5f3607d28ea19fdb4f5cb148361 Mon Sep 17 00:00:00 2001 From: Sears Russell Date: Thu, 14 Apr 2005 04:23:22 +0000 Subject: [PATCH] Fixed logWriter abort performance bug, also cleaned up the file, simplified locking, etc. --- src/lladd/logger/logWriter.c | 400 +++++++++++++++-------------------- 1 file changed, 174 insertions(+), 226 deletions(-) diff --git a/src/lladd/logger/logWriter.c b/src/lladd/logger/logWriter.c index c3670f2..70d0f6f 100644 --- a/src/lladd/logger/logWriter.c +++ b/src/lladd/logger/logWriter.c @@ -42,6 +42,7 @@ terms specified in this license. #include #include #include + /** For O_DIRECT. It's unclear that this is the correct thing to #define, but it works under linux. */ #define __USE_GNU #include @@ -64,11 +65,11 @@ terms specified in this license. #include -byte * logBuffer = 0; /** @todo Should the log file be global? */ -static FILE * log; +static FILE * log = 0; +static int roLogFD = 0; /** @see flushedLSN() @@ -84,49 +85,29 @@ static lsn_t flushedLSN_val; static rwl * flushedLSN_lock; /** - Before writeLogEntry is called, this value is 0. Once writeLogEntry is called, it is the next available LSN. @see writeLogEntry */ static lsn_t nextAvailableLSN = 0; -static lsn_t writtenLSN_val = 0; -static int bufferedSize = 0; -/** - Invariant: writeLogEntry() must be able to atomicly read - nextAvailableLSN, and then update it. (This lock does not have to - be held while we're waiting for fwrite() to return.) -*/ -static rwl * nextAvailableLSN_lock; /** The global offset for the current version of the log file. */ static lsn_t global_offset; /** - Invariant: Any thread reading from the file must call flockfile() - if it needs the file position to be preserved across calls. (For - example, when using fseek(); myFseek() does this, but only - internally, so if it is used to position the stream, it should be - guarded with flockfile(). Unfortunately, it appears as though we - cannot use flockfile() on some systems, because this sequence does - not behave correctly: - - flockfile(foo); - fclose(foo); - fopen(foo); - funlockfile(foo); - - Oh well. + This mutex makes sequences of calls to lseek() and read() atomic. + It is also used by truncateLog to block read requests while + rename() is called. */ -static rwl * log_read_lock; +pthread_mutex_t log_read_mutex; /** Invariant: Any thread writing to the file must hold this lock. The log truncation thread hold this lock from the point where it copies the tail of the old log to the new log, until after the rename call - returns. + returns. This mutex also protects nextAvailableLSN. */ pthread_mutex_t log_write_mutex; @@ -135,61 +116,66 @@ pthread_mutex_t log_write_mutex; */ pthread_mutex_t truncateLog_mutex; -/** If 1, flushLog needs to call fseek the next time it is called. */ -static int sought = 1; static char * buffer; -int openLogWriter() { - //#define BUFSIZE (1024*96) + +/** The size of the in-memory log buffer. When the buffer is full, + the log is synchronously flushed to disk. */ #define BUFSIZE (1024 * 1024) +//#define BUFSIZE (1024*96) //#define BUFSIZE (512) - //char * buffer ;/*= malloc(BUFSIZE);*/ - /* int ret = posix_memalign((void*)&(buffer), PAGE_SIZE, BUFSIZE); - assert(!ret); */ - char * buffer = malloc(BUFSIZE); + +int openLogWriter() { + + buffer = malloc(BUFSIZE); if(!buffer) { return LLADD_NO_MEM; } - int logFD = open (LOG_FILE, O_CREAT | O_RDWR | O_APPEND | O_SYNC, S_IRWXU | S_IRWXG | S_IRWXO); + /* The file is opened twice for a reason. fseek() seems to call + fflush() under Linux, which normally would be a minor problem. + However, we open the log with O_SYNC, so the fflush() results in + synchronous disk writes. Therefore, all read accesses (and + therefore all seeks) run through the second descriptor. */ + + int logFD = open (LOG_FILE, O_CREAT | O_WRONLY | O_APPEND | O_SYNC, S_IRWXU | S_IRWXG | S_IRWXO); if(logFD == -1) { - perror("Couldn't open log file (A)"); + perror("Couldn't open log file for append.\n"); abort(); } - log = fdopen(logFD, "a+"); - // log = fopen(LOG_FILE, "a+"); + log = fdopen(logFD, "a"); if (log==NULL) { perror("Couldn't open log file"); - // abort(); - /*there was an error opening this file */ - return LLADD_IO_ERROR; //FILE_WRITE_OPEN_ERROR; + abort(); + return LLADD_IO_ERROR; } setbuffer(log, buffer, BUFSIZE); + /* fread() doesn't notice when another handle writes to its file, + even if fflush() is used to push the changes out to disk. + Therefore, we use a file descriptor, and read() instead of a FILE + and fread(). */ + roLogFD = open (LOG_FILE, O_RDONLY, 0); + + if(roLogFD == -1) { + perror("Couldn't open log file for reads.\n"); + } /* Initialize locks. */ flushedLSN_lock = initlock(); - nextAvailableLSN_lock = initlock(); - log_read_lock = initlock(); + pthread_mutex_init(&log_read_mutex, NULL); pthread_mutex_init(&log_write_mutex, NULL); pthread_mutex_init(&truncateLog_mutex, NULL); + flushedLSN_val = 0; nextAvailableLSN = 0; - bufferedSize = 0; - writtenLSN_val= 0; - - /* maxLSNEncountered = sizeof(lsn_t); - writeLogEntryIsReady = 0; */ - - /* Note that the position of the file between calls to this library - does not matter, since none of the functions in logWriter.h - assume anything about the position of the stream before they are - called. - - However, we need to do this seek to check if the file is empty. + /* + Seek append only log to the end of the file. This is unnecessary, + since the file was opened in append only mode, but it returns the + length of the file. */ if (myFseek(log, 0, SEEK_END)==0) { @@ -203,16 +189,24 @@ int openLogWriter() { if(nmemb != 1) { perror("Couldn't start new log file!"); // assert(0); - return LLADD_IO_ERROR; //FILE_WRITE_OPEN_ERROR; + return LLADD_IO_ERROR; } global_offset = 0; } else { - int count; - myFseek(log, 0, SEEK_SET); - count = fread(&global_offset, sizeof(lsn_t), 1, log); - assert(count == 1); + + off_t newPosition = lseek(roLogFD, 0, SEEK_SET); + if(newPosition == -1) { + perror("Could not seek to head of log"); + } + + int bytesRead = read(roLogFD, &global_offset, sizeof(lsn_t)); + + if(bytesRead != sizeof(lsn_t)) { + printf("Could not read log header."); + abort(); + } + } - sought =1; return 0; } @@ -233,34 +227,22 @@ int openLogWriter() { The first time writeLogEntry is called, we seek from the highest LSN encountered so far to the end of the log. + + @todo writeLogEntry implicitly ignores all log entries with xid = -1. + This is probably the wrong thing to do... */ -static int flushLog(); +//static int flushLog(); int writeLogEntry(LogEntry * e) { const long size = sizeofLogEntry(e); - if(e->type == UPDATELOG) { - /* addPendingEvent(e->contents.update.rid.page); */ - } - if(e->type == CLRLOG) { - /* addPendingEvent(e->contents.clr.rid.page); */ - } - - if(e->xid == -1) { /* Don't write log entries for recovery xacts. */ - e->LSN = -1; - return 0; - } - - /* Need to prevent other writers from messing with nextAvailableLSN. - The log_write_mutex only blocks log truncation and writeLogEntry, - so it's exactly what we want here. .*/ pthread_mutex_lock(&log_write_mutex); if(!nextAvailableLSN) { - /* if(!writeLogEntryIsReady) { */ + LogHandle lh; LogEntry * le; @@ -273,99 +255,46 @@ int writeLogEntry(LogEntry * e) { } } - writelock(log_read_lock, 100); - /* Set the log entry's LSN. */ - -#ifdef DEBUGGING - e->LSN = myFseek(log, 0, SEEK_END) + global_offset; - sought = 1; - if(nextAvailableLSN != e->LSN) { - assert(nextAvailableLSN <= e->LSN); - DEBUG("Detected log truncation: nextAvailableLSN = %ld, but log length is %ld.\n", (long)nextAvailableLSN, e->LSN); - } -#endif - e->LSN = nextAvailableLSN; - /* We have the write lock, so no one else can call fseek behind our back. */ - /* flockfile(log); */ /* Prevent other threads from calling fseek... */ - nextAvailableLSN += (size + sizeof(long)); - int oldBufferedSize = bufferedSize; - bufferedSize += (size + sizeof(long)); - - logBuffer = realloc(logBuffer, size + sizeof(long)); - if(! logBuffer) { - abort(); - } - memcpy(logBuffer + oldBufferedSize, &size, sizeof(long)); - memcpy(logBuffer + oldBufferedSize + sizeof(long), e, size); - - flushLog(); - pthread_mutex_unlock(&log_write_mutex); - writeunlock(log_read_lock); + int nmemb = fwrite(&size, sizeof(long), 1, log); + if(nmemb != 1) { + if(feof(log)) { abort(); /* feof makes no sense here */ } + if(ferror(log)) { + fprintf(stderr, "writeLog couldn't write next log entry: %d\n", ferror(log)); + abort(); + } + return LLADD_IO_ERROR; + } - /* We're done. */ - return 0; -} -/** - Preliminary version of a function that will write multiple log - entries at once. It turns out that there are some nasty - interactions between write() calls and readLSN, and locking, so - this currently only writes one entry at a time. (If this function - weren't designed to bundle log entries together, it would not make - such heavy use of global variables...) - - This function should only be called when the calling thread holds - a write lock on log_read_lock. In theory, this function will not - block on I/O, although in the case of long running-transactions, - log's buffer may fill up. Since we have opened the file with - O_SYNC, this function may block on disk I/O. -*/ -static int flushLog() { - if (!logBuffer) { return 0;} - - if(sought) { - fseek(log, writtenLSN_val /*nextAvailableLSN*/ - global_offset, SEEK_SET); - sought = 0; - } - - int nmemb = fwrite(logBuffer, bufferedSize, 1, log); - writtenLSN_val += bufferedSize; - bufferedSize = 0; + nmemb = fwrite(e, size, 1, log); if(nmemb != 1) { - perror("writeLog couldn't write next log entry!"); - assert(0); - return LLADD_IO_ERROR; // FILE_WRITE_ERROR; + if(feof(log)) { abort(); /* feof makes no sense here */ } + if(ferror(log)) { + fprintf(stderr, "writeLog couldn't write next log entry: %d\n", ferror(log)); + abort(); + } + return LLADD_IO_ERROR; } - return 0; + pthread_mutex_unlock(&log_write_mutex); + + return 0; } void syncLog() { lsn_t newFlushedLSN; - writelock(log_read_lock, 0); - if(sought) { - newFlushedLSN = myFseek(log, 0, SEEK_END); - sought = 1; - } else { - newFlushedLSN = ftell(log); - } - writeunlock(log_read_lock); - /* Wait to set the static variable until after the flush returns. */ + + newFlushedLSN = ftell(log); + // Wait to set the static variable until after the flush returns. + + // Since we opened the logfile with O_SYNC, fflush() is sufficient. fflush(log); - // Since we open the logfile with O_SYNC, fflush suffices. -#ifdef HAVE_FDATASYNC - /* Should be available in linux >= 2.4 */ - // fdatasync(fileno(log)); -#else - /* Slow - forces fs implementation to sync the file metadata to disk */ - // fsync(fileno(log)); -#endif writelock(flushedLSN_lock, 0); if(newFlushedLSN > flushedLSN_val) { @@ -375,9 +304,8 @@ void syncLog() { } lsn_t flushedLSN() { - lsn_t ret; readlock(flushedLSN_lock, 0); - ret = flushedLSN_val; + lsn_t ret = flushedLSN_val; readunlock(flushedLSN_lock); return ret; } @@ -385,16 +313,18 @@ lsn_t flushedLSN() { void closeLogWriter() { /* Get the whole thing to the disk before closing it. */ syncLog(); + fclose(log); + close(roLogFD); + log = NULL; /* Free locks. */ deletelock(flushedLSN_lock); - deletelock(nextAvailableLSN_lock); - deletelock(log_read_lock); + pthread_mutex_destroy(&log_read_mutex); pthread_mutex_destroy(&log_write_mutex); pthread_mutex_destroy(&truncateLog_mutex); - free (buffer); // breaks efence. :( + free(buffer); } void deleteLogWriter() { @@ -402,61 +332,49 @@ void deleteLogWriter() { } static LogEntry * readLogEntry() { - LogEntry * ret = NULL; + LogEntry * ret = 0; long size; - // assert(!posix_memalign((void*)&(size), 512, sizeof(long))); long entrySize; - int nmemb; - if(feof(log)) { - return NULL; - } + int bytesRead = read(roLogFD, &size, sizeof(long)); - nmemb = fread(&size, sizeof(long), 1, log); - - if(nmemb != 1) { - if(feof(log)) { + if(bytesRead != sizeof(long)) { + if(bytesRead == 0) { + // fprintf(stderr, "eof on log entry size\n"); + // fflush(stderr); return NULL; - } - if(ferror(log)) { - perror("Error reading log"); + } else if(bytesRead == -1) { + perror("error reading log"); + abort(); return (LogEntry*)LLADD_IO_ERROR; + } else { + fprintf(stderr, "short read from log. Expected %d bytes, got %d.\nFIXME: This is 'normal', but currently not handled", sizeof(long), bytesRead); + fflush(stderr); + abort(); // really abort here. This code should attempt to piece together short log reads... } } - - // assert(!posix_memalign(&ret, 512, (*size))); ret = malloc(size); - nmemb = fread(ret, size, 1, log); + bytesRead = read(roLogFD, ret, size); - if(nmemb != 1) { - /* Partial log entry. */ - if(feof(log)) { - free(ret); - return NULL; - } - if(ferror(log)) { - perror("Error reading log!"); - free(ret); + if(bytesRead != size) { + if(bytesRead == 0) { + // fprintf(stderr, "eof reading entry\n"); + // fflush(stderr); + return(NULL); + } else if(bytesRead == -1) { + perror("error reading log"); + abort(); return (LogEntry*)LLADD_IO_ERROR; - } - perror("Unknown error in readLogEntry"); - return (LogEntry*)LLADD_IO_ERROR; + } else { + printf("short read from log. Expected %ld bytes, got %d.\nFIXME: This is 'normal', but currently not handled", size, bytesRead); + fflush(stderr); + abort(); + return (LogEntry*)LLADD_IO_ERROR; + } } - entrySize = sizeofLogEntry(ret); - - // This sanity check makes no sense -- sizeOfLogEntry() has nothing - // to do with the number of bytes read. */ - - /** Sanity check -- Did we get the whole entry? */ - // if(size < entrySize) { - /* Read partial entry. */ - // free(ret); - // return 0; - // } - assert(size == entrySize); return ret; @@ -465,16 +383,27 @@ static LogEntry * readLogEntry() { LogEntry * readLSNEntry(lsn_t LSN) { LogEntry * ret; - /* Irritating overhead; two mutex acquires to do a read. */ - readlock(log_read_lock, 200); + /** Because we use two file descriptors to access the log, we need + to flush the log write buffer before concluding we're at EOF. */ + if(flushedLSN() <= LSN) { + // fprintf(stderr, "Syncing log flushed = %d, requested = %d\n", flushedLSN(), LSN); + syncLog(); + // fprintf(stderr, "Synced log flushed = %d, requested = %d\n", flushedLSN(), LSN); + } + + pthread_mutex_lock(&log_read_mutex); + + off_t newPosition = lseek(roLogFD, LSN - global_offset, SEEK_SET); + if(newPosition == -1) { + perror("Could not seek for log read"); + } else { + // fprintf(stderr, "sought to %d\n", (int)newPosition); + // fflush(stderr); + } - flockfile(log); - fseek(log, LSN - global_offset, SEEK_SET); - sought = 1; ret = readLogEntry(); - funlockfile(log); - readunlock(log_read_lock); + pthread_mutex_unlock(&log_read_mutex); return ret; @@ -483,14 +412,11 @@ LogEntry * readLSNEntry(lsn_t LSN) { int truncateLog(lsn_t LSN) { FILE *tmpLog; - LogEntry * le; LogHandle lh; long size; - /* int count; */ - pthread_mutex_lock(&truncateLog_mutex); if(global_offset + 4 >= LSN) { @@ -503,9 +429,9 @@ int truncateLog(lsn_t LSN) { tmpLog = fopen(LOG_FILE_SCRATCH, "w+"); /* w+ = truncate, and open for writing. */ if (tmpLog==NULL) { - /*there was an error opening this file */ pthread_mutex_unlock(&truncateLog_mutex); perror("logTruncate() couldn't create scratch log file!"); + abort(); return LLADD_IO_ERROR; } @@ -521,15 +447,13 @@ int truncateLog(lsn_t LSN) { LSN += sizeof(lsn_t); /** - @todo We block writers too early. Instead, read until EOF, then + @todo truncateLog blocks writers too early. Instead, read until EOF, then lock, and then finish the truncate. */ pthread_mutex_lock(&log_write_mutex); - lh = getLSNHandle(LSN); - while((le = nextInLog(&lh))) { size = sizeofLogEntry(le); myFwrite(&size, sizeof(lsn_t), tmpLog); @@ -537,46 +461,69 @@ int truncateLog(lsn_t LSN) { free (le); } - writelock(log_read_lock, 300); - fflush(tmpLog); #ifdef HAVE_FDATASYNC - // fdatasync(fileno(tmpLog)); + fdatasync(fileno(tmpLog)); #else - // fsync(fileno(tmpLog)); + fsync(fileno(tmpLog)); #endif /** Time to shut out the readers */ - /* flockfile(log); --- Don't need this; we hold the writelock. */ + pthread_mutex_lock(&log_read_mutex); - fclose(log); /* closeLogWriter calls sync, but we don't need to. :) */ + /* closeLogWriter calls sync, and does some extra stuff that we don't want, so we + basicly re-implement closeLogWriter and openLogWriter here... + */ + fclose(log); + close(roLogFD); fclose(tmpLog); if(rename(LOG_FILE_SCRATCH, LOG_FILE)) { - writeunlock(log_read_lock); + pthread_mutex_unlock(&log_read_mutex); pthread_mutex_unlock(&log_write_mutex); pthread_mutex_unlock(&truncateLog_mutex); perror("Error replacing old log file with new log file"); return LLADD_IO_ERROR; } + + int logFD = open (LOG_FILE, O_CREAT | O_WRONLY | O_APPEND | O_SYNC, S_IRWXU | S_IRWXG | S_IRWXO); + if(logFD == -1) { + perror("Couldn't open log file for append.\n"); + abort(); + } + log = fdopen(logFD, "a"); + + if (log==NULL) { + perror("Couldn't open log file"); + abort(); + return LLADD_IO_ERROR; + } + + setbuffer(log, buffer, BUFSIZE); + + roLogFD = open (LOG_FILE, O_RDONLY, 0); + if(roLogFD == -1) { + perror("Couldn't open log file for reads.\n"); + abort(); + return LLADD_IO_ERROR; + } - log = fopen(LOG_FILE, "a+"); if (log==NULL) { - - writeunlock(log_read_lock); + pthread_mutex_unlock(&log_read_mutex); pthread_mutex_unlock(&log_write_mutex); pthread_mutex_unlock(&truncateLog_mutex); perror("Couldn't reopen log after truncate"); + abort(); return LLADD_IO_ERROR; } global_offset = LSN - sizeof(lsn_t); - writeunlock(log_read_lock); + pthread_mutex_unlock(&log_read_mutex); pthread_mutex_unlock(&log_write_mutex); pthread_mutex_unlock(&truncateLog_mutex); @@ -585,5 +532,6 @@ int truncateLog(lsn_t LSN) { } lsn_t firstLogEntry() { + assert(log); return global_offset + sizeof(lsn_t); }