Cleaned up logger API, added truncation support to inMemoryLog.c

This commit is contained in:
Sears Russell 2006-09-27 20:30:12 +00:00
parent fcf01f5d71
commit f01cff0d5b
6 changed files with 144 additions and 117 deletions

View file

@ -24,12 +24,11 @@ int writeLogEntry_InMemoryLog(LogEntry *e) {
int done = 0;
do{
readlock(globalOffset_lock,0);
writelock(globalOffset_lock,0);
bufferOffset = nextAvailableLSN - globalOffset;
if(bufferOffset > bufferLen) {
unlock(globalOffset_lock);
writelock(globalOffset_lock,0);
abort(); // really, need to extend log.
bufferLen *= 2;
buffer = realloc(buffer, bufferLen);
} else {
done = 1;
}
@ -38,8 +37,11 @@ int writeLogEntry_InMemoryLog(LogEntry *e) {
e->LSN = nextAvailableLSN;
LogEntry * cpy = malloc(sizeofLogEntry(e));
memcpy(cpy, e, sizeofLogEntry(e));
// printf ("lsn: %ld\n", e->LSN);
buffer[bufferOffset] = e;
buffer[bufferOffset] = cpy;
// printf("lsn: %ld type: %d\n", e->LSN, e->type);
nextAvailableLSN++;
@ -53,8 +55,34 @@ lsn_t flushedLSN_InMemoryLog() {
return nextAvailableLSN;
}
void syncLog_InMemoryLog() {
// no-op
}
lsn_t nextEntry_InMemoryLog(const LogEntry * e) {
return e->LSN + 1;
}
int truncateLog_InMemoryLog(lsn_t lsn) {
abort();
writelock(flushedLSN_lock,1);
writelock(globalOffset_lock,1);
assert(lsn <= nextAvailableLSN);
if(lsn > globalOffset) {
for(int i = globalOffset; i < lsn; i++) {
free(buffer[i - globalOffset]);
}
assert((lsn-globalOffset) + (nextAvailableLSN -lsn) < bufferLen);
memmove(&(buffer[0]), &(buffer[lsn - globalOffset]), sizeof(LogEntry*) * (nextAvailableLSN - lsn));
globalOffset = lsn;
}
writeunlock(globalOffset_lock);
writeunlock(flushedLSN_lock);
return 0;
}
lsn_t firstLogEntry_InMemoryLog() {
@ -78,15 +106,20 @@ void close_InMemoryLog() {
}
LogEntry * readLSNEntry_InMemoryLog(lsn_t LSN) {
// printf("lsn: %ld\n", LSN);
if(LSN >= nextAvailableLSN) { return 0; }
assert(LSN-globalOffset >= 0 && LSN-globalOffset< bufferLen);
LogEntry * readLSNEntry_InMemoryLog(lsn_t lsn) {
// printf("lsn: %ld\n", lsn);
if(lsn >= nextAvailableLSN) { return 0; }
assert(lsn-globalOffset >= 0 && lsn-globalOffset< bufferLen);
readlock(globalOffset_lock, 0);
LogEntry * ptr = buffer[LSN - globalOffset];
LogEntry * ptr = buffer[lsn - globalOffset];
unlock(globalOffset_lock);
assert(ptr);
assert(ptr->LSN == LSN);
assert(ptr->LSN == lsn);
LogEntry * ret = malloc(sizeofLogEntry(ptr));
memcpy(ret, ptr, sizeofLogEntry(ptr));
//printf("lsn: %ld prevlsn: %ld\n", ptr->LSN, ptr->prevLSN);
return ptr;
return ret;
}

View file

@ -6,8 +6,10 @@
int open_InMemoryLog();
int writeLogEntry_InMemoryLog(LogEntry * e);
lsn_t flushedLSN_InMemoryLog();
void syncLog_InMemoryLog();
int truncateLog_InMemoryLog(lsn_t lsn);
lsn_t firstLogEntry_InMemoryLog();
void close_InMemoryLog();
LogEntry * readLSNEntry_InMemoryLog();
LogEntry * readLSNEntry_InMemoryLog(lsn_t lsn);
lsn_t nextEntry_InMemoryLog(const LogEntry * e);
#endif

View file

@ -49,7 +49,7 @@ terms specified in this license.
That should probably be set before calling this function.
*/
static void set_offsets(LogHandle * h, const LogEntry * e, lsn_t lastRead);
static void set_offsets(LogHandle * h, const LogEntry * e);
/*-------------------------------------------------------*/
@ -76,7 +76,7 @@ LogHandle getGuardedHandle(lsn_t lsn, guard_fcn_t * guard, void * guard_state) {
const LogEntry * nextInLog(LogHandle * h) {
const LogEntry * ret = LogReadLSN(h->next_offset);
if(ret != NULL) {
set_offsets(h, ret, h->next_offset);
set_offsets(h, ret);
}
if(h->guard) {
@ -93,20 +93,14 @@ const LogEntry * nextInLog(LogHandle * h) {
const LogEntry * previousInTransaction(LogHandle * h) {
const LogEntry * ret = NULL;
if(h->prev_offset > 0) {
/* printf("A"); fflush(NULL); */
ret = LogReadLSN(h->prev_offset);
set_offsets(h, ret, h->prev_offset);
/*printf("B"); fflush(NULL); */
set_offsets(h, ret);
if(h->guard) {
/*printf("C"); fflush(NULL);*/
if(!h->guard(ret, h->guard_state)) {
FreeLogEntry(ret);
ret = NULL;
}
/*printf("D"); fflush(NULL);*/
}
}
@ -121,14 +115,8 @@ const LogEntry * previousInTransaction(LogHandle * h) {
logging implementation, not here. (One possibility is to have
readLSNEntry return it explicitly.)
*/
static void set_offsets(LogHandle * h, const LogEntry * e, lsn_t lastRead) {
if(loggerType == LOG_TO_FILE) {
h->next_offset = lastRead + sizeofLogEntry(e)+sizeof(lsn_t);
} else if(loggerType == LOG_TO_MEMORY) {
h->next_offset = lastRead + 1;
} else {
abort();
}
static void set_offsets(LogHandle * h, const LogEntry * e) {
h->next_offset = LogNextEntry(e);
h->prev_offset = (e->type==CLRLOG) ? e->contents.clr.undoNextLSN : e->prevLSN ;
}

View file

@ -74,7 +74,7 @@ static int roLogFD = 0;
int logWriter_isDurable = 1;
/**
@see flushedLSN()
@see flushedLSN_LogWriter()
*/
static lsn_t flushedLSN_val;
@ -125,8 +125,6 @@ static char * buffer;
/** The size of the in-memory log buffer. When the buffer is full,
the log is synchronously flushed to disk. */
#define BUFSIZE (1024 * 1024)
//#define BUFSIZE (1024*96)
//#define BUFSIZE (512)
int openLogWriter() {
@ -270,24 +268,8 @@ int writeLogEntry(LogEntry * e) {
pthread_mutex_lock(&log_write_mutex);
/* if(!nextAvailableLSN) {
LogHandle lh;
const LogEntry * le;
nextAvailableLSN = sizeof(lsn_t);
lh = getLSNHandle(nextAvailableLSN);
while((le = nextInLog(&lh))) {
nextAvailableLSN = le->LSN + sizeofLogEntry(le) + sizeof(lsn_t);;
FreeLogEntry(le);
}
}*/
/* Set the log entry's LSN. */
e->LSN = nextAvailableLSN;
//printf ("\nLSN: %ld\n", e->LSN);
//fflush(stdout);
size_t nmemb = fwrite(&size, sizeof(lsn_t), 1, log);
if(nmemb != 1) {
@ -312,8 +294,6 @@ int writeLogEntry(LogEntry * e) {
return LLADD_IO_ERROR;
}
//fflush(log);
pthread_mutex_lock(&log_read_mutex);
nextAvailableLSN += (size + sizeof(lsn_t));
pthread_mutex_unlock(&log_read_mutex);
@ -324,11 +304,10 @@ int writeLogEntry(LogEntry * e) {
return 0;
}
void syncLog() {
void syncLog_LogWriter() {
lsn_t newFlushedLSN;
pthread_mutex_lock(&log_read_mutex);
// newFlushedLSN = ftell(log) + global_offset;
newFlushedLSN = nextAvailableLSN;
pthread_mutex_unlock(&log_read_mutex);
// Wait to set the static variable until after the flush returns.
@ -343,7 +322,7 @@ void syncLog() {
writeunlock(flushedLSN_lock);
}
lsn_t flushedLSN() {
lsn_t flushedLSN_LogWriter() {
readlock(flushedLSN_lock, 0);
lsn_t ret = flushedLSN_val;
readunlock(flushedLSN_lock);
@ -352,7 +331,7 @@ lsn_t flushedLSN() {
void closeLogWriter() {
/* Get the whole thing to the disk before closing it. */
syncLog();
syncLog_LogWriter();
fclose(log);
close(roLogFD);
@ -377,7 +356,7 @@ static LogEntry * readLogEntry() {
lsn_t entrySize;
ssize_t bytesRead = read(roLogFD, &size, sizeof(lsn_t));
if(bytesRead != sizeof(lsn_t)) {
if(bytesRead == 0) {
// fprintf(stderr, "eof on log entry size\n");
@ -399,7 +378,7 @@ static LogEntry * readLogEntry() {
fprintf(stderr, "\nattempt to read again produced newBytesRead = %ld, newSize was %ld\n", newBytesRead, newSize);
fflush(stderr);
abort(); // really abort here. This code should attempt to piece together short log reads...
abort(); // XXX really abort here. This code should attempt to piece together short log reads...
}
}
ret = malloc(size);
@ -443,16 +422,16 @@ static LogEntry * readLogEntry() {
return ret;
}
LogEntry * readLSNEntry(lsn_t LSN) {
LogEntry * readLSNEntry_LogWriter(lsn_t LSN) {
LogEntry * ret;
/** Because we use two file descriptors to access the log, we need
to flush the log write buffer before concluding we're at EOF. */
if(flushedLSN() <= LSN && LSN < nextAvailableLSN) {
// fprintf(stderr, "Syncing log flushed = %d, requested = %d\n", flushedLSN(), LSN);
syncLog();
assert(flushedLSN() >= LSN);
// fprintf(stderr, "Synced log flushed = %d, requested = %d\n", flushedLSN(), LSN);
if(flushedLSN_LogWriter() <= LSN && LSN < nextAvailableLSN) {
// fprintf(stderr, "Syncing log flushed = %d, requested = %d\n", flushedLSN_LogWriter(), LSN);
syncLog_LogWriter();
assert(flushedLSN_LogWriter() >= LSN);
// fprintf(stderr, "Synced log flushed = %d, requested = %d\n", flushedLSN_LogWriter(), LSN);
}
pthread_mutex_lock(&log_read_mutex);
@ -478,7 +457,11 @@ LogEntry * readLSNEntry(lsn_t LSN) {
}
int truncateLog(lsn_t LSN) {
lsn_t nextEntry_LogWriter(const LogEntry * e) {
return e->LSN + sizeofLogEntry(e) + sizeof(lsn_t);
}
int truncateLog_LogWriter(lsn_t LSN) {
FILE *tmpLog;
const LogEntry * le;

View file

@ -93,7 +93,7 @@ int writeLogEntry(LogEntry * e);
/**
flush the entire log (tail) that is currently in memory to disk
*/
void syncLog();
void syncLog_LogWriter();
/**
Return the highest LSN that is known to be on disk. (Currently, we
@ -106,7 +106,7 @@ void syncLog();
and is less than the LSN of all log entries that might not have
been forced to disk.
*/
lsn_t flushedLSN();
lsn_t flushedLSN_LogWriter();
/**
Truncates the log file. In the single-threaded case, this works as
@ -134,7 +134,7 @@ lsn_t flushedLSN();
*/
int truncateLog(lsn_t);
int truncateLog_LogWriter(lsn_t);
/**
@ -163,7 +163,8 @@ void deleteLogWriter();
@param LSN the LSN of the entry that will be read.
*/
LogEntry * readLSNEntry(lsn_t LSN);
LogEntry * readLSNEntry_LogWriter(lsn_t LSN);
lsn_t nextEntry_LogWriter(const LogEntry * e);
extern int logWriter_isDurable;

View file

@ -40,32 +40,43 @@ permission to use and distribute the software in accordance with the
terms specified in this license.
---*/
/**
@file Abstract log implementation. Provides access to methods that
directly read and write log entries, force the log to disk, etc.
@todo Switch logger2 to use function pointers?
*/
#include <stdio.h>
#include <assert.h>
#include <config.h>
#include <lladd/common.h>
#include <lladd/logger/logger2.h>
#include "logWriter.h"
#include "inMemoryLog.h"
#include "page.h"
/*#include <lladd/bufferManager.h>*/
#include <stdio.h>
#include <assert.h>
#include "page.h"
int loggerType = LOG_TO_FILE;
void genericLogWrite(LogEntry * e) {
void LogWrite(LogEntry * e) {
if(loggerType == LOG_TO_FILE) {
writeLogEntry(e);
return;
} else if (loggerType == LOG_TO_MEMORY) {
writeLogEntry_InMemoryLog(e);
return;
} else {
abort();
}
abort(); // we dont have an appropriate implementation, or weren't initialized...
}
int LogInit(int logType) {
assert(logType == loggerType);
loggerType = logType;
if(LOG_TO_FILE == logType) {
openLogWriter();
} else if(LOG_TO_MEMORY == logType) {
@ -73,38 +84,37 @@ int LogInit(int logType) {
} else {
return -1;
}
loggerType = logType;
return 0;
}
int LogDeinit() {
assert(loggerType != -1);
if(LOG_TO_FILE == loggerType) {
closeLogWriter();
} else if(LOG_TO_MEMORY == loggerType) {
close_InMemoryLog();
} else {
abort();
}
return 0;
}
void LogForce(lsn_t lsn) {
if(LOG_TO_FILE == loggerType) {
if(flushedLSN() < lsn) {
syncLog();
if(LogFlushedLSN() < lsn) {
if(LOG_TO_FILE == loggerType) {
syncLog_LogWriter();
} else if (LOG_TO_MEMORY == loggerType) {
syncLog_InMemoryLog();
} else {
abort();
}
assert(flushedLSN() >= lsn);
return;
} else if(LOG_TO_MEMORY == loggerType) {
assert(flushedLSN_InMemoryLog() >= lsn);
return;
}
abort();
assert(LogFlushedLSN() >= lsn);
}
void LogTruncate(lsn_t lsn) {
if(LOG_TO_FILE == loggerType) {
truncateLog(lsn);
truncateLog_LogWriter(lsn);
} else if(LOG_TO_MEMORY == loggerType) {
abort();
truncateLog_InMemoryLog(lsn);
} else {
abort();
}
@ -113,7 +123,7 @@ void LogTruncate(lsn_t lsn) {
lsn_t LogFlushedLSN() {
if(LOG_TO_FILE == loggerType) {
return flushedLSN();
return flushedLSN_LogWriter();
} else if(LOG_TO_MEMORY == loggerType) {
return flushedLSN_InMemoryLog();
}
@ -125,29 +135,39 @@ lsn_t LogTruncationPoint() {
return firstLogEntry();
} else if(LOG_TO_MEMORY == loggerType) {
return firstLogEntry_InMemoryLog();
} else {
abort();
}
abort();
}
const LogEntry * LogReadLSN(lsn_t lsn) {
if(LOG_TO_FILE == loggerType) {
return readLSNEntry(lsn);
return readLSNEntry_LogWriter(lsn);
} else if(LOG_TO_MEMORY == loggerType) {
return readLSNEntry_InMemoryLog(lsn);
}
abort();
} else {
abort();
}
}
lsn_t LogNextEntry(const LogEntry * e) {
if(LOG_TO_FILE == loggerType) {
return nextEntry_LogWriter(e);
} else if(LOG_TO_MEMORY == loggerType) {
return nextEntry_InMemoryLog(e);
} else {
abort();
}
}
void FreeLogEntry(const LogEntry * e) {
if(LOG_TO_FILE == loggerType) {
free((LogEntry*)e);
return;
free((void*)e);
} else if(LOG_TO_MEMORY == loggerType) {
if(e->LSN == -1) {
free((LogEntry*)e);
}
return;
free((void*)e);
} else {
abort();
}
abort();
}
TransactionLog LogTransBegin(int xid) {
@ -164,7 +184,7 @@ static lsn_t LogTransCommon(TransactionLog * l, int type) {
LogEntry * e = allocCommonLogEntry(l->prevLSN, l->xid, type);
lsn_t ret;
genericLogWrite(e);
LogWrite(e);
if(l->prevLSN == -1) { l->recLSN = e->LSN; }
l->prevLSN = e->LSN;
@ -183,7 +203,7 @@ extern int numActiveXactions;
/**
@todo This belongs in logWriter.c and needs a new name.
*/
static lsn_t LogTransBundledCommit(TransactionLog * l) {
static lsn_t groupCommit(TransactionLog * l) {
static pthread_mutex_t check_commit = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t tooFewXacts = PTHREAD_COND_INITIALIZER;
static int pendingCommits = 0;
@ -196,7 +216,7 @@ static lsn_t LogTransBundledCommit(TransactionLog * l) {
// int retcode;
pthread_mutex_lock(&check_commit);
if(flushedLSN() >= ret) {
if(LogFlushedLSN() >= ret) {
pthread_mutex_unlock(&check_commit);
return ret;
}
@ -215,7 +235,7 @@ static lsn_t LogTransBundledCommit(TransactionLog * l) {
if((numActiveXactions > 1 && pendingCommits < numActiveXactions) ||
(numActiveXactions > 20 && pendingCommits < (int)((double)numActiveXactions * 0.95))) {
while(ETIMEDOUT != (pthread_cond_timedwait(&tooFewXacts, &check_commit, &timeout))) {
if(flushedLSN() >= ret) {
if(LogFlushedLSN() >= ret) {
pendingCommits--;
pthread_mutex_unlock(&check_commit);
return ret;
@ -225,32 +245,32 @@ static lsn_t LogTransBundledCommit(TransactionLog * l) {
} else {
// printf("Didn't wait %d < %d\n", (numActiveXactions / 2), pendingCommits);
}
if(flushedLSN() < ret) {
syncLog();
if(LogFlushedLSN() < ret) {
syncLog_LogWriter();
syncLogCount++;
// printf(" %d ", syncLogCount);
pthread_cond_broadcast(&tooFewXacts);
}
assert(flushedLSN() >= ret);
assert(LogFlushedLSN() >= ret);
pendingCommits--;
pthread_mutex_unlock(&check_commit);
return ret;
}
lsn_t LogTransCommit(TransactionLog * l) {
assert(loggerType != -1);
if(LOG_TO_FILE == loggerType) {
return LogTransBundledCommit(l);
} else if(LOG_TO_MEMORY == loggerType) {
return LogTransCommon(l, XCOMMIT);
}
abort();
return groupCommit(l);
}
lsn_t LogTransAbort(TransactionLog * l) {
return LogTransCommon(l, XABORT);
}
/**
@todo Does the handling of operation types / argument sizes belong
here? Shouldn't it be in logEntry.c, or perhaps with other code
that reasons about the various operation types?
*/
LogEntry * LogUpdate(TransactionLog * l, Page * p, recordid rid, int operation, const byte * args) {
void * preImage = NULL;
long argSize = 0;
@ -283,7 +303,7 @@ LogEntry * LogUpdate(TransactionLog * l, Page * p, recordid rid, int operation,
e = allocUpdateLogEntry(l->prevLSN, l->xid, operation, rid, args, argSize, preImage);
// writeLogEntry(e);
genericLogWrite(e);
LogWrite(e);
DEBUG("Log Common %d, LSN: %ld type: %ld (prevLSN %ld) (argSize %ld)\n", e->xid,
(long int)e->LSN, (long int)e->type, (long int)e->prevLSN, (long int) argSize);
@ -298,7 +318,7 @@ LogEntry * LogUpdate(TransactionLog * l, Page * p, recordid rid, int operation,
lsn_t LogCLR(int xid, lsn_t LSN, recordid rid, lsn_t prevLSN) {
lsn_t ret;
LogEntry * e = allocCLRLogEntry(-1, xid, LSN, rid, prevLSN);
genericLogWrite(e);
LogWrite(e);
DEBUG("Log CLR %d, LSN: %ld (undoing: %ld, next to undo: %ld)\n", xid,
e->LSN, LSN, prevLSN);