diff --git a/src/lladd/logger/inMemoryLog.c b/src/lladd/logger/inMemoryLog.c index 856cff9..27aa32f 100644 --- a/src/lladd/logger/inMemoryLog.c +++ b/src/lladd/logger/inMemoryLog.c @@ -24,12 +24,11 @@ int writeLogEntry_InMemoryLog(LogEntry *e) { int done = 0; do{ - readlock(globalOffset_lock,0); + writelock(globalOffset_lock,0); bufferOffset = nextAvailableLSN - globalOffset; if(bufferOffset > bufferLen) { - unlock(globalOffset_lock); - writelock(globalOffset_lock,0); - abort(); // really, need to extend log. + bufferLen *= 2; + buffer = realloc(buffer, bufferLen); } else { done = 1; } @@ -38,8 +37,11 @@ int writeLogEntry_InMemoryLog(LogEntry *e) { e->LSN = nextAvailableLSN; + LogEntry * cpy = malloc(sizeofLogEntry(e)); + memcpy(cpy, e, sizeofLogEntry(e)); + // printf ("lsn: %ld\n", e->LSN); - buffer[bufferOffset] = e; + buffer[bufferOffset] = cpy; // printf("lsn: %ld type: %d\n", e->LSN, e->type); nextAvailableLSN++; @@ -53,8 +55,34 @@ lsn_t flushedLSN_InMemoryLog() { return nextAvailableLSN; } +void syncLog_InMemoryLog() { + // no-op +} + +lsn_t nextEntry_InMemoryLog(const LogEntry * e) { + return e->LSN + 1; +} + int truncateLog_InMemoryLog(lsn_t lsn) { - abort(); + writelock(flushedLSN_lock,1); + writelock(globalOffset_lock,1); + + assert(lsn <= nextAvailableLSN); + + + if(lsn > globalOffset) { + for(int i = globalOffset; i < lsn; i++) { + free(buffer[i - globalOffset]); + } + assert((lsn-globalOffset) + (nextAvailableLSN -lsn) < bufferLen); + memmove(&(buffer[0]), &(buffer[lsn - globalOffset]), sizeof(LogEntry*) * (nextAvailableLSN - lsn)); + globalOffset = lsn; + } + + writeunlock(globalOffset_lock); + writeunlock(flushedLSN_lock); + + return 0; } lsn_t firstLogEntry_InMemoryLog() { @@ -78,15 +106,20 @@ void close_InMemoryLog() { } -LogEntry * readLSNEntry_InMemoryLog(lsn_t LSN) { - // printf("lsn: %ld\n", LSN); - if(LSN >= nextAvailableLSN) { return 0; } - assert(LSN-globalOffset >= 0 && LSN-globalOffset< bufferLen); +LogEntry * readLSNEntry_InMemoryLog(lsn_t lsn) { + // printf("lsn: %ld\n", lsn); + if(lsn >= nextAvailableLSN) { return 0; } + assert(lsn-globalOffset >= 0 && lsn-globalOffset< bufferLen); readlock(globalOffset_lock, 0); - LogEntry * ptr = buffer[LSN - globalOffset]; + LogEntry * ptr = buffer[lsn - globalOffset]; unlock(globalOffset_lock); assert(ptr); - assert(ptr->LSN == LSN); + assert(ptr->LSN == lsn); + + LogEntry * ret = malloc(sizeofLogEntry(ptr)); + + memcpy(ret, ptr, sizeofLogEntry(ptr)); + //printf("lsn: %ld prevlsn: %ld\n", ptr->LSN, ptr->prevLSN); - return ptr; + return ret; } diff --git a/src/lladd/logger/inMemoryLog.h b/src/lladd/logger/inMemoryLog.h index a7d62c6..8c3044b 100644 --- a/src/lladd/logger/inMemoryLog.h +++ b/src/lladd/logger/inMemoryLog.h @@ -6,8 +6,10 @@ int open_InMemoryLog(); int writeLogEntry_InMemoryLog(LogEntry * e); lsn_t flushedLSN_InMemoryLog(); +void syncLog_InMemoryLog(); int truncateLog_InMemoryLog(lsn_t lsn); lsn_t firstLogEntry_InMemoryLog(); void close_InMemoryLog(); -LogEntry * readLSNEntry_InMemoryLog(); +LogEntry * readLSNEntry_InMemoryLog(lsn_t lsn); +lsn_t nextEntry_InMemoryLog(const LogEntry * e); #endif diff --git a/src/lladd/logger/logHandle.c b/src/lladd/logger/logHandle.c index 865677e..0ef5304 100644 --- a/src/lladd/logger/logHandle.c +++ b/src/lladd/logger/logHandle.c @@ -49,7 +49,7 @@ terms specified in this license. That should probably be set before calling this function. */ -static void set_offsets(LogHandle * h, const LogEntry * e, lsn_t lastRead); +static void set_offsets(LogHandle * h, const LogEntry * e); /*-------------------------------------------------------*/ @@ -76,7 +76,7 @@ LogHandle getGuardedHandle(lsn_t lsn, guard_fcn_t * guard, void * guard_state) { const LogEntry * nextInLog(LogHandle * h) { const LogEntry * ret = LogReadLSN(h->next_offset); if(ret != NULL) { - set_offsets(h, ret, h->next_offset); + set_offsets(h, ret); } if(h->guard) { @@ -93,20 +93,14 @@ const LogEntry * nextInLog(LogHandle * h) { const LogEntry * previousInTransaction(LogHandle * h) { const LogEntry * ret = NULL; if(h->prev_offset > 0) { - /* printf("A"); fflush(NULL); */ ret = LogReadLSN(h->prev_offset); - set_offsets(h, ret, h->prev_offset); - /*printf("B"); fflush(NULL); */ + set_offsets(h, ret); if(h->guard) { - /*printf("C"); fflush(NULL);*/ - if(!h->guard(ret, h->guard_state)) { FreeLogEntry(ret); ret = NULL; } - /*printf("D"); fflush(NULL);*/ - } } @@ -121,14 +115,8 @@ const LogEntry * previousInTransaction(LogHandle * h) { logging implementation, not here. (One possibility is to have readLSNEntry return it explicitly.) */ -static void set_offsets(LogHandle * h, const LogEntry * e, lsn_t lastRead) { - if(loggerType == LOG_TO_FILE) { - h->next_offset = lastRead + sizeofLogEntry(e)+sizeof(lsn_t); - } else if(loggerType == LOG_TO_MEMORY) { - h->next_offset = lastRead + 1; - } else { - abort(); - } +static void set_offsets(LogHandle * h, const LogEntry * e) { + h->next_offset = LogNextEntry(e); h->prev_offset = (e->type==CLRLOG) ? e->contents.clr.undoNextLSN : e->prevLSN ; } diff --git a/src/lladd/logger/logWriter.c b/src/lladd/logger/logWriter.c index b2db79e..265a8f5 100644 --- a/src/lladd/logger/logWriter.c +++ b/src/lladd/logger/logWriter.c @@ -74,7 +74,7 @@ static int roLogFD = 0; int logWriter_isDurable = 1; /** - @see flushedLSN() + @see flushedLSN_LogWriter() */ static lsn_t flushedLSN_val; @@ -125,8 +125,6 @@ static char * buffer; /** The size of the in-memory log buffer. When the buffer is full, the log is synchronously flushed to disk. */ #define BUFSIZE (1024 * 1024) -//#define BUFSIZE (1024*96) -//#define BUFSIZE (512) int openLogWriter() { @@ -270,24 +268,8 @@ int writeLogEntry(LogEntry * e) { pthread_mutex_lock(&log_write_mutex); - /* if(!nextAvailableLSN) { - - LogHandle lh; - const LogEntry * le; - - nextAvailableLSN = sizeof(lsn_t); - lh = getLSNHandle(nextAvailableLSN); - - while((le = nextInLog(&lh))) { - nextAvailableLSN = le->LSN + sizeofLogEntry(le) + sizeof(lsn_t);; - FreeLogEntry(le); - } - }*/ - /* Set the log entry's LSN. */ e->LSN = nextAvailableLSN; - //printf ("\nLSN: %ld\n", e->LSN); - //fflush(stdout); size_t nmemb = fwrite(&size, sizeof(lsn_t), 1, log); if(nmemb != 1) { @@ -312,8 +294,6 @@ int writeLogEntry(LogEntry * e) { return LLADD_IO_ERROR; } - //fflush(log); - pthread_mutex_lock(&log_read_mutex); nextAvailableLSN += (size + sizeof(lsn_t)); pthread_mutex_unlock(&log_read_mutex); @@ -324,11 +304,10 @@ int writeLogEntry(LogEntry * e) { return 0; } -void syncLog() { +void syncLog_LogWriter() { lsn_t newFlushedLSN; pthread_mutex_lock(&log_read_mutex); - // newFlushedLSN = ftell(log) + global_offset; newFlushedLSN = nextAvailableLSN; pthread_mutex_unlock(&log_read_mutex); // Wait to set the static variable until after the flush returns. @@ -343,7 +322,7 @@ void syncLog() { writeunlock(flushedLSN_lock); } -lsn_t flushedLSN() { +lsn_t flushedLSN_LogWriter() { readlock(flushedLSN_lock, 0); lsn_t ret = flushedLSN_val; readunlock(flushedLSN_lock); @@ -352,7 +331,7 @@ lsn_t flushedLSN() { void closeLogWriter() { /* Get the whole thing to the disk before closing it. */ - syncLog(); + syncLog_LogWriter(); fclose(log); close(roLogFD); @@ -377,7 +356,7 @@ static LogEntry * readLogEntry() { lsn_t entrySize; ssize_t bytesRead = read(roLogFD, &size, sizeof(lsn_t)); - + if(bytesRead != sizeof(lsn_t)) { if(bytesRead == 0) { // fprintf(stderr, "eof on log entry size\n"); @@ -399,7 +378,7 @@ static LogEntry * readLogEntry() { fprintf(stderr, "\nattempt to read again produced newBytesRead = %ld, newSize was %ld\n", newBytesRead, newSize); fflush(stderr); - abort(); // really abort here. This code should attempt to piece together short log reads... + abort(); // XXX really abort here. This code should attempt to piece together short log reads... } } ret = malloc(size); @@ -443,16 +422,16 @@ static LogEntry * readLogEntry() { return ret; } -LogEntry * readLSNEntry(lsn_t LSN) { +LogEntry * readLSNEntry_LogWriter(lsn_t LSN) { LogEntry * ret; /** Because we use two file descriptors to access the log, we need to flush the log write buffer before concluding we're at EOF. */ - if(flushedLSN() <= LSN && LSN < nextAvailableLSN) { - // fprintf(stderr, "Syncing log flushed = %d, requested = %d\n", flushedLSN(), LSN); - syncLog(); - assert(flushedLSN() >= LSN); - // fprintf(stderr, "Synced log flushed = %d, requested = %d\n", flushedLSN(), LSN); + if(flushedLSN_LogWriter() <= LSN && LSN < nextAvailableLSN) { + // fprintf(stderr, "Syncing log flushed = %d, requested = %d\n", flushedLSN_LogWriter(), LSN); + syncLog_LogWriter(); + assert(flushedLSN_LogWriter() >= LSN); + // fprintf(stderr, "Synced log flushed = %d, requested = %d\n", flushedLSN_LogWriter(), LSN); } pthread_mutex_lock(&log_read_mutex); @@ -478,7 +457,11 @@ LogEntry * readLSNEntry(lsn_t LSN) { } -int truncateLog(lsn_t LSN) { +lsn_t nextEntry_LogWriter(const LogEntry * e) { + return e->LSN + sizeofLogEntry(e) + sizeof(lsn_t); +} + +int truncateLog_LogWriter(lsn_t LSN) { FILE *tmpLog; const LogEntry * le; diff --git a/src/lladd/logger/logWriter.h b/src/lladd/logger/logWriter.h index 12d4227..823721d 100644 --- a/src/lladd/logger/logWriter.h +++ b/src/lladd/logger/logWriter.h @@ -93,7 +93,7 @@ int writeLogEntry(LogEntry * e); /** flush the entire log (tail) that is currently in memory to disk */ -void syncLog(); +void syncLog_LogWriter(); /** Return the highest LSN that is known to be on disk. (Currently, we @@ -106,7 +106,7 @@ void syncLog(); and is less than the LSN of all log entries that might not have been forced to disk. */ -lsn_t flushedLSN(); +lsn_t flushedLSN_LogWriter(); /** Truncates the log file. In the single-threaded case, this works as @@ -134,7 +134,7 @@ lsn_t flushedLSN(); */ -int truncateLog(lsn_t); +int truncateLog_LogWriter(lsn_t); /** @@ -163,7 +163,8 @@ void deleteLogWriter(); @param LSN the LSN of the entry that will be read. */ -LogEntry * readLSNEntry(lsn_t LSN); +LogEntry * readLSNEntry_LogWriter(lsn_t LSN); +lsn_t nextEntry_LogWriter(const LogEntry * e); extern int logWriter_isDurable; diff --git a/src/lladd/logger/logger2.c b/src/lladd/logger/logger2.c index 8887bfa..244d34b 100644 --- a/src/lladd/logger/logger2.c +++ b/src/lladd/logger/logger2.c @@ -40,32 +40,43 @@ permission to use and distribute the software in accordance with the terms specified in this license. ---*/ +/** + @file Abstract log implementation. Provides access to methods that + directly read and write log entries, force the log to disk, etc. + + @todo Switch logger2 to use function pointers? +*/ + +#include +#include #include + #include #include #include "logWriter.h" #include "inMemoryLog.h" -#include "page.h" -/*#include */ -#include -#include + +#include "page.h" int loggerType = LOG_TO_FILE; -void genericLogWrite(LogEntry * e) { +void LogWrite(LogEntry * e) { if(loggerType == LOG_TO_FILE) { writeLogEntry(e); return; } else if (loggerType == LOG_TO_MEMORY) { writeLogEntry_InMemoryLog(e); return; + } else { + abort(); } - abort(); // we dont have an appropriate implementation, or weren't initialized... } int LogInit(int logType) { - assert(logType == loggerType); + + loggerType = logType; + if(LOG_TO_FILE == logType) { openLogWriter(); } else if(LOG_TO_MEMORY == logType) { @@ -73,38 +84,37 @@ int LogInit(int logType) { } else { return -1; } - loggerType = logType; return 0; } int LogDeinit() { - assert(loggerType != -1); if(LOG_TO_FILE == loggerType) { closeLogWriter(); } else if(LOG_TO_MEMORY == loggerType) { close_InMemoryLog(); + } else { + abort(); } return 0; } void LogForce(lsn_t lsn) { - if(LOG_TO_FILE == loggerType) { - if(flushedLSN() < lsn) { - syncLog(); + if(LogFlushedLSN() < lsn) { + if(LOG_TO_FILE == loggerType) { + syncLog_LogWriter(); + } else if (LOG_TO_MEMORY == loggerType) { + syncLog_InMemoryLog(); + } else { + abort(); } - assert(flushedLSN() >= lsn); - return; - } else if(LOG_TO_MEMORY == loggerType) { - assert(flushedLSN_InMemoryLog() >= lsn); - return; } - abort(); + assert(LogFlushedLSN() >= lsn); } void LogTruncate(lsn_t lsn) { if(LOG_TO_FILE == loggerType) { - truncateLog(lsn); + truncateLog_LogWriter(lsn); } else if(LOG_TO_MEMORY == loggerType) { - abort(); + truncateLog_InMemoryLog(lsn); } else { abort(); } @@ -113,7 +123,7 @@ void LogTruncate(lsn_t lsn) { lsn_t LogFlushedLSN() { if(LOG_TO_FILE == loggerType) { - return flushedLSN(); + return flushedLSN_LogWriter(); } else if(LOG_TO_MEMORY == loggerType) { return flushedLSN_InMemoryLog(); } @@ -125,29 +135,39 @@ lsn_t LogTruncationPoint() { return firstLogEntry(); } else if(LOG_TO_MEMORY == loggerType) { return firstLogEntry_InMemoryLog(); + } else { + abort(); } - abort(); } const LogEntry * LogReadLSN(lsn_t lsn) { if(LOG_TO_FILE == loggerType) { - return readLSNEntry(lsn); + return readLSNEntry_LogWriter(lsn); } else if(LOG_TO_MEMORY == loggerType) { return readLSNEntry_InMemoryLog(lsn); - } - abort(); + } else { + abort(); + } +} + +lsn_t LogNextEntry(const LogEntry * e) { + if(LOG_TO_FILE == loggerType) { + return nextEntry_LogWriter(e); + } else if(LOG_TO_MEMORY == loggerType) { + return nextEntry_InMemoryLog(e); + } else { + abort(); + } } void FreeLogEntry(const LogEntry * e) { if(LOG_TO_FILE == loggerType) { - free((LogEntry*)e); - return; + free((void*)e); } else if(LOG_TO_MEMORY == loggerType) { - if(e->LSN == -1) { - free((LogEntry*)e); - } - return; + free((void*)e); + } else { + abort(); } - abort(); + } TransactionLog LogTransBegin(int xid) { @@ -164,7 +184,7 @@ static lsn_t LogTransCommon(TransactionLog * l, int type) { LogEntry * e = allocCommonLogEntry(l->prevLSN, l->xid, type); lsn_t ret; - genericLogWrite(e); + LogWrite(e); if(l->prevLSN == -1) { l->recLSN = e->LSN; } l->prevLSN = e->LSN; @@ -183,7 +203,7 @@ extern int numActiveXactions; /** @todo This belongs in logWriter.c and needs a new name. */ -static lsn_t LogTransBundledCommit(TransactionLog * l) { +static lsn_t groupCommit(TransactionLog * l) { static pthread_mutex_t check_commit = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t tooFewXacts = PTHREAD_COND_INITIALIZER; static int pendingCommits = 0; @@ -196,7 +216,7 @@ static lsn_t LogTransBundledCommit(TransactionLog * l) { // int retcode; pthread_mutex_lock(&check_commit); - if(flushedLSN() >= ret) { + if(LogFlushedLSN() >= ret) { pthread_mutex_unlock(&check_commit); return ret; } @@ -215,7 +235,7 @@ static lsn_t LogTransBundledCommit(TransactionLog * l) { if((numActiveXactions > 1 && pendingCommits < numActiveXactions) || (numActiveXactions > 20 && pendingCommits < (int)((double)numActiveXactions * 0.95))) { while(ETIMEDOUT != (pthread_cond_timedwait(&tooFewXacts, &check_commit, &timeout))) { - if(flushedLSN() >= ret) { + if(LogFlushedLSN() >= ret) { pendingCommits--; pthread_mutex_unlock(&check_commit); return ret; @@ -225,32 +245,32 @@ static lsn_t LogTransBundledCommit(TransactionLog * l) { } else { // printf("Didn't wait %d < %d\n", (numActiveXactions / 2), pendingCommits); } - if(flushedLSN() < ret) { - syncLog(); + if(LogFlushedLSN() < ret) { + syncLog_LogWriter(); syncLogCount++; // printf(" %d ", syncLogCount); pthread_cond_broadcast(&tooFewXacts); } - assert(flushedLSN() >= ret); + assert(LogFlushedLSN() >= ret); pendingCommits--; pthread_mutex_unlock(&check_commit); return ret; } lsn_t LogTransCommit(TransactionLog * l) { - assert(loggerType != -1); - if(LOG_TO_FILE == loggerType) { - return LogTransBundledCommit(l); - } else if(LOG_TO_MEMORY == loggerType) { - return LogTransCommon(l, XCOMMIT); - } - abort(); + return groupCommit(l); } lsn_t LogTransAbort(TransactionLog * l) { return LogTransCommon(l, XABORT); } + +/** + @todo Does the handling of operation types / argument sizes belong + here? Shouldn't it be in logEntry.c, or perhaps with other code + that reasons about the various operation types? +*/ LogEntry * LogUpdate(TransactionLog * l, Page * p, recordid rid, int operation, const byte * args) { void * preImage = NULL; long argSize = 0; @@ -283,7 +303,7 @@ LogEntry * LogUpdate(TransactionLog * l, Page * p, recordid rid, int operation, e = allocUpdateLogEntry(l->prevLSN, l->xid, operation, rid, args, argSize, preImage); // writeLogEntry(e); - genericLogWrite(e); + LogWrite(e); DEBUG("Log Common %d, LSN: %ld type: %ld (prevLSN %ld) (argSize %ld)\n", e->xid, (long int)e->LSN, (long int)e->type, (long int)e->prevLSN, (long int) argSize); @@ -298,7 +318,7 @@ LogEntry * LogUpdate(TransactionLog * l, Page * p, recordid rid, int operation, lsn_t LogCLR(int xid, lsn_t LSN, recordid rid, lsn_t prevLSN) { lsn_t ret; LogEntry * e = allocCLRLogEntry(-1, xid, LSN, rid, prevLSN); - genericLogWrite(e); + LogWrite(e); DEBUG("Log CLR %d, LSN: %ld (undoing: %ld, next to undo: %ld)\n", xid, e->LSN, LSN, prevLSN);