Added CRC's to log format.

This commit is contained in:
Sears Russell 2006-10-04 04:37:03 +00:00
parent 6e222e6259
commit fd7f4074af
2 changed files with 281 additions and 92 deletions

View file

@ -42,6 +42,7 @@ terms specified in this license.
#include <sys/types.h>
#include <sys/stat.h>
#include <stdio.h>
#include <string.h>
/** For O_DIRECT. It's unclear that this is the correct thing to #define, but it works under linux. */
#define __USE_GNU
@ -56,12 +57,15 @@ terms specified in this license.
#include <lladd/common.h>
#include <lladd/transactional.h>
#include <lladd/crc32.h>
#include "logWriter.h"
#include "logHandle.h"
#include "latches.h"
#include "io.h"
#include <assert.h>
#define FILE_PERM (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH)
#include <lladd/bufferManager.h>
@ -76,8 +80,8 @@ int logWriter_isDurable = 1;
/**
@see flushedLSN_LogWriter()
*/
static lsn_t flushedLSN_val;
static lsn_t flushedLSN_stable;
static lsn_t flushedLSN_internal;
/**
Invariant: No thread is writing to flushedLSN. Since
flushedLSN is monotonically increasing, readers can immmediately
@ -111,10 +115,17 @@ pthread_mutex_t log_read_mutex;
Invariant: Any thread writing to the file must hold this lock. The
log truncation thread hold this lock from the point where it copies
the tail of the old log to the new log, until after the rename call
returns. This mutex also protects nextAvailableLSN.
returns.
*/
pthread_mutex_t log_write_mutex;
/**
This mutex protects nextAvailableLSN, which has its own mutex
because the routines for reading and writing log entries both need
to acquire it, but only for a relatively short time.
*/
pthread_mutex_t nextAvailableLSN_mutex;
/**
Invariant: We only want one thread in truncateLog at a time.
*/
@ -122,10 +133,24 @@ pthread_mutex_t truncateLog_mutex;
static char * buffer;
/**
CRC of the log between last CRC log entry, and the current end of
the log. The CRC includes everything except for the CRC log entry
and the size fields preceeding each log entry. This value is
reset to zero each time a CRC entry is generated..
*/
static unsigned int log_crc;
/** The size of the in-memory log buffer. When the buffer is full,
the log is synchronously flushed to disk. */
#define BUFSIZE (1024 * 1024)
static inline void update_log_crc(const LogEntry * le, unsigned int * crc) {
*crc = crc32(le, sizeofLogEntry(le), *crc);
}
static LogEntry * readLogEntry();
static void syncLogInternal();
int openLogWriter() {
buffer = malloc(BUFSIZE);
@ -138,35 +163,35 @@ int openLogWriter() {
synchronous disk writes. Therefore, all read accesses (and
therefore all seeks) run through the second descriptor. */
// int logFD = open (LOG_FILE, O_CREAT | O_WRONLY | O_APPEND | O_SYNC, S_IRWXU | S_IRWXG | S_IRWXO);
int logFD;
if(logWriter_isDurable) {
logFD = open (LOG_FILE, O_CREAT | O_WRONLY | O_SYNC, S_IRWXU | S_IRWXG | S_IRWXO);
logFD = open(LOG_FILE, O_CREAT | O_RDWR | O_SYNC, FILE_PERM); //, S_IRWXU | S_IRWXG | S_IRWXO);
} else {
fprintf(stderr, "\n**********\n");
fprintf (stderr, "logWriter.c: logWriter_isDurable==0; the logger will not force writes to disk.\n");
fprintf (stderr, " Transactions will not be durable if the system crashes.\n**********\n");
logFD = open (LOG_FILE, O_CREAT | O_WRONLY, S_IRWXU | S_IRWXG | S_IRWXO);
logFD = open(LOG_FILE, O_CREAT | O_RDWR, FILE_PERM);// S_IRWXU | S_IRWXG | S_IRWXO);
}
if(logFD == -1) {
perror("Couldn't open log file for append.\n");
abort();
}
log = fdopen(logFD, "a");
log = fdopen(logFD, "w+");
if (log==NULL) {
perror("Couldn't open log file");
abort();
return LLADD_IO_ERROR;
}
/* Increase the length of log's buffer, since it's in O_SYNC mode. */
setbuffer(log, buffer, BUFSIZE);
/* fread() doesn't notice when another handle writes to its file,
even if fflush() is used to push the changes out to disk.
Therefore, we use a file descriptor, and read() instead of a FILE
and fread(). */
roLogFD = open (LOG_FILE, O_RDONLY, 0);
roLogFD = open(LOG_FILE, O_RDONLY, 0);
if(roLogFD == -1) {
perror("Couldn't open log file for reads.\n");
@ -177,14 +202,11 @@ int openLogWriter() {
flushedLSN_lock = initlock();
pthread_mutex_init(&log_read_mutex, NULL);
pthread_mutex_init(&log_write_mutex, NULL);
pthread_mutex_init(&nextAvailableLSN_mutex, NULL);
pthread_mutex_init(&truncateLog_mutex, NULL);
flushedLSN_val = 0;
flushedLSN_stable = 0;
flushedLSN_internal = 0;
/*
Seek append only log to the end of the file. This is unnecessary,
since the file was opened in append only mode, but it returns the
@ -194,47 +216,81 @@ int openLogWriter() {
if (myFseek(log, 0, SEEK_END)==0) {
/*if file is empty, write an LSN at the 0th position. LSN 0 is
invalid, and this prevents us from using it. Also, the LSN at
this position can be used after log truncation to define a
global offset for the truncated log. (Not implemented yet)
this position is used after log truncation to store the
global offset for the truncated log.
*/
lsn_t zero = 0;
size_t nmemb = fwrite(&zero, sizeof(lsn_t), 1, log);
global_offset = 0;
size_t nmemb = fwrite(&global_offset, sizeof(lsn_t), 1, log);
if(nmemb != 1) {
perror("Couldn't start new log file!");
// assert(0);
return LLADD_IO_ERROR;
}
global_offset = 0;
} else {
off_t newPosition = lseek(roLogFD, 0, SEEK_SET);
if(newPosition == -1) {
perror("Could not seek to head of log");
return LLADD_IO_ERROR;
}
ssize_t bytesRead = read(roLogFD, &global_offset, sizeof(lsn_t));
if(bytesRead != sizeof(lsn_t)) {
printf("Could not read log header.");
abort();
return LLADD_IO_ERROR;
}
}
// Initialize nextAvailableLSN.
LogHandle lh;
const LogEntry * le;
nextAvailableLSN = sizeof(lsn_t);
lh = getLSNHandle(nextAvailableLSN);
nextAvailableLSN = sizeof(lsn_t) + global_offset;
flushedLSN_stable = nextAvailableLSN;
flushedLSN_internal = nextAvailableLSN;
unsigned int crc = 0;
while((le = nextInLog(&lh))) {
nextAvailableLSN = le->LSN + sizeofLogEntry(le) + sizeof(lsn_t);;
if(lseek(roLogFD, sizeof(lsn_t), SEEK_SET) != sizeof(lsn_t)) {
perror("Couldn't seek to first log entry!");
}
// Using readLogEntry() bypasses checks to see if we're past the end
// of the log.
while((le = readLogEntry())) {
if(le->type == INTERNALLOG) {
if (!(le->prevLSN) || (crc == (unsigned int) le->prevLSN)) {
nextAvailableLSN = nextEntry_LogWriter(le); //le->LSN + sizeofLogEntry(le) + sizeof(lsn_t);
crc = 0;
} else {
printf("Log corruption: %x != %x (lsn = %ld)\n", (unsigned int) le->prevLSN, crc, le->LSN);
// The log wasn't successfully forced to this point; discard
// everything after the last CRC.
FreeLogEntry(le);
break;
}
} else {
update_log_crc(le, &crc);
}
FreeLogEntry(le);
}
// If there was a short write at the end of the log, just reuse its lsn, and overwrite it.
fseek(log, nextAvailableLSN, SEEK_SET);
if(ftruncate(fileno(log), nextAvailableLSN-global_offset) == -1) {
perror("Couldn't discard junk at end of log");
}
// If there was trailing garbage at the end of the log, overwrite
// it.
if(myFseek(log, nextAvailableLSN-global_offset, SEEK_SET) != nextAvailableLSN-global_offset) {
perror("Error repositioning log");
abort();
}
// Reset log_crc to zero (nextAvailableLSN immediately follows a crc
// entry).
log_crc = 0;
return 0;
}
@ -262,27 +318,46 @@ int openLogWriter() {
*/
int writeLogEntry(LogEntry * e) {
static int writeLogEntryUnlocked(LogEntry * e) {
const lsn_t size = sizeofLogEntry(e);
pthread_mutex_lock(&log_write_mutex);
/* Set the log entry's LSN. */
pthread_mutex_lock(&nextAvailableLSN_mutex);
e->LSN = nextAvailableLSN;
pthread_mutex_unlock(&nextAvailableLSN_mutex);
update_log_crc(e, &log_crc);
//printf("Writing Log entry type = %d lsn = %ld, size = %ld\n", e->type, e->LSN, size);
// off_t current_offset = ftell(log);
// assert(e->LSN == (current_offset + global_offset));
// off_t oldOffset = ftell(log);
size_t nmemb = fwrite(&size, sizeof(lsn_t), 1, log);
if(nmemb != 1) {
if(feof(log)) { abort(); /* feof makes no sense here */ }
if(ferror(log)) {
fprintf(stderr, "writeLog couldn't write next log entry: %d\n", ferror(log));
abort();
}
abort();
// XXX nextAvailableLSN not set...
return LLADD_IO_ERROR;
}
// off_t newOffset = ftell(log);
// assert(nmemb == 1);
// assert(oldOffset + sizeof(lsn_t) == newOffset);
// current_offset = ftell(log);
// assert(e->LSN == (current_offset + global_offset - sizeof(lsn_t)));
nmemb = fwrite(e, size, 1, log);
// current_offset = ftell(log);
// assert(e->LSN == current_offset + global_offset - sizeof(lsn_t) - size);
if(nmemb != 1) {
if(feof(log)) { abort(); /* feof makes no sense here */ }
@ -290,41 +365,88 @@ int writeLogEntry(LogEntry * e) {
fprintf(stderr, "writeLog couldn't write next log entry: %d\n", ferror(log));
abort();
}
abort();
// XXX nextAvailableLSN not set...
return LLADD_IO_ERROR;
}
pthread_mutex_lock(&log_read_mutex);
nextAvailableLSN += (size + sizeof(lsn_t));
pthread_mutex_lock(&nextAvailableLSN_mutex);
assert(nextAvailableLSN == e->LSN);
nextAvailableLSN = nextEntry_LogWriter(e);
pthread_mutex_unlock(&nextAvailableLSN_mutex);
pthread_mutex_unlock(&log_read_mutex);
pthread_mutex_unlock(&log_write_mutex);
return 0;
}
int writeLogEntry(LogEntry * e) {
pthread_mutex_lock(&log_write_mutex);
int ret = writeLogEntryUnlocked(e);
pthread_mutex_unlock(&log_write_mutex);
return ret;
}
long sizeofInternalLogEntry_LogWriter(const LogEntry * e) {
return sizeof(struct __raw_log_entry);
}
static void syncLogInternal() {
lsn_t newFlushedLSN;
pthread_mutex_lock(&nextAvailableLSN_mutex);
newFlushedLSN = nextAvailableLSN;
pthread_mutex_unlock(&nextAvailableLSN_mutex);
fflush(log);
writelock(flushedLSN_lock, 0);
if(newFlushedLSN > flushedLSN_internal) {
flushedLSN_internal = newFlushedLSN;
}
unlock(flushedLSN_lock);
}
void syncLog_LogWriter() {
lsn_t newFlushedLSN;
pthread_mutex_lock(&log_read_mutex);
newFlushedLSN = nextAvailableLSN;
pthread_mutex_unlock(&log_read_mutex);
// Wait to set the static variable until after the flush returns.
pthread_mutex_lock(&log_write_mutex);
pthread_mutex_lock(&nextAvailableLSN_mutex);
newFlushedLSN = nextAvailableLSN;
pthread_mutex_unlock(&nextAvailableLSN_mutex);
LogEntry * crc_entry = allocCommonLogEntry(log_crc, -1, INTERNALLOG);
writeLogEntryUnlocked(crc_entry);
// Reset log_crc to zero each time a crc entry is written.
log_crc = 0;
pthread_mutex_unlock(&log_write_mutex);
// Since we opened the logfile with O_SYNC, fflush() is sufficient.
fflush(log);
// update flushedLSN after fflush returns.
writelock(flushedLSN_lock, 0);
if(newFlushedLSN > flushedLSN_val) {
flushedLSN_val = newFlushedLSN;
if(newFlushedLSN > flushedLSN_stable) {
flushedLSN_stable = newFlushedLSN;
}
if(newFlushedLSN > flushedLSN_internal) {
flushedLSN_internal = newFlushedLSN;
}
writeunlock(flushedLSN_lock);
}
lsn_t flushedLSN_LogWriter() {
readlock(flushedLSN_lock, 0);
lsn_t ret = flushedLSN_val;
lsn_t ret = flushedLSN_stable;
readunlock(flushedLSN_lock);
return ret;
}
static lsn_t flushedLSNInternal() {
readlock(flushedLSN_lock, 0);
lsn_t ret = flushedLSN_internal;
readunlock(flushedLSN_lock);
return ret;
}
@ -336,14 +458,23 @@ void closeLogWriter() {
fclose(log);
close(roLogFD);
log = NULL;
roLogFD = 0;
flushedLSN_stable = 0;
flushedLSN_internal = 0;
nextAvailableLSN = 0;
global_offset = 0;
/* Free locks. */
deletelock(flushedLSN_lock);
pthread_mutex_destroy(&log_read_mutex);
pthread_mutex_destroy(&log_write_mutex);
pthread_mutex_destroy(&nextAvailableLSN_mutex);
pthread_mutex_destroy(&truncateLog_mutex);
free(buffer);
buffer = 0;
log_crc = 0;
}
void deleteLogWriter() {
@ -356,11 +487,9 @@ static LogEntry * readLogEntry() {
lsn_t entrySize;
ssize_t bytesRead = read(roLogFD, &size, sizeof(lsn_t));
if(bytesRead != sizeof(lsn_t)) {
if(bytesRead == 0) {
// fprintf(stderr, "eof on log entry size\n");
// fflush(stderr);
return NULL;
} else if(bytesRead == -1) {
perror("error reading log");
@ -370,7 +499,10 @@ static LogEntry * readLogEntry() {
lsn_t newSize = size - bytesRead;
lsn_t newBytesRead = read (roLogFD, ((byte*)ret)+bytesRead, newSize);
fprintf(stdout, "Trying to piece together short read.\n"); fflush(stderr);
if(newBytesRead == 0) {
abort();
return NULL;
}
fprintf(stderr, "short read from log. Expected %ld bytes, got %ld.\nFIXME: This is 'normal', but currently not handled", sizeof(lsn_t), bytesRead);
@ -381,11 +513,12 @@ static LogEntry * readLogEntry() {
abort(); // XXX really abort here. This code should attempt to piece together short log reads...
}
}
if(!size) {
return NULL;
}
ret = malloc(size);
//printf("Log entry is %ld bytes long.\n", size);
//fflush(stdout);
bytesRead = read(roLogFD, ret, size);
if(bytesRead != size) {
@ -402,7 +535,10 @@ static LogEntry * readLogEntry() {
lsn_t newSize = size - bytesRead;
lsn_t newBytesRead = read (roLogFD, ((byte*)ret)+bytesRead, newSize);
fprintf(stdout, "Trying to piece together short log entry.\n"); fflush(stderr);
if(newBytesRead == 0) {
abort();
return NULL;
}
@ -422,16 +558,23 @@ static LogEntry * readLogEntry() {
return ret;
}
//static lsn_t lastPosition_readLSNEntry = -1;
LogEntry * readLSNEntry_LogWriter(lsn_t LSN) {
LogEntry * ret;
pthread_mutex_lock(&nextAvailableLSN_mutex);
if(LSN >= nextAvailableLSN) {
pthread_mutex_unlock(&nextAvailableLSN_mutex);
return 0;
}
pthread_mutex_unlock(&nextAvailableLSN_mutex);
/** Because we use two file descriptors to access the log, we need
to flush the log write buffer before concluding we're at EOF. */
if(flushedLSN_LogWriter() <= LSN && LSN < nextAvailableLSN) {
// fprintf(stderr, "Syncing log flushed = %d, requested = %d\n", flushedLSN_LogWriter(), LSN);
syncLog_LogWriter();
assert(flushedLSN_LogWriter() >= LSN);
// fprintf(stderr, "Synced log flushed = %d, requested = %d\n", flushedLSN_LogWriter(), LSN);
if(flushedLSNInternal() <= LSN) { // && LSN < nextAvailableLSN) {
syncLogInternal();
assert(flushedLSNInternal() > LSN);
}
pthread_mutex_lock(&log_read_mutex);
@ -439,17 +582,19 @@ LogEntry * readLSNEntry_LogWriter(lsn_t LSN) {
assert(global_offset <= LSN);
debug_lsn = LSN;
off_t newPosition = lseek(roLogFD, LSN - global_offset, SEEK_SET);
off_t newPosition = LSN - global_offset;
newPosition = lseek(roLogFD, newPosition, SEEK_SET);
if(newPosition == -1) {
perror("Could not seek for log read");
abort();
} else {
// fprintf(stderr, "sought to %d\n", (int)newPosition);
// fflush(stderr);
}
assert(newPosition == LSN-global_offset);
ret = readLogEntry();
assert(ret || LSN >= nextAvailableLSN);
assert(ret);
pthread_mutex_unlock(&log_read_mutex);
@ -457,10 +602,6 @@ LogEntry * readLSNEntry_LogWriter(lsn_t LSN) {
}
lsn_t nextEntry_LogWriter(const LogEntry * e) {
return e->LSN + sizeofLogEntry(e) + sizeof(lsn_t);
}
int truncateLog_LogWriter(lsn_t LSN) {
FILE *tmpLog;
@ -474,12 +615,12 @@ int truncateLog_LogWriter(lsn_t LSN) {
if(global_offset + sizeof(lsn_t) >= LSN) {
/* Another thread beat us to it...the log is already truncated
past the point requested, so just return. */
printf("Skipping truncate. global_offset = %ld, requested LSN = %ld\n", global_offset, LSN);
pthread_mutex_unlock(&truncateLog_mutex);
return 0;
}
tmpLog = fopen(LOG_FILE_SCRATCH, "w+"); /* w+ = truncate, and open for writing. */
/* w+ = truncate, and open for writing. */
tmpLog = fopen(LOG_FILE_SCRATCH, "w+");
if (tmpLog==NULL) {
pthread_mutex_unlock(&truncateLog_mutex);
@ -493,8 +634,6 @@ int truncateLog_LogWriter(lsn_t LSN) {
4, so the file offset is 6. */
LSN -= sizeof(lsn_t);
DEBUG("Truncating log to %ld\n", LSN + sizeof(lsn_t));
myFwrite(&LSN, sizeof(lsn_t), tmpLog);
LSN += sizeof(lsn_t);
@ -509,14 +648,53 @@ int truncateLog_LogWriter(lsn_t LSN) {
lh = getLSNHandle(LSN);
lsn_t lengthOfCopiedLog = 0;
int firstInternalEntry = 1;
while((le = nextInLog(&lh))) {
size = sizeofLogEntry(le);
if(firstInternalEntry && le->type == INTERNALLOG) {
LogEntry * firstCRC = malloc(size);
memcpy(firstCRC, le, size);
FreeLogEntry(le);
firstCRC->prevLSN = 0;
le = firstCRC;
}
lengthOfCopiedLog += (size + sizeof(lsn_t));
myFwrite(&size, sizeof(lsn_t), tmpLog);
myFwrite(le, size, tmpLog);
FreeLogEntry(le);
if(firstInternalEntry && le->type == INTERNALLOG) {
free((void*)le); // remove const qualifier + free
firstInternalEntry = 0;
} else {
FreeLogEntry(le);
}
}
LogEntry * crc_entry = allocCommonLogEntry(0, -1, INTERNALLOG);
assert(crc_entry->prevLSN == 0);
pthread_mutex_lock(&nextAvailableLSN_mutex);
crc_entry->LSN = nextAvailableLSN;
// printf("Crc entry: lsn = %ld, crc = %x\n", crc_entry->LSN, crc_entry->prevLSN);
assert(nextAvailableLSN == LSN + lengthOfCopiedLog);
size = sizeofLogEntry(crc_entry);
nextAvailableLSN = nextEntry_LogWriter(crc_entry);
log_crc = 0;
pthread_mutex_unlock(&nextAvailableLSN_mutex);
myFwrite(&size, sizeof(lsn_t), tmpLog);
myFwrite(crc_entry, size, tmpLog);
lengthOfCopiedLog += (size + sizeof(lsn_t));
assert(nextAvailableLSN == (LSN + lengthOfCopiedLog));
free(crc_entry);
fflush(tmpLog);
#ifdef HAVE_FDATASYNC
fdatasync(fileno(tmpLog));
@ -526,6 +704,7 @@ int truncateLog_LogWriter(lsn_t LSN) {
/** Time to shut out the readers */
pthread_mutex_lock(&log_read_mutex);
/* closeLogWriter calls sync, and does some extra stuff that we don't want, so we
@ -543,32 +722,16 @@ int truncateLog_LogWriter(lsn_t LSN) {
perror("Error replacing old log file with new log file");
return LLADD_IO_ERROR;
} else {
printf("Truncation complete.\n");
// printf("Truncation complete.\n");
fflush(stdout);
}
int logFD = open (LOG_FILE, O_CREAT | O_WRONLY | O_APPEND | O_SYNC, S_IRWXU | S_IRWXG | S_IRWXO);
int logFD = open(LOG_FILE, O_CREAT | O_RDWR | O_SYNC, FILE_PERM); //S_IRWXU | S_IRWXG | S_IRWXO);
if(logFD == -1) {
perror("Couldn't open log file for append.\n");
abort();
}
log = fdopen(logFD, "a");
if (log==NULL) {
perror("Couldn't open log file");
abort();
return LLADD_IO_ERROR;
}
setbuffer(log, buffer, BUFSIZE);
roLogFD = open (LOG_FILE, O_RDONLY, 0);
if(roLogFD == -1) {
perror("Couldn't open log file for reads.\n");
abort();
return LLADD_IO_ERROR;
}
log = fdopen(logFD, "w+");
if (log==NULL) {
pthread_mutex_unlock(&log_read_mutex);
@ -579,9 +742,31 @@ int truncateLog_LogWriter(lsn_t LSN) {
abort();
return LLADD_IO_ERROR;
}
setbuffer(log, buffer, BUFSIZE);
global_offset = LSN - sizeof(lsn_t);
lsn_t logPos;
if((logPos = myFseek(log, 0, SEEK_END)) != nextAvailableLSN - global_offset) {
if(logPos == -1) {
perror("Truncation couldn't seek");
} else {
printf("logfile was wrong length after truncation. Expected %ld, found %ld\n", nextAvailableLSN - global_offset, logPos);
fflush(stdout);
abort();
}
}
roLogFD = open(LOG_FILE, O_RDONLY, 0);
if(roLogFD == -1) {
perror("Couldn't open log file for reads.\n");
abort();
return LLADD_IO_ERROR;
}
pthread_mutex_unlock(&log_read_mutex);
pthread_mutex_unlock(&log_write_mutex);

View file

@ -164,10 +164,14 @@ void deleteLogWriter();
@param LSN the LSN of the entry that will be read.
*/
LogEntry * readLSNEntry_LogWriter(lsn_t LSN);
lsn_t nextEntry_LogWriter(const LogEntry * e);
static inline lsn_t nextEntry_LogWriter(const LogEntry * e) {
return e->LSN + sizeofLogEntry(e) + sizeof(lsn_t);
}
extern int logWriter_isDurable;
long sizeofInternalLogEntry_LogWriter(const LogEntry * e);
END_C_DECLS
#endif /* __LLADD_LOGGER_LOGWRITER_H */